Skip to main content

StreamData gRPC Method

Loading...

Updated on
Jan 07, 2026

StreamData gRPC Method

Parameters

request
stream<SubscribeRequest>
Loading...

Returns

stream
stream<SubscribeUpdate>
Loading...
data
StreamResponse
Loading...
pong
Pong
Loading...
Request
1
package main
2
3
import (
4
"context"
5
"crypto/tls"
6
"encoding/json"
7
"fmt"
8
"io"
9
"log"
10
"time"
11
12
"google.golang.org/grpc"
13
"google.golang.org/grpc/credentials"
14
"google.golang.org/grpc/metadata"
15
16
pb "hyperliquid-grpc-go/pb"
17
)
18
19
const (
20
// Using Quicknode endpoint
21
grpcEndpoint = "your-grpc-endpoint:port"
22
authToken = "your-auth-token"
23
)
24
25
// Create gRPC connection with TLS
26
func createConnection() (*grpc.ClientConn, error) {
27
tlsConfig := &tls.Config{
28
InsecureSkipVerify: false, // Set to true only for testing
29
}
30
creds := credentials.NewTLS(tlsConfig)
31
32
conn, err := grpc.Dial(
33
grpcEndpoint,
34
grpc.WithTransportCredentials(creds),
35
grpc.WithDefaultCallOptions(
36
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100MB
37
),
38
)
39
40
return conn, err
41
}
42
43
// Create context with auth metadata
44
func createContext() context.Context {
45
md := metadata.Pairs("x-token", authToken)
46
return metadata.NewOutgoingContext(context.Background(), md)
47
}
48
49
50
// Stream trades
51
func streamTrades(client pb.StreamingClient) error {
52
ctx := createContext()
53
54
stream, err := client.StreamData(ctx)
55
if err != nil {
56
return fmt.Errorf("failed to create stream: %v", err)
57
}
58
59
// Send subscription request
60
err = stream.Send(&pb.SubscribeRequest{
61
Request: &pb.SubscribeRequest_Subscribe{
62
Subscribe: &pb.StreamSubscribe{
63
StreamType: pb.StreamType_TRADES,
64
StartBlock: 0,
65
UserAddresses: []string{},
66
Coins: []string{},
67
},
68
},
69
})
70
if err != nil {
71
return fmt.Errorf("failed to subscribe: %v", err)
72
}
73
74
// Start ping goroutine
75
go func() {
76
ticker := time.NewTicker(30 * time.Second)
77
defer ticker.Stop()
78
79
for range ticker.C {
80
err := stream.Send(&pb.SubscribeRequest{
81
Request: &pb.SubscribeRequest_Ping{
82
Ping: &pb.Ping{
83
Timestamp: time.Now().UnixMilli(),
84
},
85
},
86
})
87
if err != nil {
88
log.Printf("Failed to send ping: %v", err)
89
return
90
}
91
}
92
}()
93
94
// Receive messages
95
log.Println("šŸ”„ Starting trades stream...")
96
for {
97
msg, err := stream.Recv()
98
if err == io.EOF {
99
log.Println("Stream ended")
100
break
101
}
102
if err != nil {
103
return fmt.Errorf("receive error: %v", err)
104
}
105
106
switch update := msg.Update.(type) {
107
case *pb.SubscribeUpdate_Data:
108
var data map[string]interface{}
109
if err := json.Unmarshal([]byte(update.Data.Data), &data); err != nil {
110
log.Printf("āš ļø Failed to parse data: %v", err)
111
continue
112
}
113
114
trades, ok := data["trades"].([]interface{})
115
tradesCount := 0
116
if ok {
117
tradesCount = len(trades)
118
}
119
120
log.Printf("šŸ“ˆ Block %d: %d trades", update.Data.BlockNumber, tradesCount)
121
122
// Pretty print first trade if available
123
if tradesCount > 0 {
124
firstTrade := trades[0]
125
tradeJSON, _ := json.MarshalIndent(firstTrade, " ", " ")
126
fmt.Printf(" Example trade:\n%s\n", tradeJSON)
127
}
128
129
case *pb.SubscribeUpdate_Pong:
130
log.Printf("šŸ“ Pong received: %d", update.Pong.Timestamp)
131
}
132
}
133
134
return nil
135
}
136
137
// Stream filtered trades
138
func streamFilteredTrades(client pb.StreamingClient, coins []string) error {
139
ctx := createContext()
140
141
stream, err := client.StreamData(ctx)
142
if err != nil {
143
return fmt.Errorf("failed to create stream: %v", err)
144
}
145
146
// Send filtered subscription
147
err = stream.Send(&pb.SubscribeRequest{
148
Request: &pb.SubscribeRequest_Subscribe{
149
Subscribe: &pb.StreamSubscribe{
150
StreamType: pb.StreamType_TRADES,
151
StartBlock: 0,
152
UserAddresses: []string{},
153
Coins: coins,
154
},
155
},
156
})
157
if err != nil {
158
return fmt.Errorf("failed to subscribe: %v", err)
159
}
160
161
log.Printf("šŸ”„ Starting filtered trades stream for coins: %v", coins)
162
163
// Receive messages
164
for {
165
msg, err := stream.Recv()
166
if err == io.EOF {
167
log.Println("Filtered stream ended")
168
break
169
}
170
if err != nil {
171
return fmt.Errorf("receive error: %v", err)
172
}
173
174
switch update := msg.Update.(type) {
175
case *pb.SubscribeUpdate_Data:
176
var data map[string]interface{}
177
if err := json.Unmarshal([]byte(update.Data.Data), &data); err != nil {
178
log.Printf("āš ļø Failed to parse data: %v", err)
179
continue
180
}
181
182
if trades, ok := data["trades"].([]interface{}); ok && len(trades) > 0 {
183
log.Printf("šŸ“ˆ Filtered Block %d: %d trades", update.Data.BlockNumber, len(trades))
184
prettyJSON, _ := json.MarshalIndent(data, "", " ")
185
fmt.Println(string(prettyJSON))
186
}
187
}
188
}
189
190
return nil
191
}
192
193
// Stream orders
194
func streamOrders(client pb.StreamingClient) error {
195
ctx := createContext()
196
197
stream, err := client.StreamData(ctx)
198
if err != nil {
199
return fmt.Errorf("failed to create stream: %v", err)
200
}
201
202
// Send subscription request for orders
203
err = stream.Send(&pb.SubscribeRequest{
204
Request: &pb.SubscribeRequest_Subscribe{
205
Subscribe: &pb.StreamSubscribe{
206
StreamType: pb.StreamType_ORDERS,
207
StartBlock: 0,
208
UserAddresses: []string{},
209
Coins: []string{},
210
},
211
},
212
})
213
if err != nil {
214
return fmt.Errorf("failed to subscribe: %v", err)
215
}
216
217
log.Println("šŸ”„ Starting orders stream...")
218
219
// Receive messages
220
for {
221
msg, err := stream.Recv()
222
if err == io.EOF {
223
log.Println("Orders stream ended")
224
break
225
}
226
if err != nil {
227
return fmt.Errorf("receive error: %v", err)
228
}
229
230
switch update := msg.Update.(type) {
231
case *pb.SubscribeUpdate_Data:
232
var data map[string]interface{}
233
if err := json.Unmarshal([]byte(update.Data.Data), &data); err != nil {
234
log.Printf("āš ļø Failed to parse order data: %v", err)
235
continue
236
}
237
238
log.Printf("šŸ“‹ Orders Block %d:", update.Data.BlockNumber)
239
prettyJSON, _ := json.MarshalIndent(data, "", " ")
240
fmt.Println(string(prettyJSON))
241
}
242
}
243
244
return nil
245
}
246
247
248
// Interactive menu
249
func runInteractiveMenu(conn *grpc.ClientConn) {
250
client := pb.NewStreamingClient(conn)
251
252
fmt.Println("\nšŸŽ‰ Connected successfully!")
253
fmt.Println("\nAvailable streams:")
254
fmt.Println("1. All trades")
255
fmt.Println("2. Filtered trades (BTC, ETH)")
256
fmt.Println("3. Orders")
257
fmt.Println("4. Exit")
258
259
for {
260
fmt.Print("\nSelect stream (1-4): ")
261
var choice string
262
fmt.Scanln(&choice)
263
264
switch choice {
265
case "1":
266
if err := streamTrades(client); err != nil {
267
log.Printf("āŒ Stream error: %v", err)
268
}
269
case "2":
270
if err := streamFilteredTrades(client, []string{"BTC", "ETH"}); err != nil {
271
log.Printf("āŒ Filtered stream error: %v", err)
272
}
273
case "3":
274
if err := streamOrders(client); err != nil {
275
log.Printf("āŒ Orders stream error: %v", err)
276
}
277
case "4":
278
fmt.Println("šŸ‘‹ Goodbye!")
279
return
280
default:
281
fmt.Println("āŒ Invalid choice. Please select 1-4.")
282
}
283
}
284
}
285
286
func main() {
287
// Create connection
288
conn, err := createConnection()
289
if err != nil {
290
log.Fatalf("āŒ Failed to connect: %v", err)
291
}
292
defer conn.Close()
293
294
fmt.Printf("Connected to: %s\n", grpcEndpoint)
295
296
// Run interactive menu
297
runInteractiveMenu(conn)
298
}
Don't have an account yet?
Create your Quicknode endpoint in seconds and start building
Get started for free