66# Third party modules
77# Local modules
88# Program
9-
10- VAR_ENDIANNESS :str = "little"
11- VAR_VERSION :bytes = b"\x01 "
12- OP_NONE :bytes = b"\xEF "
13- OP_BOOL_FALSE :bytes = b"\xF0 "
14- OP_BOOL_TRUE :bytes = b"\xF1 "
15- OP_INTERGER :bytes = b"\xF2 "
16- OP_NEG_INTERGER :bytes = b"\xF3 "
17- OP_ZERO_INTERGER :bytes = b"\xF4 "
18- OP_FLOAT :bytes = b"\xF5 "
19- OP_NEG_FLOAT :bytes = b"\xF6 "
20- OP_ZERO_FLOAT :bytes = b"\xF7 "
21- OP_STRING :bytes = b"\xF8 "
22- OP_ZERO_STRING :bytes = b"\xF9 "
23- OP_BYTES :bytes = b"\xFA "
24- OP_ZERO_BYTES :bytes = b"\xFB "
25- OP_LIST :bytes = b"\xFC "
26- OP_ZERO_LIST :bytes = b"\xFD "
27- OP_DICT :bytes = b"\xFE "
28- OP_ZERO_DICT :bytes = b"\xFF "
9+ VAR_ENDIANNESS = "little"
10+ VAR_VERSION = b"\x01 "
11+ OP_NONE = b"\xED "
12+ OP_BOOL_FALSE = b"\xEE "
13+ OP_BOOL_TRUE = b"\xEF "
14+ OP_INTERGER = b"\xF0 "
15+ OP_NEG_INTERGER = b"\xF1 "
16+ OP_ZERO_INTERGER = b"\xF2 "
17+ OP_FLOAT = b"\xF3 "
18+ OP_NEG_FLOAT = b"\xF4 "
19+ OP_ZERO_FLOAT = b"\xF5 "
20+ OP_STRING = b"\xF6 "
21+ OP_ZERO_STRING = b"\xF7 "
22+ OP_BYTES = b"\xF8 "
23+ OP_ZERO_BYTES = b"\xF9 "
24+ OP_LIST = b"\xFA "
25+ OP_ZERO_LIST = b"\xFB "
26+ OP_DICT = b"\xFC "
27+ OP_ZERO_DICT = b"\xFD "
28+ OP_SET = b"\xFE "
29+ OP_ZERO_SET = b"\xFF "
2930
3031class FSPackerError (Exception ): pass
3132
@@ -42,7 +43,7 @@ class MaxOPProtection(FSPackerError): pass
4243class OutOfData (FSPackerError ): pass
4344
4445def packMessage (message :bytes ) -> bytes :
45- d : int = len (message )
46+ d = len (message )
4647 if d < 0xFD :
4748 return d .to_bytes (1 , VAR_ENDIANNESS ) + message
4849 elif d <= 0xFFFF :
@@ -54,7 +55,7 @@ def packMessage(message:bytes) -> bytes:
5455 raise RuntimeError ("Too big data to pack" )
5556
5657def unpackMessage (buffer :bytes ) -> Tuple [int , int ]:
57- d : int = buffer [0 ]
58+ d = buffer [0 ]
5859 if d < 0xFD :
5960 return 1 , d
6061 elif d == 0xFD :
@@ -66,28 +67,34 @@ def unpackMessage(buffer:bytes) -> Tuple[int, int]:
6667 return 0 , 0
6768
6869def _create_vint (d :int ) -> bytes :
69- if d < 0xEC :
70+ if d < 0xEA :
7071 return d .to_bytes (1 , VAR_ENDIANNESS )
7172 elif d <= 0xFFFF :
72- return b"\xEC " + d .to_bytes (2 , VAR_ENDIANNESS )
73+ return b"\xEA " + d .to_bytes (2 , VAR_ENDIANNESS )
74+ elif d <= 0xFFFFFF :
75+ return b"\xEB " + d .to_bytes (3 , VAR_ENDIANNESS )
7376 elif d <= 0xFFFFFFFF :
74- return b"\xED " + d .to_bytes (4 , VAR_ENDIANNESS )
77+ return b"\xEC " + d .to_bytes (4 , VAR_ENDIANNESS )
7578 else :
76- return b" \xEE " + d . to_bytes ( 8 , VAR_ENDIANNESS )
79+ raise FSPackerError ( "Too big number" )
7780
7881class FSPacker :
82+ dictCounter :int
83+ dictByKey :Dict [Any , int ]
84+ dictBuffer :BytesIO
85+ opBuffer :BytesIO
7986 def __init__ (self ) -> None :
80- self .dictCounter : int = 0
81- self .dictByKey : Dict [ Any , int ] = {}
82- self .dictBuffer : BytesIO = BytesIO ()
83- self .opBuffer : BytesIO = BytesIO ()
87+ self .dictCounter = 0
88+ self .dictByKey = {}
89+ self .dictBuffer = BytesIO ()
90+ self .opBuffer = BytesIO ()
8491 @classmethod
85- def dump (self , data :Any , fp :IO [bytes ]) -> None :
86- fp .write ( self ()._parse (data ) )
92+ def dump (cls , data :Any , fp :IO [bytes ]) -> None :
93+ fp .write ( cls ()._parse (data ) )
8794 return
8895 @classmethod
89- def dumps (self , data :Any ) -> bytes :
90- return self ()._parse (data )
96+ def dumps (cls , data :Any ) -> bytes :
97+ return cls ()._parse (data )
9198 def _parse (self , data :Any ) -> bytes :
9299 self ._dump (data )
93100 return VAR_VERSION + _create_vint (len (self .dictByKey )) + self .dictBuffer .getbuffer () + self .opBuffer .getbuffer ()
@@ -143,13 +150,21 @@ def _dump(self, d:Any) -> None:
143150 else :
144151 self .opBuffer .write (OP_ZERO_DICT )
145152 return
153+ if dt is set :
154+ if len (d ):
155+ self .opBuffer .write (OP_SET + _create_vint (len (d )))
156+ for sd in d :
157+ self ._dump (sd )
158+ else :
159+ self .opBuffer .write (OP_ZERO_SET )
160+ return
146161 raise UnsupportedType (dt )
147162 def _register (self , k :Any ) -> int :
148163 if k not in self .dictByKey :
149- kt : type = type (k )
164+ kt = type (k )
150165 s :bytes
151166 if kt is int :
152- nl : int = ceil (k .bit_length () / 8 )
167+ nl = ceil (k .bit_length () / 8 )
153168 self .dictBuffer .write (
154169 (OP_INTERGER if k > 0 else OP_NEG_INTERGER ) + \
155170 _create_vint (nl ) + \
@@ -174,17 +189,22 @@ def _register(self, k:Any) -> int:
174189 return self .dictByKey [k ]
175190
176191class FSUnpacker :
192+ dict :List [Any ]
193+ buffer :BytesIO
194+ maxDictSize :int
195+ maxOPSize :int
196+ OPs :Dict [bytes , Any ]
177197 @classmethod
178- def load (self , data :IO [bytes ], maxDictSize :int = 0 , maxOPSize :int = 0 ) -> Any :
179- return self (BytesIO (data .read ()), maxDictSize , maxOPSize )._parse ()
198+ def load (cls , data :IO [bytes ], maxDictSize :int = 0 , maxOPSize :int = 0 ) -> Any :
199+ return cls (BytesIO (data .read ()), maxDictSize , maxOPSize )._parse ()
180200 @classmethod
181- def loads (self , data :bytes , maxDictSize :int = 0 , maxOPSize :int = 0 ) -> Any :
182- return self (BytesIO (data ), maxDictSize , maxOPSize )._parse ()
201+ def loads (cls , data :bytes , maxDictSize :int = 0 , maxOPSize :int = 0 ) -> Any :
202+ return cls (BytesIO (data ), maxDictSize , maxOPSize )._parse ()
183203 def __init__ (self , buffer :BytesIO , maxDictSize :int = 0 , maxOPSize :int = 0 ):
184- self .dict : List [ Any ] = []
185- self .buffer : BytesIO = buffer
186- self .maxDictSize : int = maxDictSize
187- self .maxOPSize : int = maxOPSize
204+ self .dict = []
205+ self .buffer = buffer
206+ self .maxDictSize = maxDictSize
207+ self .maxOPSize = maxOPSize
188208 self .OPs = {
189209 OP_NONE : None ,
190210 OP_BOOL_FALSE : False ,
@@ -195,16 +215,13 @@ def __init__(self, buffer:BytesIO, maxDictSize:int=0, maxOPSize:int=0):
195215 OP_ZERO_BYTES : b"" ,
196216 }
197217 def _parse (self ) -> Any :
198- bver : bytes = self .buffer .read (1 )
218+ bver = self .buffer .read (1 )
199219 if bver != VAR_VERSION :
200220 raise UnsupportedVersion (bver [0 ])
201221 self ._parse_dicts ()
202222 return self ._parse_ops ()
203223 def _parse_dicts (self ) -> None :
204- i :int
205- t :bytes
206- dl :int
207- dictLen :int = self ._read_vint ()
224+ dictLen = self ._read_vint ()
208225 if self .maxDictSize > 0 and dictLen > self .maxDictSize :
209226 raise MaxDictProtection ()
210227 for i in range (dictLen ):
@@ -236,36 +253,40 @@ def _parse_ops(self) -> Any:
236253 raise MaxOPProtection ()
237254 return self ._loads ()
238255 def _read_vint (self ) -> int :
239- d : bytes = self .buffer .read (1 )
240- if d [0 ] < 0xEC :
256+ d = self .buffer .read (1 )
257+ if d [0 ] < 0xEA :
241258 return d [0 ]
242- if d [0 ] == 0xEC :
259+ if d [0 ] == 0xEA :
243260 return int .from_bytes (self .buffer .read (2 ), VAR_ENDIANNESS )
244- if d [0 ] == 0xED :
245- return int .from_bytes (self .buffer .read (4 ), VAR_ENDIANNESS )
246- return int .from_bytes (self .buffer .read (8 ), VAR_ENDIANNESS )
261+ if d [0 ] == 0xEB :
262+ return int .from_bytes (self .buffer .read (3 ), VAR_ENDIANNESS )
263+ return int .from_bytes (self .buffer .read (4 ), VAR_ENDIANNESS )
247264 def _loads (self ) -> Any :
248- op : bytes = self .buffer .read (1 )
265+ op = self .buffer .read (1 )
249266 if op == b"" :
250267 raise OutOfData ()
251- if op [0 ] < 0xEF :
252- if op [0 ] < 0xEC :
268+ if op [0 ] < 0xED :
269+ if op [0 ] < 0xEA :
253270 return self .dict [ op [0 ] ]
254- if op [0 ] == 0xEC :
271+ if op [0 ] == 0xEA :
255272 return self .dict [ int .from_bytes (self .buffer .read (2 ), VAR_ENDIANNESS ) ]
256- if op [0 ] == 0xED :
257- return self .dict [ int .from_bytes (self .buffer .read (4 ), VAR_ENDIANNESS ) ]
258- return self .dict [ int .from_bytes (self .buffer .read (8 ), VAR_ENDIANNESS ) ]
273+ if op [0 ] == 0xEB :
274+ return self .dict [ int .from_bytes (self .buffer .read (3 ), VAR_ENDIANNESS ) ]
275+ return self .dict [ int .from_bytes (self .buffer .read (4 ), VAR_ENDIANNESS ) ]
259276 if op in self .OPs :
260277 return self .OPs [op ]
261278 if op == OP_ZERO_LIST :
262279 return tuple ()
263280 if op == OP_ZERO_DICT :
264281 return dict ()
282+ if op == OP_ZERO_SET :
283+ return set ()
265284 if op == OP_LIST :
266285 return tuple (( self ._loads () for i in range (self ._read_vint ()) ))
267286 if op == OP_DICT :
268287 return dict (( (self ._loads (), self ._loads ()) for i in range (self ._read_vint ()) ))
288+ if op == OP_SET :
289+ return set (( self ._loads () for i in range (self ._read_vint ()) ))
269290 raise InvalidOP (op )
270291
271292dump = FSPacker .dump
0 commit comments