Skip to content

Conversation

cretz
Copy link
Member

@cretz cretz commented Nov 1, 2023

What was changed

Create a new Telemetry option temporal_sdk_core_api::telemetry::Logger::Push which accepts a filter like the others, but also accepts an implementation of the CoreLogConsumer trait which is a simple single-function trait invoked for each log.

Also exposed a temporal_sdk_core::telemetry::CoreLogBufferedConsumer struct which, upon new, will return itself (an implementation of CoreLogConsumer) and a temporal_sdk_core::telemetry::CoreLogBuffer upon which drain can be invoked. Also altered log forwarding to leverage the new abstraction which means the foward.

Also expose a temporal_sdk_core::telemetry::CoreLogStreamConsumer that is backed by a bounded mpsc channel.

For langs where they need to buffer/throttle on a max-frequency, they are expected to use CoreLogStreamConsumer::new and for the stream response (second part of tuple), they can call tokio_stream::StreamExt::chunks_timeout on it.

Checklist

  1. Closes [Feature Request] Support custom trait for log capture #618

@cretz cretz requested a review from a team as a code owner November 1, 2023 15:58
logs_out: CoreLogsOut,
}

impl CoreLogBuffer {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could expose fuzzy len(), but no use case yet

Copy link
Member

@Sushisource Sushisource left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me, just one thing to maybe change

Copy link
Member

@bergundy bergundy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me.
Wondering if we want to also implement throttling and batching here, I would want this for TS and I would recommend using it in Python too.

@cretz
Copy link
Member Author

cretz commented Nov 1, 2023

Wondering if we want to also implement throttling and batching here, I would want this for TS and I would recommend using it in Python too.

I don't think you can do this properly here because it's dependent on how each lang will schedule a timer for draining. In Python for instance, I think we'll spawn a future call to drain if one is not already waiting. There are some race conditions. I will try out a potential Python implementation before I merge this. Maybe I can have some common tokio code shared by langs.

@bergundy
Copy link
Member

bergundy commented Nov 1, 2023

Hmm... can't the timer be scheduled on the tokio runtime? Is that not accessible here?

@cretz
Copy link
Member Author

cretz commented Nov 1, 2023

After some discussion, I am going to try a stream-based log buffer without ringbuf

EDIT: Pushed stream-based consumer, lang can call tokio_util::StreamExt::chunks_timeout to limit it.


impl CoreLogStreamConsumer {
/// Create a stream consumer and stream of logs.
pub fn new(buffer: usize) -> (Self, Receiver<CoreLog>) {
Copy link
Member Author

@cretz cretz Nov 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is clearer here to return Receiver than some kind of Box<dyn Stream<Item = CoreLog>> because it's not sized which is required by some StreamExt calls. Also, this is why I use future mpsc instead of Tokio mpsc - the former implements Stream already.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just return impl Stream<Item = CoreLog>>, but this is fine too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I wasn't able to cleanly mark the trait as sized which was biting me when using some of the StreamExt calls (though can uncleanly do it if I must). Yeah, I usually wouldn't want to leak the specific trait impl I am using to the caller, but that it's sized and this is not some user-facing API settled it for me.

@cretz
Copy link
Member Author

cretz commented Nov 3, 2023

Confirmed this works well with Python. Merging to incorporate into Python.

@cretz cretz merged commit 7b0b170 into temporalio:master Nov 3, 2023
@cretz cretz deleted the log-push branch November 3, 2023 12:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature Request] Support custom trait for log capture

3 participants