2
2
from generated import openfeed_api_pb2
3
3
from generated import openfeed_pb2
4
4
from generated import openfeed_instrument_pb2
5
+ from struct import unpack
6
+ from io import BytesIO
7
+ import platform
5
8
import time
6
9
import traceback
7
10
import sys
8
11
import inspect
9
12
import collections
10
13
import websocket
11
14
import random
15
+ from .version import VERSION
12
16
try :
13
17
import thread
14
18
except ImportError :
@@ -109,7 +113,7 @@ def add_symbol_subscription(self, symbol: Union[str, list], callback, service="R
109
113
110
114
return self
111
115
112
- def add_exchange_subscription (self , exchange : Union [str , list ], callback , service = "REAL_TIME" , subscription_type = ["QUOTE" ], instrument_type = [], snapshot_interval_seconds = 60 ):
116
+ def add_exchange_subscription (self , exchange : Union [str , list ], callback , service = "REAL_TIME" , subscription_type = ["QUOTE" ], instrument_type = [], snapshot_interval_seconds = 60 , bulk_subscription_filters = [] ):
113
117
"""Subscribe to [Market Data] by Barchart Exchange code(s).
114
118
115
119
Complete list of [SubscriptionTypes]. List of [Service] types.
@@ -121,18 +125,21 @@ def add_exchange_subscription(self, exchange: Union[str, list], callback, servic
121
125
exchange: str, list
122
126
Barchart Exchange code(s)
123
127
service: str, optional
124
- Default is `REAL_TIME` for delayed market data, set it to `DELAYED`, or for snapshots set it as one of `REAL_TIME_SNAPSHOT` or `DELAYED_SNAPSHOT'
128
+ [ServiceType]. Default is `REAL_TIME` for delayed market data, set it to `DELAYED`, or for snapshots set it as one of `REAL_TIME_SNAPSHOT` or `DELAYED_SNAPSHOT'
125
129
callback: Callable
126
130
Your callback function for Market Data messages
127
131
subscription_type: list, optional
128
- Default is ['QUOTE']. Can contain any of: 'ALL', 'QUOTE', 'QUOTE_PARTICIPANT', 'DEPTH_PRICE', 'DEPTH_ORDER', 'TRADES', 'OHLC'
132
+ [SubscriptionTypes]. Default is ['QUOTE']. Can contain any of: 'ALL', 'QUOTE', 'QUOTE_PARTICIPANT', 'DEPTH_PRICE', 'DEPTH_ORDER', 'TRADES', 'OHLC'
129
133
instrument_type: list, optional
130
- Spreads and Options must be explicitly requested. Can contain any of: 'SPREAD', 'OPTION', 'FUTURE', 'FOREX', 'EQUITY', 'INDEX', 'MUTUAL_FUND', 'MONEY_MARKET', 'MONEY_MARKET_FUND'
134
+ [InstrumentTypes]. Spreads and Options must be explicitly requested. Can contain any of: 'SPREAD', 'OPTION', 'FUTURE', 'FOREX', 'EQUITY', 'INDEX', 'MUTUAL_FUND', 'MONEY_MARKET', 'MONEY_MARKET_FUND'
135
+ bulk_subscription_filters: list of BulkSubscriptionFilter, optional
136
+ List of [BulkSubscriptionFilter]
131
137
132
138
[Market Data]: https://docs.barchart.com/openfeed/#/proto?id=marketupdate
133
139
[SubscriptionTypes]: https://docs.barchart.com/openfeed/#/proto?id=subscriptiontype
134
- [Service ]: https://docs.barchart.com/openfeed/#/proto?id=service
140
+ [ServiceType ]: https://docs.barchart.com/openfeed/#/proto?id=service
135
141
[InstrumentTypes]: https://docs.barchart.com/openfeed/#/proto?id=instrumentdefinitioninstrumenttype
142
+ [BulkSubscriptionFilter]: https://docs.barchart.com/openfeed/#/proto?id=bulksubscriptionfilter
136
143
"""
137
144
exchanges = []
138
145
@@ -146,11 +153,11 @@ def add_exchange_subscription(self, exchange: Union[str, list], callback, servic
146
153
self .exchange_handlers [exch ] = []
147
154
148
155
self .exchange_handlers [exch ].append (Listener (
149
- exchange = exch , callback = callback , service = service , subscription_type = subscription_type , instrument_type = instrument_type , snapshot_interval_seconds = snapshot_interval_seconds ))
156
+ exchange = exch , callback = callback , service = service , subscription_type = subscription_type , instrument_type = instrument_type , snapshot_interval_seconds = snapshot_interval_seconds , bulk_subscription_filters = bulk_subscription_filters ))
150
157
151
158
if self .token is not None :
152
- self ._send_message (
153
- self .__create_subscription_request (exchanges = exchanges , service = service , subscription_type = subscription_type , instrument_type = instrument_type , snapshot_interval_seconds = snapshot_interval_seconds ))
159
+ self ._send_message (
160
+ self .__create_subscription_request (exchanges = exchanges , service = service , subscription_type = subscription_type , instrument_type = instrument_type , snapshot_interval_seconds = snapshot_interval_seconds , bulk_subscription_filters = bulk_subscription_filters ))
154
161
155
162
return self
156
163
@@ -371,22 +378,34 @@ def handleOHLC(msg):
371
378
}
372
379
373
380
def on_message (ws : websocket .WebSocketApp , message ):
381
+ byte_buffer = BytesIO (message )
382
+ total = byte_buffer .getbuffer ().nbytes
374
383
375
- msg = openfeed_api_pb2 .OpenfeedGatewayMessage ()
376
- msg .ParseFromString (message )
384
+ msg_count = 0
377
385
378
- msg_type = msg .WhichOneof ("data" )
386
+ while byte_buffer .tell () != total :
387
+ msg_len = int .from_bytes (byte_buffer .read (
388
+ 2 ), byteorder = 'big' , signed = True )
379
389
380
- handler = handlers . get (
381
- msg_type , lambda x : print ( "Unhandled Message:" , x ))
390
+ msg = openfeed_api_pb2 . OpenfeedGatewayMessage ()
391
+ msg . ParseFromString ( byte_buffer . read ( msg_len ))
382
392
383
- try :
384
- handler (msg )
385
- except Exception as e :
386
393
if self .debug :
387
- print ("Failed handling incoming message:" , msg_type , e )
388
- traceback .print_exc ()
389
- self .__callback (self .on_error , e )
394
+ msg_count = msg_count + 1
395
+ print ("msg len:" , msg_len , "number of messages:" , msg_count )
396
+
397
+ msg_type = msg .WhichOneof ("data" )
398
+
399
+ handler = handlers .get (
400
+ msg_type , lambda x : print ("Unhandled Message:" , x ))
401
+
402
+ try :
403
+ handler (msg )
404
+ except Exception as e :
405
+ if self .debug :
406
+ print ("Failed handling incoming message:" , msg_type , e )
407
+ traceback .print_exc ()
408
+ self .__callback (self .on_error , e )
390
409
391
410
def on_error (ws , error ):
392
411
if self .debug :
@@ -473,7 +492,7 @@ def __send_existing_interest(self):
473
492
listeners_by_service = interest [l .service ]
474
493
if l .key () not in listeners_by_service :
475
494
listeners_by_service [l .key ()] = Listener (
476
- symbol = l .symbol , exchange = l .exchange , service = l .service , subscription_type = l .subscription_type , instrument_type = l .instrument_type , snapshot_interval_seconds = l .snapshot_interval_seconds )
495
+ symbol = l .symbol , exchange = l .exchange , service = l .service , subscription_type = l .subscription_type , instrument_type = l .instrument_type , snapshot_interval_seconds = l .snapshot_interval_seconds , bulk_subscription_filters = l . bulk_subscription_filters )
477
496
else :
478
497
existing = listeners_by_service [l .key ()]
479
498
existing .subscription_type = list (set (
@@ -483,13 +502,19 @@ def __send_existing_interest(self):
483
502
for service in interest .keys ():
484
503
for i in interest [service ].values ():
485
504
self ._send_message (
486
- self .__create_subscription_request (exchanges = i .exchanges (), symbols = i .symbols (), service = service , subscription_type = i .subscription_type , instrument_type = i .get_instrument_types (), snapshot_interval_seconds = i .snapshot_interval_seconds ))
505
+ self .__create_subscription_request (exchanges = i .exchanges (),
506
+ symbols = i .symbols (),
507
+ service = service ,
508
+ subscription_type = i .subscription_type ,
509
+ instrument_type = i .get_instrument_types (),
510
+ snapshot_interval_seconds = i .snapshot_interval_seconds ,
511
+ bulk_subscription_filters = i .bulk_subscription_filters ))
487
512
488
513
# send other rpc requests
489
514
for req in self .request_id_handlers .values ():
490
515
req .send (self )
491
516
492
- def __create_subscription_request (self , exchanges = [], symbols = [], service = "REAL_TIME" , subscription_type = ["QUOTE" ], instrument_type = [], snapshot_interval_seconds = 60 ):
517
+ def __create_subscription_request (self , exchanges = [], symbols = [], service = "REAL_TIME" , subscription_type = ["QUOTE" ], instrument_type = [], snapshot_interval_seconds = 60 , bulk_subscription_filters = [] ):
493
518
requests = []
494
519
495
520
if len (exchanges ) > 0 :
@@ -500,7 +525,9 @@ def __create_subscription_request(self, exchanges=[], symbols=[], service="REAL_
500
525
t ) for t in subscription_type ],
501
526
snapshotIntervalSeconds = snapshot_interval_seconds ,
502
527
instrumentType = [openfeed_instrument_pb2 .InstrumentDefinition .InstrumentType .Value (
503
- t ) for t in instrument_type ]
528
+ t ) for t in instrument_type ],
529
+ bulkSubscriptionFilter = [openfeed_api_pb2 .BulkSubscriptionFilter (
530
+ symbolType = f .symbolType , symbolPattern = f .symbolPattern ) for f in bulk_subscription_filters ]
504
531
))
505
532
506
533
if len (symbols ) > 0 :
@@ -568,8 +595,11 @@ def __create_instrument(self, symbol):
568
595
)
569
596
570
597
def __create_login_request (self ):
598
+ client_version = "sdk_python_version={}:python_version={}:sys_version:{}" .format (
599
+ VERSION , platform .python_version (), sys .version )
571
600
return openfeed_api_pb2 .OpenfeedGatewayRequest (
572
601
loginRequest = openfeed_api_pb2 .LoginRequest (
602
+ protocolVersion = 1 , clientVersion = client_version ,
573
603
username = self .username , password = self .password ))
574
604
575
605
def __callback (self , callback , * args ):
@@ -581,14 +611,15 @@ def __callback(self, callback, *args):
581
611
582
612
583
613
class Listener (object ):
584
- def __init__ (self , symbol = "" , exchange = "" , callback = None , service = "REAL_TIME" , subscription_type = ["QUOTE" ], instrument_type = [], snapshot_interval_seconds = 60 ):
614
+ def __init__ (self , symbol = "" , exchange = "" , callback = None , service = "REAL_TIME" , subscription_type = ["QUOTE" ], instrument_type = [], snapshot_interval_seconds = 60 , bulk_subscription_filters = [] ):
585
615
self .symbol = symbol
586
616
self .exchange = exchange
587
617
self .callback = callback
588
618
self .service = service
589
619
self .subscription_type = subscription_type
590
620
self .instrument_type = instrument_type
591
621
self .snapshot_interval_seconds = snapshot_interval_seconds
622
+ self .bulk_subscription_filters = bulk_subscription_filters
592
623
593
624
def key (self ):
594
625
if len (self .exchange ) > 0 :
@@ -638,6 +669,12 @@ def is_type(self, request_type):
638
669
return request_type == self .request .WhichOneof ("data" )
639
670
640
671
672
+ class BulkSubscriptionFilter (object ):
673
+ def __init__ (self , symbolType , symbolPattern ):
674
+ self .symbolType = symbolType
675
+ self .symbolPattern = symbolPattern
676
+
677
+
641
678
if __name__ == "__main__" :
642
679
643
680
def handle_message (msg ):
0 commit comments