@@ -1104,12 +1104,23 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
11041104                    rep_levels_byte_len + def_levels_byte_len + values_data. buf . len ( ) ; 
11051105
11061106                // Data Page v2 compresses values only. 
1107-                 match  self . compressor  { 
1107+                 let  is_compressed =  match  self . compressor  { 
11081108                    Some ( ref  mut  cmpr)  => { 
1109+                         let  buffer_len = buffer. len ( ) ; 
11091110                        cmpr. compress ( & values_data. buf ,  & mut  buffer) ?; 
1111+                         if  uncompressed_size <= buffer. len ( )  - buffer_len { 
1112+                             buffer. truncate ( buffer_len) ; 
1113+                             buffer. extend_from_slice ( & values_data. buf ) ; 
1114+                             false 
1115+                         }  else  { 
1116+                             true 
1117+                         } 
11101118                    } 
1111-                     None  => buffer. extend_from_slice ( & values_data. buf ) , 
1112-                 } 
1119+                     None  => { 
1120+                         buffer. extend_from_slice ( & values_data. buf ) ; 
1121+                         false 
1122+                     } 
1123+                 } ; 
11131124
11141125                let  data_page = Page :: DataPageV2  { 
11151126                    buf :  buffer. into ( ) , 
@@ -1119,7 +1130,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
11191130                    num_rows :  self . page_metrics . num_buffered_rows , 
11201131                    def_levels_byte_len :  def_levels_byte_len as  u32 , 
11211132                    rep_levels_byte_len :  rep_levels_byte_len as  u32 , 
1122-                     is_compressed :   self . compressor . is_some ( ) , 
1133+                     is_compressed, 
11231134                    statistics :  page_statistics, 
11241135                } ; 
11251136
@@ -4236,4 +4247,33 @@ mod tests {
42364247            . unwrap ( ) ; 
42374248        ColumnDescriptor :: new ( Arc :: new ( tpe) ,  max_def_level,  max_rep_level,  path) 
42384249    } 
4250+ 
4251+     #[ test]  
4252+     fn  test_page_v2_snappy_compression_fallback ( )  { 
4253+         // Test that PageV2 sets is_compressed to false when Snappy compression increases data size 
4254+         let  page_writer = TestPageWriter  { } ; 
4255+ 
4256+         // Create WriterProperties with PageV2 and Snappy compression 
4257+         let  props = WriterProperties :: builder ( ) 
4258+             . set_writer_version ( WriterVersion :: PARQUET_2_0 ) 
4259+             // Disable dictionary to ensure data is written directly 
4260+             . set_dictionary_enabled ( false ) 
4261+             . set_compression ( Compression :: SNAPPY ) 
4262+             . build ( ) ; 
4263+ 
4264+         let  mut  column_writer =
4265+             get_test_column_writer :: < ByteArrayType > ( Box :: new ( page_writer) ,  0 ,  0 ,  Arc :: new ( props) ) ; 
4266+ 
4267+         // Create small, simple data that Snappy compression will likely increase in size 
4268+         // due to compression overhead for very small data 
4269+         let  values = vec ! [ ByteArray :: from( "a" ) ] ; 
4270+ 
4271+         column_writer. write_batch ( & values,  None ,  None ) . unwrap ( ) ; 
4272+ 
4273+         let  result = column_writer. close ( ) . unwrap ( ) ; 
4274+         assert_eq ! ( 
4275+             result. metadata. uncompressed_size( ) , 
4276+             result. metadata. compressed_size( ) 
4277+         ) ; 
4278+     } 
42394279} 
0 commit comments