-
Notifications
You must be signed in to change notification settings - Fork 100
Log pushing support #623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Log pushing support #623
Conversation
logs_out: CoreLogsOut, | ||
} | ||
|
||
impl CoreLogBuffer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could expose fuzzy len()
, but no use case yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me, just one thing to maybe change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me.
Wondering if we want to also implement throttling and batching here, I would want this for TS and I would recommend using it in Python too.
I don't think you can do this properly here because it's dependent on how each lang will schedule a timer for draining. In Python for instance, I think we'll spawn a future call to drain if one is not already waiting. There are some race conditions. I will try out a potential Python implementation before I merge this. Maybe I can have some common tokio code shared by langs. |
Hmm... can't the timer be scheduled on the tokio runtime? Is that not accessible here? |
After some discussion, I am going to try a stream-based log buffer without ringbuf EDIT: Pushed stream-based consumer, lang can call |
|
||
impl CoreLogStreamConsumer { | ||
/// Create a stream consumer and stream of logs. | ||
pub fn new(buffer: usize) -> (Self, Receiver<CoreLog>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is clearer here to return Receiver
than some kind of Box<dyn Stream<Item = CoreLog>>
because it's not sized which is required by some StreamExt
calls. Also, this is why I use future mpsc instead of Tokio mpsc - the former implements Stream
already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just return impl Stream<Item = CoreLog>>
, but this is fine too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I wasn't able to cleanly mark the trait as sized which was biting me when using some of the StreamExt
calls (though can uncleanly do it if I must). Yeah, I usually wouldn't want to leak the specific trait impl I am using to the caller, but that it's sized and this is not some user-facing API settled it for me.
Confirmed this works well with Python. Merging to incorporate into Python. |
What was changed
Create a new Telemetry option
temporal_sdk_core_api::telemetry::Logger::Push
which accepts a filter like the others, but also accepts an implementation of theCoreLogConsumer
trait which is a simple single-function trait invoked for each log.Also exposed a
temporal_sdk_core::telemetry::CoreLogBufferedConsumer
struct which, uponnew
, will return itself (an implementation ofCoreLogConsumer
) and atemporal_sdk_core::telemetry::CoreLogBuffer
upon whichdrain
can be invoked. Also altered log forwarding to leverage the new abstraction which means the foward.Also expose a
temporal_sdk_core::telemetry::CoreLogStreamConsumer
that is backed by a bounded mpsc channel.For langs where they need to buffer/throttle on a max-frequency, they are expected to use
CoreLogStreamConsumer::new
and for the stream response (second part of tuple), they can call tokio_stream::StreamExt::chunks_timeout on it.Checklist