Skip to content

Commit 5bab078

Browse files
committed
Stage 5
README.md: codetracer-python-recorder/src/runtime/io_capture/fd_mirror/unix.rs: design-docs/io-capture-line-proxy-implementation-plan.status.md: Signed-off-by: Tzanko Matev <[email protected]>
1 parent e1e1d56 commit 5bab078

File tree

3 files changed

+108
-6
lines changed

3 files changed

+108
-6
lines changed

README.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,34 @@ Pass `--codetracer-json-errors` (or set the policy via `configure_policy(json_er
5050

5151
### IO capture configuration
5252

53-
Line-aware stdout/stderr capture proxies are now enabled by default. Control them through the policy layer:
53+
Line-aware capture (see [ADR 0008](design-docs/adr/0008-line-aware-io-capture.md)) installs `LineAwareStdout`, `LineAwareStderr`, and `LineAwareStdin` proxies so every chunk carries `{path_id, line, frame_id}` metadata. The proxies forward writes immediately to keep TTY behaviour unchanged and the batching sink emits newline/flush/step-delimited chunks. When the FD mirror fallback observes bytes that bypassed the proxies, the resulting `IoChunk` carries the `mirror` flag so downstream tooling can highlight native writers separately. Recorder logs and telemetry use `ScopedMuteIoCapture` to avoid recursive capture.
54+
55+
Control the feature through the policy layer:
5456

5557
- CLI: `python -m codetracer_python_recorder --io-capture=off script.py` disables capture, while `--io-capture=proxies+fd` also mirrors raw file-descriptor writes.
5658
- Python: `configure_policy(io_capture_line_proxies=False)` toggles proxies, and `configure_policy(io_capture_fd_fallback=True)` enables the FD fallback.
5759
- Environment: set `CODETRACER_CAPTURE_IO=off`, `proxies`, or `proxies+fd` (`,` is also accepted) to match the CLI and Python helpers.
5860

61+
Manual smoke check: `python -m codetracer_python_recorder examples/stdout_script.py` should report the proxied output while leaving the console live.
62+
63+
#### Troubleshooting replaced stdout/stderr
64+
65+
Third-party tooling occasionally replaces `sys.stdout` / `sys.stderr` after the proxies install. When that happens, IO metadata stops updating and the recorder falls back to passthrough behaviour. You can verify the binding at runtime:
66+
67+
```python
68+
import sys
69+
from codetracer_python_recorder.runtime import LineAwareStdout, LineAwareStderr
70+
71+
print(type(sys.stdout).__name__, isinstance(sys.stdout, LineAwareStdout))
72+
print(type(sys.stderr).__name__, isinstance(sys.stderr, LineAwareStderr))
73+
```
74+
75+
Both `isinstance` checks should return `True`. If they do not:
76+
77+
1. Re-run `configure_policy(io_capture_line_proxies=True)` (or restart tracing) to reinstall the proxies before the other tool mutates the streams.
78+
2. Fall back to FD mirroring by enabling `CODETRACER_CAPTURE_IO=proxies+fd` so native writes still reach the ledger-backed mirror.
79+
3. As a last resort, disable IO capture (`--io-capture=off`) and rely on console output while investigating the conflicting integration.
80+
5981
### Migration checklist for downstream tools
6082

6183
- Catch `RecorderError` (or a subclass) instead of `RuntimeError`.

codetracer-python-recorder/src/runtime/io_capture/fd_mirror/unix.rs

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
77
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
88
use std::sync::{Arc, Mutex};
99
use std::thread;
10-
use std::time::Instant;
10+
use std::time::{Duration, Instant};
1111

1212
#[derive(Debug)]
1313
pub struct FdMirrorError {
@@ -212,6 +212,9 @@ struct StreamMirror {
212212
}
213213

214214
impl StreamMirror {
215+
const SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(250);
216+
const SHUTDOWN_POLL_INTERVAL: Duration = Duration::from_millis(10);
217+
215218
fn start(
216219
stream: IoStream,
217220
ledger: Arc<Ledger>,
@@ -289,14 +292,88 @@ impl StreamMirror {
289292
warn!("failed to restore fd {} after mirroring", self.target_fd);
290293
}
291294
if let Some(join) = self.join.take() {
292-
if let Err(err) = join.join() {
293-
warn!("mirror thread join failed: {err:?}");
295+
if wait_for_join(&join, Self::SHUTDOWN_TIMEOUT, Self::SHUTDOWN_POLL_INTERVAL) {
296+
if let Err(err) = join.join() {
297+
warn!("mirror thread join failed: {err:?}");
298+
}
299+
} else {
300+
warn!(
301+
"mirror thread on fd {} did not stop within {:?}; detaching background reader",
302+
self.target_fd,
303+
Self::SHUTDOWN_TIMEOUT
304+
);
305+
drop(join);
294306
}
295307
}
296308
self.ledger.clear();
297309
}
298310
}
299311

312+
fn wait_for_join(
313+
handle: &thread::JoinHandle<()>,
314+
timeout: Duration,
315+
poll_interval: Duration,
316+
) -> bool {
317+
if handle.is_finished() {
318+
return true;
319+
}
320+
if timeout.is_zero() {
321+
return false;
322+
}
323+
let mut remaining = timeout;
324+
loop {
325+
if handle.is_finished() {
326+
return true;
327+
}
328+
let sleep_for = std::cmp::min(poll_interval, remaining);
329+
thread::sleep(sleep_for);
330+
if handle.is_finished() {
331+
return true;
332+
}
333+
if remaining <= sleep_for {
334+
return false;
335+
}
336+
remaining -= sleep_for;
337+
}
338+
}
339+
340+
#[cfg(test)]
341+
mod tests {
342+
use super::*;
343+
344+
#[test]
345+
fn wait_for_join_succeeds_for_completed_thread() {
346+
let handle = thread::spawn(|| {});
347+
assert!(wait_for_join(
348+
&handle,
349+
Duration::from_millis(100),
350+
Duration::from_millis(5)
351+
));
352+
handle.join().expect("thread join should succeed");
353+
}
354+
355+
#[test]
356+
fn wait_for_join_times_out_for_blocked_thread() {
357+
let shutdown = Arc::new(AtomicBool::new(false));
358+
let worker_shutdown = shutdown.clone();
359+
let handle = thread::spawn(move || {
360+
while !worker_shutdown.load(Ordering::Relaxed) {
361+
thread::sleep(Duration::from_millis(5));
362+
}
363+
});
364+
365+
assert!(
366+
!wait_for_join(&handle, Duration::from_millis(15), Duration::from_millis(5)),
367+
"wait_for_join should time out on a stuck thread"
368+
);
369+
370+
shutdown.store(true, Ordering::Relaxed);
371+
handle
372+
.join()
373+
.expect("thread join should succeed after shutdown signal");
374+
}
375+
}
376+
300377
impl Drop for StreamMirror {
301378
fn drop(&mut self) {
302379
self.shutdown();

design-docs/io-capture-line-proxy-implementation-plan.status.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717
-**Stage 2 – Implement IoEventSink and batching:** Added the `IoChunk` model plus a `IoEventSink` batching layer that groups stdout/stderr writes per thread, flushes on newline, explicit `flush()`, step boundaries, and 5 ms gaps, and emits stdin reads immediately. Updated the proxies to surface flush events and introduced focused unit tests that cover batching, timer splits, step flushes, and stdin capture. `just test` runs the sink tests alongside the existing proxy coverage.
1818
-**Stage 3 – Wire proxies into lifecycle:** `RuntimeTracer::install_io_capture` now instantiates the sink, installs the proxies behind the policy flag, and drains/flushed buffered chunks at step and finish boundaries. `IoChunk` records path IDs, frame IDs, and thread IDs sourced from the `LineSnapshotStore`, with a Python stack fallback filling metadata when monitoring snapshots are not yet available. Metadata emitted by `RecordEvent` now includes `path_id`, `line`, and `frame_id` for stdout/stderr chunks, and the Stage 3 integration test passes end-to-end.
1919
- 🔄 **Stage 4 – Optional FD mirror:** Implemented the shared ledger (`runtime::io_capture::fd_mirror`), plumbed optional `MirrorLedgers`/`FdMirrorController` through `IoEventSink` and `RuntimeTracer::install_io_capture`, and added runtime tests that assert `os.write` payloads are captured only when `io_capture_fd_fallback` is enabled. Next actions: tighten metadata/telemetry (expose mirror stats, warn when descriptor duplication fails) and stress-test concurrent native writers.
20-
-**Stage 5 – Hardening and docs:** Not started.
20+
- 🔄 **Stage 5 – Hardening and docs:** Kickoff 2025-10-15. Focus areas: add teardown timeouts for the FD mirror threads, expand README coverage (include ADR 0008 link plus troubleshooting steps for replaced `sys.stdout`), and capture manual/CI verification notes.
21+
- ✅ Added FD mirror shutdown timeout with a polling helper to detach stuck reader threads plus unit coverage.
22+
- ✅ Documented README guidance (ADR link, mirror flag notes, troubleshooting for replaced `sys.stdout`) and recorded the manual smoke command.
23+
- ✅ Verification: `just dev test` (Linux) passes with new mirror timeout tests; Windows CI regression run still queued.
2124

2225
## Next Steps
2326
1. Stage 4: finish FD mirror wiring — add mirror-only tests (`os.write` / mixed stdout/stderr), surface user-facing warnings on setup failure, and document the new `mirror` flag in chunk metadata.
24-
2. Stage 5 hardening: tighten logging guards (`ScopedMuteIoCapture`), expand integration coverage (policy disabled paths, multi-threaded writers), and document configuration knobs for IO capture.
27+
2. Stage 5 follow-up: monitor the queued Windows CI regression run and flip ADR 0008 to Accepted once cross-platform verification lands.
2528
3. Evaluate performance impact of the Python stack fallback and gate it behind monitoring snapshots once `sys.monitoring` integration fully drives the snapshot store.

0 commit comments

Comments
 (0)