Skip to main content

Making Hyperliquid gRPC Requests with Python

Updated on
Jan 08, 2026

Overview

Python is a high-level, interpreted programming language known for its simplicity and readability. Follow the official installation guide to install Python 3.8+. Verify the installation:

python --version
pip --version

Initiating the Python Project for Hyperliquid gRPC

Step 1: Initialize Python Project

Create a dedicated directory for your Hyperliquid gRPC project and navigate into it:

mkdir hyperliquid-grpc-python
cd hyperliquid-grpc-python

Create and activate a virtual environment to isolate project dependencies:

# Create virtual environment
python3 -m venv venv

# Activate virtual environment
# On macOS/Linux:
source venv/bin/activate
# On Windows:
# venv\Scripts\activate

Step 3: Install Dependencies

Install the necessary Python dependencies for gRPC and Protocol Buffers:

# Install gRPC and Protocol Buffers packages
pip install grpcio grpcio-tools

# For development and testing
pip install asyncio

Step 4: Create Proto Files

Create the proto directory and add the Hyperliquid streaming protocol definition:

# Create proto directory
mkdir -p proto

Create a new file named proto/streaming.proto and add the following content:

syntax = "proto3";
package hyperliquid;

service Streaming {
// Bi-directional streaming
rpc StreamData (stream SubscribeRequest) returns (stream SubscribeUpdate);
rpc Ping (PingRequest) returns (PingResponse);
}

service BlockStreaming {
// Stream replica_cmds (raw blocks)
rpc StreamBlocks (Timestamp) returns (stream Block);
}

// --- Requests ---
message SubscribeRequest {
oneof request {
StreamSubscribe subscribe = 1;
Ping ping = 3;
}
reserved 2;
}

message StreamSubscribe {
StreamType stream_type = 1;
uint64 start_block = 2;

// Generic filters - field name to allowed values
// Recursively searches each event for matching field/value pairs
// Example: {"coin": ["ETH", "BTC"], "user": ["0x123..."], "type": ["deposit"]}
map<string, FilterValues> filters = 3;

// Optional name for this filter
// Allows multiple independent filters per stream (OR logic)
string filter_name = 4;
}

// Container for filter values
message FilterValues {
repeated string values = 1;
}

message Ping { int64 timestamp = 1; }

// --- Responses ---
message SubscribeUpdate {
oneof update {
StreamResponse data = 1;
Pong pong = 2;
}
}

message StreamResponse {
uint64 block_number = 1;
uint64 timestamp = 2; // Server ingress timestamp
// Raw JSON data from the file (Exact replica of source)
string data = 3;
}

// --- Data Types ---
enum StreamType {
UNKNOWN = 0;
TRADES = 1;
ORDERS = 2;
BOOK_UPDATES = 3;
TWAP = 4;
EVENTS = 5;
BLOCKS = 6;
WRITER_ACTIONS = 7;
}

message Block {
string data_json = 1;
}

message Pong { int64 timestamp = 1; }
message Timestamp { int64 timestamp = 1; }
message PingRequest { int32 count = 1; }
message PingResponse { int32 count = 1; }

Step 5: Generate Python Code from Proto Files

Generate Python client code from the protocol buffer definition:

# Create pb directory for generated files
mkdir -p pb

# Generate Python code from proto files
python -m grpc_tools.protoc --proto_path=proto --python_out=pb --grpc_python_out=pb proto/streaming.proto

# Create __init__.py to make pb a proper Python package
touch pb/__init__.py

This will create:

  • pb/streaming_pb2.py - Message classes
  • pb/streaming_pb2_grpc.py - gRPC service classes
  • pb/__init__.py - Makes pb directory a Python package

Step 6: Fix Import Issues (Important!)

After generating the files, fix the import in pb/streaming_pb2_grpc.py:

Change this line:

import streaming_pb2 as streaming__pb2

To this:

from . import streaming_pb2 as streaming__pb2

You can do this manually or run:

# Fix the import automatically (macOS/Linux)
sed -i 's/import streaming_pb2 as streaming__pb2/from . import streaming_pb2 as streaming__pb2/' pb/streaming_pb2_grpc.py

# For Windows PowerShell:
# (Get-Content pb/streaming_pb2_grpc.py) -replace 'import streaming_pb2 as streaming__pb2', 'from . import streaming_pb2 as streaming__pb2' | Set-Content pb/streaming_pb2_grpc.py

Step 7: Create Simple Ping Example

