Skip to content

Commit c9808de

Browse files
committed
Revert "[autorevert] use 'tests.all_test_runs' instead of 'default.test_run_s3' for test signals (#7439)"
This reverts commit bc5f540.
1 parent 548cf22 commit c9808de

File tree

5 files changed

+24
-39
lines changed

5 files changed

+24
-39
lines changed

aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,33 +43,33 @@ Notes
4343
- This preserves all runs (original + restarts) and per‑run attempts (`run_attempt`).
4444
- Job retries typically show up as separate job rows; names may include `Attempt #2` and have later `started_at`.
4545

46-
## Phase B — Test Details Fetch (batched, from `tests.all_test_runs`)
46+
## Phase B — Test Details Fetch (batched, from `default.test_run_s3`)
4747

48-
Decide in Python which jobs belong to the test‑track (e.g., `rule IN ('pytest failure','Python unittest failure')`). For those (job_id, run_id[, run_attempt]) triples, fetch per‑test rows directly from `tests.all_test_runs` — this table contains one row per testcase and is populated earlier while jobs may still be running.
48+
Decide in Python which jobs belong to the test‑track (e.g., `rule IN ('pytest failure','Python unittest failure')`. For those (job_id, run_id[, run_attempt]) triples, fetch per‑test rows directly from `default.test_run_s3` — this table contains one row per testcase, including successful ones (failure_count=0, error_count=0).
4949

50-
Why `tests.all_test_runs`?
51-
- We need per‑test identities to build per‑test Signals; `tests.all_test_runs` has them and is populated earlier than the final summary tables. Summary is optional and redundant for this layer.
50+
Why `test_run_s3` only?
51+
- We need per‑test identities to build per‑test Signals; `default.test_run_s3` has them. Summary is optional and redundant for this layer.
5252
- Performance remains good by filtering on `job_id IN (...)` (first PK column) and grouping; limit to the time window implicitly via the selected job set from Phase A.
5353

5454
Job selection for test track:
5555
- Step 1: find normalized job base names that exhibited a test‑related classification in any commit within the window.
5656
- Step 2: include ALL jobs across ALL commits whose normalized base is in that set (original runs, restarts; any run_id/attempt) so we can observe successes or pendings for the same test on other commits.
5757

58-
Optimized batched all_test_runs query (for N job_ids):
58+
Optimized batched test_run_s3 query (for N job_ids):
5959

6060
```
6161
SELECT job_id, workflow_id, workflow_run_attempt, file, classname, name,
6262
max(failure_count > 0) AS failing,
6363
max(error_count > 0) AS errored,
6464
max(rerun_count > 0) AS rerun_seen,
6565
count() AS rows
66-
FROM tests.all_test_runs
66+
FROM default.test_run_s3
6767
WHERE job_id IN {job_ids:Array(Int64)}
6868
GROUP BY job_id, workflow_id, workflow_run_attempt, file, classname, name
6969
```
7070

7171
Notes
72-
- Use `job_id IN (...)` to leverage the table’s primary key prefix on `job_id`.
72+
- Use `job_id IN (...)` to leverage the PK prefix `(job_id, name, classname, invoking_file, file)`.
7373
- We keep `workflow_run_attempt` to distinguish attempts within the same workflow run.
7474

7575
## Mapping to Signals
@@ -83,7 +83,7 @@ Notes
8383
Ordering: dicts in Python 3.7+ preserve insertion order. Phase A inserts commit keys in push‑timestamp DESC order, so iterating the mapping yields newest→older commits without extra sorting.
8484

8585
### Test‑track semantics
86-
- Source of truth for SUCCESS/FAILURE is `tests.all_test_runs` per test id.
86+
- Source of truth for SUCCESS/FAILURE is `default.test_run_s3` per test id.
8787
- When a test row exists for an attempt:
8888
- Emit at most one FAILURE if any failed runs exist; at most one SUCCESS if any successful runs exist.
8989
- When no test rows exist for an attempt and any grouped job for that attempt is pending → emit PENDING.
@@ -106,9 +106,9 @@ Event naming (for debuggability):
106106

107107
### Test‑track mapping
108108
- Build a per‑commit map `test_id -> list[SignalEvent]` by combining all relevant jobs and shards:
109-
- For each (wf_run_id, run_attempt, job_base_name) group in the commit, consult `tests.all_test_runs` rows (if any) for each candidate `test_id`:
110-
- If rows exist for this `test_id` → status should reflect the found test verdict.
111-
- If no rows exist and the group is still running (some jobs pending) → status = PENDING.
109+
- For each (wf_run_id, run_attempt, job_base_name) group in the commit, consult `test_run_s3` rows (if any) for each candidate `test_id`:
110+
- If `test_run_s3` rows exist for this `test_id` → status should reflect the found test verdict.
111+
- If no `test_run_s3` rows exist and the group is still running (some jobs pending) → status = PENDING.
112112
- Else (no rows and group completed) → missing/unknown (no event emitted).
113113
- Event boundaries (naturally arise from grouping):
114114
- Separate events for distinct workflow runs (different `wf_run_id`) on the same commit (regardless of how they were triggered).
@@ -174,7 +174,7 @@ Notes
174174
3) Implement selectors for test‑track pairs (Python filter on `rule`).
175175
4) Implement batched Phase B queries:
176176
- Use `(workflow_id, job_id) IN array(tuple(...))` to leverage PK prefixes.
177-
- call `tests.all_test_runs` to enumerate failing tests
177+
- call `test_run_s3` to enumerate failing tests
178178
5) Implement mapping to Signals for both tracks, emitting multiple events per commit as specified.
179179
6) Add unit tests:
180180
- Test‑track: a) failure on one commit; b) success on another; c) unknown/gap.
@@ -185,7 +185,7 @@ Notes
185185

