@@ -28,9 +28,7 @@ use parquet::arrow::arrow_reader::{
2828 ArrowReaderMetadata , ArrowReaderOptions , ParquetRecordBatchReaderBuilder , RowSelection ,
2929 RowSelector ,
3030} ;
31- use parquet:: arrow:: arrow_writer:: {
32- compute_leaves, ArrowColumnChunk , ArrowLeafColumn , ArrowRowGroupWriterFactory ,
33- } ;
31+ use parquet:: arrow:: arrow_writer:: { compute_leaves, ArrowLeafColumn , ArrowRowGroupWriterFactory } ;
3432use parquet:: arrow:: { ArrowSchemaConverter , ArrowWriter } ;
3533use parquet:: data_type:: { ByteArray , ByteArrayType } ;
3634use parquet:: encryption:: decrypt:: FileDecryptionProperties ;
@@ -1165,41 +1163,48 @@ async fn test_multi_threaded_encrypted_writing() {
11651163
11661164 // Get column writers with encryptor from ArrowRowGroupWriter
11671165 let col_writers = arrow_row_group_writer. writers ;
1166+ let num_columns = col_writers. len ( ) ;
1167+
1168+ // Create a channel for each column writer to send ArrowLeafColumn data to
1169+ let mut col_writer_tasks = Vec :: with_capacity ( num_columns) ;
1170+ let mut col_array_channels = Vec :: with_capacity ( num_columns) ;
1171+ for mut writer in col_writers. into_iter ( ) {
1172+ let ( send_array, mut receive_array) = tokio:: sync:: mpsc:: channel :: < ArrowLeafColumn > ( 100 ) ;
1173+ col_array_channels. push ( send_array) ;
1174+ let handle = tokio:: spawn ( async move {
1175+ while let Some ( col) = receive_array. recv ( ) . await {
1176+ let _ = writer. write ( & col) ;
1177+ }
1178+ writer. close ( )
1179+ } ) ;
1180+ col_writer_tasks. push ( handle) ;
1181+ }
11681182
1169- let mut workers: Vec < _ > = col_writers
1170- . into_iter ( )
1171- . map ( |mut col_writer| {
1172- let ( send, recv) = std:: sync:: mpsc:: channel :: < ArrowLeafColumn > ( ) ;
1173- let handle = std:: thread:: spawn ( move || {
1174- // receive Arrays to encode via the channel
1175- for col in recv {
1176- col_writer. write ( & col) ?;
1177- }
1178- // once the input is complete, close the writer
1179- // to return the newly created ArrowColumnChunk
1180- col_writer. close ( )
1181- } ) ;
1182- ( handle, send)
1183- } )
1184- . collect ( ) ;
1185-
1186- let mut worker_iter = workers. iter_mut ( ) ;
1187- for ( arr, field) in to_write. iter ( ) . zip ( & schema. fields ) {
1188- for leaves in compute_leaves ( field, arr) . unwrap ( ) {
1189- worker_iter. next ( ) . unwrap ( ) . 1 . send ( leaves) . unwrap ( ) ;
1183+ // Send the ArrowLeafColumn data to the respective column writer channels
1184+ for ( channel_idx, ( array, field) ) in to_write. iter ( ) . zip ( schema. fields ( ) ) . enumerate ( ) {
1185+ for c in compute_leaves ( field, array) . iter ( ) . flat_map ( |x| x) {
1186+ let _ = col_array_channels[ channel_idx] . send ( c. clone ( ) ) . await ;
11901187 }
11911188 }
1189+ drop ( col_array_channels) ;
11921190
1193- // Wait for the workers to complete encoding, and append
1191+ // Wait for all column writers to finish writing
1192+ let mut finalized_rg = Vec :: with_capacity ( num_columns) ;
1193+ for task in col_writer_tasks. into_iter ( ) {
1194+ finalized_rg. push ( task. await ) ;
1195+ }
1196+
1197+ // Wait for the workers to complete writing then append
11941198 // the resulting column chunks to the row group (and the file)
11951199 let mut row_group_writer = file_writer. next_row_group ( ) . unwrap ( ) ;
1196-
1197- for ( handle , send ) in workers {
1198- drop ( send ) ; // Drop send side to signal termination
1199- // wait for the worker to send the completed chunk
1200- let chunk : ArrowColumnChunk = handle . join ( ) . unwrap ( ) . unwrap ( ) ;
1201- chunk . append_to_row_group ( & mut row_group_writer ) . unwrap ( ) ;
1200+ for chunk in finalized_rg {
1201+ chunk
1202+ . unwrap ( )
1203+ . unwrap ( )
1204+ . append_to_row_group ( & mut row_group_writer )
1205+ . unwrap ( ) ;
12021206 }
1207+
12031208 // Close the row group which writes to the underlying file
12041209 row_group_writer. close ( ) . unwrap ( ) ;
12051210
0 commit comments