Skip to content

Commit f9eb223

Browse files
Devdutt Shenoinikhilsinhaparseable
authored andcommitted
refactor: flatten nesting
1 parent 75d7820 commit f9eb223

File tree

1 file changed

+26
-32
lines changed

1 file changed

+26
-32
lines changed

src/event/format/json.rs

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -72,38 +72,32 @@ impl EventFormat for Event {
7272
let mut is_first = false;
7373
let schema = match derive_arrow_schema(stream_schema, fields) {
7474
Ok(schema) => schema,
75-
Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) {
76-
Ok(mut infer_schema) => {
77-
let new_infer_schema = super::update_field_type_in_schema(
78-
Arc::new(infer_schema),
79-
Some(stream_schema),
80-
time_partition,
81-
Some(&value_arr),
82-
schema_version,
83-
);
84-
infer_schema = Schema::new(new_infer_schema.fields().clone());
85-
if let Err(err) = Schema::try_merge(vec![
86-
Schema::new(stream_schema.values().cloned().collect::<Fields>()),
87-
infer_schema.clone(),
88-
]) {
89-
return Err(anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err));
90-
}
91-
is_first = true;
92-
infer_schema
93-
.fields
94-
.iter()
95-
.filter(|field| !field.data_type().is_null())
96-
.cloned()
97-
.sorted_by(|a, b| a.name().cmp(b.name()))
98-
.collect()
99-
}
100-
Err(err) => {
101-
return Err(anyhow!(
102-
"Could not infer schema for this event due to err {:?}",
103-
err
104-
))
105-
}
106-
},
75+
Err(_) => {
76+
let mut infer_schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok))
77+
.map_err(|err| {
78+
anyhow!("Could not infer schema for this event due to err {:?}", err)
79+
})?;
80+
let new_infer_schema = super::update_field_type_in_schema(
81+
Arc::new(infer_schema),
82+
Some(stream_schema),
83+
time_partition,
84+
Some(&value_arr),
85+
schema_version,
86+
);
87+
infer_schema = Schema::new(new_infer_schema.fields().clone());
88+
Schema::try_merge(vec![
89+
Schema::new(stream_schema.values().cloned().collect::<Fields>()),
90+
infer_schema.clone(),
91+
]).map_err(|err| anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err))?;
92+
is_first = true;
93+
infer_schema
94+
.fields
95+
.iter()
96+
.filter(|field| !field.data_type().is_null())
97+
.cloned()
98+
.sorted_by(|a, b| a.name().cmp(b.name()))
99+
.collect()
100+
}
107101
};
108102

109103
if static_schema_flag.is_none()

0 commit comments

Comments
 (0)