Skip to content

Commit 2a2cced

Browse files
committed
Address PR Comments
1 parent 11a6f57 commit 2a2cced

File tree

1 file changed

+74
-92
lines changed

1 file changed

+74
-92
lines changed

arrow-avro/src/reader/mod.rs

Lines changed: 74 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,16 @@ impl Decoder {
177177
Ok(Some(batch))
178178
}
179179
}
180+
181+
/// Returns the number of rows that can be added to this decoder before it is full.
182+
pub fn capacity(&self) -> usize {
183+
self.batch_size.saturating_sub(self.decoded_rows)
184+
}
185+
186+
/// Returns true if the decoder has reached its capacity for the current batch.
187+
pub fn batch_is_full(&self) -> bool {
188+
self.capacity() == 0
189+
}
180190
}
181191

182192
/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
@@ -210,6 +220,32 @@ impl ReaderBuilder {
210220
Self::default()
211221
}
212222

223+
fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> Result<RecordDecoder, ArrowError> {
224+
let root_field = AvroField::try_from(schema)?;
225+
RecordDecoder::try_new_with_options(
226+
root_field.data_type(),
227+
self.utf8_view,
228+
self.strict_mode,
229+
)
230+
}
231+
232+
fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, Decoder), ArrowError> {
233+
let header = read_header(reader)?;
234+
let record_decoder = if let Some(schema) = &self.schema {
235+
self.make_record_decoder(schema)?
236+
} else {
237+
let avro_schema: Option<AvroSchema<'_>> = header
238+
.schema()
239+
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
240+
let avro_schema = avro_schema.ok_or_else(|| {
241+
ArrowError::ParseError("No Avro schema present in file header".to_string())
242+
})?;
243+
self.make_record_decoder(&avro_schema)?
244+
};
245+
let decoder = Decoder::new(record_decoder, self.batch_size);
246+
Ok((header, decoder))
247+
}
248+
213249
/// Sets the row-based batch size
214250
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
215251
self.batch_size = batch_size;
@@ -246,32 +282,14 @@ impl ReaderBuilder {
246282

247283
/// Create a [`Reader`] from this builder and a `BufRead`
248284
pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, ArrowError> {
249-
let header = read_header(&mut reader)?;
250-
let compression = header.compression()?;
251-
let root_field = if let Some(schema) = &self.schema {
252-
AvroField::try_from(schema)?
253-
} else {
254-
let avro_schema: Option<AvroSchema<'_>> = header
255-
.schema()
256-
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
257-
let avro_schema = avro_schema.ok_or_else(|| {
258-
ArrowError::ParseError("No Avro schema present in file header".to_string())
259-
})?;
260-
AvroField::try_from(&avro_schema)?
261-
};
262-
let record_decoder = RecordDecoder::try_new_with_options(
263-
root_field.data_type(),
264-
self.utf8_view,
265-
self.strict_mode,
266-
)?;
267-
let decoder = Decoder::new(record_decoder, self.batch_size);
285+
let (header, decoder) = self.build_impl(&mut reader)?;
268286
Ok(Reader {
269287
reader,
270288
header,
271-
compression,
272289
decoder,
273290
block_decoder: BlockDecoder::default(),
274291
block_data: Vec::new(),
292+
block_cursor: 0,
275293
finished: false,
276294
})
277295
}
@@ -280,46 +298,33 @@ impl ReaderBuilder {
280298
/// reading and parsing the Avro file's header. This will
281299
/// not create a full [`Reader`].
282300
pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, ArrowError> {
283-
let record_decoder = if let Some(schema) = self.schema {
284-
let root_field = AvroField::try_from(&schema)?;
285-
RecordDecoder::try_new_with_options(
286-
root_field.data_type(),
287-
self.utf8_view,
288-
self.strict_mode,
289-
)?
290-
} else {
291-
let header = read_header(&mut reader)?;
292-
let avro_schema = header
293-
.schema()
294-
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?
295-
.ok_or_else(|| {
296-
ArrowError::ParseError("No Avro schema present in file header".to_string())
297-
})?;
298-
let root_field = AvroField::try_from(&avro_schema)?;
299-
RecordDecoder::try_new_with_options(
300-
root_field.data_type(),
301-
self.utf8_view,
302-
self.strict_mode,
303-
)?
304-
};
305-
Ok(Decoder::new(record_decoder, self.batch_size))
301+
match self.schema {
302+
Some(ref schema) => {
303+
let record_decoder = self.make_record_decoder(schema)?;
304+
Ok(Decoder::new(record_decoder, self.batch_size))
305+
}
306+
None => {
307+
let (_, decoder) = self.build_impl(&mut reader)?;
308+
Ok(decoder)
309+
}
310+
}
306311
}
307312
}
308313

309314
/// A high-level Avro `Reader` that reads container-file blocks
310315
/// and feeds them into a row-level [`Decoder`].
311316
#[derive(Debug)]
312-
pub struct Reader<R> {
317+
pub struct Reader<R: BufRead> {
313318
reader: R,
314319
header: Header,
315-
compression: Option<crate::compression::CompressionCodec>,
316320
decoder: Decoder,
317321
block_decoder: BlockDecoder,
318322
block_data: Vec<u8>,
323+
block_cursor: usize,
319324
finished: bool,
320325
}
321326

322-
impl<R> Reader<R> {
327+
impl<R: BufRead> Reader<R> {
323328
/// Return the Arrow schema discovered from the Avro file header
324329
pub fn schema(&self) -> SchemaRef {
325330
self.decoder.schema()
@@ -329,64 +334,41 @@ impl<R> Reader<R> {
329334
pub fn avro_header(&self) -> &Header {
330335
&self.header
331336
}
332-
}
333337

334-
impl<R: BufRead> Reader<R> {
335338
/// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
336339
fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
337-
if self.finished {
338-
return Ok(None);
339-
}
340-
loop {
341-
if !self.block_data.is_empty() {
342-
let consumed = self.decoder.decode(&self.block_data)?;
343-
if consumed > 0 {
344-
self.block_data.drain(..consumed);
345-
}
346-
match self.decoder.flush()? {
347-
None => {
348-
if !self.block_data.is_empty() {
349-
break;
350-
}
351-
}
352-
Some(batch) => {
353-
return Ok(Some(batch));
354-
}
355-
}
356-
}
357-
let maybe_block = {
340+
'outer: while !self.finished && !self.decoder.batch_is_full() {
341+
while self.block_cursor == self.block_data.len() {
358342
let buf = self.reader.fill_buf()?;
359343
if buf.is_empty() {
360-
None
361-
} else {
362-
let read_len = buf.len();
363-
let consumed_len = self.block_decoder.decode(buf)?;
364-
self.reader.consume(consumed_len);
365-
if consumed_len == 0 && read_len != 0 {
366-
return Err(ArrowError::ParseError(
367-
"Could not decode next Avro block from partial data".to_string(),
368-
));
369-
}
370-
self.block_decoder.flush()
344+
self.finished = true;
345+
break 'outer;
371346
}
372-
};
373-
match maybe_block {
374-
Some(block) => {
375-
let block_data = if let Some(ref codec) = self.compression {
347+
// Try to decode another block from the buffered reader.
348+
let consumed = self.block_decoder.decode(buf)?;
349+
self.reader.consume(consumed);
350+
if let Some(block) = self.block_decoder.flush() {
351+
// Successfully decoded a block.
352+
let block_data = if let Some(ref codec) = self.header.compression()? {
376353
codec.decompress(&block.data)?
377354
} else {
378355
block.data
379356
};
380357
self.block_data = block_data;
358+
self.block_cursor = 0;
359+
} else if consumed == 0 {
360+
// The block decoder made no progress on a non-empty buffer.
361+
return Err(ArrowError::ParseError(
362+
"Could not decode next Avro block from partial data".to_string(),
363+
));
381364
}
382-
None => {
383-
self.finished = true;
384-
if !self.block_data.is_empty() {
385-
let consumed = self.decoder.decode(&self.block_data)?;
386-
self.block_data.drain(..consumed);
387-
}
388-
return self.decoder.flush();
389-
}
365+
}
366+
// Try to decode more rows from the current block.
367+
let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?;
368+
if consumed == 0 && self.block_cursor < self.block_data.len() {
369+
self.block_cursor = self.block_data.len();
370+
} else {
371+
self.block_cursor += consumed;
390372
}
391373
}
392374
self.decoder.flush()

0 commit comments

Comments
 (0)