@@ -20,14 +20,14 @@ use super::logstream::error::{CreateStreamError, StreamError};
20
20
use super :: modal:: utils:: ingest_utils:: { flatten_and_push_logs, push_logs} ;
21
21
use super :: users:: dashboards:: DashboardError ;
22
22
use super :: users:: filters:: FiltersError ;
23
- use super :: { otel_logs, otel_metrics} ;
23
+ use super :: { otel_logs, otel_metrics, otel_traces } ;
24
24
use crate :: event:: {
25
25
self ,
26
26
error:: EventError ,
27
27
format:: { self , EventFormat } ,
28
28
} ;
29
29
use crate :: handlers:: http:: modal:: utils:: logstream_utils:: create_stream_and_schema_from_storage;
30
- use crate :: handlers:: { LOG_SOURCE_KEY , LOG_SOURCE_OTEL , STREAM_NAME_HEADER_KEY } ;
30
+ use crate :: handlers:: STREAM_NAME_HEADER_KEY ;
31
31
use crate :: metadata:: error:: stream_info:: MetadataError ;
32
32
use crate :: metadata:: STREAM_INFO ;
33
33
use crate :: option:: { Mode , CONFIG } ;
@@ -118,23 +118,11 @@ pub async fn handle_otel_logs_ingestion(
118
118
let stream_name = stream_name. to_str ( ) . unwrap ( ) . to_owned ( ) ;
119
119
create_stream_if_not_exists ( & stream_name, & StreamType :: UserDefined . to_string ( ) ) . await ?;
120
120
121
- //flatten logs
122
- if let Some ( ( _, log_source) ) = req. headers ( ) . iter ( ) . find ( |& ( key, _) | key == LOG_SOURCE_KEY )
123
- {
124
- let log_source: String = log_source. to_str ( ) . unwrap ( ) . to_owned ( ) ;
125
- if log_source == LOG_SOURCE_OTEL {
126
- let mut json = otel_logs:: flatten_otel_logs ( & body) ;
127
- for record in json. iter_mut ( ) {
128
- let body: Bytes = serde_json:: to_vec ( record) . unwrap ( ) . into ( ) ;
129
- push_logs ( stream_name. to_string ( ) , req. clone ( ) , body) . await ?;
130
- }
131
- } else {
132
- return Err ( PostError :: CustomError ( "Unknown log source" . to_string ( ) ) ) ;
133
- }
134
- } else {
135
- return Err ( PostError :: CustomError (
136
- "log source key header is missing" . to_string ( ) ,
137
- ) ) ;
121
+ //custom flattening required for otel logs
122
+ let mut json = otel_logs:: flatten_otel_logs ( & body) ;
123
+ for record in json. iter_mut ( ) {
124
+ let body: Bytes = serde_json:: to_vec ( record) . unwrap ( ) . into ( ) ;
125
+ push_logs ( stream_name. to_string ( ) , req. clone ( ) , body) . await ?;
138
126
}
139
127
} else {
140
128
return Err ( PostError :: Header ( ParseHeaderError :: MissingStreamName ) ) ;
@@ -157,23 +145,11 @@ pub async fn handle_otel_metrics_ingestion(
157
145
let stream_name = stream_name. to_str ( ) . unwrap ( ) . to_owned ( ) ;
158
146
create_stream_if_not_exists ( & stream_name, & StreamType :: UserDefined . to_string ( ) ) . await ?;
159
147
160
- //flatten logs
161
- if let Some ( ( _, log_source) ) = req. headers ( ) . iter ( ) . find ( |& ( key, _) | key == LOG_SOURCE_KEY )
162
- {
163
- let log_source: String = log_source. to_str ( ) . unwrap ( ) . to_owned ( ) ;
164
- if log_source == LOG_SOURCE_OTEL {
165
- let mut json = otel_metrics:: flatten_otel_metrics ( & body) ;
166
- for record in json. iter_mut ( ) {
167
- let body: Bytes = serde_json:: to_vec ( record) . unwrap ( ) . into ( ) ;
168
- push_logs ( stream_name. to_string ( ) , req. clone ( ) , body) . await ?;
169
- }
170
- } else {
171
- return Err ( PostError :: CustomError ( "Unknown log source" . to_string ( ) ) ) ;
172
- }
173
- } else {
174
- return Err ( PostError :: CustomError (
175
- "log source key header is missing" . to_string ( ) ,
176
- ) ) ;
148
+ //custom flattening required for otel metrics
149
+ let mut json = otel_metrics:: flatten_otel_metrics ( & body) ;
150
+ for record in json. iter_mut ( ) {
151
+ let body: Bytes = serde_json:: to_vec ( record) . unwrap ( ) . into ( ) ;
152
+ push_logs ( stream_name. to_string ( ) , req. clone ( ) , body) . await ?;
177
153
}
178
154
} else {
179
155
return Err ( PostError :: Header ( ParseHeaderError :: MissingStreamName ) ) ;
@@ -195,7 +171,13 @@ pub async fn handle_otel_traces_ingestion(
195
171
{
196
172
let stream_name = stream_name. to_str ( ) . unwrap ( ) . to_owned ( ) ;
197
173
create_stream_if_not_exists ( & stream_name, & StreamType :: UserDefined . to_string ( ) ) . await ?;
198
- push_logs ( stream_name. to_string ( ) , req. clone ( ) , body) . await ?;
174
+
175
+ //custom flattening required for otel traces
176
+ let mut json = otel_traces:: flatten_otel_traces ( & body) ;
177
+ for record in json. iter_mut ( ) {
178
+ let body: Bytes = serde_json:: to_vec ( record) . unwrap ( ) . into ( ) ;
179
+ push_logs ( stream_name. to_string ( ) , req. clone ( ) , body) . await ?;
180
+ }
199
181
} else {
200
182
return Err ( PostError :: Header ( ParseHeaderError :: MissingStreamName ) ) ;
201
183
}
0 commit comments