1717
1818from typing import TYPE_CHECKING , AsyncGenerator , AsyncIterable , Awaitable
1919
20+ from collections import deque
21+
2022from google .cloud .bigtable_v2 .types import ReadRowsRequest as ReadRowsRequestPB
2123from google .cloud .bigtable_v2 .types import ReadRowsResponse as ReadRowsResponsePB
2224from google .cloud .bigtable_v2 .types import RowSet as RowSetPB
3941 from google .cloud .bigtable .data ._async .client import TableAsync
4042
4143
42- class _ResetRow (Exception ):
43- def __init__ (self , chunk ):
44- self .chunk = chunk
45-
46-
4744class _ReadRowsOperationAsync :
4845 """
4946 ReadRowsOperation handles the logic of merging chunks from a ReadRowsResponse stream
@@ -155,6 +152,7 @@ async def chunk_stream(
155152 """
156153 process chunks out of raw read_rows stream
157154 """
155+ q = deque ()
158156 async for resp in await stream :
159157 # extract proto from proto-plus wrapper
160158 resp = resp ._pb
@@ -182,14 +180,25 @@ async def chunk_stream(
182180 ):
183181 raise InvalidChunk ("row keys should be strictly increasing" )
184182
185- yield c
186-
187183 if c .reset_row :
184+ if c .row_key or c .HasField ("family_name" ) or c .HasField ("qualifier" ) or c .timestamp_micros or c .labels or c .value :
185+ raise InvalidChunk ("reset row has extra fields" )
186+ if not q :
187+ raise InvalidChunk ("reset row with no prior chunks" )
188188 current_key = None
189- elif c .commit_row :
189+ q .clear ()
190+ continue
191+
192+ q .append (c )
193+
194+ if c .commit_row :
190195 # update row state after each commit
196+ while q :
197+ yield q .popleft ()
191198 self ._last_yielded_row_key = current_key
192199 current_key = None
200+ if q :
201+ raise InvalidChunk ("finished with incomplete row" )
193202
194203 @staticmethod
195204 async def merge_rows (
@@ -222,8 +231,6 @@ async def merge_rows(
222231 try :
223232 # for each cell
224233 while True :
225- if c .reset_row :
226- raise _ResetRow (c )
227234 k = c .row_key
228235 f = c .family_name .value
229236 q = c .qualifier .value if c .HasField ("qualifier" ) else None
@@ -271,8 +278,6 @@ async def merge_rows(
271278 if k and k != row_key :
272279 raise InvalidChunk ("row key changed mid cell" )
273280
274- if c .reset_row :
275- raise _ResetRow (c )
276281 buffer .append (c .value )
277282 value = b"" .join (buffer )
278283 if family is None :
@@ -286,18 +291,6 @@ async def merge_rows(
286291 yield Row (row_key , cells )
287292 break
288293 c = await it .__anext__ ()
289- except _ResetRow as e :
290- c = e .chunk
291- if (
292- c .row_key
293- or c .HasField ("family_name" )
294- or c .HasField ("qualifier" )
295- or c .timestamp_micros
296- or c .labels
297- or c .value
298- ):
299- raise InvalidChunk ("reset row with data" )
300- continue
301294 except StopAsyncIteration :
302295 raise InvalidChunk ("premature end of stream" )
303296
0 commit comments