186186
- Keep the window small (16–32h) and deduplicate commits via push timestamps.
187187
- Limit the batched pairs size; chunk when necessary.
188-
- Align filters with primary keys: `job_id` for `tests.all_test_runs`.
188+
- Align filters with primary keys: `job_id` for `test_run_s3`.
189189
- Avoid scanning all of `workflow_job` by joining to recent pushes and filtering repo/branches.
190190

191191
## Open Questions

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,7 @@ def extract(self) -> List[Signal]:
8282
# Select jobs to participate in test-track details fetch
8383
test_track_job_ids, failed_job_ids = self._select_test_track_job_ids(jobs)
8484
test_rows = self._datasource.fetch_tests_for_job_ids(
85-
test_track_job_ids,
86-
failed_job_ids=failed_job_ids,
87-
lookback_hours=self.lookback_hours,
85+
test_track_job_ids, failed_job_ids=failed_job_ids
8886
)
8987

9088
test_signals = self._build_test_signals(jobs, test_rows, commits)
@@ -220,7 +218,7 @@ def _inject_pending_workflow_events(
220218
return out
221219

222220
# -----------------------------
223-
# Phase B — Tests (tests.all_test_runs only)
221+
# Phase B — Tests (test_run_s3 only)
224222
# -----------------------------
225223
def _select_test_track_job_ids(
226224
self, jobs: List[JobRow]
@@ -261,11 +259,11 @@ def _build_test_signals(
261259
) -> List[Signal]:
262260
"""Build per-test Signals across commits, scoped to job base.
263261
264-
We index `tests.all_test_runs` rows per (wf_run_id, run_attempt, job_base) and collect
262+
We index `default.test_run_s3` rows per (wf_run_id, run_attempt, job_base) and collect
265263
which base(s) (by normalized job name) a test appears in. For each commit and (workflow, base),
266264
we compute attempt metadata (pending/completed, start time). Then, for tests that failed at least once in
267265
that base, we emit events per commit/attempt:
268-
- If tests.all_test_runs rows exist → emit at most one FAILURE event if any failed runs exist,
266+
- If test_run_s3 rows exist → emit at most one FAILURE event if any failed runs exist,
269267
and at most one SUCCESS event if any successful runs exist (both may be present).
270268
- Else if group pending → PENDING
271269
- Else → no event (missing)
@@ -297,7 +295,7 @@ def _build_test_signals(
297295
value_fn=lambda j: (j.wf_run_id, j.run_attempt),
298296
)
299297

300-
# Index tests.all_test_runs rows per (commit, job_base, wf_run, attempt, test_id)
298+
# Index test_run_s3 rows per (commit, job_base, wf_run, attempt, test_id)
301299
# Store aggregated failure/success counts
302300
tests_by_group_attempt: Dict[
303301
Tuple[Sha, WorkflowName, JobBaseName, WfRunId, RunAttempt, TestId],

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_datasource.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -189,15 +189,12 @@ def fetch_tests_for_job_ids(
189189
job_ids: List[JobId],
190190
*,
191191
failed_job_ids: List[JobId],
192-
lookback_hours: int,
193192
) -> List[TestRow]:
194-
"""Batch fetch test verdict rows from tests.all_test_runs for given job ids.
193+
"""Batch fetch test verdict rows from default.test_run_s3 for given job ids.
195194
196195
If failed_job_ids is provided, first compute the set of failed test identifiers
197196
(file+classname+name) from those jobs, and only fetch tests for job_ids that
198197
match that set. This reduces the result size significantly.
199-
Additionally, constrain by the table's partition (toDate(time_inserted))
200-
using NOW() and the lookback window with a 1-day margin to avoid timezone issues.
201198
"""
202199
log = logging.getLogger(__name__)
203200
if not job_ids:
@@ -227,31 +224,25 @@ def fetch_tests_for_job_ids(
227224
)
228225
# One query with a CTE that enumerates failed test ids from failed_job_ids,
229226
# then filters the main selection by those ids for the current chunk.
230-
# Partition pruning: restrict toDate(time_inserted) to the lookback window
231-
# with a 1 day margin using NOW() to avoid timezone handling.
227+
# Note: success_runs explicitly excludes skipped rows via skipped_count = 0.
232228
query = """
233229
WITH failed_test_names AS (
234230
SELECT DISTINCT concat(file, '|', classname, '|', name) AS test_id
235-
FROM tests.all_test_runs
231+
FROM default.test_run_s3
236232
WHERE job_id IN {failed_job_ids:Array(Int64)}
237233
AND (failure_count > 0 OR error_count > 0)
238-
AND toDate(time_inserted) >=
239-
toDate(NOW() - toIntervalHour({lookback_hours:Int32}) - toIntervalDay(1))
240234
)
241235
SELECT job_id, workflow_id, workflow_run_attempt, file, classname, name,
242236
countIf(failure_count > 0 OR error_count > 0) AS failure_runs,
243237
countIf(failure_count = 0 AND error_count = 0 AND skipped_count = 0) AS success_runs
244-
FROM tests.all_test_runs
238+
FROM default.test_run_s3
245239
WHERE job_id IN {job_ids:Array(Int64)}
246240
AND concat(file, '|', classname, '|', name) IN failed_test_names
247-
AND toDate(time_inserted) >=
248-
toDate(NOW() - toIntervalHour({lookback_hours:Int32}) - toIntervalDay(1))
249241
GROUP BY job_id, workflow_id, workflow_run_attempt, file, classname, name
250242
"""
251243
params = {
252244
"job_ids": [int(j) for j in chunk],
253245
"failed_job_ids": [int(j) for j in failed_job_ids],
254-
"lookback_hours": int(lookback_hours),
255246
}
256247

257248
for attempt in RetryWithBackoff():

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ def is_test_failure(self) -> bool:
121121
return self.is_failure and (self.rule in DEFAULT_TEST_RULES)
122122

123123

124-
# Represents a test verdict row from the tests.all_test_runs table in ClickHouse
124+
# Represents a test verdict row from the test_run_s3 table in ClickHouse
125125
@dataclass(frozen=True)
126126
class TestRow:
127127
job_id: JobId

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,7 @@ def fetch_jobs_for_workflows(
5252
return list(self._jobs)
5353

5454
def fetch_tests_for_job_ids(
55-
self,
56-
job_ids: List[JobId],
57-
*,
58-
failed_job_ids: List[JobId],
59-
lookback_hours: int = 24,
55+
self, job_ids: List[JobId], *, failed_job_ids: List[JobId]
6056
) -> List[TestRow]:
6157
ids = {int(j) for j in job_ids}
6258
return [r for r in self._tests if int(r.job_id) in ids]

0 commit comments

Comments
 (0)