Skip to content

Commit 384b277

Browse files
committed
fix partitioned table reading for json
1 parent 2a490e4 commit 384b277

File tree

6 files changed

+160
-4
lines changed

6 files changed

+160
-4
lines changed

datafusion/core/src/datasource/physical_plan/file_scan_config.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,22 @@ impl FileScanConfig {
157157
})
158158
}
159159

160+
/// Projects only file schema, ignoring partition columns
161+
pub(crate) fn projected_file_schema(&self) -> SchemaRef {
162+
let fields = self.projection.as_ref().map(|proj| {
163+
proj.iter()
164+
.filter(|col_idx| **col_idx < self.file_schema.fields().len())
165+
.map(|col_idx| self.file_schema.field(*col_idx))
166+
.cloned()
167+
.collect::<Vec<_>>()
168+
});
169+
170+
fields.map_or_else(
171+
|| Arc::clone(&self.file_schema),
172+
|f| Arc::new(Schema::new(f).with_metadata(self.file_schema.metadata.clone())),
173+
)
174+
}
175+
160176
pub(crate) fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
161177
self.projection.as_ref().map(|p| {
162178
p.iter()
@@ -686,6 +702,66 @@ mod tests {
686702
crate::assert_batches_eq!(expected, &[projected_batch]);
687703
}
688704

705+
#[test]
706+
fn test_projected_file_schema_with_partition_col() {
707+
let schema = aggr_test_schema();
708+
let partition_cols = vec![
709+
(
710+
"part1".to_owned(),
711+
wrap_partition_type_in_dict(DataType::Utf8),
712+
),
713+
(
714+
"part2".to_owned(),
715+
wrap_partition_type_in_dict(DataType::Utf8),
716+
),
717+
];
718+
719+
// Projected file schema for config with projection including partition column
720+
let projection = config_for_projection(
721+
schema.clone(),
722+
Some(vec![0, 3, 5, schema.fields().len()]),
723+
Statistics::new_unknown(&schema),
724+
to_partition_cols(partition_cols.clone()),
725+
)
726+
.projected_file_schema();
727+
728+
// Assert partition column filtered out in projected file schema
729+
let expected_columns = vec!["c1", "c4", "c6"];
730+
let actual_columns = projection
731+
.fields()
732+
.iter()
733+
.map(|f| f.name().clone())
734+
.collect::<Vec<_>>();
735+
assert_eq!(expected_columns, actual_columns);
736+
}
737+
738+
#[test]
739+
fn test_projected_file_schema_without_projection() {
740+
let schema = aggr_test_schema();
741+
let partition_cols = vec![
742+
(
743+
"part1".to_owned(),
744+
wrap_partition_type_in_dict(DataType::Utf8),
745+
),
746+
(
747+
"part2".to_owned(),
748+
wrap_partition_type_in_dict(DataType::Utf8),
749+
),
750+
];
751+
752+
// Projected file schema for config without projection
753+
let projection = config_for_projection(
754+
schema.clone(),
755+
None,
756+
Statistics::new_unknown(&schema),
757+
to_partition_cols(partition_cols.clone()),
758+
)
759+
.projected_file_schema();
760+
761+
// Assert projected file schema is equal to file schema
762+
assert_eq!(projection.fields(), schema.fields());
763+
}
764+
689765
// sets default for configs that play no role in projections
690766
fn config_for_projection(
691767
file_schema: SchemaRef,

datafusion/core/src/datasource/physical_plan/json.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,14 +174,13 @@ impl ExecutionPlan for NdJsonExec {
174174
context: Arc<TaskContext>,
175175
) -> Result<SendableRecordBatchStream> {
176176
let batch_size = context.session_config().batch_size();
177-
let (projected_schema, ..) = self.base_config.project();
178177

179178
let object_store = context
180179
.runtime_env()
181180
.object_store(&self.base_config.object_store_url)?;
182181
let opener = JsonOpener {
183182
batch_size,
184-
projected_schema,
183+
projected_schema: self.base_config.projected_file_schema(),
185184
file_compression_type: self.file_compression_type.to_owned(),
186185
object_store,
187186
};
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
{"id": 1, "value": "foo"}
2+
{"id": 2, "value": "bar"}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
{"id": 3, "value": "baz"}
2+
{"id": 4, "value": "qux"}

datafusion/sqllogictest/test_files/insert_to_external.slt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,15 @@ INSERT INTO partitioned_insert_test_json values (1, 2), (3, 4), (5, 6), (1, 2),
195195
----
196196
6
197197

198-
# Issue open for this error: https://github.com/apache/arrow-datafusion/issues/7816
199-
query error DataFusion error: Arrow error: Json error: Encountered unmasked nulls in non\-nullable StructArray child: Field \{ name: "a", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: \{\} \}
198+
query TT
200199
select * from partitioned_insert_test_json order by a,b
200+
----
201+
1 2
202+
1 2
203+
3 4
204+
3 4
205+
5 6
206+
5 6
201207

202208
statement ok
203209
CREATE EXTERNAL TABLE

datafusion/sqllogictest/test_files/json.slt

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,74 @@ DROP TABLE json_test
6868

6969
statement ok
7070
DROP TABLE single_nan
71+
72+
# JSON partitioned table
73+
statement ok
74+
CREATE EXTERNAL TABLE json_partitioned_test (
75+
part Int,
76+
id Int,
77+
value String,
78+
)
79+
STORED AS JSON
80+
LOCATION '../core/tests/data/partitioned_table_json'
81+
PARTITIONED BY (part);
82+
83+
# select wildcard always returns partition columns as the last ones
84+
query ITI
85+
SELECT * FROM json_partitioned_test ORDER BY id
86+
----
87+
1 foo 1
88+
2 bar 1
89+
3 baz 2
90+
4 qux 2
91+
92+
93+
# select all fields
94+
query IIT
95+
SELECT part, id, value FROM json_partitioned_test ORDER BY id
96+
----
97+
1 1 foo
98+
1 2 bar
99+
2 3 baz
100+
2 4 qux
101+
102+
# select without partition column
103+
query I
104+
SELECT id FROM json_partitioned_test ORDER BY id
105+
----
106+
1
107+
2
108+
3
109+
4
110+
111+
# select only partition column
112+
query I
113+
SELECT part FROM json_partitioned_test ORDER BY part
114+
----
115+
1
116+
1
117+
2
118+
2
119+
120+
# select without any table-relates columns in projection
121+
query T
122+
SELECT 'x' FROM json_partitioned_test
123+
----
124+
x
125+
x
126+
x
127+
x
128+
129+
# select with partition filter
130+
query I
131+
SELECT id FROM json_partitioned_test WHERE part = 1 ORDER BY id
132+
----
133+
1
134+
2
135+
136+
# select with partition filter should scan only one directory
137+
query TT
138+
EXPLAIN SELECT id FROM json_partitioned_test WHERE part = 2
139+
----
140+
logical_plan TableScan: json_partitioned_test projection=[id], full_filters=[json_partitioned_test.part = Int32(2)]
141+
physical_plan JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_json/part=2/data.json]]}, projection=[id]

0 commit comments

Comments
 (0)