diff --git a/src/cli.rs b/src/cli.rs index a47f223..46eade8 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -8,8 +8,9 @@ use fxhash::{FxHashMap, FxHashSet}; use tlparse::{ analyze_graph_runtime_deltas, generate_multi_rank_html, parse_path, read_chromium_events_with_pid, ArtifactFlags, Diagnostics, DivergenceFlags, DivergenceGroup, - ParseConfig, RankMetaData, + ExecOrderSummary, ParseConfig, RankMetaData, }; +use tlparse::{analyze_execution_order, parse_graph_execution_order, ExecOrderIssue}; #[derive(Parser)] #[command(author, version, about, long_about = None)] @@ -562,6 +563,191 @@ fn handle_all_ranks( cache_groups: cache_divergence_groups.clone(), collective_groups: collective_divergence_groups.clone(), tensor_meta_groups: tensor_meta_divergence_groups.clone(), + exec_order: { + // Build execution-order diagnostics if we have at least 2 ranks + // 1) Collect latest "graph_execution" artifact per rank + use fxhash::FxHashMap; + let mut exec_orders: FxHashMap> = FxHashMap::default(); + + for &rank in &rank_nums { + let rank_dir = out_path.join(format!("rank_{rank}")); + let cd_path = rank_dir.join("compile_directory.json"); + let Ok(content) = std::fs::read_to_string(&cd_path) else { continue }; + let Ok(serde_json::Value::Object(map)) = serde_json::from_str::(&content) else { continue }; + + // Find artifact with name == "graph_execution.json" and max number + let mut best: Option<(u64, String)> = None; // (number, url) + for (_cid, entry) in map.iter() { + let Some(arts) = entry.get("artifacts").and_then(|x| x.as_array()) else { continue }; + for a in arts { + let name = a.get("name").and_then(|x| x.as_str()).unwrap_or(""); + if name == "graph_execution.json" { + if let (Some(num), Some(url)) = (a.get("number").and_then(|x| x.as_u64()), a.get("url").and_then(|x| x.as_str())) { + match best { + Some((bn, _)) if bn >= num => {} + _ => best = Some((num, url.to_string())), + } + } + } + } + } + + if let Some((_n, url)) = best { + let path = rank_dir.join(url); + if let Ok(payload) = std::fs::read_to_string(path) { + if let Ok(order) = parse_graph_execution_order(&payload) { + exec_orders.insert(rank, order); + } + } + } + } + + if exec_orders.len() < 2 { + None + } else { + // 2) Build mapping: (rank, compile_id) -> collective ops for comparison + let mut dir_to_compile_id_per_rank: FxHashMap> = FxHashMap::default(); + for &rank in &rank_nums { + let rank_dir = out_path.join(format!("rank_{rank}")); + let cd_path = rank_dir.join("compile_directory.json"); + if let Ok(s) = std::fs::read_to_string(cd_path) { + if let Ok(v) = serde_json::from_str::(&s) { + if let Some(obj) = v.as_object() { + let mut m = FxHashMap::default(); + for (cid, entry) in obj.iter() { + if let Some(arts) = entry.get("artifacts").and_then(|x| x.as_array()) { + for a in arts { + if let Some(url) = a.get("url").and_then(|x| x.as_str()) { + if let Some((prefix, _)) = url.split_once('/') { + m.entry(prefix.to_string()).or_insert_with(|| cid.to_string()); + } + } + } + } + } + dir_to_compile_id_per_rank.insert(rank, m); + } + } + } + } + + let mut collective_by_graph: FxHashMap<(u32, String), Vec> = FxHashMap::default(); + for cs in &collective_schedules { + if let Some(m) = dir_to_compile_id_per_rank.get(&cs.rank) { + let compile_id = m.get(&cs.graph).cloned().unwrap_or(cs.graph.clone()); + collective_by_graph.insert((cs.rank, compile_id), cs.ops.clone()); + } + } + + // 3) Build mapping: (rank, compile_id) -> cache status (hit/miss/bypass) + let mut cache_status: FxHashMap<(u32, String), String> = FxHashMap::default(); + for &rank in &rank_nums { + let rank_dir = out_path.join(format!("rank_{rank}")); + let cd_path = rank_dir.join("compile_directory.json"); + let Ok(s) = std::fs::read_to_string(cd_path) else { continue }; + let Ok(v) = serde_json::from_str::(&s) else { continue }; + let Some(obj) = v.as_object() else { continue }; + let Some(dir2cid) = dir_to_compile_id_per_rank.get(&rank) else { continue }; + for (_cid, entry) in obj.iter() { + let Some(arts) = entry.get("artifacts").and_then(|x| x.as_array()) else { continue }; + // group by compile dir prefix to derive status per graph + use std::collections::HashMap as StdHashMap; + let mut status_by_dir: StdHashMap = StdHashMap::new(); + for a in arts { + if let Some(url) = a.get("url").and_then(|x| x.as_str()) { + if let Some((prefix, _)) = url.split_once('/') { + let name = a.get("name").and_then(|x| x.as_str()).unwrap_or(""); + let status = if name.contains("cache_miss") { + "miss" + } else if name.contains("cache_hit") { + "hit" + } else if name.contains("cache_bypass") { + "bypass" + } else { + "" + }; + if !status.is_empty() { + // Prefer miss over hit, hit over bypass + let e = status_by_dir.entry(prefix.to_string()).or_insert_with(String::new); + if e.is_empty() || *e == "bypass" || (*e == "hit" && status == "miss") { + *e = status.to_string(); + } + } + } + } + } + for (dir, st) in status_by_dir { + if let Some(cid) = dir2cid.get(&dir) { + cache_status.insert((rank, cid.clone()), st); + } + } + } + } + + // 4) Evaluate + let report = analyze_execution_order(&exec_orders, &collective_by_graph, &cache_status); + + // Summaries + use std::collections::HashSet; + let order_differs = report + .by_index + .iter() + .any(|row| { + let uniq: HashSet<&str> = row + .by_rank + .values() + .map(|s| s.as_str()) + .collect(); + uniq.len() > 1 + }); + + let mut sched_ranks: HashSet = HashSet::new(); + let mut cache_ranks: HashSet = HashSet::new(); + for row in &report.by_index { + if row.issues.contains(&ExecOrderIssue::ScheduleMismatch) { + for &rk in row.by_rank.keys() { + sched_ranks.insert(rk); + } + } + if row.issues.contains(&ExecOrderIssue::CacheMismatch) { + for &rk in row.by_rank.keys() { + cache_ranks.insert(rk); + } + } + } + let mut ranks_schedule: Vec = sched_ranks.into_iter().collect(); + ranks_schedule.sort_unstable(); + let mut ranks_cache: Vec = cache_ranks.into_iter().collect(); + ranks_cache.sort_unstable(); + + let schedule_str = if ranks_schedule.is_empty() { + String::new() + } else { + ranks_schedule + .iter() + .map(|r| format!("r{}", r)) + .collect::>() + .join(", ") + }; + let cache_str = if ranks_cache.is_empty() { + String::new() + } else { + ranks_cache + .iter() + .map(|r| format!("r{}", r)) + .collect::>() + .join(", ") + }; + + Some(ExecOrderSummary { + order_differs, + ranks_schedule, + ranks_cache, + ranks_schedule_str: schedule_str, + ranks_cache_str: cache_str, + }) + } + }, }; let (landing_page_path, landing_html) = generate_multi_rank_html( diff --git a/src/lib.rs b/src/lib.rs index 0e4c4ff..137b5ee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ mod types; pub use types::{ ArtifactFlags, CollectivesParityReport, Diagnostics, DivergenceFlags, DivergenceGroup, - GraphAnalysis, GraphCollectivesParity, GraphRuntime, RankMetaData, RuntimeAnalysis, + ExecOrderSummary, GraphAnalysis, GraphCollectivesParity, GraphRuntime, RankMetaData, RuntimeAnalysis, RuntimeRankDetail, }; diff --git a/src/templates.rs b/src/templates.rs index 0250d29..4124bc7 100644 --- a/src/templates.rs +++ b/src/templates.rs @@ -595,6 +595,22 @@ Each rank appears as a separate process (PID) in the trace; within each process, You can download and view this trace in Perfetto to visualize performance differences across ranks.

