Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl LogProcessor for NoopProcessor {
Ok(())
}

fn shutdown(&mut self) -> LogResult<()> {
fn shutdown(&self) -> LogResult<()> {
Ok(())
}

Expand Down
5 changes: 5 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
`ProcessResourceDetector` resource detectors, use the
[`opentelemetry-resource-detector`](https://crates.io/crates/opentelemetry-resource-detectors) instead.
- Baggage propagation error will be reported to global error handler [#1640](https://github.com/open-telemetry/opentelemetry-rust/pull/1640)
- Improves `shutdown` behavior of `LoggerProvider` and `LogProcessor` [#1643](https://github.com/open-telemetry/opentelemetry-rust/pull/1643).
- `shutdown` can be called by any clone of the `LoggerProvider` without the need of waiting on all `Logger` drops. Thus, `try_shutdown` has been removed.
- `shutdown` methods in `LoggerProvider` and `LogProcessor` now takes a immutable reference
- After `shutdown`, `LoggerProvider` will return noop `Logger`
- After `shutdown`, `LogProcessor` will not process any new logs

## v0.22.1

Expand Down
1 change: 1 addition & 0 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ url = { workspace = true, optional = true }
tokio = { workspace = true, features = ["rt", "time"], optional = true }
tokio-stream = { workspace = true, optional = true }
http = { workspace = true, optional = true }
lazy_static = "1.4.0"

[package.metadata.docs.rs]
all-features = true
Expand Down
128 changes: 109 additions & 19 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,25 @@ use opentelemetry::{
#[cfg(feature = "logs_level_enabled")]
use opentelemetry::logs::Severity;

use std::sync::atomic::AtomicBool;
use std::{borrow::Cow, sync::Arc};

use once_cell::sync::Lazy;

// a no nop logger provider used as placeholder when the provider is shutdown
static NOOP_LOGGER_PROVIDER: Lazy<LoggerProvider> = Lazy::new(|| LoggerProvider {
inner: Arc::new(LoggerProviderInner {
processors: Vec::new(),
config: Config::default(),
}),
is_shutdown: Arc::new(AtomicBool::new(true)),
});

#[derive(Debug, Clone)]
/// Creator for `Logger` instances.
pub struct LoggerProvider {
inner: Arc<LoggerProviderInner>,
is_shutdown: Arc<AtomicBool>,
}

/// Default logger name if empty string is provided.
Expand Down Expand Up @@ -59,6 +72,10 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider {
}

fn library_logger(&self, library: Arc<InstrumentationLibrary>) -> Self::Logger {
// If the provider is shutdown, new logger will refer a no-op logger provider.
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return Logger::new(library, NOOP_LOGGER_PROVIDER.clone());
}
Logger::new(library, self.clone())
}
}
Expand Down Expand Up @@ -87,22 +104,18 @@ impl LoggerProvider {
.collect()
}

/// Shuts down this `LoggerProvider`, panicking on failure.
pub fn shutdown(&mut self) -> Vec<LogResult<()>> {
self.try_shutdown()
.expect("cannot shutdown LoggerProvider when child Loggers are still active")
}

/// Attempts to shutdown this `LoggerProvider`, succeeding only when
/// all cloned `LoggerProvider` values have been dropped.
pub fn try_shutdown(&mut self) -> Option<Vec<LogResult<()>>> {
Arc::get_mut(&mut self.inner).map(|inner| {
inner
.processors
.iter_mut()
.map(|processor| processor.shutdown())
.collect()
})
/// Shuts down this `LoggerProvider`
pub fn shutdown(&self) -> Vec<LogResult<()>> {
// mark itself as already shutdown
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
// propagate the shutdown signal to processors
// it's up to the processor to properly block new logs after shutdown
self.inner
.processors
.iter()
.map(|processor| processor.shutdown())
.collect()
}
}

Expand Down Expand Up @@ -168,6 +181,7 @@ impl Builder {
processors: self.processors,
config: self.config,
}),
is_shutdown: Arc::new(AtomicBool::new(false)),
}
}
}
Expand Down Expand Up @@ -253,11 +267,63 @@ mod tests {

use super::*;
use opentelemetry::global::{logger, set_logger_provider, shutdown_logger_provider};
use opentelemetry::logs::Logger;
use opentelemetry::logs::{Logger, LoggerProvider as _};
use opentelemetry::{Key, KeyValue, Value};
use std::sync::Mutex;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
use std::thread;

struct ShutdownTestLogProcessor {
is_shutdown: Arc<Mutex<bool>>,
counter: Arc<AtomicU64>,
}

impl Debug for ShutdownTestLogProcessor {
fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
todo!()
}
}

impl ShutdownTestLogProcessor {
pub(crate) fn new(counter: Arc<AtomicU64>) -> Self {
ShutdownTestLogProcessor {
is_shutdown: Arc::new(Mutex::new(false)),
counter,
}
}
}

