Skip to content

Commit 613caaf

Browse files
authored
Merge 0a4d560 into abe84cf
2 parents abe84cf + 0a4d560 commit 613caaf

File tree

3 files changed

+43
-45
lines changed

3 files changed

+43
-45
lines changed

datafusion/src/physical_plan/hash_aggregate.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! Defines the execution plan for the hash aggregate operation
1919
2020
use std::any::Any;
21-
use std::sync::{Arc, Mutex};
21+
use std::sync::Arc;
2222
use std::task::{Context, Poll};
2323

2424
use ahash::RandomState;
@@ -95,7 +95,7 @@ pub struct HashAggregateExec {
9595
/// to the partial aggregate
9696
input_schema: SchemaRef,
9797
/// Metric to track number of output rows
98-
output_rows: Arc<Mutex<SQLMetric>>,
98+
output_rows: Arc<SQLMetric>,
9999
}
100100

101101
fn create_schema(
@@ -144,7 +144,7 @@ impl HashAggregateExec {
144144

145145
let schema = Arc::new(schema);
146146

147-
let output_rows = SQLMetric::counter("outputRows");
147+
let output_rows = SQLMetric::counter();
148148

149149
Ok(HashAggregateExec {
150150
mode,
@@ -253,10 +253,7 @@ impl ExecutionPlan for HashAggregateExec {
253253

254254
fn metrics(&self) -> HashMap<String, SQLMetric> {
255255
let mut metrics = HashMap::new();
256-
metrics.insert(
257-
"outputRows".to_owned(),
258-
self.output_rows.lock().unwrap().clone(),
259-
);
256+
metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
260257
metrics
261258
}
262259
}
@@ -292,7 +289,7 @@ pin_project! {
292289
#[pin]
293290
output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
294291
finished: bool,
295-
output_rows: Arc<Mutex<SQLMetric>>,
292+
output_rows: Arc<SQLMetric>,
296293
}
297294
}
298295

@@ -644,7 +641,7 @@ impl GroupedHashAggregateStream {
644641
group_expr: Vec<Arc<dyn PhysicalExpr>>,
645642
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
646643
input: SendableRecordBatchStream,
647-
output_rows: Arc<Mutex<SQLMetric>>,
644+
output_rows: Arc<SQLMetric>,
648645
) -> Self {
649646
let (tx, rx) = futures::channel::oneshot::channel();
650647

@@ -702,7 +699,6 @@ impl Stream for GroupedHashAggregateStream {
702699
};
703700

704701
if let Ok(batch) = &result {
705-
let mut output_rows = output_rows.lock().unwrap();
706702
output_rows.add(batch.num_rows())
707703
}
708704

datafusion/src/physical_plan/mod.rs

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
//! Traits for physical query plan, supporting parallel execution for partitioned relations.
1919
2020
use std::fmt::{Debug, Display};
21-
use std::sync::{Arc, Mutex};
21+
use std::sync::atomic::{AtomicUsize, Ordering};
22+
use std::sync::Arc;
2223
use std::{any::Any, pin::Pin};
2324

2425
use crate::execution::context::ExecutionContextState;
@@ -58,44 +59,53 @@ pub enum MetricType {
5859

5960
/// SQL metric such as counter (number of input or output rows) or timing information about
6061
/// a physical operator.
61-
#[derive(Debug, Clone)]
62+
#[derive(Debug)]
6263
pub struct SQLMetric {
63-
/// Metric name
64-
name: String,
6564
/// Metric value
66-
value: usize,
65+
value: AtomicUsize,
6766
/// Metric type
6867
metric_type: MetricType,
6968
}
7069

70+
impl Clone for SQLMetric {
71+
fn clone(&self) -> Self {
72+
Self {
73+
value: AtomicUsize::new(self.value.load(Ordering::Relaxed)),
74+
metric_type: self.metric_type.clone(),
75+
}
76+
}
77+
}
78+
7179
impl SQLMetric {
80+
// relaxed ordering for operations on `value` poses no issues
81+
// we're purely using atomic ops with no associated memory ops
82+
7283
/// Create a new metric for tracking a counter
73-
pub fn counter(name: &str) -> Arc<Mutex<SQLMetric>> {
74-
Arc::new(Mutex::new(SQLMetric::new(name, MetricType::Counter)))
84+
pub fn counter() -> Arc<SQLMetric> {
85+
Arc::new(SQLMetric::new(MetricType::Counter))
7586
}
7687

7788
/// Create a new metric for tracking time in nanoseconds
78-
pub fn time_nanos(name: &str) -> Arc<Mutex<SQLMetric>> {
79-
Arc::new(Mutex::new(SQLMetric::new(name, MetricType::TimeNanos)))
89+
pub fn time_nanos() -> Arc<SQLMetric> {
90+
Arc::new(SQLMetric::new(MetricType::TimeNanos))
8091
}
8192

8293
/// Create a new SQLMetric
83-
pub fn new(name: &str, metric_type: MetricType) -> Self {
94+
pub fn new(metric_type: MetricType) -> Self {
8495
Self {
85-
name: name.to_owned(),
86-
value: 0,
96+
value: AtomicUsize::new(0),
8797
metric_type,
8898
}
8999
}
90100

91101
/// Add to the value
92-
pub fn add(&mut self, n: usize) {
93-
self.value += n;
102+
pub fn add(&self, n: usize) {
103+
self.value.fetch_add(n, Ordering::Relaxed);
94104
}
95105

96106
/// Get the current value
97107
pub fn value(&self) -> usize {
98-
self.value
108+
self.value.load(Ordering::Relaxed)
99109
}
100110
}
101111

datafusion/src/physical_plan/sort.rs

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
2020
use std::any::Any;
2121
use std::pin::Pin;
22-
use std::sync::{Arc, Mutex};
22+
use std::sync::Arc;
2323
use std::task::{Context, Poll};
2424
use std::time::Instant;
2525

@@ -52,9 +52,9 @@ pub struct SortExec {
5252
/// Sort expressions
5353
expr: Vec<PhysicalSortExpr>,
5454
/// Output rows
55-
output_rows: Arc<Mutex<SQLMetric>>,
55+
output_rows: Arc<SQLMetric>,
5656
/// Time to sort batches
57-
sort_time_nanos: Arc<Mutex<SQLMetric>>,
57+
sort_time_nanos: Arc<SQLMetric>,
5858
}
5959

6060
impl SortExec {
@@ -66,8 +66,8 @@ impl SortExec {
6666
Ok(Self {
6767
expr,
6868
input,
69-
output_rows: SQLMetric::counter("outputRows"),
70-
sort_time_nanos: SQLMetric::time_nanos("sortTime"),
69+
output_rows: SQLMetric::counter(),
70+
sort_time_nanos: SQLMetric::time_nanos(),
7171
})
7272
}
7373

@@ -147,14 +147,8 @@ impl ExecutionPlan for SortExec {
147147

148148
fn metrics(&self) -> HashMap<String, SQLMetric> {
149149
let mut metrics = HashMap::new();
150-
metrics.insert(
151-
"outputRows".to_owned(),
152-
self.output_rows.lock().unwrap().clone(),
153-
);
154-
metrics.insert(
155-
"sortTime".to_owned(),
156-
self.sort_time_nanos.lock().unwrap().clone(),
157-
);
150+
metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
151+
metrics.insert("sortTime".to_owned(), (*self.sort_time_nanos).clone());
158152
metrics
159153
}
160154
}
@@ -224,16 +218,16 @@ pin_project! {
224218
output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
225219
finished: bool,
226220
schema: SchemaRef,
227-
output_rows: Arc<Mutex<SQLMetric>>,
221+
output_rows: Arc<SQLMetric>,
228222
}
229223
}
230224

231225
impl SortStream {
232226
fn new(
233227
input: SendableRecordBatchStream,
234228
expr: Vec<PhysicalSortExpr>,
235-
output_rows: Arc<Mutex<SQLMetric>>,
236-
sort_time: Arc<Mutex<SQLMetric>>,
229+
output_rows: Arc<SQLMetric>,
230+
sort_time: Arc<SQLMetric>,
237231
) -> Self {
238232
let (tx, rx) = futures::channel::oneshot::channel();
239233

@@ -246,7 +240,6 @@ impl SortStream {
246240
.and_then(move |batches| {
247241
let now = Instant::now();
248242
let result = sort_batches(&batches, &schema, &expr);
249-
let mut sort_time = sort_time.lock().unwrap();
250243
sort_time.add(now.elapsed().as_nanos() as usize);
251244
result
252245
});
@@ -288,7 +281,6 @@ impl Stream for SortStream {
288281
};
289282

290283
if let Some(Ok(batch)) = &result {
291-
let mut output_rows = output_rows.lock().unwrap();
292284
output_rows.add(batch.num_rows());
293285
}
294286

@@ -431,8 +423,8 @@ mod tests {
431423
assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
432424

433425
let result: Vec<RecordBatch> = collect(sort_exec.clone()).await?;
434-
assert!(sort_exec.metrics().get("sortTime").unwrap().value > 0);
435-
assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value, 8);
426+
assert!(sort_exec.metrics().get("sortTime").unwrap().value() > 0);
427+
assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value(), 8);
436428
assert_eq!(result.len(), 1);
437429

438430
let columns = result[0].columns();

0 commit comments

Comments
 (0)