{{ endif }} +{{ if diagnostics.exec_order }} +

Graph Execution-Order Diagnostics

+{{ if diagnostics.exec_order.order_differs }} +

Warning: Graph execution order differs across ranks.

+{{ endif }} +{{ if diagnostics.exec_order.ranks_schedule.len() > 0 }} +

Schedule mismatch across ranks: {diagnostics.exec_order.ranks_schedule_str}

+{{ else }} +

Schedule: consistent across ranks.

+{{ endif }} +{{ if diagnostics.exec_order.ranks_cache.len() > 0 }} +

Cache hit/miss mismatch across ranks: {diagnostics.exec_order.ranks_cache_str}

+{{ else }} +

Cache hit/miss: consistent across ranks.

+{{ endif }} +{{ endif }}

Individual rank reports:

diff --git a/src/types.rs b/src/types.rs index 991253a..ec5c1ef 100644 --- a/src/types.rs +++ b/src/types.rs @@ -929,6 +929,21 @@ pub struct ArtifactFlags { pub runtime_trace: bool, } +/// Summary of graph execution-order diagnostics across ranks +#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)] +pub struct ExecOrderSummary { + /// True if any index has differing compile_ids across ranks + pub order_differs: bool, + /// Ranks involved in any schedule mismatches (sorted numerically) + pub ranks_schedule: Vec, + /// Ranks involved in any cache hit/miss mismatches (sorted numerically) + pub ranks_cache: Vec, + /// Pretty-printed ranks for template rendering (e.g., "r0, r1, r2") + pub ranks_schedule_str: String, + /// Pretty-printed ranks for template rendering (e.g., "r0, r2") + pub ranks_cache_str: String, +} + #[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)] pub struct Diagnostics { pub divergence: DivergenceFlags, @@ -937,6 +952,8 @@ pub struct Diagnostics { pub cache_groups: Vec, pub collective_groups: Vec, pub tensor_meta_groups: Vec, + /// Optional graph execution-order summary (present when >=2 ranks have artifacts) + pub exec_order: Option, } #[derive(Serialize)]