First, create a simple ping-only example to test basic connectivity (ping_example.py):

import grpc
import sys
from pb import streaming_pb2, streaming_pb2_grpc

# Configuration
GRPC_ENDPOINT = 'your-grpc-endpoint:port'
AUTH_TOKEN = 'your-auth-token'

def create_channel():
"""Create gRPC channel with TLS"""
credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(
GRPC_ENDPOINT,
credentials,
options=[
('grpc.max_receive_message_length', 100 * 1024 * 1024), # 100MB
('grpc.max_send_message_length', 100 * 1024 * 1024), # 100MB
]
)
return channel

def create_metadata():
"""Create metadata with auth token"""
return [('x-token', AUTH_TOKEN)]

def test_ping(stub):
"""Test connectivity with ping"""
try:
metadata = create_metadata()
request = streaming_pb2.PingRequest(count=1)

response = stub.Ping(request, metadata=metadata)
print(f"✅ Ping successful: {response}")
return True
except grpc.RpcError as e:
print(f"❌ Ping failed: {e.details()}")
return False

def main():
"""Main function"""
try:
print(f"Testing connection to: {GRPC_ENDPOINT}")

# Create channel and stub
channel = create_channel()
streaming_stub = streaming_pb2_grpc.StreamingStub(channel)

# Test connectivity
if test_ping(streaming_stub):
print("✅ Connection test completed successfully!")
else:
print("❌ Connection test failed!")

# Close channel
channel.close()

except KeyboardInterrupt:
print("\n👋 Goodbye!")
except Exception as e:
print(f"❌ Error: {e}")
sys.exit(1)

if __name__ == "__main__":
main()

Step 8: Create Full Application with StreamData

Create the full application to handle streaming data (main.py):

import grpc
import json
import time
import signal
import sys
import threading
from pb import streaming_pb2, streaming_pb2_grpc

# Configuration
GRPC_ENDPOINT = 'your-grpc-endpoint:port'
AUTH_TOKEN = 'your-auth-token'

def create_channel():
"""Create gRPC channel with TLS"""
credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(
GRPC_ENDPOINT,
credentials,
options=[
('grpc.max_receive_message_length', 100 * 1024 * 1024), # 100MB
('grpc.max_send_message_length', 100 * 1024 * 1024), # 100MB
]
)
return channel

def create_metadata():
"""Create metadata with auth token"""
return [('x-token', AUTH_TOKEN)]

def test_ping(stub):
"""Test connectivity with ping"""
try:
metadata = create_metadata()
request = streaming_pb2.PingRequest(count=1)

response = stub.Ping(request, metadata=metadata)
print(f"✅ Ping successful: {response}")
return True
except grpc.RpcError as e:
print(f"❌ Ping failed: {e.details()}")
return False

def stream_trades(stub):
"""Stream trades data"""
try:
metadata = create_metadata()

print("🔄 Starting trades stream...")

# Create request generator for bidirectional stream
def request_generator():
# Send initial subscription request
subscribe_request = streaming_pb2.SubscribeRequest()
subscribe_request.subscribe.stream_type = streaming_pb2.StreamType.TRADES
subscribe_request.subscribe.start_block = 0
# Empty lists for user_addresses and coins means get all
yield subscribe_request

# Keep sending pings to maintain connection
while True:
time.sleep(30)
ping_request = streaming_pb2.SubscribeRequest()
ping_request.ping.timestamp = int(time.time() * 1000)
yield ping_request

# Create bidirectional stream
stream = stub.StreamData(request_generator(), metadata=metadata)

# Handle stream responses
for response in stream:
try:
if response.HasField('data'):
data = json.loads(response.data.data)
trades = data.get('trades', [])

print(f"📈 Block {response.data.block_number}: {len(trades)} trades")

# Pretty print first trade if available
if trades:
print(f" Example trade: {json.dumps(trades[0], indent=2)}")

elif response.HasField('pong'):
print(f"🏓 Pong received: {response.pong.timestamp}")

except json.JSONDecodeError as e:
print(f"⚠️ Failed to parse data: {e}")
except KeyboardInterrupt:
print("\n⏹️ Stream interrupted by user")
break

except grpc.RpcError as e:
print(f"❌ Stream error: {e.details()}")
except KeyboardInterrupt:
print("\n⏹️ Stream interrupted by user")

def stream_orders(stub):
"""Stream orders data"""
try:
metadata = create_metadata()

print("🔄 Starting orders stream...")

