@@ -22,6 +22,7 @@ def __init__(self, username, password, server="openfeed.aws.barchart.com", debug
2222
2323 self .instrument_definitions = {}
2424 self .instruments_by_symbol = {}
25+
2526 self .symbol_handlers = {}
2627 self .exchange_handlers = {}
2728 self .heartbeat_handlers = []
@@ -106,50 +107,61 @@ def handleSubscriptionResponse(msg):
106107 if msg .subscriptionResponse .status .result > 1 :
107108 raise Exception ("Subscription has failed" , msg )
108109
109- self .__notify_symbol_listeners (
110- msg .subscriptionResponse .symbol , msg )
110+ if len (msg .subscriptionResponse .symbol ) > 0 :
111+ self .__notify_symbol_listeners (
112+ msg .subscriptionResponse .symbol , msg )
113+ else :
114+ self .__notify_exchange_listeners (
115+ msg .subscriptionResponse .exchange , msg )
116+
111117 return msg
112118
113119 def handleInstrumentDefinition (msg ):
114120 self .instrument_definitions [msg .instrumentDefinition .marketId ] = msg
115121 self .instruments_by_symbol [msg .instrumentDefinition .symbol ] = msg
122+
116123 return msg
117124
118125 def handleMarketUpdate (msg ):
119- symbol = self .instrument_definitions [msg .marketUpdate .marketId ].instrumentDefinition . symbol
126+ inst = self .instrument_definitions [msg .marketUpdate .marketId ].instrumentDefinition
120127
121- self .__notify_symbol_listeners (symbol , msg )
128+ self .__notify_exchange_listeners (inst .barchartExchangeCode , msg )
129+ self .__notify_symbol_listeners (inst .symbol , msg )
122130
123131 return msg
124132
125133 def handleMarketSnapshot (msg ):
126- symbol = self .instrument_definitions [msg .marketSnapshot .marketId ].instrumentDefinition .symbol
134+ inst = self .instrument_definitions [msg .marketSnapshot .marketId ].instrumentDefinition
135+
136+ self .__notify_exchange_listeners (inst .barchartExchangeCode , msg )
137+ self .__notify_symbol_listeners (inst .symbol , msg )
127138
128- self .__notify_symbol_listeners (symbol , msg )
129139 return msg
130140
141+ handlers = {
142+ "loginResponse" : handleLogin ,
143+ "heartBeat" : handleHeartbeat ,
144+ "subscriptionResponse" : handleSubscriptionResponse ,
145+ "instrumentDefinition" : handleInstrumentDefinition ,
146+ "marketSnapshot" : handleMarketSnapshot ,
147+ "marketUpdate" : handleMarketUpdate ,
148+ }
149+
131150 def on_message (ws , message ):
132151
133152 msg = openfeed_api_pb2 .OpenfeedGatewayMessage ()
134153 msg .ParseFromString (message )
135154
136- handlers = {
137- "loginResponse" : handleLogin ,
138- "heartBeat" : handleHeartbeat ,
139- "subscriptionResponse" : handleSubscriptionResponse ,
140- "instrumentDefinition" : handleInstrumentDefinition ,
141- "marketSnapshot" : handleMarketSnapshot ,
142- "marketUpdate" : handleMarketUpdate ,
143- }
155+ msg_type = msg .WhichOneof ("data" )
144156
145- handler = handlers .get (msg . WhichOneof (
146- "data" ) , lambda x : print ("Unhandled Message: " , x ))
157+ handler = handlers .get (
158+ msg_type , lambda x : print ("Unhandled Message: " , x ))
147159
148160 try :
149161 handler (msg )
150162 except Exception as e :
151163 if self .debug :
152- print ("Failed to handling incoming message:" , e )
164+ print ("Failed handling incoming message:" , msg_type , e )
153165 self .__callback (self .on_error , e )
154166
155167 def on_error (ws , error ):
@@ -181,6 +193,9 @@ def on_open(ws):
181193 self .ws .run_forever ()
182194
183195 def __notify_symbol_listeners (self , symbol , msg ):
196+ if symbol not in self .symbol_handlers :
197+ return
198+
184199 for cb in self .symbol_handlers [symbol ]:
185200 try :
186201 cb (msg )
@@ -189,6 +204,18 @@ def __notify_symbol_listeners(self, symbol, msg):
189204 print ("Failed to notify `symbol` callback" , e )
190205 self .__callback (self .on_error , e )
191206
207+ def __notify_exchange_listeners (self , exchange , msg ):
208+ if exchange not in self .exchange_handlers :
209+ return
210+
211+ for cb in self .exchange_handlers [exchange ]:
212+ try :
213+ cb (msg )
214+ except Exception as e :
215+ if self .debug :
216+ print ("Failed to notify `exchange` callback" , e )
217+ self .__callback (self .on_error , e )
218+
192219 def __notify_heartbeat_listeners (self , msg ):
193220 for cb in self .heartbeat_handlers :
194221 try :
@@ -256,8 +283,8 @@ def handle_heartbeat(msg):
256283 of_client = OpenfeedClient ("username" , "password" , debug = False )
257284
258285 of_client .add_symbol_subscription (symbol = "AAPL" , callback = handle_message )
259- # of_client.add_exchange_subscription(
260- # exchange="NYSE ", callback=handle_message)
286+ of_client .add_exchange_subscription (
287+ exchange = "FOREX " , callback = handle_message )
261288 of_client .add_heartbeat_subscription (callback = handle_heartbeat )
262289
263290 of_client .on_error = lambda x : print ("of-client: something went wrong:" , x )
0 commit comments