diff --git a/server/src/handlers/http/otel.rs b/server/src/handlers/http/otel.rs index 5e2961095..09b3eaba1 100644 --- a/server/src/handlers/http/otel.rs +++ b/server/src/handlers/http/otel.rs @@ -17,6 +17,8 @@ */ use bytes::Bytes; +use proto::common::v1::KeyValue; +use proto::logs::v1::LogRecord; use serde_json::Value; mod proto; use crate::handlers::http::otel::proto::logs::v1::LogRecordFlags; @@ -127,28 +129,136 @@ fn value_to_string(value: serde_json::Value) -> String { } } +pub fn flatten_attributes( + attributes: &Vec, + attribute_source_key: String, +) -> BTreeMap { + let mut attributes_json: BTreeMap = BTreeMap::new(); + for attribute in attributes { + let key = &attribute.key; + let value = &attribute.value; + let value_json = + collect_json_from_values(value, &format!("{}_{}", attribute_source_key, key)); + for key in value_json.keys() { + attributes_json.insert(key.to_owned(), value_json[key].to_owned()); + } + } + attributes_json +} + +pub fn flatten_log_record(log_record: &LogRecord) -> BTreeMap { + let mut log_record_json: BTreeMap = BTreeMap::new(); + if log_record.time_unix_nano.is_some() { + log_record_json.insert( + "time_unix_nano".to_string(), + Value::String(log_record.time_unix_nano.as_ref().unwrap().to_string()), + ); + } + if log_record.observed_time_unix_nano.is_some() { + log_record_json.insert( + "observed_time_unix_nano".to_string(), + Value::String( + log_record + .observed_time_unix_nano + .as_ref() + .unwrap() + .to_string(), + ), + ); + } + if log_record.severity_number.is_some() { + let severity_number: i32 = log_record.severity_number.unwrap(); + log_record_json.insert( + "severity_number".to_string(), + Value::Number(serde_json::Number::from(severity_number)), + ); + if log_record.severity_text.is_none() { + log_record_json.insert( + "severity_text".to_string(), + Value::String(SeverityNumber::as_str_name(severity_number).to_string()), + ); + } + } + if log_record.severity_text.is_some() { + log_record_json.insert( + "severity_text".to_string(), + Value::String(log_record.severity_text.as_ref().unwrap().to_string()), + ); + } + + if log_record.body.is_some() { + let body = &log_record.body; + let body_json = collect_json_from_values(body, &"body".to_string()); + for key in body_json.keys() { + log_record_json.insert(key.to_owned(), body_json[key].to_owned()); + } + } + + if let Some(attributes) = log_record.attributes.as_ref() { + let attributes_json = flatten_attributes(attributes, "log_record".to_string()); + for key in attributes_json.keys() { + log_record_json.insert(key.to_owned(), attributes_json[key].to_owned()); + } + } + + if log_record.dropped_attributes_count.is_some() { + log_record_json.insert( + "log_record_dropped_attributes_count".to_string(), + Value::Number(serde_json::Number::from( + log_record.dropped_attributes_count.unwrap(), + )), + ); + } + + if log_record.flags.is_some() { + let flags: u32 = log_record.flags.unwrap(); + log_record_json.insert( + "flags_number".to_string(), + Value::Number(serde_json::Number::from(flags)), + ); + log_record_json.insert( + "flags_string".to_string(), + Value::String(LogRecordFlags::as_str_name(flags).to_string()), + ); + } + + if log_record.span_id.is_some() { + log_record_json.insert( + "span_id".to_string(), + Value::String(log_record.span_id.as_ref().unwrap().to_string()), + ); + } + + if log_record.trace_id.is_some() { + log_record_json.insert( + "trace_id".to_string(), + Value::String(log_record.trace_id.as_ref().unwrap().to_string()), + ); + } + + log_record_json +} + pub fn flatten_otel_logs(body: &Bytes) -> Vec> { let mut vec_otel_json: Vec> = Vec::new(); let body_str = std::str::from_utf8(body).unwrap(); let message: LogsData = serde_json::from_str(body_str).unwrap(); - for records in message.resource_logs.iter() { + + if let Some(records) = message.resource_logs.as_ref() { + let mut vec_resource_logs_json: Vec> = Vec::new(); for record in records.iter() { - let mut otel_json: BTreeMap = BTreeMap::new(); - for resource in record.resource.iter() { - let attributes = &resource.attributes; - for attributes in attributes.iter() { - for attribute in attributes { - let key = &attribute.key; - let value = &attribute.value; - let value_json = - collect_json_from_values(value, &format!("resource_{}", key)); - for key in value_json.keys() { - otel_json.insert(key.to_owned(), value_json[key].to_owned()); - } + let mut resource_log_json: BTreeMap = BTreeMap::new(); + + if let Some(resource) = record.resource.as_ref() { + if let Some(attributes) = resource.attributes.as_ref() { + let attributes_json = flatten_attributes(attributes, "resource".to_string()); + for key in attributes_json.keys() { + resource_log_json.insert(key.to_owned(), attributes_json[key].to_owned()); } } + if resource.dropped_attributes_count.is_some() { - otel_json.insert( + resource_log_json.insert( "resource_dropped_attributes_count".to_string(), Value::Number(serde_json::Number::from( resource.dropped_attributes_count.unwrap(), @@ -157,11 +267,14 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { } } - for scope_logs in record.scope_logs.iter() { + if let Some(scope_logs) = record.scope_logs.as_ref() { + let mut vec_scope_log_json: Vec> = Vec::new(); for scope_log in scope_logs.iter() { - for instrumentation_scope in scope_log.scope.iter() { + let mut scope_log_json: BTreeMap = BTreeMap::new(); + if scope_log.scope.is_some() { + let instrumentation_scope = scope_log.scope.as_ref().unwrap(); if instrumentation_scope.name.is_some() { - otel_json.insert( + scope_log_json.insert( "instrumentation_scope_name".to_string(), Value::String( instrumentation_scope.name.as_ref().unwrap().to_string(), @@ -169,29 +282,25 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { ); } if instrumentation_scope.version.is_some() { - otel_json.insert( + scope_log_json.insert( "instrumentation_scope_version".to_string(), Value::String( instrumentation_scope.version.as_ref().unwrap().to_string(), ), ); } - let attributes = &instrumentation_scope.attributes; - for attributes in attributes.iter() { - for attribute in attributes { - let key = &attribute.key; - let value = &attribute.value; - let value_json = collect_json_from_values( - value, - &format!("instrumentation_scope_{}", key), - ); - for key in value_json.keys() { - otel_json.insert(key.to_owned(), value_json[key].to_owned()); - } + + if let Some(attributes) = instrumentation_scope.attributes.as_ref() { + let attributes_json = + flatten_attributes(attributes, "instrumentation_scope".to_string()); + for key in attributes_json.keys() { + scope_log_json + .insert(key.to_owned(), attributes_json[key].to_owned()); } } + if instrumentation_scope.dropped_attributes_count.is_some() { - otel_json.insert( + scope_log_json.insert( "instrumentation_scope_dropped_attributes_count".to_string(), Value::Number(serde_json::Number::from( instrumentation_scope.dropped_attributes_count.unwrap(), @@ -199,128 +308,42 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { ); } } + if scope_log.schema_url.is_some() { + scope_log_json.insert( + "scope_log_schema_url".to_string(), + Value::String(scope_log.schema_url.as_ref().unwrap().to_string()), + ); + } for log_record in scope_log.log_records.iter() { - let mut log_record_json: BTreeMap = BTreeMap::new(); - if log_record.time_unix_nano.is_some() { - log_record_json.insert( - "time_unix_nano".to_string(), - Value::String( - log_record.time_unix_nano.as_ref().unwrap().to_string(), - ), - ); - } - if log_record.observed_time_unix_nano.is_some() { - log_record_json.insert( - "observed_time_unix_nano".to_string(), - Value::String( - log_record - .observed_time_unix_nano - .as_ref() - .unwrap() - .to_string(), - ), - ); - } - if log_record.severity_number.is_some() { - let severity_number: i32 = log_record.severity_number.unwrap(); - log_record_json.insert( - "severity_number".to_string(), - Value::Number(serde_json::Number::from(severity_number)), - ); - if log_record.severity_text.is_none() { - log_record_json.insert( - "severity_text".to_string(), - Value::String( - SeverityNumber::as_str_name(severity_number).to_string(), - ), - ); - } - } - if log_record.severity_text.is_some() { - log_record_json.insert( - "severity_text".to_string(), - Value::String( - log_record.severity_text.as_ref().unwrap().to_string(), - ), - ); - } - - if log_record.body.is_some() { - let body = &log_record.body; - let body_json = collect_json_from_values(body, &"body".to_string()); - for key in body_json.keys() { - log_record_json.insert(key.to_owned(), body_json[key].to_owned()); - } - } + let log_record_json = flatten_log_record(log_record); - for attributes in log_record.attributes.iter() { - for attribute in attributes { - let key = &attribute.key; - let value = &attribute.value; - let value_json = - collect_json_from_values(value, &format!("log_record_{}", key)); - for key in value_json.keys() { - log_record_json - .insert(key.to_owned(), value_json[key].to_owned()); - } - } - } - - if log_record.dropped_attributes_count.is_some() { - log_record_json.insert( - "log_record_dropped_attributes_count".to_string(), - Value::Number(serde_json::Number::from( - log_record.dropped_attributes_count.unwrap(), - )), - ); - } - - if log_record.flags.is_some() { - let flags: u32 = log_record.flags.unwrap(); - log_record_json.insert( - "flags_number".to_string(), - Value::Number(serde_json::Number::from(flags)), - ); - log_record_json.insert( - "flags_string".to_string(), - Value::String(LogRecordFlags::as_str_name(flags).to_string()), - ); - } - - if log_record.span_id.is_some() { - log_record_json.insert( - "span_id".to_string(), - Value::String(log_record.span_id.as_ref().unwrap().to_string()), - ); - } - - if log_record.trace_id.is_some() { - log_record_json.insert( - "trace_id".to_string(), - Value::String(log_record.trace_id.as_ref().unwrap().to_string()), - ); - } for key in log_record_json.keys() { - otel_json.insert(key.to_owned(), log_record_json[key].to_owned()); + scope_log_json.insert(key.to_owned(), log_record_json[key].to_owned()); } - vec_otel_json.push(otel_json.clone()); - } - - if scope_log.schema_url.is_some() { - otel_json.insert( - "scope_log_schema_url".to_string(), - Value::String(scope_log.schema_url.as_ref().unwrap().to_string()), - ); + vec_scope_log_json.push(scope_log_json.clone()); } } + for scope_log_json in vec_scope_log_json.iter() { + vec_resource_logs_json.push(scope_log_json.clone()); + } } if record.schema_url.is_some() { - otel_json.insert( - "resource_schema_url".to_string(), + resource_log_json.insert( + "schema_url".to_string(), Value::String(record.schema_url.as_ref().unwrap().to_string()), ); } + + for resource_logs_json in vec_resource_logs_json.iter_mut() { + for key in resource_log_json.keys() { + resource_logs_json.insert(key.to_owned(), resource_log_json[key].to_owned()); + } + } + } + + for resource_logs_json in vec_resource_logs_json.iter() { + vec_otel_json.push(resource_logs_json.clone()); } } vec_otel_json