From 7c37afaef57bf4ab3eecf771bcef5f87154e9b3c Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 18 Jul 2024 11:15:47 +0530 Subject: [PATCH 1/2] fix for otel logs flattening --- server/src/handlers/http/otel.rs | 76 ++++++++++++++++++++------------ 1 file changed, 48 insertions(+), 28 deletions(-) diff --git a/server/src/handlers/http/otel.rs b/server/src/handlers/http/otel.rs index 5e2961095..90897461a 100644 --- a/server/src/handlers/http/otel.rs +++ b/server/src/handlers/http/otel.rs @@ -131,24 +131,27 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { let mut vec_otel_json: Vec> = Vec::new(); let body_str = std::str::from_utf8(body).unwrap(); let message: LogsData = serde_json::from_str(body_str).unwrap(); - for records in message.resource_logs.iter() { + + if let Some(records) = message.resource_logs.as_ref() { + let mut vec_resource_logs_json: Vec> = Vec::new(); for record in records.iter() { - let mut otel_json: BTreeMap = BTreeMap::new(); - for resource in record.resource.iter() { - let attributes = &resource.attributes; - for attributes in attributes.iter() { + let mut resource_log_json: BTreeMap = BTreeMap::new(); + + if let Some(resource) = record.resource.as_ref() { + if let Some(attributes) = resource.attributes.as_ref() { for attribute in attributes { let key = &attribute.key; let value = &attribute.value; let value_json = collect_json_from_values(value, &format!("resource_{}", key)); for key in value_json.keys() { - otel_json.insert(key.to_owned(), value_json[key].to_owned()); + resource_log_json.insert(key.to_owned(), value_json[key].to_owned()); } } } + if resource.dropped_attributes_count.is_some() { - otel_json.insert( + resource_log_json.insert( "resource_dropped_attributes_count".to_string(), Value::Number(serde_json::Number::from( resource.dropped_attributes_count.unwrap(), @@ -157,11 +160,14 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { } } - for scope_logs in record.scope_logs.iter() { + if let Some(scope_logs) = record.scope_logs.as_ref() { + let mut vec_scope_log_json: Vec> = Vec::new(); for scope_log in scope_logs.iter() { - for instrumentation_scope in scope_log.scope.iter() { + let mut scope_log_json: BTreeMap = BTreeMap::new(); + if scope_log.scope.is_some() { + let instrumentation_scope = scope_log.scope.as_ref().unwrap(); if instrumentation_scope.name.is_some() { - otel_json.insert( + scope_log_json.insert( "instrumentation_scope_name".to_string(), Value::String( instrumentation_scope.name.as_ref().unwrap().to_string(), @@ -169,16 +175,16 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { ); } if instrumentation_scope.version.is_some() { - otel_json.insert( + scope_log_json.insert( "instrumentation_scope_version".to_string(), Value::String( instrumentation_scope.version.as_ref().unwrap().to_string(), ), ); } - let attributes = &instrumentation_scope.attributes; - for attributes in attributes.iter() { - for attribute in attributes { + + if let Some(attributes) = instrumentation_scope.attributes.as_ref() { + for attribute in attributes.iter() { let key = &attribute.key; let value = &attribute.value; let value_json = collect_json_from_values( @@ -186,12 +192,14 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { &format!("instrumentation_scope_{}", key), ); for key in value_json.keys() { - otel_json.insert(key.to_owned(), value_json[key].to_owned()); + scope_log_json + .insert(key.to_owned(), value_json[key].to_owned()); } } } + if instrumentation_scope.dropped_attributes_count.is_some() { - otel_json.insert( + scope_log_json.insert( "instrumentation_scope_dropped_attributes_count".to_string(), Value::Number(serde_json::Number::from( instrumentation_scope.dropped_attributes_count.unwrap(), @@ -199,6 +207,12 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { ); } } + if scope_log.schema_url.is_some() { + scope_log_json.insert( + "scope_log_schema_url".to_string(), + Value::String(scope_log.schema_url.as_ref().unwrap().to_string()), + ); + } for log_record in scope_log.log_records.iter() { let mut log_record_json: BTreeMap = BTreeMap::new(); @@ -254,7 +268,7 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { } } - for attributes in log_record.attributes.iter() { + if let Some(attributes) = log_record.attributes.as_ref() { for attribute in attributes { let key = &attribute.key; let value = &attribute.value; @@ -302,25 +316,31 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { ); } for key in log_record_json.keys() { - otel_json.insert(key.to_owned(), log_record_json[key].to_owned()); + scope_log_json.insert(key.to_owned(), log_record_json[key].to_owned()); } - vec_otel_json.push(otel_json.clone()); - } - - if scope_log.schema_url.is_some() { - otel_json.insert( - "scope_log_schema_url".to_string(), - Value::String(scope_log.schema_url.as_ref().unwrap().to_string()), - ); + vec_scope_log_json.push(scope_log_json.clone()); } } + for scope_log_json in vec_scope_log_json.iter() { + vec_resource_logs_json.push(scope_log_json.clone()); + } } if record.schema_url.is_some() { - otel_json.insert( - "resource_schema_url".to_string(), + resource_log_json.insert( + "schema_url".to_string(), Value::String(record.schema_url.as_ref().unwrap().to_string()), ); } + + for resource_logs_json in vec_resource_logs_json.iter_mut() { + for key in resource_log_json.keys() { + resource_logs_json.insert(key.to_owned(), resource_log_json[key].to_owned()); + } + } + } + + for resource_logs_json in vec_resource_logs_json.iter() { + vec_otel_json.push(resource_logs_json.clone()); } } vec_otel_json From 02deb30bab7c4446da2a957098613c3a3d5708bd Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 18 Jul 2024 12:36:23 +0530 Subject: [PATCH 2/2] fix: otel log ingestion with all available fields issue: log ingested did not have all the data few fields missed in flattening the multi-level hierarchical structure of the OTEL log flattening issue has been fixed with this PR tested and verified that all the data in the log event has been ingested successfully change: fix the flattening --- server/src/handlers/http/otel.rs | 239 ++++++++++++++++--------------- 1 file changed, 121 insertions(+), 118 deletions(-) diff --git a/server/src/handlers/http/otel.rs b/server/src/handlers/http/otel.rs index 90897461a..09b3eaba1 100644 --- a/server/src/handlers/http/otel.rs +++ b/server/src/handlers/http/otel.rs @@ -17,6 +17,8 @@ */ use bytes::Bytes; +use proto::common::v1::KeyValue; +use proto::logs::v1::LogRecord; use serde_json::Value; mod proto; use crate::handlers::http::otel::proto::logs::v1::LogRecordFlags; @@ -127,6 +129,116 @@ fn value_to_string(value: serde_json::Value) -> String { } } +pub fn flatten_attributes( + attributes: &Vec, + attribute_source_key: String, +) -> BTreeMap { + let mut attributes_json: BTreeMap = BTreeMap::new(); + for attribute in attributes { + let key = &attribute.key; + let value = &attribute.value; + let value_json = + collect_json_from_values(value, &format!("{}_{}", attribute_source_key, key)); + for key in value_json.keys() { + attributes_json.insert(key.to_owned(), value_json[key].to_owned()); + } + } + attributes_json +} + +pub fn flatten_log_record(log_record: &LogRecord) -> BTreeMap { + let mut log_record_json: BTreeMap = BTreeMap::new(); + if log_record.time_unix_nano.is_some() { + log_record_json.insert( + "time_unix_nano".to_string(), + Value::String(log_record.time_unix_nano.as_ref().unwrap().to_string()), + ); + } + if log_record.observed_time_unix_nano.is_some() { + log_record_json.insert( + "observed_time_unix_nano".to_string(), + Value::String( + log_record + .observed_time_unix_nano + .as_ref() + .unwrap() + .to_string(), + ), + ); + } + if log_record.severity_number.is_some() { + let severity_number: i32 = log_record.severity_number.unwrap(); + log_record_json.insert( + "severity_number".to_string(), + Value::Number(serde_json::Number::from(severity_number)), + ); + if log_record.severity_text.is_none() { + log_record_json.insert( + "severity_text".to_string(), + Value::String(SeverityNumber::as_str_name(severity_number).to_string()), + ); + } + } + if log_record.severity_text.is_some() { + log_record_json.insert( + "severity_text".to_string(), + Value::String(log_record.severity_text.as_ref().unwrap().to_string()), + ); + } + + if log_record.body.is_some() { + let body = &log_record.body; + let body_json = collect_json_from_values(body, &"body".to_string()); + for key in body_json.keys() { + log_record_json.insert(key.to_owned(), body_json[key].to_owned()); + } + } + + if let Some(attributes) = log_record.attributes.as_ref() { + let attributes_json = flatten_attributes(attributes, "log_record".to_string()); + for key in attributes_json.keys() { + log_record_json.insert(key.to_owned(), attributes_json[key].to_owned()); + } + } + + if log_record.dropped_attributes_count.is_some() { + log_record_json.insert( + "log_record_dropped_attributes_count".to_string(), + Value::Number(serde_json::Number::from( + log_record.dropped_attributes_count.unwrap(), + )), + ); + } + + if log_record.flags.is_some() { + let flags: u32 = log_record.flags.unwrap(); + log_record_json.insert( + "flags_number".to_string(), + Value::Number(serde_json::Number::from(flags)), + ); + log_record_json.insert( + "flags_string".to_string(), + Value::String(LogRecordFlags::as_str_name(flags).to_string()), + ); + } + + if log_record.span_id.is_some() { + log_record_json.insert( + "span_id".to_string(), + Value::String(log_record.span_id.as_ref().unwrap().to_string()), + ); + } + + if log_record.trace_id.is_some() { + log_record_json.insert( + "trace_id".to_string(), + Value::String(log_record.trace_id.as_ref().unwrap().to_string()), + ); + } + + log_record_json +} + pub fn flatten_otel_logs(body: &Bytes) -> Vec> { let mut vec_otel_json: Vec> = Vec::new(); let body_str = std::str::from_utf8(body).unwrap(); @@ -139,14 +251,9 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { if let Some(resource) = record.resource.as_ref() { if let Some(attributes) = resource.attributes.as_ref() { - for attribute in attributes { - let key = &attribute.key; - let value = &attribute.value; - let value_json = - collect_json_from_values(value, &format!("resource_{}", key)); - for key in value_json.keys() { - resource_log_json.insert(key.to_owned(), value_json[key].to_owned()); - } + let attributes_json = flatten_attributes(attributes, "resource".to_string()); + for key in attributes_json.keys() { + resource_log_json.insert(key.to_owned(), attributes_json[key].to_owned()); } } @@ -184,17 +291,11 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { } if let Some(attributes) = instrumentation_scope.attributes.as_ref() { - for attribute in attributes.iter() { - let key = &attribute.key; - let value = &attribute.value; - let value_json = collect_json_from_values( - value, - &format!("instrumentation_scope_{}", key), - ); - for key in value_json.keys() { - scope_log_json - .insert(key.to_owned(), value_json[key].to_owned()); - } + let attributes_json = + flatten_attributes(attributes, "instrumentation_scope".to_string()); + for key in attributes_json.keys() { + scope_log_json + .insert(key.to_owned(), attributes_json[key].to_owned()); } } @@ -215,106 +316,8 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec> { } for log_record in scope_log.log_records.iter() { - let mut log_record_json: BTreeMap = BTreeMap::new(); - if log_record.time_unix_nano.is_some() { - log_record_json.insert( - "time_unix_nano".to_string(), - Value::String( - log_record.time_unix_nano.as_ref().unwrap().to_string(), - ), - ); - } - if log_record.observed_time_unix_nano.is_some() { - log_record_json.insert( - "observed_time_unix_nano".to_string(), - Value::String( - log_record - .observed_time_unix_nano - .as_ref() - .unwrap() - .to_string(), - ), - ); - } - if log_record.severity_number.is_some() { - let severity_number: i32 = log_record.severity_number.unwrap(); - log_record_json.insert( - "severity_number".to_string(), - Value::Number(serde_json::Number::from(severity_number)), - ); - if log_record.severity_text.is_none() { - log_record_json.insert( - "severity_text".to_string(), - Value::String( - SeverityNumber::as_str_name(severity_number).to_string(), - ), - ); - } - } - if log_record.severity_text.is_some() { - log_record_json.insert( - "severity_text".to_string(), - Value::String( - log_record.severity_text.as_ref().unwrap().to_string(), - ), - ); - } - - if log_record.body.is_some() { - let body = &log_record.body; - let body_json = collect_json_from_values(body, &"body".to_string()); - for key in body_json.keys() { - log_record_json.insert(key.to_owned(), body_json[key].to_owned()); - } - } - - if let Some(attributes) = log_record.attributes.as_ref() { - for attribute in attributes { - let key = &attribute.key; - let value = &attribute.value; - let value_json = - collect_json_from_values(value, &format!("log_record_{}", key)); - for key in value_json.keys() { - log_record_json - .insert(key.to_owned(), value_json[key].to_owned()); - } - } - } - - if log_record.dropped_attributes_count.is_some() { - log_record_json.insert( - "log_record_dropped_attributes_count".to_string(), - Value::Number(serde_json::Number::from( - log_record.dropped_attributes_count.unwrap(), - )), - ); - } + let log_record_json = flatten_log_record(log_record); - if log_record.flags.is_some() { - let flags: u32 = log_record.flags.unwrap(); - log_record_json.insert( - "flags_number".to_string(), - Value::Number(serde_json::Number::from(flags)), - ); - log_record_json.insert( - "flags_string".to_string(), - Value::String(LogRecordFlags::as_str_name(flags).to_string()), - ); - } - - if log_record.span_id.is_some() { - log_record_json.insert( - "span_id".to_string(), - Value::String(log_record.span_id.as_ref().unwrap().to_string()), - ); - } - - if log_record.trace_id.is_some() { - log_record_json.insert( - "trace_id".to_string(), - Value::String(log_record.trace_id.as_ref().unwrap().to_string()), - ); - } for key in log_record_json.keys() { scope_log_json.insert(key.to_owned(), log_record_json[key].to_owned()); }