diff --git a/.github/actions/create_workflow_report/create_workflow_report.py b/.github/actions/create_workflow_report/create_workflow_report.py index a6a9a3b3a5b7..9eb106309434 100755 --- a/.github/actions/create_workflow_report/create_workflow_report.py +++ b/.github/actions/create_workflow_report/create_workflow_report.py @@ -303,9 +303,10 @@ def get_new_fails_this_pr( # Combine both types of fails and select only desired columns desired_columns = ["job_name", "test_name", "test_status", "results_link"] - all_pr_fails = pd.concat([checks_fails, regression_fails], ignore_index=True)[ - desired_columns - ] + all_pr_fails = pd.concat( + [df for df in [checks_fails, regression_fails] if len(df) > 0], + ignore_index=True, + )[desired_columns] if len(all_pr_fails) == 0: return pd.DataFrame() @@ -353,7 +354,9 @@ def get_new_fails_this_pr( base_regression = base_regression.drop(columns=["arch", "status"]) # Combine base results - base_results = pd.concat([base_checks, base_regression], ignore_index=True) + base_results = pd.concat( + [df for df in [base_checks, base_regression] if len(df) > 0], ignore_index=True + ) # Find tests that failed in PR but passed in base pr_failed_tests = set(zip(all_pr_fails["job_name"], all_pr_fails["test_name"])) @@ -370,6 +373,105 @@ def get_new_fails_this_pr( return new_fails_df +def get_test_instability_scores(client: Client, branch_name: str): + """ + Calculate test instability metrics. + """ + query = f"""WITH test_results AS ( + SELECT check_start_time, head_ref, base_ref, test_name, test_status, + replaceRegexpOne(check_name, ', [1-9/]*\\)$', '') as job_name_base -- remove group number + FROM `gh-data`.checks + WHERE (head_ref = '{branch_name}' OR base_ref = '{branch_name}') + AND test_status IN ('OK', 'FAIL', 'BROKEN') + AND check_start_time > now() - INTERVAL 4 WEEK + ORDER BY check_start_time + ), + test_sequences AS ( + SELECT job_name_base, test_name, head_ref, base_ref, groupArray(if(test_status = 'OK', 'pass', 'fail')) AS status_array + FROM test_results + GROUP BY head_ref, base_ref, job_name_base, test_name + ), + flip_results AS ( + SELECT job_name_base, test_name, base_ref, head_ref, + length(status_array) AS total_runs, + arraySum( + arrayMap( + i -> status_array[i - 1] = 'pass' AND status_array[i] = 'fail' ? 1 : 0, + arraySlice(arrayEnumerate(status_array), 2) + ) + ) AS num_pass_to_fail + FROM test_sequences + WHERE total_runs >= 3 -- ensure decent sample size for this check-test-branch combination + ) + SELECT + job_name_base, + test_name, + if(base_ref = '', head_ref, base_ref) as version, + sum(total_runs) as runs, + sum(num_pass_to_fail) as sudden_fails, + round(2 * sudden_fails / runs, 2) as instability + FROM flip_results + GROUP BY job_name_base, test_name, version + ORDER BY instability DESC + """ + + try: + df = client.query_dataframe(query)[ + ["job_name_base", "test_name", "instability"] + ] + + except Exception as e: + print(f"Error getting test instability scores: {e}") + return pd.DataFrame() + + # Set the index to make it compatible with pandas join operations + if len(df) > 0: + df = df.set_index(["job_name_base", "test_name"]) + + return df + + +def join_instability_scores( + df: pd.DataFrame, instability_scores: pd.DataFrame +) -> pd.DataFrame: + """ + Join instability scores to a DataFrame based on job_name_base and test_name. + Uses fuzzy matching on job_name (ignoring group numbers). + + Args: + df: DataFrame to join with + instability_scores: DataFrame with instability scores (must have MultiIndex on job_name_base, test_name) + + Returns: + DataFrame with instability scores joined + """ + if len(df) == 0 or len(instability_scores) == 0: + return df + + # Create a copy to avoid modifying the original + df_copy = df.copy() + + # Add a base job name column for joining (strip group numbers) + df_copy["job_name_base"] = df_copy["job_name"].str.replace( + r", [1-9/]*\)$", "", regex=True + ) + + # Join on the base job name and test name + try: + result = df_copy.join( + instability_scores, + on=["job_name_base", "test_name"], + how="left", + ) + except Exception as e: + print(f"Error joining instability scores: {e}") + return df + + # Remove the temporary column and fill missing values + result = result.drop(columns=["job_name_base"]).fillna({"instability": 0}) + return result + + @lru_cache def get_workflow_config() -> dict: workflow_config_files = glob("./ci/tmp/workflow_config*.json") @@ -711,9 +813,11 @@ def create_workflow_report( if pr_number == 0: pr_info_html = f"Release ({branch_name})" + base_branch = branch_name else: try: pr_info = get_pr_info_from_number(pr_number) + base_branch = pr_info.get("base", {}).get("ref") pr_info_html = f""" #{pr_info.get("number")} ({pr_info.get("base", {}).get('ref')} <- {pr_info.get("head", {}).get('ref')}) {pr_info.get("title")} """ @@ -730,6 +834,21 @@ def create_workflow_report( fail_results["job_statuses"], pr_number, branch_name, commit_sha ) + checks_instability_scores = get_test_instability_scores(db_client, base_branch) + if len(checks_instability_scores) > 0: + for table in [ + "checks_fails", + "checks_errors", + "regression_fails", + "checks_known_fails", + "pr_new_fails", + ]: + if table not in fail_results: + continue + fail_results[table] = join_instability_scores( + fail_results[table], checks_instability_scores + ) + high_cve_count = 0 if not cves_not_checked and len(fail_results["docker_images_cves"]) > 0: high_cve_count = (