impl LogProcessor for ShutdownTestLogProcessor {
fn emit(&self, _data: LogData) {
self.is_shutdown
.lock()
.map(|is_shutdown| {
if !*is_shutdown {
self.counter
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
})
.expect("lock poisoned");
}

fn force_flush(&self) -> LogResult<()> {
Ok(())
}

fn shutdown(&self) -> LogResult<()> {
self.is_shutdown
.lock()
.map(|mut is_shutdown| *is_shutdown = true)
.expect("lock poisoned");
Ok(())
}

#[cfg(feature = "logs_level_enabled")]
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
true
}
}
#[test]
fn test_logger_provider_default_resource() {
let assert_resource = |provider: &super::LoggerProvider,
Expand Down Expand Up @@ -386,6 +452,30 @@ mod tests {

#[test]
fn shutdown_test() {
let counter = Arc::new(AtomicU64::new(0));
let logger_provider = LoggerProvider::builder()
.with_log_processor(ShutdownTestLogProcessor::new(counter.clone()))
.build();

let logger1 = logger_provider.logger("test-logger1");
let logger2 = logger_provider.logger("test-logger2");
logger1.emit(LogRecord::default());
logger2.emit(LogRecord::default());

let logger3 = logger_provider.logger("test-logger3");
let handle = thread::spawn(move || {
logger3.emit(LogRecord::default());
});
handle.join().expect("thread panicked");

let _ = logger_provider.shutdown();
logger1.emit(LogRecord::default());

assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 3);
}

#[test]
fn global_shutdown_test() {
// cargo test shutdown_test --features=logs

// Arrange
Expand Down Expand Up @@ -493,7 +583,7 @@ mod tests {
Ok(())
}

fn shutdown(&mut self) -> LogResult<()> {
fn shutdown(&self) -> LogResult<()> {
*self.shutdown_called.lock().unwrap() = true;
Ok(())
}
Expand Down
78 changes: 75 additions & 3 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use opentelemetry::{
global,
logs::{LogError, LogResult},
};
use std::sync::atomic::AtomicBool;
use std::{cmp::min, env, sync::Mutex};
use std::{
fmt::{self, Debug, Formatter},
Expand Down Expand Up @@ -46,7 +47,9 @@ pub trait LogProcessor: Send + Sync + Debug {
/// Force the logs lying in the cache to be exported.
fn force_flush(&self) -> LogResult<()>;
/// Shuts down the processor.
fn shutdown(&mut self) -> LogResult<()>;
/// After shutdown returns the log processor should stop processing any logs.
/// It's up to the implementation on when to drop the LogProcessor.
fn shutdown(&self) -> LogResult<()>;
#[cfg(feature = "logs_level_enabled")]
/// Check if logging is enabled
fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool;
Expand All @@ -59,18 +62,25 @@ pub trait LogProcessor: Send + Sync + Debug {
#[derive(Debug)]
pub struct SimpleLogProcessor {
exporter: Mutex<Box<dyn LogExporter>>,
is_shutdown: AtomicBool,
}

impl SimpleLogProcessor {
pub(crate) fn new(exporter: Box<dyn LogExporter>) -> Self {
SimpleLogProcessor {
exporter: Mutex::new(exporter),
is_shutdown: AtomicBool::new(false),
}
}
}

impl LogProcessor for SimpleLogProcessor {
fn emit(&self, data: LogData) {
// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return;
}

let result = self
.exporter
.lock()
Expand All @@ -85,7 +95,9 @@ impl LogProcessor for SimpleLogProcessor {
Ok(())
}

fn shutdown(&mut self) -> LogResult<()> {
fn shutdown(&self) -> LogResult<()> {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Ok(())
Expand Down Expand Up @@ -141,7 +153,7 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
.and_then(std::convert::identity)
}

fn shutdown(&mut self) -> LogResult<()> {
fn shutdown(&self) -> LogResult<()> {
let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Shutdown(res_sender))
Expand Down Expand Up @@ -458,6 +470,9 @@ mod tests {
BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY,
};
use crate::export::logs::LogData;
use crate::logs::{LogProcessor, SimpleLogProcessor};
use crate::testing::logs::InMemoryLogsExporterBuilder;
use crate::{
logs::{
log_processor::{
Expand Down Expand Up @@ -620,4 +635,61 @@ mod tests {
assert_eq!(actual.max_export_timeout, Duration::from_millis(3));
assert_eq!(actual.max_queue_size, 4);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_batch_shutdown() {
// assert we will receive an error
// setup
let exporter = InMemoryLogsExporterBuilder::default()
.keep_records_on_shutdown()
.build();
let processor = BatchLogProcessor::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);
processor.emit(LogData {
record: Default::default(),
resource: Default::default(),
instrumentation: Default::default(),
});
processor.force_flush().unwrap();
processor.shutdown().unwrap();
// todo: expect to see errors here. How should we assert this?
processor.emit(LogData {
record: Default::default(),
resource: Default::default(),
instrumentation: Default::default(),
});
assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
}

#[test]
fn test_simple_shutdown() {
let exporter = InMemoryLogsExporterBuilder::default()
.keep_records_on_shutdown()
.build();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));

processor.emit(LogData {
record: Default::default(),
resource: Default::default(),
instrumentation: Default::default(),
});

processor.shutdown().unwrap();

let is_shutdown = processor
.is_shutdown
.load(std::sync::atomic::Ordering::Relaxed);
assert!(is_shutdown);

processor.emit(LogData {
record: Default::default(),
resource: Default::default(),
instrumentation: Default::default(),
});

assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
}
}
Loading