From 5c57bb8d04936d51ca980f087ec26820efe3eda7 Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Sun, 24 Mar 2024 20:27:31 -0700 Subject: [PATCH 01/11] feat(logs): make logger shutdown &self --- opentelemetry-sdk/Cargo.toml | 1 + opentelemetry-sdk/src/logs/log_emitter.rs | 123 ++++++++++++++++++-- opentelemetry-sdk/src/logs/log_processor.rs | 31 ++++- stress/src/logs.rs | 2 +- 4 files changed, 142 insertions(+), 15 deletions(-) diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index cabcc84476..cf811acc01 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -29,6 +29,7 @@ url = { workspace = true, optional = true } tokio = { workspace = true, features = ["rt", "time"], optional = true } tokio-stream = { workspace = true, optional = true } http = { workspace = true, optional = true } +lazy_static = "1.4.0" [package.metadata.docs.rs] all-features = true diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 10e9e444fd..35f07b64e6 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -13,12 +13,27 @@ use opentelemetry::{ #[cfg(feature = "logs_level_enabled")] use opentelemetry::logs::Severity; +use std::sync::atomic::AtomicBool; use std::{borrow::Cow, sync::Arc}; +use lazy_static::lazy_static; + +lazy_static! { + // a no nop logger provider used as placeholder when the provider is shutdown + static ref NOOP_LOGGER_PROVIDER: LoggerProvider = LoggerProvider { + inner: Arc::new(LoggerProviderInner { + processors: Vec::new(), + config: Config::default(), + }), + is_shutdown: Arc::new(AtomicBool::new(true)), + }; +} + #[derive(Debug, Clone)] /// Creator for `Logger` instances. pub struct LoggerProvider { inner: Arc, + is_shutdown: Arc, } /// Default logger name if empty string is provided. @@ -59,6 +74,10 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider { } fn library_logger(&self, library: Arc) -> Self::Logger { + // If the provider is shutdown, new logger will refer a no-op logger provider. + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + return Logger::new(library, NOOP_LOGGER_PROVIDER.clone()); + } Logger::new(library, self.clone()) } } @@ -89,20 +108,23 @@ impl LoggerProvider { /// Shuts down this `LoggerProvider`, panicking on failure. pub fn shutdown(&mut self) -> Vec> { - self.try_shutdown() - .expect("cannot shutdown LoggerProvider when child Loggers are still active") + // mark itself as already shutdown + self.is_shutdown + .store(true, std::sync::atomic::Ordering::Relaxed); + // propagate the shutdown signal to processors + // it's up to the processor to properly block new logs after shutdown + self.inner + .processors + .iter() + .map(|processor| processor.shutdown()) + .collect() } /// Attempts to shutdown this `LoggerProvider`, succeeding only when /// all cloned `LoggerProvider` values have been dropped. + // todo: remove this pub fn try_shutdown(&mut self) -> Option>> { - Arc::get_mut(&mut self.inner).map(|inner| { - inner - .processors - .iter_mut() - .map(|processor| processor.shutdown()) - .collect() - }) + Some(self.shutdown()) } } @@ -168,6 +190,7 @@ impl Builder { processors: self.processors, config: self.config, }), + is_shutdown: Arc::new(AtomicBool::new(false)), } } } @@ -255,7 +278,10 @@ mod tests { use opentelemetry::global::{logger, set_logger_provider, shutdown_logger_provider}; use opentelemetry::logs::Logger; use opentelemetry::{Key, KeyValue, Value}; - use std::sync::Mutex; + use opentelemetry::logs::{Logger, LoggerProvider as _}; + use std::fmt::{Debug, Formatter}; + use std::sync::atomic::AtomicU64; + use std::sync::{Arc, Mutex}; use std::thread; #[test] @@ -384,8 +410,83 @@ mod tests { assert_eq!(no_service_name.config().resource.len(), 0); } + struct ShutdownTestLogProcessor { + is_shutdown: Arc>, + counter: Arc, + } + + impl Debug for ShutdownTestLogProcessor { + fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } + } + + impl ShutdownTestLogProcessor { + pub(crate) fn new(counter: Arc) -> Self { + ShutdownTestLogProcessor { + is_shutdown: Arc::new(Mutex::new(false)), + counter, + } + } + } + + impl LogProcessor for ShutdownTestLogProcessor { + fn emit(&self, _data: LogData) { + self.is_shutdown + .lock() + .map(|is_shutdown| { + if !*is_shutdown { + self.counter + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + }) + .expect("lock poisoned"); + } + + fn force_flush(&self) -> LogResult<()> { + Ok(()) + } + + fn shutdown(&self) -> LogResult<()> { + self.is_shutdown + .lock() + .map(|mut is_shutdown| *is_shutdown = true) + .expect("lock poisoned"); + Ok(()) + } + + #[cfg(feature = "logs_level_enabled")] + fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { + true + } + } + #[test] fn shutdown_test() { + let counter = Arc::new(AtomicU64::new(0)); + let mut logger_provider = LoggerProvider::builder() + .with_log_processor(ShutdownTestLogProcessor::new(counter.clone())) + .build(); + + let logger1 = logger_provider.logger("test-logger1"); + let logger2 = logger_provider.logger("test-logger2"); + logger1.emit(LogRecord::default()); + logger2.emit(LogRecord::default()); + + let logger3 = logger_provider.logger("test-logger3"); + let handle = thread::spawn(move || { + logger3.emit(LogRecord::default()); + }); + handle.join().expect("thread panicked"); + + let _ = logger_provider.shutdown(); + logger1.emit(LogRecord::default()); + + assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 3); + } + + #[test] + fn global_shutdown_test() { // cargo test shutdown_test --features=logs // Arrange @@ -493,7 +594,7 @@ mod tests { Ok(()) } - fn shutdown(&mut self) -> LogResult<()> { + fn shutdown(&self) -> LogResult<()> { *self.shutdown_called.lock().unwrap() = true; Ok(()) } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 57ace8dad6..734844a894 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -46,7 +46,9 @@ pub trait LogProcessor: Send + Sync + Debug { /// Force the logs lying in the cache to be exported. fn force_flush(&self) -> LogResult<()>; /// Shuts down the processor. - fn shutdown(&mut self) -> LogResult<()>; + /// After shutdown returns the log processor should stop processing any logs. + /// It's up to the implementation on when to drop the LogProcessor. + fn shutdown(&self) -> LogResult<()>; #[cfg(feature = "logs_level_enabled")] /// Check if logging is enabled fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool; @@ -85,7 +87,7 @@ impl LogProcessor for SimpleLogProcessor { Ok(()) } - fn shutdown(&mut self) -> LogResult<()> { + fn shutdown(&self) -> LogResult<()> { if let Ok(mut exporter) = self.exporter.lock() { exporter.shutdown(); Ok(()) @@ -141,7 +143,7 @@ impl LogProcessor for BatchLogProcessor { .and_then(std::convert::identity) } - fn shutdown(&mut self) -> LogResult<()> { + fn shutdown(&self) -> LogResult<()> { let (res_sender, res_receiver) = oneshot::channel(); self.message_sender .try_send(BatchMessage::Shutdown(res_sender)) @@ -458,6 +460,8 @@ mod tests { BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, }; + use crate::export::logs::LogData; + use crate::logs::LogProcessor; use crate::{ logs::{ log_processor::{ @@ -620,4 +624,25 @@ mod tests { assert_eq!(actual.max_export_timeout, Duration::from_millis(3)); assert_eq!(actual.max_queue_size, 4); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_batch_shutdown() { + // assert we will receive an error + // setup + let exporter = InMemoryLogsExporter::default(); + let processor = + BatchLogProcessor::new(Box::new(exporter), BatchConfig::default(), runtime::Tokio); + processor.emit(LogData { + record: Default::default(), + resource: Default::default(), + instrumentation: Default::default(), + }); + processor.shutdown().unwrap(); + // todo: expect to see errors here. How should we assert this? + processor.emit(LogData { + record: Default::default(), + resource: Default::default(), + instrumentation: Default::default(), + }); + } } diff --git a/stress/src/logs.rs b/stress/src/logs.rs index aaa3a1d7ae..fdc08e9dae 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -15,7 +15,7 @@ impl LogProcessor for NoOpLogProcessor { Ok(()) } - fn shutdown(&mut self) -> opentelemetry::logs::LogResult<()> { + fn shutdown(&self) -> opentelemetry::logs::LogResult<()> { Ok(()) } From 25f28e804200c83581ccdb9082c15b3ca7c605ff Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Sun, 31 Mar 2024 16:25:38 -0700 Subject: [PATCH 02/11] add unit tests --- opentelemetry-sdk/src/logs/log_processor.rs | 55 +++++++++++++++++-- .../src/testing/logs/in_memory_exporter.rs | 21 ++++++- opentelemetry/src/logs/record.rs | 7 ++- 3 files changed, 74 insertions(+), 9 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 734844a894..45fc25ff5e 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -13,6 +13,7 @@ use opentelemetry::{ global, logs::{LogError, LogResult}, }; +use std::sync::atomic::AtomicBool; use std::{cmp::min, env, sync::Mutex}; use std::{ fmt::{self, Debug, Formatter}, @@ -61,18 +62,25 @@ pub trait LogProcessor: Send + Sync + Debug { #[derive(Debug)] pub struct SimpleLogProcessor { exporter: Mutex>, + is_shutdown: AtomicBool, } impl SimpleLogProcessor { pub(crate) fn new(exporter: Box) -> Self { SimpleLogProcessor { exporter: Mutex::new(exporter), + is_shutdown: AtomicBool::new(false), } } } impl LogProcessor for SimpleLogProcessor { fn emit(&self, data: LogData) { + // noop after shutdown + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + return; + } + let result = self .exporter .lock() @@ -88,6 +96,8 @@ impl LogProcessor for SimpleLogProcessor { } fn shutdown(&self) -> LogResult<()> { + self.is_shutdown + .store(true, std::sync::atomic::Ordering::Relaxed); if let Ok(mut exporter) = self.exporter.lock() { exporter.shutdown(); Ok(()) @@ -461,7 +471,8 @@ mod tests { OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, }; use crate::export::logs::LogData; - use crate::logs::LogProcessor; + use crate::logs::{LogProcessor, SimpleLogProcessor}; + use crate::testing::logs::InMemoryLogsExporterBuilder; use crate::{ logs::{ log_processor::{ @@ -629,14 +640,20 @@ mod tests { async fn test_batch_shutdown() { // assert we will receive an error // setup - let exporter = InMemoryLogsExporter::default(); - let processor = - BatchLogProcessor::new(Box::new(exporter), BatchConfig::default(), runtime::Tokio); + let exporter = InMemoryLogsExporterBuilder::default() + .keep_records_on_shutdown() + .build(); + let processor = BatchLogProcessor::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::Tokio, + ); processor.emit(LogData { record: Default::default(), resource: Default::default(), instrumentation: Default::default(), }); + processor.force_flush().unwrap(); processor.shutdown().unwrap(); // todo: expect to see errors here. How should we assert this? processor.emit(LogData { @@ -644,5 +661,35 @@ mod tests { resource: Default::default(), instrumentation: Default::default(), }); + assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) + } + + #[test] + fn test_simple_shutdown() { + let exporter = InMemoryLogsExporterBuilder::default() + .keep_records_on_shutdown() + .build(); + let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); + + processor.emit(LogData { + record: Default::default(), + resource: Default::default(), + instrumentation: Default::default(), + }); + + processor.shutdown().unwrap(); + + let is_shutdown = processor + .is_shutdown + .load(std::sync::atomic::Ordering::Relaxed); + assert!(is_shutdown); + + processor.emit(LogData { + record: Default::default(), + resource: Default::default(), + instrumentation: Default::default(), + }); + + assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) } } diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index aca67dc20b..2c68e00854 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -36,6 +36,7 @@ use std::sync::{Arc, Mutex}; #[derive(Clone, Debug)] pub struct InMemoryLogsExporter { logs: Arc>>, + should_reset_on_shutdown: bool, } impl Default for InMemoryLogsExporter { @@ -71,7 +72,9 @@ impl Default for InMemoryLogsExporter { /// ``` /// #[derive(Debug, Clone)] -pub struct InMemoryLogsExporterBuilder {} +pub struct InMemoryLogsExporterBuilder { + reset_on_shutdown: bool, +} impl Default for InMemoryLogsExporterBuilder { fn default() -> Self { @@ -83,7 +86,16 @@ impl InMemoryLogsExporterBuilder { /// Creates a new instance of `InMemoryLogsExporter`. /// pub fn new() -> Self { - Self {} + Self { + reset_on_shutdown: true, + } + } + + /// If set, the records will not be [`InMemoryLogsExporter::reset`] on shutdown. + pub fn keep_records_on_shutdown(self) -> Self { + Self { + reset_on_shutdown: false, + } } /// Creates a new instance of `InMemoryLogsExporter`. @@ -91,6 +103,7 @@ impl InMemoryLogsExporterBuilder { pub fn build(&self) -> InMemoryLogsExporter { InMemoryLogsExporter { logs: Arc::new(Mutex::new(Vec::new())), + should_reset_on_shutdown: self.reset_on_shutdown, } } } @@ -143,6 +156,8 @@ impl LogExporter for InMemoryLogsExporter { .map_err(LogError::from) } fn shutdown(&mut self) { - self.reset(); + if self.should_reset_on_shutdown { + self.reset(); + } } } diff --git a/opentelemetry/src/logs/record.rs b/opentelemetry/src/logs/record.rs index ca705bac58..81fb27754f 100644 --- a/opentelemetry/src/logs/record.rs +++ b/opentelemetry/src/logs/record.rs @@ -373,10 +373,13 @@ impl LogRecordBuilder { } /// Sets the `event_name` of a record. - pub fn with_name(self, name: Cow<'static, str>) -> Self { + pub fn with_name(self, name: T) -> Self + where + T: Into>, + { Self { record: LogRecord { - event_name: Some(name), + event_name: Some(name.into()), ..self.record }, } From cde300d61548b3c9cc6e77ffb96877dc8b2873c5 Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Sun, 31 Mar 2024 17:47:52 -0700 Subject: [PATCH 03/11] fix main --- opentelemetry-appender-tracing/benches/logs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 7446b6e9f4..b37f3de024 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -62,7 +62,7 @@ impl LogProcessor for NoopProcessor { Ok(()) } - fn shutdown(&mut self) -> LogResult<()> { + fn shutdown(&self) -> LogResult<()> { Ok(()) } From 185c09422bd227f0c89b309afcf7d4b657063e20 Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Sun, 7 Apr 2024 14:05:38 -0700 Subject: [PATCH 04/11] add `shutdown` in global logger provider --- opentelemetry-sdk/src/logs/log_emitter.rs | 37 +++++++++-------------- opentelemetry/src/global/logs.rs | 11 ++++++- opentelemetry/src/logs/logger.rs | 10 +++++- 3 files changed, 34 insertions(+), 24 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 35f07b64e6..b42f4f3489 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -80,6 +80,20 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider { } Logger::new(library, self.clone()) } + + /// Shuts down this `LoggerProvider` + fn shutdown(&self) -> Vec> { + // mark itself as already shutdown + self.is_shutdown + .store(true, std::sync::atomic::Ordering::Relaxed); + // propagate the shutdown signal to processors + // it's up to the processor to properly block new logs after shutdown + self.inner + .processors + .iter() + .map(|processor| processor.shutdown()) + .collect() + } } impl LoggerProvider { @@ -105,27 +119,6 @@ impl LoggerProvider { .map(|processor| processor.force_flush()) .collect() } - - /// Shuts down this `LoggerProvider`, panicking on failure. - pub fn shutdown(&mut self) -> Vec> { - // mark itself as already shutdown - self.is_shutdown - .store(true, std::sync::atomic::Ordering::Relaxed); - // propagate the shutdown signal to processors - // it's up to the processor to properly block new logs after shutdown - self.inner - .processors - .iter() - .map(|processor| processor.shutdown()) - .collect() - } - - /// Attempts to shutdown this `LoggerProvider`, succeeding only when - /// all cloned `LoggerProvider` values have been dropped. - // todo: remove this - pub fn try_shutdown(&mut self) -> Option>> { - Some(self.shutdown()) - } } #[derive(Debug)] @@ -464,7 +457,7 @@ mod tests { #[test] fn shutdown_test() { let counter = Arc::new(AtomicU64::new(0)); - let mut logger_provider = LoggerProvider::builder() + let logger_provider = LoggerProvider::builder() .with_log_processor(ShutdownTestLogProcessor::new(counter.clone())) .build(); diff --git a/opentelemetry/src/global/logs.rs b/opentelemetry/src/global/logs.rs index e2b53f43ee..d1f1b1c33a 100644 --- a/opentelemetry/src/global/logs.rs +++ b/opentelemetry/src/global/logs.rs @@ -6,6 +6,7 @@ use std::{ use once_cell::sync::Lazy; +use crate::logs::LogResult; use crate::{ logs::{Logger, LoggerProvider, NoopLoggerProvider}, InstrumentationLibrary, @@ -25,6 +26,9 @@ pub trait ObjectSafeLoggerProvider { &self, library: Arc, ) -> Box; + + /// shutdown the logger provider, logs emitted after this will not be processed. + fn shutdown(&self) -> Vec>; } impl ObjectSafeLoggerProvider for P @@ -38,6 +42,10 @@ where ) -> Box { Box::new(self.library_logger(library)) } + + fn shutdown(&self) -> Vec> { + self.shutdown() + } } pub struct BoxedLogger(Box); @@ -132,5 +140,6 @@ where /// Shut down the current global [`LoggerProvider`]. pub fn shutdown_logger_provider() { - let _ = set_logger_provider(NoopLoggerProvider::new()); + let logger_provider = set_logger_provider(NoopLoggerProvider::new()); + let _ = ObjectSafeLoggerProvider::shutdown(&logger_provider); } diff --git a/opentelemetry/src/logs/logger.rs b/opentelemetry/src/logs/logger.rs index 6d4d48f973..ccc364d1ea 100644 --- a/opentelemetry/src/logs/logger.rs +++ b/opentelemetry/src/logs/logger.rs @@ -1,6 +1,6 @@ use std::{borrow::Cow, sync::Arc}; -use crate::{logs::LogRecord, InstrumentationLibrary, InstrumentationLibraryBuilder, KeyValue}; +use crate::{logs::{LogRecord, LogResult}, InstrumentationLibrary, InstrumentationLibraryBuilder, KeyValue}; #[cfg(feature = "logs_level_enabled")] use super::Severity; @@ -145,4 +145,12 @@ impl<'a, T: LoggerProvider + ?Sized> LoggerBuilder<'a, T> { self.provider .library_logger(Arc::new(self.library_builder.build())) } + + /// Shutdown the logger provider. Noop logger will be returned after shutdown. + /// + /// Note that `shutdown` doesn't mean `Drop`(though `Drop` can call `shutdown`). + /// It simply means the provider will emit anything in the buffer(if applicable) and stop processing + fn shutdown(&self) -> Vec> { + Vec::new() // noop + } } From 1f351aa9d62963c42a9fe86e45204ab98ebcdb1c Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Sun, 7 Apr 2024 14:27:37 -0700 Subject: [PATCH 05/11] revise tests --- opentelemetry-sdk/src/logs/log_emitter.rs | 127 +++++++++------------- opentelemetry/src/global/logs.rs | 4 + opentelemetry/src/logs/logger.rs | 4 +- opentelemetry/src/logs/noop.rs | 5 + 4 files changed, 64 insertions(+), 76 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index b42f4f3489..58f836258c 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -269,14 +269,63 @@ mod tests { use super::*; use opentelemetry::global::{logger, set_logger_provider, shutdown_logger_provider}; - use opentelemetry::logs::Logger; - use opentelemetry::{Key, KeyValue, Value}; use opentelemetry::logs::{Logger, LoggerProvider as _}; + use opentelemetry::{Key, KeyValue, Value}; use std::fmt::{Debug, Formatter}; use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex}; use std::thread; + struct ShutdownTestLogProcessor { + is_shutdown: Arc>, + counter: Arc, + } + + impl Debug for ShutdownTestLogProcessor { + fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } + } + + impl ShutdownTestLogProcessor { + pub(crate) fn new(counter: Arc) -> Self { + ShutdownTestLogProcessor { + is_shutdown: Arc::new(Mutex::new(false)), + counter, + } + } + } + + impl LogProcessor for ShutdownTestLogProcessor { + fn emit(&self, _data: LogData) { + self.is_shutdown + .lock() + .map(|is_shutdown| { + if !*is_shutdown { + self.counter + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + }) + .expect("lock poisoned"); + } + + fn force_flush(&self) -> LogResult<()> { + Ok(()) + } + + fn shutdown(&self) -> LogResult<()> { + self.is_shutdown + .lock() + .map(|mut is_shutdown| *is_shutdown = true) + .expect("lock poisoned"); + Ok(()) + } + + #[cfg(feature = "logs_level_enabled")] + fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { + true + } + } #[test] fn test_logger_provider_default_resource() { let assert_resource = |provider: &super::LoggerProvider, @@ -403,57 +452,6 @@ mod tests { assert_eq!(no_service_name.config().resource.len(), 0); } - struct ShutdownTestLogProcessor { - is_shutdown: Arc>, - counter: Arc, - } - - impl Debug for ShutdownTestLogProcessor { - fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { - todo!() - } - } - - impl ShutdownTestLogProcessor { - pub(crate) fn new(counter: Arc) -> Self { - ShutdownTestLogProcessor { - is_shutdown: Arc::new(Mutex::new(false)), - counter, - } - } - } - - impl LogProcessor for ShutdownTestLogProcessor { - fn emit(&self, _data: LogData) { - self.is_shutdown - .lock() - .map(|is_shutdown| { - if !*is_shutdown { - self.counter - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - } - }) - .expect("lock poisoned"); - } - - fn force_flush(&self) -> LogResult<()> { - Ok(()) - } - - fn shutdown(&self) -> LogResult<()> { - self.is_shutdown - .lock() - .map(|mut is_shutdown| *is_shutdown = true) - .expect("lock poisoned"); - Ok(()) - } - - #[cfg(feature = "logs_level_enabled")] - fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { - true - } - } - #[test] fn shutdown_test() { let counter = Arc::new(AtomicU64::new(0)); @@ -524,36 +522,19 @@ mod tests { thread::sleep(std::time::Duration::from_millis(10)); } - // Intentionally *not* calling shutdown/flush on the provider, but - // instead relying on shutdown_logger_provider which causes the global - // provider to be dropped, leading to the sdk logger provider's drop to - // be called, which is expected to call shutdown on processors. + // shutdown logger provider explicitly call shutdown on the logger shutdown_logger_provider(); // Assert - // shutdown_logger_provider is necessary but not sufficient, as loggers - // hold on to the the provider (via inner provider clones). - assert!(!*shutdown_called.lock().unwrap()); + // shutdown_logger_provider should be shutdown regardless if the logger is dropped. + assert!(*shutdown_called.lock().unwrap()); // flush is never called by the sdk. assert!(!*flush_called.lock().unwrap()); - // Drop one of the logger. Not enough! - drop(logger1); - assert!(!*shutdown_called.lock().unwrap()); - - // drop logger2, which is the only remaining logger in this thread. - // Still not enough! - drop(logger2); - assert!(!*shutdown_called.lock().unwrap()); - - // now signal the spawned thread to end, which causes it to drop its - // logger. Since that is the last logger, the provider (inner provider) - // is finally dropped, triggering shutdown *signal_to_end.lock().unwrap() = true; handle.join().unwrap(); - assert!(*shutdown_called.lock().unwrap()); // flush is never called by the sdk. assert!(!*flush_called.lock().unwrap()); diff --git a/opentelemetry/src/global/logs.rs b/opentelemetry/src/global/logs.rs index d1f1b1c33a..f3459c2c29 100644 --- a/opentelemetry/src/global/logs.rs +++ b/opentelemetry/src/global/logs.rs @@ -98,6 +98,10 @@ impl LoggerProvider for GlobalLoggerProvider { fn library_logger(&self, library: Arc) -> Self::Logger { BoxedLogger(self.provider.boxed_logger(library)) } + + fn shutdown(&self) -> Vec> { + self.provider.shutdown() + } } static GLOBAL_LOGGER_PROVIDER: Lazy> = diff --git a/opentelemetry/src/logs/logger.rs b/opentelemetry/src/logs/logger.rs index ccc364d1ea..4395f864de 100644 --- a/opentelemetry/src/logs/logger.rs +++ b/opentelemetry/src/logs/logger.rs @@ -150,7 +150,5 @@ impl<'a, T: LoggerProvider + ?Sized> LoggerBuilder<'a, T> { /// /// Note that `shutdown` doesn't mean `Drop`(though `Drop` can call `shutdown`). /// It simply means the provider will emit anything in the buffer(if applicable) and stop processing - fn shutdown(&self) -> Vec> { - Vec::new() // noop - } + fn shutdown(&self) -> Vec>; } diff --git a/opentelemetry/src/logs/noop.rs b/opentelemetry/src/logs/noop.rs index c99b891145..7cafd63294 100644 --- a/opentelemetry/src/logs/noop.rs +++ b/opentelemetry/src/logs/noop.rs @@ -1,5 +1,6 @@ use std::{borrow::Cow, sync::Arc}; +use crate::logs::LogResult; use crate::{ logs::{LogRecord, Logger, LoggerProvider}, InstrumentationLibrary, KeyValue, @@ -32,6 +33,10 @@ impl LoggerProvider for NoopLoggerProvider { ) -> Self::Logger { NoopLogger(()) } + + fn shutdown(&self) -> Vec> { + vec![] + } } /// A no-op implementation of a [`Logger`] From 90c5e1cf54ac6111f01ace943f21f13639d0a214 Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Mon, 22 Apr 2024 23:02:18 -0700 Subject: [PATCH 06/11] address comments --- opentelemetry-sdk/src/logs/log_emitter.rs | 22 ++++++++++------------ opentelemetry/src/logs/logger.rs | 20 +++++++++++++------- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 58f836258c..c317631d6a 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -16,18 +16,16 @@ use opentelemetry::logs::Severity; use std::sync::atomic::AtomicBool; use std::{borrow::Cow, sync::Arc}; -use lazy_static::lazy_static; - -lazy_static! { - // a no nop logger provider used as placeholder when the provider is shutdown - static ref NOOP_LOGGER_PROVIDER: LoggerProvider = LoggerProvider { - inner: Arc::new(LoggerProviderInner { - processors: Vec::new(), - config: Config::default(), - }), - is_shutdown: Arc::new(AtomicBool::new(true)), - }; -} +use once_cell::sync::Lazy; + +// a no nop logger provider used as placeholder when the provider is shutdown +static NOOP_LOGGER_PROVIDER: Lazy = Lazy::new(|| LoggerProvider { + inner: Arc::new(LoggerProviderInner { + processors: Vec::new(), + config: Config::default(), + }), + is_shutdown: Arc::new(AtomicBool::new(true)), +}); #[derive(Debug, Clone)] /// Creator for `Logger` instances. diff --git a/opentelemetry/src/logs/logger.rs b/opentelemetry/src/logs/logger.rs index 4395f864de..d24f8055fb 100644 --- a/opentelemetry/src/logs/logger.rs +++ b/opentelemetry/src/logs/logger.rs @@ -1,6 +1,9 @@ use std::{borrow::Cow, sync::Arc}; -use crate::{logs::{LogRecord, LogResult}, InstrumentationLibrary, InstrumentationLibraryBuilder, KeyValue}; +use crate::{ + logs::{LogRecord, LogResult}, + InstrumentationLibrary, InstrumentationLibraryBuilder, KeyValue, +}; #[cfg(feature = "logs_level_enabled")] use super::Severity; @@ -114,6 +117,15 @@ pub trait LoggerProvider { fn logger(&self, name: impl Into>) -> Self::Logger { self.logger_builder(name).build() } + + /// Shuts down this `LoggerProvider` + /// After shutdown the logger provider cannot be used to create new loggers. + /// The implementation should propagate the shutdown signal down to ensure no new logs are emitted. + /// Shutdown the logger provider. Noop logger will be returned after shutdown. + /// + /// Note that `shutdown` doesn't mean `Drop`(though `Drop` can call `shutdown`). + /// It simply means the provider will emit anything in the buffer(if applicable) and stop processing + fn shutdown(&self) -> Vec>; } #[derive(Debug)] @@ -145,10 +157,4 @@ impl<'a, T: LoggerProvider + ?Sized> LoggerBuilder<'a, T> { self.provider .library_logger(Arc::new(self.library_builder.build())) } - - /// Shutdown the logger provider. Noop logger will be returned after shutdown. - /// - /// Note that `shutdown` doesn't mean `Drop`(though `Drop` can call `shutdown`). - /// It simply means the provider will emit anything in the buffer(if applicable) and stop processing - fn shutdown(&self) -> Vec>; } From 4daa2af28fa26df2d9869cabb6da654a2f72ae76 Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Mon, 22 Apr 2024 23:08:57 -0700 Subject: [PATCH 07/11] CHANGELOG --- opentelemetry-sdk/CHANGELOG.md | 1 + opentelemetry/CHANGELOG.md | 1 + 2 files changed, 2 insertions(+) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index f72ea62851..b4dd5ec11b 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -13,6 +13,7 @@ `ProcessResourceDetector` resource detectors, use the [`opentelemetry-resource-detector`](https://crates.io/crates/opentelemetry-resource-detectors) instead. - Baggage propagation error will be reported to global error handler [#1640](https://github.com/open-telemetry/opentelemetry-rust/pull/1640) +- Make `LoggerProvider`'s `shutdown` method taking immutable reference [#1643](https://github.com/open-telemetry/opentelemetry-rust/pull/1643) ## v0.22.1 diff --git a/opentelemetry/CHANGELOG.md b/opentelemetry/CHANGELOG.md index e593fcbfa8..364395a955 100644 --- a/opentelemetry/CHANGELOG.md +++ b/opentelemetry/CHANGELOG.md @@ -6,6 +6,7 @@ - [#1623](https://github.com/open-telemetry/opentelemetry-rust/pull/1623) Add global::meter_provider_shutdown - [#1640](https://github.com/open-telemetry/opentelemetry-rust/pull/1640) Add `PropagationError` +- [#1643](https://github.com/open-telemetry/opentelemetry-rust/pull/1643) Add `shutdown` method to `LoggerProvider` ### Removed From 79c0b052d208ced95f769b3fd5ed056a72ade4b1 Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Thu, 25 Apr 2024 20:30:41 -0700 Subject: [PATCH 08/11] address comments --- opentelemetry-sdk/src/logs/log_emitter.rs | 28 +++++++++---------- .../src/testing/logs/in_memory_exporter.rs | 15 +++++----- opentelemetry/src/global/logs.rs | 15 +--------- opentelemetry/src/logs/logger.rs | 14 +--------- opentelemetry/src/logs/noop.rs | 5 ---- 5 files changed, 24 insertions(+), 53 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index c317631d6a..92fca48c02 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -78,20 +78,6 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider { } Logger::new(library, self.clone()) } - - /// Shuts down this `LoggerProvider` - fn shutdown(&self) -> Vec> { - // mark itself as already shutdown - self.is_shutdown - .store(true, std::sync::atomic::Ordering::Relaxed); - // propagate the shutdown signal to processors - // it's up to the processor to properly block new logs after shutdown - self.inner - .processors - .iter() - .map(|processor| processor.shutdown()) - .collect() - } } impl LoggerProvider { @@ -117,6 +103,20 @@ impl LoggerProvider { .map(|processor| processor.force_flush()) .collect() } + + /// Shuts down this `LoggerProvider` + pub fn shutdown(&self) -> Vec> { + // mark itself as already shutdown + self.is_shutdown + .store(true, std::sync::atomic::Ordering::Relaxed); + // propagate the shutdown signal to processors + // it's up to the processor to properly block new logs after shutdown + self.inner + .processors + .iter() + .map(|processor| processor.shutdown()) + .collect() + } } #[derive(Debug)] diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 2c68e00854..e987209fb7 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -91,13 +91,6 @@ impl InMemoryLogsExporterBuilder { } } - /// If set, the records will not be [`InMemoryLogsExporter::reset`] on shutdown. - pub fn keep_records_on_shutdown(self) -> Self { - Self { - reset_on_shutdown: false, - } - } - /// Creates a new instance of `InMemoryLogsExporter`. /// pub fn build(&self) -> InMemoryLogsExporter { @@ -106,6 +99,14 @@ impl InMemoryLogsExporterBuilder { should_reset_on_shutdown: self.reset_on_shutdown, } } + + /// If set, the records will not be [`InMemoryLogsExporter::reset`] on shutdown. + #[cfg(test)] + pub(crate) fn keep_records_on_shutdown(self) -> Self { + Self { + reset_on_shutdown: false, + } + } } impl InMemoryLogsExporter { diff --git a/opentelemetry/src/global/logs.rs b/opentelemetry/src/global/logs.rs index f3459c2c29..e2b53f43ee 100644 --- a/opentelemetry/src/global/logs.rs +++ b/opentelemetry/src/global/logs.rs @@ -6,7 +6,6 @@ use std::{ use once_cell::sync::Lazy; -use crate::logs::LogResult; use crate::{ logs::{Logger, LoggerProvider, NoopLoggerProvider}, InstrumentationLibrary, @@ -26,9 +25,6 @@ pub trait ObjectSafeLoggerProvider { &self, library: Arc, ) -> Box; - - /// shutdown the logger provider, logs emitted after this will not be processed. - fn shutdown(&self) -> Vec>; } impl ObjectSafeLoggerProvider for P @@ -42,10 +38,6 @@ where ) -> Box { Box::new(self.library_logger(library)) } - - fn shutdown(&self) -> Vec> { - self.shutdown() - } } pub struct BoxedLogger(Box); @@ -98,10 +90,6 @@ impl LoggerProvider for GlobalLoggerProvider { fn library_logger(&self, library: Arc) -> Self::Logger { BoxedLogger(self.provider.boxed_logger(library)) } - - fn shutdown(&self) -> Vec> { - self.provider.shutdown() - } } static GLOBAL_LOGGER_PROVIDER: Lazy> = @@ -144,6 +132,5 @@ where /// Shut down the current global [`LoggerProvider`]. pub fn shutdown_logger_provider() { - let logger_provider = set_logger_provider(NoopLoggerProvider::new()); - let _ = ObjectSafeLoggerProvider::shutdown(&logger_provider); + let _ = set_logger_provider(NoopLoggerProvider::new()); } diff --git a/opentelemetry/src/logs/logger.rs b/opentelemetry/src/logs/logger.rs index d24f8055fb..6d4d48f973 100644 --- a/opentelemetry/src/logs/logger.rs +++ b/opentelemetry/src/logs/logger.rs @@ -1,9 +1,6 @@ use std::{borrow::Cow, sync::Arc}; -use crate::{ - logs::{LogRecord, LogResult}, - InstrumentationLibrary, InstrumentationLibraryBuilder, KeyValue, -}; +use crate::{logs::LogRecord, InstrumentationLibrary, InstrumentationLibraryBuilder, KeyValue}; #[cfg(feature = "logs_level_enabled")] use super::Severity; @@ -117,15 +114,6 @@ pub trait LoggerProvider { fn logger(&self, name: impl Into>) -> Self::Logger { self.logger_builder(name).build() } - - /// Shuts down this `LoggerProvider` - /// After shutdown the logger provider cannot be used to create new loggers. - /// The implementation should propagate the shutdown signal down to ensure no new logs are emitted. - /// Shutdown the logger provider. Noop logger will be returned after shutdown. - /// - /// Note that `shutdown` doesn't mean `Drop`(though `Drop` can call `shutdown`). - /// It simply means the provider will emit anything in the buffer(if applicable) and stop processing - fn shutdown(&self) -> Vec>; } #[derive(Debug)] diff --git a/opentelemetry/src/logs/noop.rs b/opentelemetry/src/logs/noop.rs index 7cafd63294..c99b891145 100644 --- a/opentelemetry/src/logs/noop.rs +++ b/opentelemetry/src/logs/noop.rs @@ -1,6 +1,5 @@ use std::{borrow::Cow, sync::Arc}; -use crate::logs::LogResult; use crate::{ logs::{LogRecord, Logger, LoggerProvider}, InstrumentationLibrary, KeyValue, @@ -33,10 +32,6 @@ impl LoggerProvider for NoopLoggerProvider { ) -> Self::Logger { NoopLogger(()) } - - fn shutdown(&self) -> Vec> { - vec![] - } } /// A no-op implementation of a [`Logger`] From 0fcd1672d293cd363d5501949b85fc5b3fdc1fb4 Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Thu, 25 Apr 2024 20:31:59 -0700 Subject: [PATCH 09/11] CHANGELOG --- opentelemetry-sdk/CHANGELOG.md | 2 +- opentelemetry/CHANGELOG.md | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index b4dd5ec11b..b7838fad87 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -13,7 +13,7 @@ `ProcessResourceDetector` resource detectors, use the [`opentelemetry-resource-detector`](https://crates.io/crates/opentelemetry-resource-detectors) instead. - Baggage propagation error will be reported to global error handler [#1640](https://github.com/open-telemetry/opentelemetry-rust/pull/1640) -- Make `LoggerProvider`'s `shutdown` method taking immutable reference [#1643](https://github.com/open-telemetry/opentelemetry-rust/pull/1643) +- Make `shutdown` method in `LoggerProvider` and `LogProcessor` taking immutable reference [#1643](https://github.com/open-telemetry/opentelemetry-rust/pull/1643) ## v0.22.1 diff --git a/opentelemetry/CHANGELOG.md b/opentelemetry/CHANGELOG.md index 364395a955..e593fcbfa8 100644 --- a/opentelemetry/CHANGELOG.md +++ b/opentelemetry/CHANGELOG.md @@ -6,7 +6,6 @@ - [#1623](https://github.com/open-telemetry/opentelemetry-rust/pull/1623) Add global::meter_provider_shutdown - [#1640](https://github.com/open-telemetry/opentelemetry-rust/pull/1640) Add `PropagationError` -- [#1643](https://github.com/open-telemetry/opentelemetry-rust/pull/1643) Add `shutdown` method to `LoggerProvider` ### Removed From 5bd59e755dba60059a8b41be018e3c435c1694e1 Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Thu, 25 Apr 2024 20:36:04 -0700 Subject: [PATCH 10/11] unit test --- opentelemetry-sdk/src/logs/log_emitter.rs | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 92fca48c02..503fcca367 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -520,19 +520,36 @@ mod tests { thread::sleep(std::time::Duration::from_millis(10)); } - // shutdown logger provider explicitly call shutdown on the logger + // Intentionally *not* calling shutdown/flush on the provider, but + // instead relying on shutdown_logger_provider which causes the global + // provider to be dropped, leading to the sdk logger provider's drop to + // be called, which is expected to call shutdown on processors. shutdown_logger_provider(); // Assert - // shutdown_logger_provider should be shutdown regardless if the logger is dropped. - assert!(*shutdown_called.lock().unwrap()); + // shutdown_logger_provider is necessary but not sufficient, as loggers + // hold on to the the provider (via inner provider clones). + assert!(!*shutdown_called.lock().unwrap()); // flush is never called by the sdk. assert!(!*flush_called.lock().unwrap()); + // Drop one of the logger. Not enough! + drop(logger1); + assert!(!*shutdown_called.lock().unwrap()); + + // drop logger2, which is the only remaining logger in this thread. + // Still not enough! + drop(logger2); + assert!(!*shutdown_called.lock().unwrap()); + + // now signal the spawned thread to end, which causes it to drop its + // logger. Since that is the last logger, the provider (inner provider) + // is finally dropped, triggering shutdown *signal_to_end.lock().unwrap() = true; handle.join().unwrap(); + assert!(*shutdown_called.lock().unwrap()); // flush is never called by the sdk. assert!(!*flush_called.lock().unwrap()); From 5bb6471eed64ecefe13608310dbe613cccec97e6 Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Sun, 28 Apr 2024 19:55:55 -0700 Subject: [PATCH 11/11] Update CHANGELOG.md --- opentelemetry-sdk/CHANGELOG.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index b7838fad87..d71d0ca4a1 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -13,7 +13,11 @@ `ProcessResourceDetector` resource detectors, use the [`opentelemetry-resource-detector`](https://crates.io/crates/opentelemetry-resource-detectors) instead. - Baggage propagation error will be reported to global error handler [#1640](https://github.com/open-telemetry/opentelemetry-rust/pull/1640) -- Make `shutdown` method in `LoggerProvider` and `LogProcessor` taking immutable reference [#1643](https://github.com/open-telemetry/opentelemetry-rust/pull/1643) +- Improves `shutdown` behavior of `LoggerProvider` and `LogProcessor` [#1643](https://github.com/open-telemetry/opentelemetry-rust/pull/1643). + - `shutdown` can be called by any clone of the `LoggerProvider` without the need of waiting on all `Logger` drops. Thus, `try_shutdown` has been removed. + - `shutdown` methods in `LoggerProvider` and `LogProcessor` now takes a immutable reference + - After `shutdown`, `LoggerProvider` will return noop `Logger` + - After `shutdown`, `LogProcessor` will not process any new logs ## v0.22.1