Skip to content

Commit 4b000cd

Browse files
committed
Merge df-6.0-arrow2-0.8 and make it compile
1 parent 0843c77 commit 4b000cd

File tree

13 files changed

+55
-63
lines changed

13 files changed

+55
-63
lines changed

datafusion/Cargo.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,7 @@ path = "src/lib.rs"
4040
[features]
4141
default = ["crypto_expressions", "regex_expressions", "unicode_expressions"]
4242
simd = ["arrow/simd"]
43-
<<<<<<< HEAD
4443
crypto_expressions = ["md-5", "sha2", "blake2", "blake3"]
45-
=======
46-
crypto_expressions = ["md-5", "sha2"]
47-
>>>>>>> ExternalSortExec v1
4844
regex_expressions = ["regex"]
4945
unicode_expressions = ["unicode-segmentation"]
5046
# FIXME: add pyarrow support to arrow2 pyarrow = ["pyo3", "arrow/pyarrow"]

datafusion/src/error.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ pub enum DataFusionError {
6363
Execution(String),
6464
/// This error is thrown when a consumer cannot acquire memory from the Memory Manager
6565
/// we can just cancel the execution of the partition.
66-
OutOfMemory(String),
66+
ResourcesExhausted(String),
6767
}
6868

6969
impl DataFusionError {
@@ -132,8 +132,8 @@ impl Display for DataFusionError {
132132
DataFusionError::Execution(ref desc) => {
133133
write!(f, "Execution error: {}", desc)
134134
}
135-
DataFusionError::OutOfMemory(ref desc) => {
136-
write!(f, "Out of memory error: {}", desc)
135+
DataFusionError::ResourcesExhausted(ref desc) => {
136+
write!(f, "Resources exhausted: {}", desc)
137137
}
138138
}
139139
}

datafusion/src/execution/context.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ use crate::{
3131
},
3232
MemTable,
3333
},
34-
execution::disk_manager::DiskManager,
35-
execution::memory_management::MemoryManager,
3634
logical_plan::{PlanType, ToStringifiedPlan},
3735
optimizer::eliminate_limit::EliminateLimit,
3836
physical_optimizer::{

datafusion/src/execution/disk_manager.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub struct DiskManager {
3434

3535
impl DiskManager {
3636
/// Create local dirs inside user provided dirs through conf
37-
pub fn new(conf_dirs: &Vec<String>) -> Result<Self> {
37+
pub fn new(conf_dirs: &[String]) -> Result<Self> {
3838
Ok(Self {
3939
local_dirs: create_local_dirs(conf_dirs)?,
4040
})
@@ -55,9 +55,9 @@ impl DiskManager {
5555
}
5656

5757
/// Setup local dirs by creating one new dir in each of the given dirs
58-
fn create_local_dirs(local_dir: &Vec<String>) -> Result<Vec<String>> {
58+
fn create_local_dirs(local_dir: &[String]) -> Result<Vec<String>> {
5959
local_dir
60-
.into_iter()
60+
.iter()
6161
.map(|root| create_directory(root, "datafusion"))
6262
.collect()
6363
}
@@ -82,7 +82,7 @@ fn create_directory(root: &str, prefix: &str) -> Result<String> {
8282
)))
8383
}
8484

85-
fn get_file(file_name: &str, local_dirs: &Vec<String>) -> String {
85+
fn get_file(file_name: &str, local_dirs: &[String]) -> String {
8686
let mut hasher = DefaultHasher::new();
8787
file_name.hash(&mut hasher);
8888
let hash = hasher.finish();
@@ -93,7 +93,7 @@ fn get_file(file_name: &str, local_dirs: &Vec<String>) -> String {
9393
path.to_str().unwrap().to_string()
9494
}
9595

96-
fn create_tmp_file(local_dirs: &Vec<String>) -> Result<String> {
96+
fn create_tmp_file(local_dirs: &[String]) -> Result<String> {
9797
let name = Uuid::new_v4().to_string();
9898
let mut path = get_file(&*name, local_dirs);
9999
while Path::new(path.as_str()).exists() {

datafusion/src/execution/memory_management/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
2020
pub mod allocation_strategist;
2121

22-
use crate::error::DataFusionError::OutOfMemory;
22+
use std::cmp::Reverse;
23+
use crate::error::DataFusionError::ResourcesExhausted;
2324
use crate::error::{DataFusionError, Result};
2425
use crate::execution::memory_management::allocation_strategist::{
2526
DummyAllocationStrategist, FairStrategist, MemoryAllocationStrategist,
@@ -169,7 +170,7 @@ impl PartitionMemoryManager {
169170
}
170171

171172
/// Try to acquire `required` of execution memory for the consumer and return the number of bytes
172-
/// obtained, or return OutOfMemoryError if no enough memory avaiable even after possible spills.
173+
/// obtained, or return ResourcesExhausted if no enough memory available even after possible spills.
173174
pub async fn acquire_exec_memory(
174175
&self,
175176
required: usize,
@@ -192,7 +193,7 @@ impl PartitionMemoryManager {
192193
for c in consumers.iter() {
193194
all_consumers.push(c.1.clone());
194195
}
195-
all_consumers.sort_by(|a, b| b.get_used().cmp(&a.get_used()));
196+
all_consumers.sort_by_key(|b| Reverse(b.get_used()));
196197

197198
for c in all_consumers.iter_mut() {
198199
if c.id() == consumer_id {
@@ -235,7 +236,7 @@ impl PartitionMemoryManager {
235236
}
236237

237238
if got < required {
238-
return Err(OutOfMemory(format!(
239+
return Err(ResourcesExhausted(format!(
239240
"Unable to acquire {} bytes of memory, got {}",
240241
required, got
241242
)));

datafusion/src/execution/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
2020
pub mod context;
2121
pub mod dataframe_impl;
22-
pub mod options;
2322
pub mod disk_manager;
2423
pub mod memory_management;
24+
pub mod options;
2525
pub mod runtime_env;

datafusion/src/execution/runtime_env.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use crate::error::Result;
2222
use crate::execution::disk_manager::DiskManager;
2323
use crate::execution::memory_management::{MemoryConsumer, MemoryManager};
24+
use lazy_static::lazy_static;
2425
use std::sync::Arc;
2526

2627
lazy_static! {
@@ -100,7 +101,7 @@ impl RuntimeConfig {
100101

101102
/// Customize exec size
102103
pub fn with_local_dirs(mut self, local_dirs: Vec<String>) -> Self {
103-
assert!(local_dirs.len() > 0);
104+
assert!(!local_dirs.is_empty());
104105
self.local_dirs = local_dirs;
105106
self
106107
}

datafusion/src/physical_plan/common.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ pub struct IPCWriterWrapper {
294294
impl IPCWriterWrapper {
295295
/// Create new writer
296296
pub fn new(path: &str, schema: &Schema) -> Result<Self> {
297-
let file = File::create(path).map_err(|e| DataFusionError::IoError(e))?;
297+
let file = File::create(path).map_err(DataFusionError::IoError)?;
298298
let buffer_writer = std::io::BufWriter::new(file);
299299
Ok(Self {
300300
num_batches: 0,

datafusion/src/physical_plan/mod.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -624,11 +624,7 @@ pub mod distinct_expressions;
624624
pub mod empty;
625625
pub mod explain;
626626
pub mod expressions;
627-
<<<<<<< HEAD
628627
pub mod file_format;
629-
pub mod external_sort;
630-
=======
631-
>>>>>>> move sorts together into submodule
632628
pub mod filter;
633629
pub mod functions;
634630
pub mod hash_aggregate;

datafusion/src/physical_plan/sorts/external_sort.rs

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -307,12 +307,16 @@ async fn read_spill_as_stream(
307307
TKReceiver<ArrowResult<RecordBatch>>,
308308
) = tokio::sync::mpsc::channel(2);
309309
let path_clone = path.clone();
310-
task::spawn_blocking(move || {
310+
let join_handle = task::spawn_blocking(move || {
311311
if let Err(e) = read_spill(sender, path_clone) {
312312
error!("Failure while reading spill file: {}. Error: {}", path, e);
313313
}
314314
});
315-
Ok(RecordBatchReceiverStream::create(&schema, receiver))
315+
Ok(RecordBatchReceiverStream::create(
316+
&schema,
317+
receiver,
318+
join_handle,
319+
))
316320
}
317321

318322
pub(crate) async fn convert_stream_disk_based(
@@ -521,30 +525,41 @@ pub async fn external_sort(
521525
#[cfg(test)]
522526
mod tests {
523527
use super::*;
528+
use crate::datasource::object_store::local::LocalFileSystem;
524529
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
525530
use crate::physical_plan::expressions::col;
526531
use crate::physical_plan::memory::MemoryExec;
527-
use crate::physical_plan::sorts::SortOptions;
528532
use crate::physical_plan::{
529533
collect,
530-
csv::{CsvExec, CsvReadOptions},
534+
file_format::{CsvExec, PhysicalPlanConfig},
531535
};
532536
use crate::test;
537+
use crate::test_util;
533538
use arrow::array::*;
539+
use arrow::compute::sort::SortOptions;
534540
use arrow::datatypes::*;
535541

536542
#[tokio::test]
537543
async fn test_sort() -> Result<()> {
538-
let schema = test::aggr_test_schema();
544+
let schema = test_util::aggr_test_schema();
539545
let partitions = 4;
540-
let path = test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;
541-
let csv = CsvExec::try_new(
542-
&path,
543-
CsvReadOptions::new().schema(&schema),
544-
None,
545-
1024,
546-
None,
547-
)?;
546+
let (_, files) =
547+
test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;
548+
549+
let csv = CsvExec::new(
550+
PhysicalPlanConfig {
551+
object_store: Arc::new(LocalFileSystem {}),
552+
file_schema: Arc::clone(&schema),
553+
file_groups: files,
554+
statistics: Statistics::default(),
555+
projection: None,
556+
batch_size: 1024,
557+
limit: None,
558+
table_partition_cols: vec![],
559+
},
560+
true,
561+
b',',
562+
);
548563

549564
let sort_exec = Arc::new(ExternalSortExec::try_new(
550565
vec![

0 commit comments

Comments
 (0)