diff --git a/codetracer-python-recorder/Cargo.lock b/codetracer-python-recorder/Cargo.lock index 2399ba2..1809808 100644 --- a/codetracer-python-recorder/Cargo.lock +++ b/codetracer-python-recorder/Cargo.lock @@ -74,10 +74,14 @@ checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" name = "codetracer-python-recorder" version = "0.1.0" dependencies = [ + "base64", "bitflags", + "crossbeam-channel", "dashmap", + "libc", "log", "once_cell", + "os_pipe", "pyo3", "recorder-errors", "runtime_tracing", @@ -85,8 +89,24 @@ dependencies = [ "serde_json", "tempfile", "uuid", + "windows-sys 0.59.0", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "dashmap" version = "5.5.3" @@ -113,7 +133,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.60.2", ] [[package]] @@ -256,6 +276,16 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "os_pipe" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db335f4760b14ead6290116f2427bf33a14d4f0617d49f78a246de10c1831224" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "parking_lot_core" version = "0.9.11" @@ -412,7 +442,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys", + "windows-sys 0.60.2", ] [[package]] @@ -515,7 +545,7 @@ dependencies = [ "getrandom", "once_cell", "rustix", - "windows-sys", + "windows-sys 0.60.2", ] [[package]] @@ -615,6 +645,15 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.60.2" diff --git a/codetracer-python-recorder/Cargo.toml b/codetracer-python-recorder/Cargo.toml index d368a27..8bc27d8 100644 --- a/codetracer-python-recorder/Cargo.toml +++ b/codetracer-python-recorder/Cargo.toml @@ -23,6 +23,7 @@ default = ["extension-module"] [dependencies] pyo3 = { version = "0.25.1" } runtime_tracing = "0.14.0" +base64 = "0.22" bitflags = "2.4" once_cell = "1.19" dashmap = "5.5" @@ -31,7 +32,22 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" uuid = { version = "1.10", features = ["v4"] } recorder-errors = { version = "0.1.0", path = "crates/recorder-errors" } +crossbeam-channel = "0.5" +libc = "0.2" [dev-dependencies] pyo3 = { version = "0.25.1", features = ["auto-initialize"] } tempfile = "3.10" + +[target.'cfg(unix)'.dev-dependencies] +os_pipe = "1.2" + +[target.'cfg(windows)'.dependencies] +windows-sys = { version = "0.59", features = [ + "Win32_Foundation", + "Win32_System_IO", + "Win32_System_Pipes", + "Win32_System_Threading", + "Win32_System_WindowsProgramming", + "Win32_Storage_FileSystem", +] } diff --git a/codetracer-python-recorder/src/ffi.rs b/codetracer-python-recorder/src/ffi.rs index 04f51c4..9ca4783 100644 --- a/codetracer-python-recorder/src/ffi.rs +++ b/codetracer-python-recorder/src/ffi.rs @@ -145,12 +145,9 @@ mod tests { #[test] fn map_recorder_error_sets_python_attributes() { Python::with_gil(|py| { - let err = usage!( - ErrorCode::UnsupportedFormat, - "invalid trace format" - ) - .with_context("format", "yaml") - .with_source(std::io::Error::new(std::io::ErrorKind::Other, "boom")); + let err = usage!(ErrorCode::UnsupportedFormat, "invalid trace format") + .with_context("format", "yaml") + .with_source(std::io::Error::new(std::io::ErrorKind::Other, "boom")); let pyerr = map_recorder_error(err); let ty = pyerr.get_type(py); assert!(ty.is(py.get_type::())); @@ -191,9 +188,8 @@ mod tests { #[test] fn dispatch_converts_recorder_error_to_pyerr() { Python::with_gil(|py| { - let result: PyResult<()> = dispatch("dispatch_env", || { - Err(enverr!(ErrorCode::Io, "disk full")) - }); + let result: PyResult<()> = + dispatch("dispatch_env", || Err(enverr!(ErrorCode::Io, "disk full"))); let err = result.expect_err("expected PyErr"); let ty = err.get_type(py); assert!(ty.is(py.get_type::())); diff --git a/codetracer-python-recorder/src/runtime/frame_inspector.rs b/codetracer-python-recorder/src/runtime/frame_inspector.rs index 3bb06d2..f7c5874 100644 --- a/codetracer-python-recorder/src/runtime/frame_inspector.rs +++ b/codetracer-python-recorder/src/runtime/frame_inspector.rs @@ -40,6 +40,11 @@ impl<'py> FrameSnapshot<'py> { pub fn locals_is_globals(&self) -> bool { self.locals_is_globals } + + /// Return a stable identifier for the captured frame. + pub fn frame_id(&self) -> usize { + self.frame_ptr as usize + } } impl<'py> Drop for FrameSnapshot<'py> { diff --git a/codetracer-python-recorder/src/runtime/io_capture/mod.rs b/codetracer-python-recorder/src/runtime/io_capture/mod.rs new file mode 100644 index 0000000..3451b6a --- /dev/null +++ b/codetracer-python-recorder/src/runtime/io_capture/mod.rs @@ -0,0 +1,484 @@ +//! Cross-platform IO capture workers for stdout, stderr, and stdin. + +use std::thread::JoinHandle; +use std::time::Instant; + +use base64::engine::general_purpose::STANDARD as BASE64_ENGINE; +use base64::Engine; +use crossbeam_channel::{bounded, Receiver, Sender}; +use recorder_errors::{bug, enverr, ErrorCode, RecorderResult}; +use runtime_tracing::{EventLogKind, RecordEvent, TraceLowLevelEvent, TraceWriter}; +use serde::Serialize; + +use crate::errors::Result; + +use super::thread_snapshots::{SnapshotEntry, ThreadSnapshotStore}; +use super::trace_writer_host::TraceWriterHost; +use super::IoDrain; + +#[cfg(unix)] +mod unix; +#[cfg(unix)] +use unix as platform; + +#[cfg(windows)] +mod windows; +#[cfg(windows)] +use windows as platform; + +const CHANNEL_CAPACITY: usize = 1024; + +pub type IoChunkReceiver = Receiver; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum StreamKind { + Stdout, + Stderr, + Stdin, +} + +impl StreamKind { + pub fn as_str(self) -> &'static str { + match self { + StreamKind::Stdout => "stdout", + StreamKind::Stderr => "stderr", + StreamKind::Stdin => "stdin", + } + } +} + +#[derive(Debug)] +pub struct IoChunk { + pub stream: StreamKind, + pub timestamp: Instant, + pub bytes: Vec, + pub producer_thread: std::thread::ThreadId, +} + +type WorkerHandle = JoinHandle>; + +pub struct IoCapture { + receiver: Option, + workers: Vec, + platform: Option, +} + +impl IoCapture { + pub fn start() -> Result { + let (tx, rx) = bounded(CHANNEL_CAPACITY); + let (controller, workers) = platform::start(tx)?; + Ok(Self { + receiver: Some(rx), + workers, + platform: Some(controller), + }) + } + + pub fn take_receiver(&mut self) -> Option { + self.receiver.take() + } + + pub fn shutdown(&mut self) -> Result<()> { + if let Some(mut controller) = self.platform.take() { + controller.restore()?; + } + self.join_workers() + } + + fn join_workers(&mut self) -> Result<()> { + for handle in self.workers.drain(..) { + match handle.join() { + Ok(Ok(())) => {} + Ok(Err(err)) => return Err(err), + Err(panic) => { + let message = if let Some(msg) = panic.downcast_ref::<&'static str>() { + msg.to_string() + } else if let Some(msg) = panic.downcast_ref::() { + msg.clone() + } else { + "unknown panic".to_string() + }; + return Err(bug!(ErrorCode::Unknown, "IO capture worker panicked") + .with_context("details", message)); + } + } + } + Ok(()) + } +} + +impl Drop for IoCapture { + fn drop(&mut self) { + if self.platform.is_some() { + if let Err(err) = self.shutdown() { + log::error!("failed to shutdown IO capture cleanly: {}", err); + } + } + } +} + +#[derive(Clone)] +struct IoEventSink { + writer: TraceWriterHost, + snapshots: ThreadSnapshotStore, + start: Instant, +} + +impl IoEventSink { + fn new(writer: TraceWriterHost, snapshots: ThreadSnapshotStore) -> Self { + Self { + writer, + snapshots, + start: Instant::now(), + } + } + + fn pump(self, receiver: IoChunkReceiver) -> Result<()> { + let mut sink = self; + while let Ok(chunk) = receiver.recv() { + sink.record_chunk(chunk)?; + } + Ok(()) + } + + fn record_chunk(&mut self, chunk: IoChunk) -> Result<()> { + let event = TraceLowLevelEvent::Event(RecordEvent { + kind: map_stream_to_event_kind(chunk.stream), + metadata: self.serialize_metadata(&chunk)?, + content: BASE64_ENGINE.encode(&chunk.bytes), + }); + + let mut writer = self.writer.lock()?; + writer.add_event(event); + Ok(()) + } + + fn serialize_metadata(&self, chunk: &IoChunk) -> Result { + let metadata = IoChunkMetadata { + stream: chunk.stream.as_str(), + timestamp_ns: self.relative_timestamp_ns(chunk.timestamp), + thread_id: format!("{:?}", chunk.producer_thread), + byte_len: chunk.bytes.len(), + snapshot: self.snapshot_metadata(chunk.producer_thread), + }; + + serde_json::to_string(&metadata).map_err(|err| { + bug!( + ErrorCode::Unknown, + "failed to encode IO chunk metadata as JSON" + ) + .with_context("error", err.to_string()) + }) + } + + fn relative_timestamp_ns(&self, timestamp: Instant) -> u128 { + match timestamp.checked_duration_since(self.start) { + Some(duration) => duration.as_nanos(), + None => 0, + } + } + + fn snapshot_metadata(&self, thread_id: std::thread::ThreadId) -> Option { + let snapshot = self + .snapshots + .snapshot_for(thread_id) + .or_else(|| self.snapshots.latest()); + snapshot.map(IoSnapshotMetadata::from) + } +} + +#[derive(Serialize)] +struct IoChunkMetadata<'a> { + stream: &'a str, + timestamp_ns: u128, + thread_id: String, + byte_len: usize, + #[serde(skip_serializing_if = "Option::is_none")] + snapshot: Option, +} + +#[derive(Serialize)] +struct IoSnapshotMetadata { + path_id: usize, + line: i64, + frame_id: usize, +} + +impl From for IoSnapshotMetadata { + fn from(entry: SnapshotEntry) -> Self { + IoSnapshotMetadata { + path_id: entry.path_id.0, + line: entry.line.0, + frame_id: entry.frame_id, + } + } +} + +fn map_stream_to_event_kind(stream: StreamKind) -> EventLogKind { + match stream { + StreamKind::Stdout => EventLogKind::Write, + StreamKind::Stderr => EventLogKind::WriteOther, + StreamKind::Stdin => EventLogKind::Read, + } +} + +pub struct ActiveCapture { + capture: IoCapture, + sink: Option, +} + +impl ActiveCapture { + pub fn start(writer: TraceWriterHost, snapshots: ThreadSnapshotStore) -> Result { + let mut capture = IoCapture::start()?; + let receiver = capture.take_receiver().ok_or_else(|| { + bug!( + ErrorCode::Unknown, + "IO capture receiver already taken before sink start" + ) + })?; + + let sink_worker = spawn_reader_thread("sink", move || { + IoEventSink::new(writer, snapshots).pump(receiver) + })?; + + Ok(Self { + capture, + sink: Some(sink_worker), + }) + } + + fn join_sink(&mut self) -> Result<()> { + if let Some(handle) = self.sink.take() { + match handle.join() { + Ok(result) => result, + Err(panic) => { + let message = if let Some(msg) = panic.downcast_ref::<&'static str>() { + (*msg).to_string() + } else if let Some(msg) = panic.downcast_ref::() { + msg.clone() + } else { + "unknown panic".to_string() + }; + Err(bug!(ErrorCode::Unknown, "IO capture sink worker panicked") + .with_context("details", message)) + } + } + } else { + Ok(()) + } + } +} + +impl IoDrain for ActiveCapture { + fn drain(&mut self, _py: pyo3::Python<'_>) -> RecorderResult<()> { + let shutdown_result = self.capture.shutdown(); + let sink_result = self.join_sink(); + shutdown_result.and(sink_result) + } +} + +impl Drop for ActiveCapture { + fn drop(&mut self) { + if let Err(err) = self.capture.shutdown() { + log::error!("failed to shutdown IO capture cleanly: {}", err); + } + if let Err(err) = self.join_sink() { + log::error!("failed to join IO sink worker: {}", err); + } + } +} + +pub(super) fn spawn_reader_thread(name: &'static str, worker: F) -> Result +where + F: FnOnce() -> Result<()> + Send + 'static, +{ + std::thread::Builder::new() + .name(format!("io-capture-{}", name)) + .spawn(worker) + .map_err(|err| { + enverr!( + recorder_errors::ErrorCode::Io, + "failed to spawn IO capture worker thread" + ) + .with_context("thread", name) + .with_context("error", err.to_string()) + }) +} + +fn build_chunk(stream: StreamKind, bytes: Vec) -> IoChunk { + IoChunk { + stream, + timestamp: Instant::now(), + producer_thread: std::thread::current().id(), + bytes, + } +} + +fn send_chunk(sender: &Sender, chunk: IoChunk) { + let stream = chunk.stream; + if let Err(err) = sender.send(chunk) { + log::warn!( + "dropping IO chunk for {} because receiver closed: {}", + stream.as_str(), + err + ); + } +} + +pub(super) fn send_buffer(sender: &Sender, stream: StreamKind, buffer: &[u8]) { + if buffer.is_empty() { + return; + } + send_chunk(sender, build_chunk(stream, buffer.to_vec())); +} + +#[cfg(test)] +mod tests { + use super::*; + + use once_cell::sync::Lazy; + use std::io::{Read, Write}; + use std::sync::Mutex; + use std::time::Duration; + + static STDIO_GUARD: Lazy> = Lazy::new(|| Mutex::new(())); + + #[cfg(unix)] + use os_pipe::pipe; + #[cfg(unix)] + use std::os::fd::AsRawFd; + + #[cfg(unix)] + #[test] + fn captures_stdout_and_preserves_passthrough() { + let _guard = STDIO_GUARD.lock().unwrap(); + + let (mut reader, writer) = pipe().expect("pipe"); + let original_stdout = unsafe { libc::dup(libc::STDOUT_FILENO) }; + assert!(original_stdout >= 0); + unsafe { + libc::dup2(writer.as_raw_fd(), libc::STDOUT_FILENO); + } + drop(writer); + + let mut capture = IoCapture::start().expect("start capture"); + let receiver = capture.take_receiver().expect("receiver available"); + + write!(std::io::stdout(), "hello").expect("write stdout"); + std::io::stdout().flush().expect("flush stdout"); + + let chunk = receiver + .recv_timeout(Duration::from_secs(1)) + .expect("chunk arrived"); + assert_eq!(chunk.stream, StreamKind::Stdout); + assert_eq!(chunk.bytes, b"hello"); + + let mut mirror_buf = [0u8; 5]; + let read = reader.read(&mut mirror_buf).expect("read mirror output"); + assert_eq!(&mirror_buf[..read], b"hello"); + + capture.shutdown().expect("shutdown capture"); + unsafe { + libc::dup2(original_stdout, libc::STDOUT_FILENO); + libc::close(original_stdout); + } + } + + #[cfg(unix)] + #[test] + fn captures_stdin_and_forwards_bytes() { + let _guard = STDIO_GUARD.lock().unwrap(); + + let (reader, mut writer) = pipe().expect("pipe"); + let original_stdin = unsafe { libc::dup(libc::STDIN_FILENO) }; + assert!(original_stdin >= 0); + unsafe { + libc::dup2(reader.as_raw_fd(), libc::STDIN_FILENO); + } + drop(reader); + + let mut capture = IoCapture::start().expect("start capture"); + let receiver = capture.take_receiver().expect("receiver available"); + + let read_thread = std::thread::spawn(|| { + let mut buf = [0u8; 5]; + std::io::stdin() + .read_exact(&mut buf) + .expect("read from stdin through capture"); + buf + }); + + writer + .write_all(b"world") + .expect("write to synthetic stdin"); + writer.flush().expect("flush synthetic stdin"); + drop(writer); + + let forwarded = read_thread.join().expect("join reader"); + assert_eq!(&forwarded, b"world"); + + let chunk = receiver + .recv_timeout(Duration::from_secs(1)) + .expect("chunk arrived"); + assert_eq!(chunk.stream, StreamKind::Stdin); + assert_eq!(chunk.bytes, b"world"); + + capture.shutdown().expect("shutdown capture"); + unsafe { + libc::dup2(original_stdin, libc::STDIN_FILENO); + libc::close(original_stdin); + } + } + + #[cfg(windows)] + #[test] + fn captures_stdout_on_windows() { + use std::fs::OpenOptions; + use std::os::windows::io::IntoRawHandle; + use tempfile::NamedTempFile; + + let _guard = STDIO_GUARD.lock().unwrap(); + + let tmp = NamedTempFile::new().expect("temp file"); + let path = tmp.path().to_path_buf(); + let file = tmp.into_file(); + let handle = file.into_raw_handle(); + let fd = unsafe { libc::_open_osfhandle(handle as isize, libc::_O_BINARY | libc::_O_RDWR) }; + assert!(fd >= 0); + + let original_stdout = unsafe { libc::_dup(1) }; + assert!(original_stdout >= 0); + unsafe { + libc::_dup2(fd, 1); + } + + let mut capture = IoCapture::start().expect("start capture"); + let receiver = capture.take_receiver().expect("receiver available"); + + write!(std::io::stdout(), "hiwin").expect("write stdout"); + std::io::stdout().flush().expect("flush stdout"); + + let chunk = receiver + .recv_timeout(Duration::from_secs(1)) + .expect("chunk arrived"); + assert_eq!(chunk.stream, StreamKind::Stdout); + assert_eq!(chunk.bytes, b"hiwin"); + + capture.shutdown().expect("shutdown capture"); + unsafe { + libc::_dup2(original_stdout, 1); + libc::_close(original_stdout); + libc::_close(fd); + } + + let mut stored = Vec::new(); + let mut reopened = OpenOptions::new() + .read(true) + .open(path) + .expect("re-open temp file"); + reopened + .read_to_end(&mut stored) + .expect("read captured file"); + assert_eq!(stored, b"hiwin"); + } +} diff --git a/codetracer-python-recorder/src/runtime/io_capture/unix.rs b/codetracer-python-recorder/src/runtime/io_capture/unix.rs new file mode 100644 index 0000000..0c11689 --- /dev/null +++ b/codetracer-python-recorder/src/runtime/io_capture/unix.rs @@ -0,0 +1,357 @@ +use std::fs::File; +use std::io::{Read, Write}; +use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; + +use crossbeam_channel::Sender; + +use recorder_errors::{enverr, ErrorCode}; + +use crate::errors::Result; + +use super::{send_buffer, spawn_reader_thread, IoChunk, StreamKind, WorkerHandle}; + +const READ_BUFFER_SIZE: usize = 8192; + +pub(super) struct Controller { + outputs: Vec, + input: Option, +} + +impl Controller { + pub fn restore(&mut self) -> Result<()> { + for guard in self.outputs.drain(..) { + restore_descriptor(guard.saved_fd, guard.stream)?; + } + + if let Some(input) = self.input.take() { + input.restore()?; + } + + Ok(()) + } + + fn rollback(&mut self) { + if let Err(err) = self.restore() { + log::error!("IO capture rollback failed: {}", err); + } + } +} + +struct OutputGuard { + stream: StreamKind, + saved_fd: OwnedFd, +} + +struct InputGuard { + saved_fd: OwnedFd, + shutdown_writer: OwnedFd, +} + +impl InputGuard { + fn restore(self) -> Result<()> { + let shutdown_writer = self.shutdown_writer; + restore_descriptor(self.saved_fd, StreamKind::Stdin)?; + signal_shutdown_writer(shutdown_writer) + } +} + +fn signal_shutdown_writer(writer: OwnedFd) -> Result<()> { + let fd_ref = writer.as_raw_fd(); + let byte = [1u8; 1]; + let result = unsafe { libc::write(fd_ref, byte.as_ptr() as *const libc::c_void, byte.len()) }; + let fd = writer.into_raw_fd(); + let _ = unsafe { libc::close(fd) }; + if result < 0 { + let errno = std::io::Error::last_os_error(); + return Err( + enverr!(ErrorCode::Io, "failed to signal stdin capture shutdown") + .with_context("error", errno.to_string()), + ); + } + Ok(()) +} + +pub(super) fn start(sender: Sender) -> Result<(Controller, Vec)> { + let mut controller = Controller { + outputs: Vec::new(), + input: None, + }; + + let mut workers: Vec = Vec::new(); + + for stream in [StreamKind::Stdout, StreamKind::Stderr] { + match start_output_stream(stream, &sender) { + Ok((guard, worker)) => { + controller.outputs.push(guard); + workers.push(worker); + } + Err(err) => { + controller.rollback(); + return Err(err); + } + } + } + + match start_input_stream(&sender) { + Ok((guard, worker)) => { + controller.input = Some(guard); + workers.push(worker); + } + Err(err) => { + controller.rollback(); + return Err(err); + } + } + + Ok((controller, workers)) +} + +fn start_output_stream( + stream: StreamKind, + sender: &Sender, +) -> Result<(OutputGuard, WorkerHandle)> { + let fd = std_fd(stream); + let saved_fd = dup_fd(fd, stream)?; + let mirror_fd = dup_fd(fd, stream)?; + let (reader, writer) = new_pipe()?; + + replace_fd(writer.as_raw_fd(), fd, stream)?; + + // Drop the extra writer descriptor; stdout/stderr already point to it via dup2. + drop(writer); + + let reader_fd = reader.into_raw_fd(); + let mirror_fd_raw = mirror_fd.into_raw_fd(); + let tx = sender.clone(); + + let worker = match spawn_reader_thread(stream.as_str(), move || { + run_output_reader(stream, reader_fd, mirror_fd_raw, tx) + }) { + Ok(worker) => worker, + Err(err) => { + close_fd(reader_fd); + close_fd(mirror_fd_raw); + let target_fd = std_fd(stream); + if unsafe { libc::dup2(saved_fd.as_raw_fd(), target_fd) } < 0 { + log::error!( + "failed to restore {} after spawn failure: {}", + stream.as_str(), + std::io::Error::last_os_error() + ); + } + return Err(err); + } + }; + + Ok((OutputGuard { stream, saved_fd }, worker)) +} + +fn start_input_stream(sender: &Sender) -> Result<(InputGuard, WorkerHandle)> { + let fd = std_fd(StreamKind::Stdin); + let saved_fd = dup_fd(fd, StreamKind::Stdin)?; + let source_fd = dup_fd(fd, StreamKind::Stdin)?; + let (pipe_reader, pipe_writer) = new_pipe()?; + let (shutdown_reader, shutdown_writer) = new_pipe()?; + + replace_fd(pipe_reader.as_raw_fd(), fd, StreamKind::Stdin)?; + drop(pipe_reader); + + let writer_fd = pipe_writer.into_raw_fd(); + let source_fd_raw = source_fd.into_raw_fd(); + let shutdown_reader_fd = shutdown_reader.into_raw_fd(); + let tx = sender.clone(); + + let worker = match spawn_reader_thread("stdin", move || { + run_input_reader(source_fd_raw, writer_fd, shutdown_reader_fd, tx) + }) { + Ok(worker) => worker, + Err(err) => { + close_fd(writer_fd); + close_fd(source_fd_raw); + close_fd(shutdown_reader_fd); + let target_fd = std_fd(StreamKind::Stdin); + if unsafe { libc::dup2(saved_fd.as_raw_fd(), target_fd) } < 0 { + log::error!( + "failed to restore stdin after spawn failure: {}", + std::io::Error::last_os_error() + ); + } + return Err(err); + } + }; + + Ok(( + InputGuard { + saved_fd, + shutdown_writer, + }, + worker, + )) +} + +fn run_output_reader( + stream: StreamKind, + reader_fd: RawFd, + mirror_fd: RawFd, + sender: Sender, +) -> Result<()> { + let mut reader = unsafe { File::from_raw_fd(reader_fd) }; + let mut mirror = unsafe { File::from_raw_fd(mirror_fd) }; + let mut buffer = [0u8; READ_BUFFER_SIZE]; + + loop { + match reader.read(&mut buffer) { + Ok(0) => break, + Ok(count) => { + send_buffer(&sender, stream, &buffer[..count]); + if let Err(err) = mirror.write_all(&buffer[..count]) { + return Err(enverr!(ErrorCode::Io, "failed to mirror captured output") + .with_context("stream", stream.as_str()) + .with_context("error", err.to_string())); + } + } + Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue, + Err(err) => { + return Err(enverr!(ErrorCode::Io, "failed to read captured output") + .with_context("stream", stream.as_str()) + .with_context("error", err.to_string())); + } + } + } + + Ok(()) +} + +fn run_input_reader( + source_fd: RawFd, + writer_fd: RawFd, + shutdown_fd: RawFd, + sender: Sender, +) -> Result<()> { + let mut source = unsafe { File::from_raw_fd(source_fd) }; + let mut writer = unsafe { File::from_raw_fd(writer_fd) }; + let mut buffer = [0u8; READ_BUFFER_SIZE]; + + loop { + let mut fds = [ + libc::pollfd { + fd: source_fd, + events: libc::POLLIN, + revents: 0, + }, + libc::pollfd { + fd: shutdown_fd, + events: libc::POLLIN, + revents: 0, + }, + ]; + + let rc = unsafe { libc::poll(fds.as_mut_ptr(), fds.len() as libc::nfds_t, -1) }; + if rc < 0 { + let err = std::io::Error::last_os_error(); + if err.kind() == std::io::ErrorKind::Interrupted { + continue; + } + return Err(enverr!(ErrorCode::Io, "stdin capture poll failed") + .with_context("error", err.to_string())); + } + + if fds[1].revents & libc::POLLIN != 0 { + break; + } + + if fds[0].revents & libc::POLLIN != 0 { + match source.read(&mut buffer) { + Ok(0) => break, + Ok(count) => { + send_buffer(&sender, StreamKind::Stdin, &buffer[..count]); + if let Err(err) = writer.write_all(&buffer[..count]) { + return Err(enverr!(ErrorCode::Io, "failed to forward stdin chunk") + .with_context("error", err.to_string())); + } + } + Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue, + Err(err) => { + return Err(enverr!(ErrorCode::Io, "failed to read stdin for capture") + .with_context("error", err.to_string())); + } + } + } + } + + // Drain shutdown notification byte if present and close control fd. + let mut temp = [0u8; 1]; + let _ = unsafe { libc::read(shutdown_fd, temp.as_mut_ptr() as *mut _, 1) }; + let _ = unsafe { libc::close(shutdown_fd) }; + let _ = writer.flush(); + + Ok(()) +} + +fn close_fd(fd: RawFd) { + if fd >= 0 { + let _ = unsafe { libc::close(fd) }; + } +} + +fn dup_fd(fd: RawFd, stream: StreamKind) -> Result { + let result = unsafe { libc::dup(fd) }; + if result < 0 { + let err = std::io::Error::last_os_error(); + return Err( + enverr!(ErrorCode::Io, "failed to duplicate file descriptor") + .with_context("stream", stream.as_str()) + .with_context("error", err.to_string()), + ); + } + // SAFETY: `result` is a freshly duplicated descriptor. + Ok(unsafe { OwnedFd::from_raw_fd(result) }) +} + +fn new_pipe() -> Result<(OwnedFd, OwnedFd)> { + let mut fds = [0; 2]; + if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 { + let err = std::io::Error::last_os_error(); + return Err( + enverr!(ErrorCode::Io, "failed to create pipe for IO capture") + .with_context("error", err.to_string()), + ); + } + // SAFETY: pipe returns two valid descriptors on success. + let reader = unsafe { OwnedFd::from_raw_fd(fds[0]) }; + let writer = unsafe { OwnedFd::from_raw_fd(fds[1]) }; + Ok((reader, writer)) +} + +fn replace_fd(source_fd: RawFd, target_fd: RawFd, stream: StreamKind) -> Result<()> { + if unsafe { libc::dup2(source_fd, target_fd) } < 0 { + let err = std::io::Error::last_os_error(); + return Err(enverr!(ErrorCode::Io, "failed to redirect stream to pipe") + .with_context("stream", stream.as_str()) + .with_context("error", err.to_string())); + } + Ok(()) +} + +fn restore_descriptor(saved: OwnedFd, stream: StreamKind) -> Result<()> { + let target_fd = std_fd(stream); + if unsafe { libc::dup2(saved.as_raw_fd(), target_fd) } < 0 { + let err = std::io::Error::last_os_error(); + return Err( + enverr!(ErrorCode::Io, "failed to restore descriptor after capture") + .with_context("stream", stream.as_str()) + .with_context("error", err.to_string()), + ); + } + // Drop saved descriptor to avoid leaks. + drop(saved); + Ok(()) +} + +fn std_fd(stream: StreamKind) -> RawFd { + match stream { + StreamKind::Stdout => libc::STDOUT_FILENO, + StreamKind::Stderr => libc::STDERR_FILENO, + StreamKind::Stdin => libc::STDIN_FILENO, + } +} diff --git a/codetracer-python-recorder/src/runtime/io_capture/windows.rs b/codetracer-python-recorder/src/runtime/io_capture/windows.rs new file mode 100644 index 0000000..381ca1b --- /dev/null +++ b/codetracer-python-recorder/src/runtime/io_capture/windows.rs @@ -0,0 +1,322 @@ +use std::io::ErrorKind; + +use crossbeam_channel::Sender; + +use recorder_errors::{enverr, ErrorCode}; + +use crate::errors::Result; + +use super::{send_buffer, spawn_reader_thread, IoChunk, StreamKind, WorkerHandle}; + +const READ_BUFFER_SIZE: usize = 8192; +const PIPE_BUFFER_SIZE: u32 = 8192; + +pub(super) struct Controller { + outputs: Vec, + input: Option, +} + +impl Controller { + pub fn restore(&mut self) -> Result<()> { + for guard in self.outputs.drain(..) { + restore_descriptor(guard.saved_fd, guard.stream)?; + } + + if let Some(input) = self.input.take() { + restore_descriptor(input.saved_fd, StreamKind::Stdin)?; + close_fd(input.source_fd); + } + + Ok(()) + } + + fn rollback(&mut self) { + if let Err(err) = self.restore() { + log::error!("IO capture rollback failed: {}", err); + } + } +} + +struct OutputGuard { + stream: StreamKind, + saved_fd: i32, +} + +struct InputGuard { + saved_fd: i32, + source_fd: i32, +} + +pub(super) fn start(sender: Sender) -> Result<(Controller, Vec)> { + let mut controller = Controller { + outputs: Vec::new(), + input: None, + }; + + let mut workers: Vec = Vec::new(); + + for stream in [StreamKind::Stdout, StreamKind::Stderr] { + match start_output_stream(stream, &sender) { + Ok((guard, worker)) => { + controller.outputs.push(guard); + workers.push(worker); + } + Err(err) => { + controller.rollback(); + return Err(err); + } + } + } + + match start_input_stream(&sender) { + Ok((guard, worker)) => { + controller.input = Some(guard); + workers.push(worker); + } + Err(err) => { + controller.rollback(); + return Err(err); + } + } + + Ok((controller, workers)) +} + +fn start_output_stream( + stream: StreamKind, + sender: &Sender, +) -> Result<(OutputGuard, WorkerHandle)> { + let fd = std_fd(stream); + let saved_fd = dup_fd(fd, stream)?; + let mirror_fd = dup_fd(fd, stream)?; + let (reader_fd, writer_fd) = new_pipe(stream)?; + + replace_fd(writer_fd, fd, stream)?; + close_fd(writer_fd); + + let tx = sender.clone(); + let worker = match spawn_reader_thread(stream.as_str(), move || { + run_output_reader(stream, reader_fd, mirror_fd, tx) + }) { + Ok(worker) => worker, + Err(err) => { + close_fd(reader_fd); + close_fd(mirror_fd); + if unsafe { libc::_dup2(saved_fd, fd) } < 0 { + log::error!( + "failed to restore {} after spawn failure: {}", + stream.as_str(), + std::io::Error::last_os_error() + ); + } + return Err(err); + } + }; + + Ok((OutputGuard { stream, saved_fd }, worker)) +} + +fn start_input_stream(sender: &Sender) -> Result<(InputGuard, WorkerHandle)> { + let fd = std_fd(StreamKind::Stdin); + let saved_fd = dup_fd(fd, StreamKind::Stdin)?; + let source_fd = dup_fd(fd, StreamKind::Stdin)?; + let (pipe_reader_fd, pipe_writer_fd) = new_pipe(StreamKind::Stdin)?; + + replace_fd(pipe_reader_fd, fd, StreamKind::Stdin)?; + close_fd(pipe_reader_fd); + + let tx = sender.clone(); + let worker = match spawn_reader_thread("stdin", move || { + run_input_reader(source_fd, pipe_writer_fd, tx) + }) { + Ok(worker) => worker, + Err(err) => { + close_fd(pipe_writer_fd); + close_fd(source_fd); + if unsafe { libc::_dup2(saved_fd, fd) } < 0 { + log::error!( + "failed to restore stdin after spawn failure: {}", + std::io::Error::last_os_error() + ); + } + return Err(err); + } + }; + + Ok(( + InputGuard { + saved_fd, + source_fd, + }, + worker, + )) +} + +fn run_output_reader( + stream: StreamKind, + reader_fd: i32, + mirror_fd: i32, + sender: Sender, +) -> Result<()> { + let mut buffer = [0u8; READ_BUFFER_SIZE]; + + loop { + let read = unsafe { + libc::_read( + reader_fd, + buffer.as_mut_ptr() as *mut libc::c_void, + READ_BUFFER_SIZE as i32, + ) + }; + if read == 0 { + break; + } + if read < 0 { + let err = std::io::Error::last_os_error(); + if err.kind() == ErrorKind::Interrupted { + continue; + } + close_fd(reader_fd); + close_fd(mirror_fd); + return Err(enverr!(ErrorCode::Io, "failed to read captured output") + .with_context("stream", stream.as_str()) + .with_context("error", err.to_string())); + } + + let count = read as usize; + send_buffer(&sender, stream, &buffer[..count]); + let written = + unsafe { libc::_write(mirror_fd, buffer.as_ptr() as *const libc::c_void, read) }; + if written < 0 { + let err = std::io::Error::last_os_error(); + close_fd(reader_fd); + close_fd(mirror_fd); + return Err(enverr!(ErrorCode::Io, "failed to mirror captured output") + .with_context("stream", stream.as_str()) + .with_context("error", err.to_string())); + } + } + + close_fd(reader_fd); + close_fd(mirror_fd); + Ok(()) +} + +fn run_input_reader(source_fd: i32, writer_fd: i32, sender: Sender) -> Result<()> { + let mut buffer = [0u8; READ_BUFFER_SIZE]; + + loop { + let read = unsafe { + libc::_read( + source_fd, + buffer.as_mut_ptr() as *mut libc::c_void, + READ_BUFFER_SIZE as i32, + ) + }; + + if read == 0 { + break; + } + + if read < 0 { + let err = std::io::Error::last_os_error(); + if err.kind() == ErrorKind::Interrupted { + continue; + } + // When the manager closes source_fd to request shutdown, errno becomes EBADF. + if err.raw_os_error() == Some(libc::EBADF) { + break; + } + close_fd(writer_fd); + return Err(enverr!(ErrorCode::Io, "failed to read stdin for capture") + .with_context("error", err.to_string())); + } + + let count = read as usize; + send_buffer(&sender, StreamKind::Stdin, &buffer[..count]); + let written = + unsafe { libc::_write(writer_fd, buffer.as_ptr() as *const libc::c_void, read) }; + if written < 0 { + let err = std::io::Error::last_os_error(); + close_fd(writer_fd); + return Err(enverr!(ErrorCode::Io, "failed to forward stdin chunk") + .with_context("error", err.to_string())); + } + } + + close_fd(writer_fd); + Ok(()) +} + +fn dup_fd(fd: i32, stream: StreamKind) -> Result { + let result = unsafe { libc::_dup(fd) }; + if result < 0 { + let err = std::io::Error::last_os_error(); + return Err( + enverr!(ErrorCode::Io, "failed to duplicate file descriptor") + .with_context("stream", stream.as_str()) + .with_context("error", err.to_string()), + ); + } + Ok(result) +} + +fn new_pipe(stream: StreamKind) -> Result<(i32, i32)> { + let mut fds = [0i32; 2]; + if unsafe { + libc::_pipe( + fds.as_mut_ptr(), + PIPE_BUFFER_SIZE, + libc::_O_BINARY | libc::_O_NOINHERIT, + ) + } < 0 + { + let err = std::io::Error::last_os_error(); + return Err( + enverr!(ErrorCode::Io, "failed to allocate pipe for capture") + .with_context("stream", stream.as_str()) + .with_context("error", err.to_string()), + ); + } + Ok((fds[0], fds[1])) +} + +fn replace_fd(source_fd: i32, target_fd: i32, stream: StreamKind) -> Result<()> { + if unsafe { libc::_dup2(source_fd, target_fd) } < 0 { + let err = std::io::Error::last_os_error(); + return Err(enverr!(ErrorCode::Io, "failed to redirect stream to pipe") + .with_context("stream", stream.as_str()) + .with_context("error", err.to_string())); + } + Ok(()) +} + +fn restore_descriptor(saved_fd: i32, stream: StreamKind) -> Result<()> { + let target_fd = std_fd(stream); + if unsafe { libc::_dup2(saved_fd, target_fd) } < 0 { + let err = std::io::Error::last_os_error(); + return Err( + enverr!(ErrorCode::Io, "failed to restore descriptor after capture") + .with_context("stream", stream.as_str()) + .with_context("error", err.to_string()), + ); + } + close_fd(saved_fd); + Ok(()) +} + +fn close_fd(fd: i32) { + if fd >= 0 { + unsafe { + libc::_close(fd); + } + } +} + +fn std_fd(stream: StreamKind) -> i32 { + match stream { + StreamKind::Stdout => 1, + StreamKind::Stderr => 2, + StreamKind::Stdin => 0, + } +} diff --git a/codetracer-python-recorder/src/runtime/mod.rs b/codetracer-python-recorder/src/runtime/mod.rs index 45c8e29..8165209 100644 --- a/codetracer-python-recorder/src/runtime/mod.rs +++ b/codetracer-python-recorder/src/runtime/mod.rs @@ -2,8 +2,11 @@ mod activation; mod frame_inspector; +mod io_capture; mod logging; mod output_paths; +mod thread_snapshots; +mod trace_writer_host; mod value_capture; mod value_encoder; @@ -11,7 +14,10 @@ pub use output_paths::TraceOutputPaths; use activation::ActivationController; use frame_inspector::capture_frame; +use io_capture::ActiveCapture; use logging::log_event; +use thread_snapshots::{SnapshotEntry, ThreadSnapshotStore}; +use trace_writer_host::{register_step_with_guard, TraceWriterHost}; use value_capture::{capture_call_arguments, record_return_value, record_visible_scope}; use std::collections::{hash_map::Entry, HashMap, HashSet}; @@ -26,7 +32,6 @@ use pyo3::prelude::*; use pyo3::types::PyAny; use recorder_errors::{bug, enverr, target, usage, ErrorCode, RecorderResult}; -use runtime_tracing::NonStreamingTraceWriter; use runtime_tracing::{Line, TraceEventsFileFormat, TraceWriter}; use crate::code_object::CodeObjectWrapper; @@ -53,10 +58,24 @@ impl Drop for TraceIdResetGuard { } } +/// Hook invoked before the tracer finishes to drain auxiliary workers. +pub trait IoDrain: Send { + fn drain(&mut self, py: Python<'_>) -> RecorderResult<()>; +} + +#[derive(Default)] +struct NoopIoDrain; + +impl IoDrain for NoopIoDrain { + fn drain(&mut self, _py: Python<'_>) -> RecorderResult<()> { + Ok(()) + } +} + /// Minimal runtime tracer that maps Python sys.monitoring events to /// runtime_tracing writer operations. pub struct RuntimeTracer { - writer: NonStreamingTraceWriter, + writer: TraceWriterHost, format: TraceEventsFileFormat, activation: ActivationController, program_path: PathBuf, @@ -66,6 +85,8 @@ pub struct RuntimeTracer { events_recorded: bool, encountered_failure: bool, trace_id: String, + thread_snapshots: ThreadSnapshotStore, + io_drain: Box, } #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -138,8 +159,7 @@ fn should_inject_failure(_stage: FailureStage) -> bool { #[cfg(feature = "integration-test")] fn should_inject_target_error() -> bool { - matches!(configured_failure_mode(), Some(FailureMode::TargetArgs)) - && mark_failure_triggered() + matches!(configured_failure_mode(), Some(FailureMode::TargetArgs)) && mark_failure_triggered() } #[cfg(not(feature = "integration-test"))] @@ -213,8 +233,7 @@ impl RuntimeTracer { format: TraceEventsFileFormat, activation_path: Option<&Path>, ) -> Self { - let mut writer = NonStreamingTraceWriter::new(program, args); - writer.set_format(format); + let writer = TraceWriterHost::new(program, args, format); let activation = ActivationController::new(activation_path); let program_path = PathBuf::from(program); Self { @@ -228,6 +247,8 @@ impl RuntimeTracer { events_recorded: false, encountered_failure: false, trace_id: Uuid::new_v4().to_string(), + thread_snapshots: ThreadSnapshotStore::new(), + io_drain: Box::new(NoopIoDrain::default()), } } @@ -235,13 +256,15 @@ impl RuntimeTracer { pub fn begin(&mut self, outputs: &TraceOutputPaths, start_line: u32) -> PyResult<()> { let start_path = self.activation.start_path(&self.program_path); log::debug!("{}", start_path.display()); - outputs - .configure_writer(&mut self.writer, start_path, start_line) + self.writer + .configure_outputs(outputs, start_path, start_line) .map_err(ffi::map_recorder_error)?; self.output_paths = Some(outputs.clone()); self.events_recorded = false; self.encountered_failure = false; set_active_trace_id(Some(self.trace_id.clone())); + self.thread_snapshots.reset(); + self.configure_io_capture()?; Ok(()) } @@ -257,6 +280,31 @@ impl RuntimeTracer { self.encountered_failure = true; } + /// Replace the IO drain hook. Intended for future IO capture integration. + #[allow(dead_code)] + pub fn set_io_drain(&mut self, drain: Box) { + self.io_drain = drain; + } + + /// Share the writer host for background workers. + #[allow(dead_code)] + pub fn writer_host(&self) -> TraceWriterHost { + self.writer.clone() + } + + /// Share the snapshot store for read-only observers. + #[allow(dead_code)] + pub fn snapshot_store(&self) -> ThreadSnapshotStore { + self.thread_snapshots.clone() + } + + fn configure_io_capture(&mut self) -> PyResult<()> { + let capture = ActiveCapture::start(self.writer.clone(), self.thread_snapshots.clone()) + .map_err(ffi::map_recorder_error)?; + self.io_drain = Box::new(capture); + Ok(()) + } + fn cleanup_partial_outputs(&self) -> RecorderResult<()> { if let Some(outputs) = &self.output_paths { for path in [outputs.events(), outputs.metadata(), outputs.paths()] { @@ -283,19 +331,7 @@ impl RuntimeTracer { } fn finalise_writer(&mut self) -> RecorderResult<()> { - TraceWriter::finish_writing_trace_metadata(&mut self.writer).map_err(|err| { - enverr!(ErrorCode::Io, "failed to finalise trace metadata") - .with_context("source", err.to_string()) - })?; - TraceWriter::finish_writing_trace_paths(&mut self.writer).map_err(|err| { - enverr!(ErrorCode::Io, "failed to finalise trace paths") - .with_context("source", err.to_string()) - })?; - TraceWriter::finish_writing_trace_events(&mut self.writer).map_err(|err| { - enverr!(ErrorCode::Io, "failed to finalise trace events") - .with_context("source", err.to_string()) - })?; - Ok(()) + self.writer.finalize() } fn ensure_function_id( @@ -309,8 +345,9 @@ impl RuntimeTracer { let name = code.qualname(py)?; let filename = code.filename(py)?; let first_line = code.first_line(py)?; + let mut writer = self.writer.lock().map_err(ffi::map_recorder_error)?; let function_id = TraceWriter::ensure_function_id( - &mut self.writer, + &mut *writer, name, Path::new(filename), Line(first_line as i64), @@ -386,20 +423,23 @@ impl Tracer for RuntimeTracer { log_event(py, code, "on_py_start", None); if let Ok(fid) = self.ensure_function_id(py, code) { - match capture_call_arguments(py, &mut self.writer, code) { - Ok(args) => TraceWriter::register_call(&mut self.writer, fid, args), - Err(err) => { - let details = err.to_string(); - with_error_code(ErrorCode::FrameIntrospectionFailed, || { - log::error!("on_py_start: failed to capture args: {details}"); - }); - return Err(ffi::map_recorder_error( - enverr!( - ErrorCode::FrameIntrospectionFailed, - "failed to capture call arguments" - ) - .with_context("details", details), - )); + { + let mut writer = self.writer.lock().map_err(ffi::map_recorder_error)?; + match capture_call_arguments(py, &mut writer, code) { + Ok(args) => TraceWriter::register_call(&mut *writer, fid, args), + Err(err) => { + let details = err.to_string(); + with_error_code(ErrorCode::FrameIntrospectionFailed, || { + log::error!("on_py_start: failed to capture args: {details}"); + }); + return Err(ffi::map_recorder_error( + enverr!( + ErrorCode::FrameIntrospectionFailed, + "failed to capture call arguments" + ) + .with_context("details", details), + )); + } } } self.mark_event(); @@ -433,15 +473,22 @@ impl Tracer for RuntimeTracer { log_event(py, code, "on_line", Some(lineno)); - if let Ok(filename) = code.filename(py) { - TraceWriter::register_step(&mut self.writer, Path::new(filename), Line(lineno as i64)); - self.mark_event(); - } - let snapshot = capture_frame(py, code)?; - - let mut recorded: HashSet = HashSet::new(); - record_visible_scope(py, &mut self.writer, &snapshot, &mut recorded); + let filename = code.filename(py)?; + let line = Line(lineno as i64); + let path_id = { + let mut writer = self.writer.lock().map_err(ffi::map_recorder_error)?; + let path_id = register_step_with_guard(&mut writer, Path::new(filename), line); + let mut recorded: HashSet = HashSet::new(); + record_visible_scope(py, &mut writer, &snapshot, &mut recorded); + path_id + }; + self.mark_event(); + self.thread_snapshots.update_current(SnapshotEntry { + path_id, + line, + frame_id: snapshot.frame_id(), + }); Ok(CallbackOutcome::Continue) } @@ -466,8 +513,12 @@ impl Tracer for RuntimeTracer { log_event(py, code, "on_py_return", None); - record_return_value(py, &mut self.writer, retval); + { + let mut writer = self.writer.lock().map_err(ffi::map_recorder_error)?; + record_return_value(py, &mut writer, retval); + } self.mark_event(); + self.thread_snapshots.clear_current(); if self.activation.handle_return_event(code.id()) { log::debug!("[RuntimeTracer] deactivated on activation return"); } @@ -486,12 +537,9 @@ impl Tracer for RuntimeTracer { // For non-streaming formats we can update the events file. match self.format { TraceEventsFileFormat::Json | TraceEventsFileFormat::BinaryV0 => { - TraceWriter::finish_writing_trace_events(&mut self.writer).map_err(|err| { - ffi::map_recorder_error( - enverr!(ErrorCode::Io, "failed to finalise trace events") - .with_context("source", err.to_string()), - ) - })?; + self.writer + .finalize_events() + .map_err(ffi::map_recorder_error)?; } TraceEventsFileFormat::Binary => { // Streaming writer: no partial flush to avoid closing the stream. @@ -501,7 +549,7 @@ impl Tracer for RuntimeTracer { Ok(()) } - fn finish(&mut self, _py: Python<'_>) -> PyResult<()> { + fn finish(&mut self, py: Python<'_>) -> PyResult<()> { // Trace event entry log::debug!("[RuntimeTracer] finish"); @@ -513,6 +561,8 @@ impl Tracer for RuntimeTracer { let _reset = TraceIdResetGuard::new(); let policy = policy_snapshot(); + self.io_drain.drain(py).map_err(ffi::map_recorder_error)?; + if self.encountered_failure { if policy.keep_partial_trace { if let Err(err) = self.finalise_writer() { @@ -537,6 +587,7 @@ impl Tracer for RuntimeTracer { } self.ignored_code_ids.clear(); self.function_ids.clear(); + self.thread_snapshots.reset(); return Ok(()); } @@ -545,6 +596,7 @@ impl Tracer for RuntimeTracer { self.finalise_writer().map_err(ffi::map_recorder_error)?; self.ignored_code_ids.clear(); self.function_ids.clear(); + self.thread_snapshots.reset(); Ok(()) } } @@ -621,10 +673,12 @@ mod tests { py.run(script_c.as_c_str(), None, None) .expect("execute synthetic script"); } + let writer = tracer.writer.lock().expect("lock writer"); assert!( - tracer.writer.events.is_empty(), + writer.events.is_empty(), "expected no events for synthetic filename" ); + drop(writer); assert_eq!(last_outcome(), Some(CallbackOutcome::DisableLocation)); let compile_fn = py @@ -654,6 +708,42 @@ mod tests { assert_eq!(last_outcome(), Some(CallbackOutcome::Continue)); } + #[test] + fn updates_thread_snapshot_store_for_active_thread() { + Python::with_gil(|py| { + ensure_test_module(py); + let tmp = tempfile::tempdir().expect("create temp dir"); + let script_path = tmp.path().join("store_script.py"); + let script = format!("{PRELUDE}\n\nsnapshot()\n"); + std::fs::write(&script_path, &script).expect("write script"); + + let mut tracer = RuntimeTracer::new( + script_path.to_str().expect("utf8"), + &[], + TraceEventsFileFormat::Json, + None, + ); + { + let _guard = ScopedTracer::new(&mut tracer); + LAST_OUTCOME.with(|cell| cell.set(None)); + let run_code = format!( + "import runpy\nrunpy.run_path(r\"{}\")", + script_path.display() + ); + let run_code_c = CString::new(run_code).expect("nul byte free"); + py.run(run_code_c.as_c_str(), None, None) + .expect("execute script"); + } + + let store = tracer.snapshot_store(); + let thread_id = std::thread::current().id(); + let snapshot = store + .snapshot_for(thread_id) + .expect("snapshot for current thread"); + assert_ne!(snapshot.frame_id, 0, "expected non-zero frame id"); + }); + } + #[test] fn callbacks_do_not_import_sys_monitoring() { let body = r#" @@ -712,17 +802,19 @@ result = compute()\n" .expect("execute test script"); } - let returns: Vec = tracer - .writer - .events - .iter() - .filter_map(|event| match event { - TraceLowLevelEvent::Return(record) => { - Some(SimpleValue::from_value(&record.return_value)) - } - _ => None, - }) - .collect(); + let returns: Vec = { + let writer = tracer.writer.lock().expect("lock writer"); + writer + .events + .iter() + .filter_map(|event| match event { + TraceLowLevelEvent::Return(record) => { + Some(SimpleValue::from_value(&record.return_value)) + } + _ => None, + }) + .collect() + }; assert!( returns.contains(&SimpleValue::String("tail".to_string())), @@ -915,7 +1007,8 @@ def emit_return(value): py.run(run_code_c.as_c_str(), None, None) .expect("execute test script"); } - collect_snapshots(&tracer.writer.events) + let writer = tracer.writer.lock().expect("lock writer"); + collect_snapshots(&writer.events) }) } diff --git a/codetracer-python-recorder/src/runtime/thread_snapshots.rs b/codetracer-python-recorder/src/runtime/thread_snapshots.rs new file mode 100644 index 0000000..4c68066 --- /dev/null +++ b/codetracer-python-recorder/src/runtime/thread_snapshots.rs @@ -0,0 +1,77 @@ +//! Store the latest execution snapshot per OS thread. + +use std::collections::HashMap; +use std::sync::{Mutex, MutexGuard}; +use std::thread::{self, ThreadId}; + +use log::warn; +use runtime_tracing::{Line, PathId}; + +/// Snapshot of the last recorded step for a thread. +#[derive(Clone, Debug)] +pub struct SnapshotEntry { + pub path_id: PathId, + pub line: Line, + pub frame_id: usize, +} + +#[derive(Default, Clone, Debug)] +pub struct ThreadSnapshotStore { + inner: std::sync::Arc>>, + last_global: std::sync::Arc>>, +} + +impl ThreadSnapshotStore { + pub fn new() -> Self { + Self::default() + } + + /// Record a snapshot for the current thread. + pub fn update_current(&self, entry: SnapshotEntry) { + let thread_id = thread::current().id(); + let mut store = lock(&self.inner); + store.insert(thread_id, entry.clone()); + drop(store); + + let mut global = lock(&self.last_global); + *global = Some(entry); + } + + /// Remove any stored snapshot for the current thread. + pub fn clear_current(&self) { + let thread_id = thread::current().id(); + let mut store = lock(&self.inner); + store.remove(&thread_id); + } + + /// Return the latest snapshot for a given thread id. + pub fn snapshot_for(&self, thread_id: ThreadId) -> Option { + let store = lock(&self.inner); + store.get(&thread_id).cloned() + } + + /// Return the most recent snapshot observed across all threads. + pub fn latest(&self) -> Option { + let global = lock(&self.last_global); + global.clone() + } + + /// Clear all stored snapshots. + pub fn reset(&self) { + let mut store = lock(&self.inner); + store.clear(); + drop(store); + let mut global = lock(&self.last_global); + *global = None; + } +} + +fn lock(mutex: &std::sync::Arc>) -> MutexGuard<'_, T> { + match mutex.lock() { + Ok(guard) => guard, + Err(poisoned) => { + warn!("ThreadSnapshotStore mutex poisoned; continuing with recovered state"); + poisoned.into_inner() + } + } +} diff --git a/codetracer-python-recorder/src/runtime/trace_writer_host.rs b/codetracer-python-recorder/src/runtime/trace_writer_host.rs new file mode 100644 index 0000000..8505998 --- /dev/null +++ b/codetracer-python-recorder/src/runtime/trace_writer_host.rs @@ -0,0 +1,112 @@ +//! Thread-safe wrapper around `NonStreamingTraceWriter`. + +use std::ops::{Deref, DerefMut}; +use std::path::Path; +use std::sync::{Arc, Mutex, MutexGuard}; + +use log::warn; +use recorder_errors::{enverr, ErrorCode}; +use runtime_tracing::{Line, NonStreamingTraceWriter, TraceEventsFileFormat, TraceWriter}; + +use crate::errors::Result; + +use super::output_paths::TraceOutputPaths; + +/// Shared facade for the runtime trace writer. +#[derive(Clone)] +pub struct TraceWriterHost { + inner: Arc>, +} + +/// Guard exposing mutable access to the underlying writer. +pub struct TraceWriterGuard<'a> { + guard: MutexGuard<'a, NonStreamingTraceWriter>, +} + +impl TraceWriterHost { + /// Build a new writer host configured for the current program. + pub fn new(program: &str, args: &[String], format: TraceEventsFileFormat) -> Self { + let mut writer = NonStreamingTraceWriter::new(program, args); + writer.set_format(format); + Self { + inner: Arc::new(Mutex::new(writer)), + } + } + + /// Borrow the underlying writer. Recovers from poisoned mutexes while logging a warning. + pub fn lock(&self) -> Result> { + match self.inner.lock() { + Ok(guard) => Ok(TraceWriterGuard { guard }), + Err(poisoned) => { + warn!("TraceWriterHost mutex poisoned; continuing with recovered state"); + Ok(TraceWriterGuard { + guard: poisoned.into_inner(), + }) + } + } + } + + /// Configure output files and start trace recording. + pub fn configure_outputs( + &self, + outputs: &TraceOutputPaths, + start_path: &Path, + start_line: u32, + ) -> Result<()> { + let mut writer = self.lock()?; + outputs.configure_writer(&mut *writer, start_path, start_line) + } + + /// Finalise metadata, path, and event files. + pub fn finalize(&self) -> Result<()> { + let mut writer = self.lock()?; + TraceWriter::finish_writing_trace_metadata(&mut *writer).map_err(|err| { + enverr!(ErrorCode::Io, "failed to finalise trace metadata") + .with_context("source", err.to_string()) + })?; + TraceWriter::finish_writing_trace_paths(&mut *writer).map_err(|err| { + enverr!(ErrorCode::Io, "failed to finalise trace paths") + .with_context("source", err.to_string()) + })?; + TraceWriter::finish_writing_trace_events(&mut *writer).map_err(|err| { + enverr!(ErrorCode::Io, "failed to finalise trace events") + .with_context("source", err.to_string()) + })?; + Ok(()) + } + + /// Finalise event data only (used by flush operations). + pub fn finalize_events(&self) -> Result<()> { + let mut writer = self.lock()?; + TraceWriter::finish_writing_trace_events(&mut *writer).map_err(|err| { + enverr!(ErrorCode::Io, "failed to finalise trace events") + .with_context("source", err.to_string()) + })?; + Ok(()) + } +} + +impl<'a> Deref for TraceWriterGuard<'a> { + type Target = NonStreamingTraceWriter; + + fn deref(&self) -> &Self::Target { + &self.guard + } +} + +impl<'a> DerefMut for TraceWriterGuard<'a> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.guard + } +} + +/// Convenience helper for recording individual steps with a borrowed guard. +pub fn register_step_with_guard( + guard: &mut TraceWriterGuard<'_>, + path: &Path, + line: Line, +) -> runtime_tracing::PathId { + let path_id = TraceWriter::ensure_path_id(&mut **guard, path); + TraceWriter::register_step(&mut **guard, path, line); + path_id +} diff --git a/codetracer-python-recorder/src/session.rs b/codetracer-python-recorder/src/session.rs index 4acfc51..2d69f6d 100644 --- a/codetracer-python-recorder/src/session.rs +++ b/codetracer-python-recorder/src/session.rs @@ -4,7 +4,10 @@ mod bootstrap; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Mutex; +use log::warn; +use once_cell::sync::Lazy; use pyo3::prelude::*; use recorder_errors::{usage, ErrorCode}; @@ -17,9 +20,59 @@ use bootstrap::TraceSessionBootstrap; /// Global flag tracking whether tracing is active. static ACTIVE: AtomicBool = AtomicBool::new(false); +#[derive(Default)] +struct SessionLifecycle { + extras: Vec>, +} + +impl SessionLifecycle { + fn replace(&mut self, extras: Option>) { + self.extras.clear(); + if let Some(extra) = extras { + self.extras.push(extra); + } + } + + fn extend(&mut self, extras: Option>) { + if let Some(extra) = extras { + self.extras.push(extra); + } + } + + fn drain(&mut self, py: Python<'_>) { + for handle in self.extras.drain(..) { + if let Err(err) = invoke_stop(py, &handle) { + warn!("failed to stop lifecycle handle"); + err.print(py); + } + } + } +} + +static SESSION_LIFECYCLE: Lazy> = + Lazy::new(|| Mutex::new(SessionLifecycle::default())); + +fn invoke_stop(py: Python<'_>, handle: &Py) -> PyResult<()> { + let bound = handle.bind(py); + if bound.is_none() { + return Ok(()); + } + if bound.hasattr("stop")? { + bound.call_method0("stop")?; + } else if bound.hasattr("close")? { + bound.call_method0("close")?; + } + Ok(()) +} + /// Start tracing using sys.monitoring and runtime_tracing writer. -#[pyfunction] -pub fn start_tracing(path: &str, format: &str, activation_path: Option<&str>) -> PyResult<()> { +#[pyfunction(signature = (path, format, activation_path=None, extras=None))] +pub fn start_tracing( + path: &str, + format: &str, + activation_path: Option<&str>, + extras: Option>, +) -> PyResult<()> { ffi::wrap_pyfunction("start_tracing", || { // Ensure logging is ready before any tracer logs might be emitted. // Default our crate to warnings-only so tests stay quiet unless explicitly enabled. @@ -34,6 +87,7 @@ pub fn start_tracing(path: &str, format: &str, activation_path: Option<&str>) -> let activation_path = activation_path.map(PathBuf::from); Python::with_gil(|py| { + let mut extra_handle = extras.map(|obj| obj.unbind()); let bootstrap = TraceSessionBootstrap::prepare( py, Path::new(path), @@ -55,19 +109,27 @@ pub fn start_tracing(path: &str, format: &str, activation_path: Option<&str>) -> // Install callbacks install_tracer(py, Box::new(tracer))?; ACTIVE.store(true, Ordering::SeqCst); + if let Ok(mut lifecycle) = SESSION_LIFECYCLE.lock() { + lifecycle.replace(extra_handle.take()); + } Ok(()) }) }) } /// Stop tracing by resetting the global flag. -#[pyfunction] -pub fn stop_tracing() -> PyResult<()> { +#[pyfunction(signature = (extras=None))] +pub fn stop_tracing(extras: Option>) -> PyResult<()> { ffi::wrap_pyfunction("stop_tracing", || { Python::with_gil(|py| { // Uninstall triggers finish() on tracer implementation. uninstall_tracer(py)?; ACTIVE.store(false, Ordering::SeqCst); + let extra_handle = extras.map(|obj| obj.unbind()); + if let Ok(mut lifecycle) = SESSION_LIFECYCLE.lock() { + lifecycle.extend(extra_handle); + lifecycle.drain(py); + } Ok(()) }) }) diff --git a/codetracer-python-recorder/tests/python/test_io_capture.py b/codetracer-python-recorder/tests/python/test_io_capture.py new file mode 100644 index 0000000..db090cd --- /dev/null +++ b/codetracer-python-recorder/tests/python/test_io_capture.py @@ -0,0 +1,85 @@ +"""Integration tests for runtime IO capture.""" + +from __future__ import annotations + +import base64 +import json +import os +import subprocess +import sys +import textwrap +from collections import defaultdict +from pathlib import Path +from typing import DefaultDict, Iterable, Tuple + + +def _load_io_events(trace_file: Path) -> Iterable[Tuple[int, dict[str, object], bytes]]: + events = json.loads(trace_file.read_text()) + for entry in events: + payload = entry.get("Event") + if not payload: + continue + metadata = json.loads(payload["metadata"]) + chunk = base64.b64decode(payload["content"]) + yield payload["kind"], metadata, chunk + + +def test_io_capture_records_all_streams(tmp_path: Path) -> None: + script = textwrap.dedent( + """ + import sys + import codetracer_python_recorder as codetracer + + sys.stdout.write("hello stdout\\n") + sys.stdout.flush() + sys.stderr.write("warning\\n") + sys.stderr.flush() + data = sys.stdin.readline() + sys.stdout.write(f"input={data}") + sys.stdout.flush() + codetracer.stop() + """ + ) + + env = os.environ.copy() + env["CODETRACER_TRACE"] = str(tmp_path) + env["CODETRACER_FORMAT"] = "json" + + completed = subprocess.run( + [sys.executable, "-c", script], + input="feed\n", + text=True, + capture_output=True, + env=env, + check=True, + ) + + assert completed.stdout == "hello stdout\ninput=feed\n" + assert completed.stderr == "warning\n" + + trace_file = tmp_path / "trace.json" + assert trace_file.exists(), "expected trace artefact" + + buffers: DefaultDict[str, bytearray] = defaultdict(bytearray) + + for kind, metadata, chunk in _load_io_events(trace_file): + stream = metadata["stream"] + buffers[stream].extend(chunk) + + assert metadata["byte_len"] == len(chunk) + assert metadata["timestamp_ns"] >= 0 + assert isinstance(metadata["thread_id"], str) + snapshot = metadata.get("snapshot") + if snapshot is not None: + assert {"path_id", "line", "frame_id"}.issubset(snapshot) + + if stream == "stdout": + assert kind == 0 # EventLogKind::Write + elif stream == "stderr": + assert kind == 2 # EventLogKind::WriteOther + elif stream == "stdin": + assert kind == 3 # EventLogKind::Read + + assert buffers["stdout"].decode() == "hello stdout\ninput=feed\n" + assert buffers["stderr"].decode() == "warning\n" + assert buffers["stdin"].decode() == "feed\n" diff --git a/design-docs/adr/0005-io-capture.md b/design-docs/adr/0005-io-capture.md new file mode 100644 index 0000000..605b86c --- /dev/null +++ b/design-docs/adr/0005-io-capture.md @@ -0,0 +1,43 @@ +# ADR 0005: Input and Output Capture for Runtime Traces + +- **Status:** Proposed +- **Date:** 2025-10-03 +- **Deciders:** Runtime recorder maintainers +- **Consulted:** Python platform crew, Replay tooling crew +- **Informed:** DX crew, Release crew + +## Context +- The repo now splits session bootstrap, monitoring glue, and runtime logic into clear modules (`session`, `monitoring`, `runtime`). +- `RuntimeTracer` owns the `NonStreamingTraceWriter` and activation rules, and already writes metadata, paths, and step events. +- `recorder-errors` gives us uniform error codes and panic trapping. Every new subsystem must use it. +- We still forward stdout, stderr, and stdin directly to the host console. No bytes reach the trace. +- Replay and debugging teams need IO events beside call and line records so they can rebuild console sessions. + +## Problem +- We need lossless IO capture without breaking the in-process `sys.monitoring` design or the new error policy. +- The old pipe-based spec assumed the tracer lived inside `start()` and mutated global state freely. The refactor put lifecycle code behind `TraceSessionBootstrap`, `TraceOutputPaths`, and `RuntimeTracer::begin`. +- We also added activation gating and stricter teardown rules. Any IO hooks must respect them and always restore the original file descriptors. + +## Decision +1. Keep the Python CLI contract. `codetracer_python_recorder.start_tracing` keeps installing the tracer, but now also starts an IO capture controller right before `install_tracer` and shuts it down inside `stop_tracing`. +2. Introduce `runtime::io_capture` with a single public type, `IoCapture`. It duplicates stdin/stdout/stderr, installs platform pipes, and spawns blocking reader threads. The module hides Unix vs Windows code paths behind a small trait (`IoEndpoint`). +3. Expose an `IoEventSink` from `RuntimeTracer`. The sink wraps the writer in `Arc>` and exposes two safe methods: `record_output(chunk: IoChunk)` and `record_input(chunk: IoChunk)`. Reader threads call the sink only. All conversions to `TraceLowLevelEvent` live next to the writer so we reuse value encoders and error helpers. +4. Extend `RuntimeTracer` with a light `ThreadSnapshotStore`. `on_line` updates the current `{ path_id, line, frame_id }` per Python thread. `IoEventSink` reads the latest snapshot when it serialises a chunk. When no snapshot exists we fall back to the last global step. +5. Store stdout and stderr bytes as `EventLogKind::Write` and `WriteOther`. Store stdin bytes as `EventLogKind::Read`. Metadata includes the stream name, monotonic timestamps, thread tag, and the captured snapshot when present. Bytes stay base64 encoded by the runtime tracing crate. +6. Keep console passthrough. The reader threads mirror each chunk back into the saved file descriptors so users still see live output. +7. Wire capture teardown into existing error handling. `IoCapture::stop` drains the pipes, restores FDs, signals the threads, and logs failures through the `recorder-errors` macros. `RuntimeTracer::finish` waits for the IO channel before calling `TraceWriter::finish_*` to avoid races. +8. Hide the feature behind `RecorderPolicy`. A new flag `policy.io_capture` defaults to off today. Tests and early adopters enable it. Once stable we flip the default. + +## Consequences +- **Upsides:** We capture IO without a subprocess, reuse the refactored writer lifecycle, and keep activation gating intact. Replay tooling reads one stream for events and IO. +- **Costs:** Writer calls now cross a mutex, so we must measure contention. The new module adds platform code that needs tight tests. We must watch out for deadlocks on interpreter shutdown. + +## Rollout +- Ship behind an environment toggle `CODETRACER_CAPTURE_IO=1` wired into the policy layer. Emit a warning when the policy disables capture. +- Document the behaviour in the recorder README and the user CLI help once we land the feature. +- Graduate the ADR to **Accepted** after the implementation plan closes and the policy ship flips the default on both Unix and Windows. + +## Alternatives +- A subprocess wrapper was considered again and rejected. It would undo the refactor that keeps tracing in-process and would break existing embedding use cases. +- `sys.stdout` monkey patching remains off the table. It misses native writes and user-assigned streams. +- Writing IO into a separate JSON file is still unnecessary. The runtime tracing schema already handles IO events. diff --git a/design-docs/capture-output-implementation-plan.md b/design-docs/capture-output-implementation-plan.md new file mode 100644 index 0000000..e093e0a --- /dev/null +++ b/design-docs/capture-output-implementation-plan.md @@ -0,0 +1,68 @@ +# Capture Output Implementation Plan + +## Goal +- Ship lossless stdout, stderr, and stdin capture in the Rust recorder without breaking the current CLI flow or error policy. + +## Guiding Notes +- Follow ADR 0005. +- Keep sentences short for readers; prefer bullets. +- Run `just test` on every stage. + +## Stage 0 – Refactor for IO capture (must land first) +- Split writer ownership out of `RuntimeTracer` into a helper (`TraceWriterHost`) that exposes a thread-safe event API. +- Add a small `ThreadSnapshotStore` that records the latest `{path_id, line, frame_id}` per Python thread inside the runtime module. +- Ensure `RuntimeTracer::finish` already waits on background work hooks; add a stub `IoDrain` trait with no-op implementation so later stages can slot in real drains. +- Update `session::start_tracing` and `stop_tracing` to accept optional "extra lifecycle" handles so we can pair start/stop work without more globals. +- Tests: extend existing runtime unit tests to cover the new snapshot store and confirm start/stop paths still finalise trace files. + +## Stage 1 – Build the IO capture core +- Create `runtime::io_capture` with platform-specific back ends (`unix.rs`, `windows.rs`) hidden behind a common trait. +- Implement descriptor/handle duplication, pipe install, and reader thread startup. Use blocking reads and thread-safe queues (`crossbeam-channel` already in workspace; add if missing). +- Ensure mirror writes go back to the saved descriptors so console output stays live. +- Tests: add Rust unit tests that fake pipes (use `os_pipe` on Unix, `tempfile` handles on Windows via CI) to confirm duplication and restoration. + +## Stage 2 – Connect capture to the tracer +- Add an `IoEventSink` that wraps the `TraceWriterHost` and `ThreadSnapshotStore`, captures a start `Instant`, and exposes a `pump` loop to drain `IoChunk` messages from the channel. +- When serialising a chunk, emit `TraceLowLevelEvent::Event` records with: + - `EventLogKind::{Write, WriteOther, Read}` selected from the stream; + - Base64-encoded payload bytes stored in `content`; + - JSON metadata containing the stream label, elapsed time in nanoseconds relative to the sink start, the producing thread identifier string, the raw byte length, and an optional snapshot `{path_id, line, frame_id}` (fall back to the latest snapshot if the specific thread is missing). +- Extend the platform `IoCapture` to hand ownership to an `ActiveCapture` helper that spawns the sink worker thread, records panics or shutdown issues with `bug!`, reports IO/thread spawn failures via `enverr!`, and logs warnings if the receiver disappears before shutdown. +- Update `RuntimeTracer::begin` (via `configure_io_capture`) to instantiate `ActiveCapture` when capture is enabled, storing it as the runtime’s `IoDrain` so `finish` always drains and joins both the platform workers and the sink. +- Tests: add a Python integration test under `tests/python` that drives stdout, stderr, and stdin through the recorder, asserts the console passthrough matches, and inspects the emitted JSON trace to validate event kinds, metadata fields, and base64 payload reconstruction. Keep the Rust unit tests covering channel plumbing and metadata edge cases. + +## Stage 3 – Policy flag, CLI wiring, and guards +- Extend `RecorderPolicy` with `io_capture_enabled` plus env var `CODETRACER_CAPTURE_IO`. +- Make the Python CLI surface a `--capture-io` flag (defaults to policy). Document the flag in help text. +- Emit a single log line when capture is disabled by policy so users understand why their trace lacks IO events. +- Tests: Python integration test toggling the policy and checking presence/absence of IO records. + +## Stage 4 – Hardening and docs +- Stress test with large outputs (beyond pipe buffer) and interleaved writes from multiple threads. +- Run Windows CI to verify handle restore logic and CRLF behaviour. +- Document the feature in README + design docs. Update ADR status once accepted. +- Add metrics for dropped IO chunks using the existing logging counters. +- Tests: extend stress tests plus regression tests for start/stop loops to ensure descriptors always restore. + +## Milestones +1. Stage 0 merged and green CI. Serves as base branch for feature work. +2. Stages 1–2 merged together behind a feature flag. Feature hidden by default. +3. Stage 3 flips the flag for opted-in users. Gather feedback. +4. Stage 4 finishes docs, flips default to on, and promotes ADR 0005 to Accepted. + +## Verification Checklist +- `just test` passes after every stage. +- New unit tests cover writer host, snapshot store, and IO capture workers. +- Integration tests assert trace events and passthrough behaviour on Linux and Windows. +- Manual smoke: run `python -m codetracer_python_recorder examples/stdout_script.py` and confirm console output plus IO trace entries. + +## Risks & Mitigations +- **Deadlocks:** Keep reader threads simple, use bounded channels, and add shutdown timeouts tested in CI. +- **Performance hit:** Benchmark before and after Stage 2 with large stdout workloads; document results. +- **Platform drift:** Share the Unix/Windows API contract in a `README` inside the module and guard behaviour with tests. + +## Exit Criteria +- IO events present in trace files when the policy flag is on. +- Console output unchanged for users. +- No file descriptor leaks (checked via stress tests and `lsof` in CI scripts). +- Documentation published and linked from ADR 0005. diff --git a/design-docs/capture-output-implementation-plan.status.md b/design-docs/capture-output-implementation-plan.status.md new file mode 100644 index 0000000..398a515 --- /dev/null +++ b/design-docs/capture-output-implementation-plan.status.md @@ -0,0 +1,21 @@ +# Capture Output Implementation Plan — Status + +## Stage 0 – Refactor for IO capture +- **Status:** Completed (2025-10-03) +- **Highlights:** Introduced `TraceWriterHost`, added `ThreadSnapshotStore`, stubbed `IoDrain` hook, and taught `session::start_tracing` / `stop_tracing` to manage optional lifecycle handles. Added regression test `updates_thread_snapshot_store_for_active_thread` and ran `just test`. + +## Stage 1 – Build the IO capture core +- **Status:** Completed (2025-10-03) +- **Highlights:** Introduced the `runtime::io_capture` module with Unix and Windows backends, wiring descriptor duplication, pipe installation, and crossbeam-channel queues into the new `IoCapture` worker harness. Added shutdown-safe mirroring of stdout/stderr plus stdin pumping, and covered the code with focused unit tests (`captures_stdout_and_preserves_passthrough`, `captures_stdin_and_forwards_bytes`, and a Windows tempfile-based regression). `just test` passes. + +## Stage 2 – Connect capture to the tracer +- **Status:** Completed (2025-10-03) +- **Highlights:** Landed the `IoEventSink` + `ActiveCapture` bridge so IO chunks become runtime tracing events with base64 content and JSON metadata (including snapshot fallback), wired the sink into `RuntimeTracer` lifecycle, logged failures through `recorder-errors`, and added the end-to-end Python trace assertion. `just test` passes. + +## Stage 3 – Policy flag, CLI wiring, and guards +- **Status:** Not started +- **Notes:** Pending output from Stages 1–2. Need decisions on CLI UX before implementation. + +## Stage 4 – Hardening and docs +- **Status:** Not started +- **Notes:** Will schedule once capture experiments validate perf and stability. diff --git a/design-docs/capture-output.md b/design-docs/capture-output.md deleted file mode 100644 index efa4f03..0000000 --- a/design-docs/capture-output.md +++ /dev/null @@ -1,204 +0,0 @@ -# ADR: Non-invasive stdout/stderr/stdin capture and line-level mapping for the Python tracer (PyO3 + runtime_tracing) - -**Status**: Accepted -**Date**: 2025-10-01 -**Owners**: Tracing/Recorder team -**Scope**: Recorder runtime (Rust/PyO3) and Python instrumentation glue - ---- - -## Context - -- The user-facing CLI remains `python -m codetracer_python_recorder`. -- On startup the CLI parses recorder flags, adjusts `sys.argv`, and calls `codetracer_python_recorder.start(...)`, which delegates to the Rust extension. -- The Rust side already installs a `RuntimeTracer` implementation that subscribes to `sys.monitoring` callbacks and writes runtime_tracing artifacts (`trace.bin`/`trace.json`, `trace_metadata.json`, `trace_paths.json`). -- We still lack non-invasive capture of stdout/stderr/stdin that aligns each chunk with the trace stream for replay. -- The original draft assumed the script would be executed from Rust; in practice `runpy.run_path` stays inside the Python entrypoint, so the design must integrate with that lifecycle. -- The runtime_tracing crate (`TraceWriter`, `TraceLowLevelEvent::Event`, `EventLogKind::*`) already provides primitives for persisting arbitrary IO events; we will rely on it rather than introducing a bespoke artifact. - ---- - -## Decision - -1. **Lifecycle & CLI compatibility** - Retain the existing Python launcher. `python -m codetracer_python_recorder script.py` continues to prepare `sys.argv`, call `start(...)`, execute `runpy.run_path`, and finally `stop()`. Capture plumbing lives entirely inside `start()`/`stop()`, keeping the public API stable. - -2. **FD-level capture component** - Within `start_tracing` we instantiate an `OutputCapture` controller that duplicates the original stdin/stdout/stderr descriptors (or Windows handles), installs pipes in their place, and spawns draining threads. The Python script still runs in-process, but every write to fd 1/2 is diverted through the controller. Each chunk receives a monotonic timestamp before being queued, and the bytes are simultaneously mirrored back to the preserved descriptors so users continue to see live console output. - -3. **stdin strategy** - The controller exposes a write-end that the CLI feeds. By default we mirror the user's real stdin into the pipe (teeing so we can persist what was provided) and close it on EOF. Later scripted input can use the same interface. - -4. **runtime_tracing integration** - `RuntimeTracer` keeps sole ownership of the `NonStreamingTraceWriter`. We wrap it in `Arc>` so the FD reader threads can call `TraceWriter::add_event(TraceLowLevelEvent::Event(...))`. Each chunk becomes an `EventLogKind::Write` (stdout), `WriteOther` (stderr), or `Read` (stdin) record whose `metadata` is a JSON document (stream, thread ids when known, activation state, step reference, timestamps) and whose `content` carries the base64-encoded bytes. IO data therefore lives in the same `trace.bin`/`trace.json` file as step and call events. - -5. **Line attribution** - `RuntimeTracer` already handles `LINE` and `PY_*` monitoring callbacks. We extend it to track the most recent `Step` per Python thread and expose a `Snapshot` API returning `(path_id, line, call_key)`. When an IO chunk arrives we fetch the latest snapshot for the emitting Python thread (or fall back to the global latest step if the writer thread is unknown) and include it in the metadata so replay tooling can align output with execution. - -6. **Recorder logging isolation** - Logger initialisation continues to route Rust `log` output to the preserved original stderr handle before redirection. Capture threads never use `println!`; they rely on the logger to avoid contaminating user streams. - -7. **Buffering & flushing** - After installing the pipes we request line buffering on `sys.stdout`/`sys.stderr` when possible. On `stop()` we drain pending chunks, restore the original descriptors, and finish the runtime_tracing writer (`finish_writing_trace_*`). - ---- - -## Alternatives Considered - -- Monkey-patching `sys.stdout`/`sys.stderr`: misses native writes and conflicts with user overrides. -- Spawning a subprocess wrapper around the script: breaks the in-process monitoring story and changes CLI semantics. -- Emitting IO to a bespoke JSON artifact: unnecessary because runtime_tracing already models IO events. -- Deferring IO capture to a UI component: prevents parity with existing CodeTracer replay capabilities. - ---- - -## Consequences - -- **Pros** - - IO, monitoring events, and metadata share the runtime_tracing stream, so downstream tooling continues to work. - - Users keep invoking the CLI exactly the same way. - - FD duplication captures writes coming from both Python and native extensions; recorder logging stays isolated. - -- **Cons / Risks** - - `NonStreamingTraceWriter` must be guarded for cross-thread use; we need to validate performance and correctness when `add_event` is called from background threads. - - Mapping IO chunks to the exact Python thread is best-effort because file descriptors do not expose thread identity. - - Reader threads must stay ahead of producers to avoid filling pipe buffers; shutdown paths must handle long-running readers. - ---- - -## Detailed Design - -### A. Execution lifecycle - -1. **CLI (Python)** - - Parse recorder flags, resolve `script_path`, choose format/path, call `start(trace_dir, format, start_on_enter=script_path)`. - -2. **`start_tracing` (Rust)** - - Initialise logging and guard against nested sessions. - - Ensure the trace directory exists and choose event/meta/path filenames. - - Instantiate `RuntimeTracer` with the requested format and activation path. - - Wrap the tracer in `Arc>` exposing `emit_io_chunk(...)`. - - Construct `OutputCapture`: - - Duplicate original fds/handles and store them for restoration and logger use. - - Create pipes (Unix: `pipe2`; Windows: `CreatePipe`). - - Redirect 0/1/2 via `dup2`/`SetStdHandle`. - - Spawn reader threads that block on the pipe, timestamp (`Instant::now()`), gather OS thread id when available, and push `IoChunk { stream, bytes, ts, os_thread }` into a channel while forwarding the same bytes to the saved descriptors to maintain passthrough console behaviour. - -3. **Tracing activation** - - Install `sys.monitoring` callbacks with `install_tracer(py, Box::new(runtime_tracer_clone))`. - - Start a draining thread that consumes `IoChunk`, resolves Python thread ids via a shared `DashMap` maintained by `RuntimeTracer` on `PY_START/PY_RESUME`, and calls `emit_io_chunk`. - -4. **Python script execution** - - Back in Python, `runpy.run_path(str(script_path), run_name="__main__")` runs the target script. IO already flows through the capture pipelines. - -5. **Shutdown** -- On normal completion or exception: - - Close writer handles to signal EOF. - - Join reader/draining threads with a timeout guard. - - Restore original descriptors/handles. - - Call `flush()`/`finish()` on `RuntimeTracer` and release the monitoring tool id. - -### B. Encoding IO chunks with runtime_tracing - -- `emit_io_chunk` composes a metadata JSON document similar to: - -```json -{ - "stream": "stdout", - "encoding": "base64", - "ts_ns": 1234567890123, - "os_thread": 140355679779840, - "py_thread": 123, - "step": { "path_id": 5, "line": 42, "call_key": 12 }, - "activation": "active" -} -``` - -- The payload is `base64::encode(chunk.bytes)`. -- We invoke `TraceWriter::add_event(TraceLowLevelEvent::Event(RecordEvent { kind, metadata, content }))`, where `kind` is `EventLogKind::Write` for stdout, `WriteOther` for stderr, and `Read` for stdin. -- This keeps IO data inside `trace.bin`/`trace.json`, allowing replay tools to process it alongside steps and returns. - -### C. Mapping algorithm - -- Maintain `DashMap` updated on every LINE event. A snapshot stores `(path_id, line, call_key, perf_counter_ns)`. -- When an IO chunk arrives: - - Map OS thread id to Python thread id if possible; otherwise leave it null. - - Retrieve the latest snapshot for that Python thread. If the chunk timestamp predates the snapshot by more than a configurable threshold, fall back to the global latest snapshot. - - Embed the snapshot in the metadata JSON. -- For multi-line outputs the drainers keep chunk boundaries (newline-aware if the writer flushes per line) so replay can group contiguous chunks with identical snapshots. - ---- - -## Implementation Notes (Unix/Windows) - -- **Unix**: rely on the `nix` crate for `pipe2`, `dup`, `dup2`, and `fcntl` (set CLOEXEC). Reader threads use blocking `read` and propagate EOF by pushing a terminal message. -- **Windows**: use `CreatePipe`, `SetStdHandle`, `DuplicateHandle`, and `ReadFile` in overlapped mode if needed. Convert UTF-16 console output to UTF-8 before encoding. Ensure we re-register the original handles via `SetStdHandle` on teardown. Forwarding threads write mirrored bytes via the saved handles (`WriteFile`) so console output remains visible. -- PTY support (for interactive shells) can be layered later with `openpty`/ConPTY; initial scope sticks to pipes. - ---- - -## Telemetry & Logging - -- Keep `env_logger` initialisation on module import. Default to debug for the recorder crate so developers can inspect capture lifecycle logs. -- Emit debug logs (`install`, `chunk`, `drain_exit`, `restore`) to the preserved stderr handle. Emit failures as `EventLogKind::Error` events with diagnostic metadata when feasible. - ---- - -## Key Files - -- `codetracer-python-recorder/src/lib.rs`: orchestrates tracer startup/shutdown, trace directory provisioning, and will host creation of the `OutputCapture` component. -- `codetracer-python-recorder/src/runtime_tracer.rs`: owns the `NonStreamingTraceWriter`, handles `sys.monitoring` callbacks, and will expose APIs for emitting IO events plus step snapshots. -- `codetracer-python-recorder/src/tracer.rs`: manages registration with `sys.monitoring`; may require adjustments to share thread metadata with the capture pipeline. -- `codetracer-python-recorder/codetracer_python_recorder/__main__.py`: CLI entrypoint that invokes `start()`/`stop()`; may need updates for new flags or environment toggles. -- `codetracer-python-recorder/codetracer_python_recorder/api.py`: Python façade over the Rust extension, coordinating session lifecycle and flushing semantics. -- `codetracer-python-recorder/src/output_capture.rs` (new): encapsulates platform-specific descriptor duplication, pipe management, mirroring, and reader threads. -- `codetracer-python-recorder/tests/` (integration tests): will gain coverage asserting IO events appear in traces and that console passthrough remains functional. - ---- - -## Testing Plan - -1. **Unit / small integration** - - Scripts emitting stdout/stderr; assert generated traces contain `EventLogKind::Write`/`WriteOther` records with correct base64 payloads. - - Validate metadata JSON includes expected `path_id` and `line` for simple cases. - -2. **Concurrency** - - Multi-threaded printing to ensure no deadlocks and that chunks remain ordered. - - `asyncio` tasks writing concurrently; confirm snapshots continue to resolve. - -3. **Input** - - `input()` and `sys.stdin.read()` scenarios; ensure `EventLogKind::Read` captures bytes and EOF. - - Passthrough from the real stdin preserves exact bytes. - -4. **Large output & stress** - - Emit payloads larger than the pipe buffer to validate continuous draining. - - Rapid start/stop cycles ensure descriptors restore cleanly. - -5. **Windows** - - Mirror the above coverage on Windows CI runners, focusing on handle restoration and CRLF handling. - ---- - -## Rollout - -- Gate the feature behind an environment flag (`CODETRACER_CAPTURE_IO=1`) for early adopters, removing it after validation. -- Update CLI help text to mention stdout/stderr/stdin capture. -- Add regression tests driven by `just test` that assert on IO events in traces generated from the examples. - ---- - -## Open Questions / Future Work - -- Improve thread attribution by integrating with Python `threading` hooks if OS-level mapping proves insufficient. -- Allow configurable passthrough (e.g., disable mirroring when running headless or redirect to a file) once the default teeing behaviour is in place. -- Investigate PTY support for interactive applications that expect terminal semantics. -- Consider compressing large payloads before base64 encoding to reduce trace sizes. - ---- - -## Acceptance Criteria - -- Running `python -m codetracer_python_recorder script.py` produces runtime_tracing files containing IO events alongside existing step/call records. -- stdout, stderr, and stdin bytes are captured losslessly and attributed to the most relevant step snapshot. -- Original descriptors are restored even on exceptions or early exits. -- Reader threads terminate cleanly without leaks on Unix and Windows.