# Create request generator for bidirectional stream
def request_generator():
# Send initial subscription request for orders
subscribe_request = streaming_pb2.SubscribeRequest()
subscribe_request.subscribe.stream_type = streaming_pb2.StreamType.ORDERS
subscribe_request.subscribe.start_block = 0
yield subscribe_request

# Keep sending pings to maintain connection
while True:
time.sleep(30)
ping_request = streaming_pb2.SubscribeRequest()
ping_request.ping.timestamp = int(time.time() * 1000)
yield ping_request

# Create bidirectional stream
stream = stub.StreamData(request_generator(), metadata=metadata)

# Handle stream responses
for response in stream:
try:
if response.HasField('data'):
data = json.loads(response.data.data)
print(f"📋 Orders Block {response.data.block_number}:")
print(json.dumps(data, indent=2))

except json.JSONDecodeError as e:
print(f"⚠️ Failed to parse order data: {e}")
except KeyboardInterrupt:
print("\n⏹️ Stream interrupted by user")
break

except grpc.RpcError as e:
print(f"❌ Stream error: {e.details()}")
except KeyboardInterrupt:
print("\n⏹️ Stream interrupted by user")

def stream_filtered_trades(stub, coins):
"""Stream filtered trades for specific coins"""
try:
metadata = create_metadata()

print(f"🔄 Starting filtered trades stream for coins: {', '.join(coins)}")

# Create request generator for bidirectional stream
def request_generator():
# Send filtered subscription request
subscribe_request = streaming_pb2.SubscribeRequest()
subscribe_request.subscribe.stream_type = streaming_pb2.StreamType.TRADES
subscribe_request.subscribe.start_block = 0
subscribe_request.subscribe.coins.extend(coins)
yield subscribe_request

# Keep sending pings to maintain connection
while True:
time.sleep(30)
ping_request = streaming_pb2.SubscribeRequest()
ping_request.ping.timestamp = int(time.time() * 1000)
yield ping_request

# Create bidirectional stream
stream = stub.StreamData(request_generator(), metadata=metadata)

# Handle stream responses
for response in stream:
try:
if response.HasField('data'):
data = json.loads(response.data.data)
trades = data.get('trades', [])

if trades:
print(f"📈 Filtered Block {response.data.block_number}: {len(trades)} trades")
print(json.dumps(data, indent=2))

except json.JSONDecodeError as e:
print(f"⚠️ Failed to parse data: {e}")
except KeyboardInterrupt:
print("\n⏹️ Stream interrupted by user")
break

except grpc.RpcError as e:
print(f"❌ Stream error: {e.details()}")
except KeyboardInterrupt:
print("\n⏹️ Stream interrupted by user")

def show_menu():
"""Display interactive menu"""
print("\n🎉 Connected successfully!")
print("\nAvailable streams:")
print("1. All trades")
print("2. Filtered trades (BTC, ETH)")
print("3. Orders")
print("4. Test ping")
print("5. Exit")

while True:
try:
choice = input("\nSelect stream (1-5): ").strip()
if choice in ['1', '2', '3', '4', '5']:
return choice
else:
print("❌ Invalid choice. Please select 1-5.")
except KeyboardInterrupt:
return '5'

def main():
"""Main function"""
try:
print(f"Testing connection to: {GRPC_ENDPOINT}")

# Create channel and stub
channel = create_channel()
streaming_stub = streaming_pb2_grpc.StreamingStub(channel)

# Test initial connectivity
if not test_ping(streaming_stub):
print("❌ Connection test failed!")
return

# Interactive menu loop
while True:
choice = show_menu()

if choice == '1':
stream_trades(streaming_stub)
elif choice == '2':
stream_filtered_trades(streaming_stub, ['BTC', 'ETH'])
elif choice == '3':
stream_orders(streaming_stub)
elif choice == '4':
test_ping(streaming_stub)
elif choice == '5':
print("👋 Goodbye!")
break

# Close channel
channel.close()

except KeyboardInterrupt:
print("\n👋 Goodbye!")
except Exception as e:
print(f"❌ Error: {e}")
sys.exit(1)

if __name__ == "__main__":
main()

Step 9: Create Async Version (Optional)

For better performance with streaming, create an asynchronous version (async_main.py):

import asyncio
import grpc
import json
import time
import signal
from pb import streaming_pb2, streaming_pb2_grpc

# Configuration
GRPC_ENDPOINT = 'your-grpc-endpoint:port'
AUTH_TOKEN = 'your-auth-token'

