1+ import traceback
12import sys
23import inspect
34from generated import openfeed_api_pb2
@@ -52,7 +53,8 @@ def add_symbol_subscription(self, symbol, callback):
5253 self .symbol_handlers [symbol ] = []
5354
5455 if self .token is not None :
55- self ._send_message (self .__create_symbol_request ([symbol ]))
56+ self ._send_message (
57+ self .__create_subscription_request (None , [symbol ]))
5658
5759 self .symbol_handlers [symbol ].append (callback )
5860
@@ -61,17 +63,23 @@ def add_exchange_subscription(self, exchange, callback):
6163 self .exchange_handlers [exchange ] = []
6264
6365 if self .token is not None :
64- self ._send_message (self .__create_exchange_request ([exchange ]))
66+ self ._send_message (
67+ self .__create_subscription_request ([exchange ], None ))
6568
6669 self .exchange_handlers [exchange ].append (callback )
6770
71+ def get_instrument_definitions (self ):
72+ return self .instrument_definitions
73+
6874 def get_instrument_definition (self , id ):
6975 return self .instrument_definitions [id ]
7076
7177 def get_instrument_definition_by_symbol (self , symbol ):
7278 return self .instruments_by_symbol [symbol ]
7379
7480 def _send_message (self , msg ):
81+ if self .debug :
82+ print ("Sending:" , msg )
7583 self .ws .send (msg .SerializeToString (), websocket .ABNF .OPCODE_BINARY )
7684
7785 def __reset (self ):
@@ -89,13 +97,9 @@ def handleLogin(msg):
8997 self .token = msg .loginResponse .token
9098
9199 # sub to all existing interest
92- self ._send_message (self .__create_symbol_request (
93- self .symbol_handlers .keys ()))
94-
95- # sub to all existing exchanges
96- self ._send_message (self .__create_exchange_request (
97- self .exchange_handlers .keys ()))
98-
100+ self ._send_message (self .__create_subscription_request (
101+ self .exchange_handlers .keys (), self .symbol_handlers .keys ()))
102+
99103 return msg
100104
101105 def handleHeartbeat (msg ):
@@ -134,7 +138,10 @@ def handleMarketSnapshot(msg):
134138 inst = self .instrument_definitions [msg .marketSnapshot .marketId ].instrumentDefinition
135139
136140 self .__notify_exchange_listeners (inst .barchartExchangeCode , msg )
137- self .__notify_symbol_listeners (inst .symbol , msg )
141+
142+ # TODO review symbology handling, subbing by one and keying off the other can create unexpected results
143+ # for example subscribing to "ZCYAIA40.CM" will come back with OF symbol (less the suffix) in `instrument.symbol`
144+ self .__notify_symbol_listeners (inst .symbols [0 ].symbol , msg )
138145
139146 return msg
140147
@@ -167,7 +174,7 @@ def on_message(ws, message):
167174 def on_error (ws , error ):
168175 if self .debug :
169176 print ("WS Error: " , error )
170-
177+ traceback . print_exc ()
171178 self .__callback (self .on_error , error )
172179
173180 def on_close (ws ):
@@ -225,30 +232,22 @@ def __notify_heartbeat_listeners(self, msg):
225232 print ("Failed to notify `heartbeat` callback" , e )
226233 self .__callback (self .on_error , e )
227234
228- def __create_symbol_request (self , symbols ):
235+ def __create_subscription_request (self , exchanges , symbols ):
229236 requests = []
230237
231- for sym in symbols :
232- requests .append (openfeed_api_pb2 .SubscriptionRequest .Request (
233- symbol = sym ,
234- ))
238+ if len (exchanges ) > 0 :
239+ for exch in exchanges :
240+ requests .append (openfeed_api_pb2 .SubscriptionRequest .Request (
241+ exchange = exch ,
242+ subscriptionType = [
243+ openfeed_api_pb2 .SubscriptionType .Value ("QUOTE" )]
244+ ))
235245
236- of_req = openfeed_api_pb2 .OpenfeedGatewayRequest (
237- subscriptionRequest = openfeed_api_pb2 .SubscriptionRequest (
238- token = self .token ,
239- service = openfeed_pb2 .Service .Value ("REAL_TIME" ),
240- requests = requests
241- ))
242-
243- return of_req
244-
245- def __create_exchange_request (self , exchanges ):
246- requests = []
247-
248- for exch in exchanges :
249- requests .append (openfeed_api_pb2 .SubscriptionRequest .Request (
250- exchange = exch ,
251- ))
246+ if len (symbols ) > 0 :
247+ for sym in symbols :
248+ requests .append (openfeed_api_pb2 .SubscriptionRequest .Request (
249+ symbol = sym ,
250+ ))
252251
253252 of_req = openfeed_api_pb2 .OpenfeedGatewayRequest (
254253 subscriptionRequest = openfeed_api_pb2 .SubscriptionRequest (
@@ -282,7 +281,6 @@ def handle_heartbeat(msg):
282281
283282 of_client = OpenfeedClient ("username" , "password" , debug = False )
284283
285- of_client .add_symbol_subscription (symbol = "AAPL" , callback = handle_message )
286284 of_client .add_exchange_subscription (
287285 exchange = "FOREX" , callback = handle_message )
288286 of_client .add_heartbeat_subscription (callback = handle_heartbeat )
@@ -292,7 +290,8 @@ def handle_heartbeat(msg):
292290 of_client .on_connected = lambda x : print ("of-client: connected" )
293291
294292 # blocking mode
295- of_client .start (blocking = True )
293+ of_client .start (blocking = False )
296294
297- # while True:
298- # time.sleep(3)
295+ while True :
296+ print ("Number of Instruments:" , len (of_client .get_instrument_definitions ()))
297+ time .sleep (10 )
0 commit comments