Skip to content

Commit a843a5f

Browse files
fix: flatten before detect
1 parent 04078cf commit a843a5f

File tree

1 file changed

+35
-12
lines changed

1 file changed

+35
-12
lines changed

src/handlers/http/logstream.rs

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
3030
use crate::storage::retention::Retention;
3131
use crate::storage::{StreamInfo, StreamType};
3232
use crate::utils::actix::extract_session_key_from_req;
33+
use crate::utils::json::flatten::{
34+
self, convert_to_array, generic_flattening, has_more_than_max_allowed_levels,
35+
};
3336
use crate::{stats, validator, LOCK_EXPECT};
3437

3538
use actix_web::http::StatusCode;
@@ -102,22 +105,42 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, StreamError> {
102105
}
103106

104107
pub async fn detect_schema(Json(json): Json<Value>) -> Result<impl Responder, StreamError> {
105-
let log_records: Vec<Value> = match json {
106-
Value::Array(arr) => arr,
107-
value @ Value::Object(_) => vec![value],
108-
_ => {
108+
// flatten before infer
109+
if !has_more_than_max_allowed_levels(&json, 1) {
110+
//perform generic flattening, return error if failed to flatten
111+
let mut flattened_json = match generic_flattening(&json) {
112+
Ok(flattened) => convert_to_array(flattened).unwrap(),
113+
Err(e) => {
114+
return Err(StreamError::Custom {
115+
msg: e.to_string(),
116+
status: StatusCode::BAD_REQUEST,
117+
})
118+
}
119+
};
120+
if let Err(err) = flatten::flatten(&mut flattened_json, "_", None, None, None, false) {
109121
return Err(StreamError::Custom {
110-
msg: "please send json events as part of the request".to_string(),
122+
msg: err.to_string(),
111123
status: StatusCode::BAD_REQUEST,
112-
})
124+
});
113125
}
114-
};
115-
116-
let mut schema = Arc::new(infer_json_schema_from_iterator(log_records.iter().map(Ok)).unwrap());
117-
for log_record in log_records {
118-
schema = override_data_type(schema, log_record, SchemaVersion::V1);
126+
let flattened_json_arr = match flattened_json {
127+
Value::Array(arr) => arr,
128+
value @ Value::Object(_) => vec![value],
129+
_ => unreachable!("flatten would have failed beforehand"),
130+
};
131+
let mut schema =
132+
Arc::new(infer_json_schema_from_iterator(flattened_json_arr.iter().map(Ok)).unwrap());
133+
for flattened_json in flattened_json_arr {
134+
schema = override_data_type(schema, flattened_json, SchemaVersion::V1);
135+
}
136+
Ok((web::Json(schema), StatusCode::OK))
137+
} else {
138+
// error out if the JSON is heavily nested
139+
Err(StreamError::Custom {
140+
msg: "heavily nested, cannot flatten this JSON".to_string(),
141+
status: StatusCode::BAD_REQUEST,
142+
})
119143
}
120-
Ok((web::Json(schema), StatusCode::OK))
121144
}
122145

123146
pub async fn get_schema(stream_name: Path<String>) -> Result<impl Responder, StreamError> {

0 commit comments

Comments
 (0)