def create_channel():
"""Create async gRPC channel with TLS"""
credentials = grpc.ssl_channel_credentials()
channel = grpc.aio.secure_channel(
GRPC_ENDPOINT,
credentials,
options=[
('grpc.max_receive_message_length', 100 * 1024 * 1024), # 100MB
('grpc.max_send_message_length', 100 * 1024 * 1024), # 100MB
]
)
return channel

def create_metadata():
"""Create metadata with auth token"""
return [('x-token', AUTH_TOKEN)]

async def test_ping(stub):
"""Test connectivity with ping"""
try:
metadata = create_metadata()
request = streaming_pb2.PingRequest(count=1)

response = await stub.Ping(request, metadata=metadata)
print(f"✅ Ping successful: {response}")
return True
except grpc.RpcError as e:
print(f"❌ Ping failed: {e.details()}")
return False

async def stream_blocks(stub):
"""Stream blocks (raw L1 data) asynchronously"""
try:
metadata = create_metadata()
request = streaming_pb2.Timestamp(timestamp=int(time.time() * 1000))

print("🔄 Starting blocks stream...")

# Create async stream
stream = stub.StreamBlocks(request, metadata=metadata)

# Handle stream data
async for block in stream:
try:
block_data = json.loads(block.data_json)

if 'abci_block' in block_data and 'round' in block_data['abci_block']:
block_round = block_data['abci_block']['round']
print(f"🧱 Received block round: {block_round}")

# Pretty print the full block data (optional - can be very verbose)
# print(json.dumps(block_data, indent=2))

except json.JSONDecodeError as e:
print(f"⚠️ Failed to parse block: {e}")
except asyncio.CancelledError:
print("\n⏹️ Stream cancelled")
break

except grpc.RpcError as e:
print(f"❌ Stream error: {e.details()}")
except asyncio.CancelledError:
print("\n⏹️ Stream cancelled")

def show_menu():
"""Display interactive menu"""
print("\n🎉 Connected successfully!")
print("\nAvailable options:")
print("1. Stream blocks")
print("2. Test ping")
print("3. Exit")

while True:
try:
choice = input("\nSelect option (1-3): ").strip()
if choice in ['1', '2', '3']:
return choice
else:
print("❌ Invalid choice. Please select 1-3.")
except KeyboardInterrupt:
return '3'

async def main():
"""Main async function"""
try:
print(f"Testing connection to: {GRPC_ENDPOINT}")

# Create channel and stubs
channel = create_channel()
streaming_stub = streaming_pb2_grpc.StreamingStub(channel)
block_streaming_stub = streaming_pb2_grpc.BlockStreamingStub(channel)

# Test initial connectivity
if not await test_ping(streaming_stub):
print("❌ Connection test failed!")
return

# Interactive menu loop
while True:
choice = show_menu()

if choice == '1':
await stream_blocks(block_streaming_stub)
elif choice == '2':
await test_ping(streaming_stub)
elif choice == '3':
print("👋 Goodbye!")
break

# Close channel
await channel.close()

except KeyboardInterrupt:
print("\n👋 Goodbye!")
except Exception as e:
print(f"❌ Error: {e}")

def run_async_main():
"""Run the async main function"""
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n👋 Goodbye!")

if __name__ == "__main__":
run_async_main()

Step 10: Create Requirements File

Create requirements.txt to manage dependencies:

grpcio==1.60.0
grpcio-tools==1.60.0
protobuf==4.25.1

Step 11: Run the Application

Test with simple ping example first to verify basic connectivity:

python ping_example.py

For the full application with StreamData:

python main.py

For the asynchronous version:

python async_main.py

Install from requirements file:

pip install -r requirements.txt
python ping_example.py # Test connectivity first
python main.py # Run full application

Step 12: Verify Setup

If everything is set up correctly, you should see:

Testing connection to: docs-demo.hype-mainnet.quiknode.pro:10000
Ping successful: count: 1
Connection test completed successfully!
  • Connection established to the gRPC endpoint
  • Successful ping response
  • No compilation or runtime errors

Project Structure

hyperliquid-grpc-python/
├── requirements.txt
├── ping_example.py
├── main.py
├── async_main.py (optional)
├── proto/
│ └── streaming.proto
├── pb/
│ ├── __init__.py
│ ├── streaming_pb2.py
│ └── streaming_pb2_grpc.py
└── venv/ (if using virtual environment)

We ❤️ Feedback!

If you have any feedback or questions about this documentation, let us know. We'd love to hear from you!

Share this doc