@@ -46,8 +46,8 @@ use crate::event::commit_schema;
4646use crate :: metrics:: QUERY_EXECUTE_TIME ;
4747use crate :: parseable:: { StreamNotFound , PARSEABLE } ;
4848use crate :: query:: error:: ExecuteError ;
49- use crate :: query:: { resolve_stream_names, QUERY_SESSION } ;
5049use crate :: query:: { execute, CountsRequest , Query as LogicalQuery } ;
50+ use crate :: query:: { resolve_stream_names, QUERY_SESSION } ;
5151use crate :: rbac:: Users ;
5252use crate :: response:: QueryResponse ;
5353use crate :: storage:: ObjectStorageError ;
@@ -95,8 +95,8 @@ pub async fn get_records_and_fields(
9595 . first ( )
9696 . ok_or_else ( || QueryError :: MalformedQuery ( "No table name found in query" ) ) ?;
9797 user_auth_for_datasets ( & permissions, & tables) . await ?;
98-
99- let ( records, fields) = execute ( query, & table_name, false ) . await ?;
98+ update_schema_when_distributed ( & tables ) . await ? ;
99+ let ( records, fields) = execute ( query, table_name, false ) . await ?;
100100
101101 let records = match records {
102102 Either :: Left ( vec_rb) => vec_rb,
@@ -122,25 +122,25 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
122122 . first ( )
123123 . ok_or_else ( || QueryError :: MalformedQuery ( "No table name found in query" ) ) ?;
124124 user_auth_for_datasets ( & permissions, & tables) . await ?;
125-
125+ update_schema_when_distributed ( & tables ) . await ? ;
126126 let time = Instant :: now ( ) ;
127127
128128 // if the query is `select count(*) from <dataset>`
129129 // we use the `get_bin_density` method to get the count of records in the dataset
130130 // instead of executing the query using datafusion
131131 if let Some ( column_name) = query. is_logical_plan_count_without_filters ( ) {
132- return handle_count_query ( & query_request, & table_name, column_name, time) . await ;
132+ return handle_count_query ( & query_request, table_name, column_name, time) . await ;
133133 }
134134
135135 // if the query request has streaming = false (default)
136136 // we use datafusion's `execute` method to get the records
137137 if !query_request. streaming {
138- return handle_non_streaming_query ( query, & table_name, & query_request, time) . await ;
138+ return handle_non_streaming_query ( query, table_name, & query_request, time) . await ;
139139 }
140140
141141 // if the query request has streaming = true
142142 // we use datafusion's `execute_stream` method to get the records
143- handle_streaming_query ( query, & table_name, & query_request, time) . await
143+ handle_streaming_query ( query, table_name, & query_request, time) . await
144144}
145145
146146/// Handles count queries (e.g., `SELECT COUNT(*) FROM <dataset-name>`)
0 commit comments