Skip to content
Draft

trial #144

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 187 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use fxhash::{FxHashMap, FxHashSet};
use tlparse::{
analyze_graph_runtime_deltas, generate_multi_rank_html, parse_path,
read_chromium_events_with_pid, ArtifactFlags, Diagnostics, DivergenceFlags, DivergenceGroup,
ParseConfig, RankMetaData,
ExecOrderSummary, ParseConfig, RankMetaData,
};
use tlparse::{analyze_execution_order, parse_graph_execution_order, ExecOrderIssue};

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -562,6 +563,191 @@ fn handle_all_ranks(
cache_groups: cache_divergence_groups.clone(),
collective_groups: collective_divergence_groups.clone(),
tensor_meta_groups: tensor_meta_divergence_groups.clone(),
exec_order: {
// Build execution-order diagnostics if we have at least 2 ranks
// 1) Collect latest "graph_execution" artifact per rank
use fxhash::FxHashMap;
let mut exec_orders: FxHashMap<u32, Vec<String>> = FxHashMap::default();

for &rank in &rank_nums {
let rank_dir = out_path.join(format!("rank_{rank}"));
let cd_path = rank_dir.join("compile_directory.json");
let Ok(content) = std::fs::read_to_string(&cd_path) else { continue };
let Ok(serde_json::Value::Object(map)) = serde_json::from_str::<serde_json::Value>(&content) else { continue };

// Find artifact with name == "graph_execution.json" and max number
let mut best: Option<(u64, String)> = None; // (number, url)
for (_cid, entry) in map.iter() {
let Some(arts) = entry.get("artifacts").and_then(|x| x.as_array()) else { continue };
for a in arts {
let name = a.get("name").and_then(|x| x.as_str()).unwrap_or("");
if name == "graph_execution.json" {
if let (Some(num), Some(url)) = (a.get("number").and_then(|x| x.as_u64()), a.get("url").and_then(|x| x.as_str())) {
match best {
Some((bn, _)) if bn >= num => {}
_ => best = Some((num, url.to_string())),
}
}
}
}
}

if let Some((_n, url)) = best {
let path = rank_dir.join(url);
if let Ok(payload) = std::fs::read_to_string(path) {
if let Ok(order) = parse_graph_execution_order(&payload) {
exec_orders.insert(rank, order);
}
}
}
}

if exec_orders.len() < 2 {
None
} else {
// 2) Build mapping: (rank, compile_id) -> collective ops for comparison
let mut dir_to_compile_id_per_rank: FxHashMap<u32, FxHashMap<String, String>> = FxHashMap::default();
for &rank in &rank_nums {
let rank_dir = out_path.join(format!("rank_{rank}"));
let cd_path = rank_dir.join("compile_directory.json");
if let Ok(s) = std::fs::read_to_string(cd_path) {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&s) {
if let Some(obj) = v.as_object() {
let mut m = FxHashMap::default();
for (cid, entry) in obj.iter() {
if let Some(arts) = entry.get("artifacts").and_then(|x| x.as_array()) {
for a in arts {
if let Some(url) = a.get("url").and_then(|x| x.as_str()) {
if let Some((prefix, _)) = url.split_once('/') {
m.entry(prefix.to_string()).or_insert_with(|| cid.to_string());
}
}
}
}
}
dir_to_compile_id_per_rank.insert(rank, m);
}
}
}
}

let mut collective_by_graph: FxHashMap<(u32, String), Vec<String>> = FxHashMap::default();
for cs in &collective_schedules {
if let Some(m) = dir_to_compile_id_per_rank.get(&cs.rank) {
let compile_id = m.get(&cs.graph).cloned().unwrap_or(cs.graph.clone());
collective_by_graph.insert((cs.rank, compile_id), cs.ops.clone());
}
}

// 3) Build mapping: (rank, compile_id) -> cache status (hit/miss/bypass)
let mut cache_status: FxHashMap<(u32, String), String> = FxHashMap::default();
for &rank in &rank_nums {
let rank_dir = out_path.join(format!("rank_{rank}"));
let cd_path = rank_dir.join("compile_directory.json");
let Ok(s) = std::fs::read_to_string(cd_path) else { continue };
let Ok(v) = serde_json::from_str::<serde_json::Value>(&s) else { continue };
let Some(obj) = v.as_object() else { continue };
let Some(dir2cid) = dir_to_compile_id_per_rank.get(&rank) else { continue };
for (_cid, entry) in obj.iter() {
let Some(arts) = entry.get("artifacts").and_then(|x| x.as_array()) else { continue };
// group by compile dir prefix to derive status per graph
use std::collections::HashMap as StdHashMap;
let mut status_by_dir: StdHashMap<String, String> = StdHashMap::new();
for a in arts {
if let Some(url) = a.get("url").and_then(|x| x.as_str()) {
if let Some((prefix, _)) = url.split_once('/') {
let name = a.get("name").and_then(|x| x.as_str()).unwrap_or("");
let status = if name.contains("cache_miss") {
"miss"
} else if name.contains("cache_hit") {
"hit"
} else if name.contains("cache_bypass") {
"bypass"
} else {
""
};
if !status.is_empty() {
// Prefer miss over hit, hit over bypass
let e = status_by_dir.entry(prefix.to_string()).or_insert_with(String::new);
if e.is_empty() || *e == "bypass" || (*e == "hit" && status == "miss") {
*e = status.to_string();
}
}
}
}
}
for (dir, st) in status_by_dir {
if let Some(cid) = dir2cid.get(&dir) {
cache_status.insert((rank, cid.clone()), st);
}
}
}
}

// 4) Evaluate
let report = analyze_execution_order(&exec_orders, &collective_by_graph, &cache_status);

// Summaries
use std::collections::HashSet;
let order_differs = report
.by_index
.iter()
.any(|row| {
let uniq: HashSet<&str> = row
.by_rank
.values()
.map(|s| s.as_str())
.collect();
uniq.len() > 1
});

let mut sched_ranks: HashSet<u32> = HashSet::new();
let mut cache_ranks: HashSet<u32> = HashSet::new();
for row in &report.by_index {
if row.issues.contains(&ExecOrderIssue::ScheduleMismatch) {
for &rk in row.by_rank.keys() {
sched_ranks.insert(rk);
}
}
if row.issues.contains(&ExecOrderIssue::CacheMismatch) {
for &rk in row.by_rank.keys() {
cache_ranks.insert(rk);
}
}
}
let mut ranks_schedule: Vec<u32> = sched_ranks.into_iter().collect();
ranks_schedule.sort_unstable();
let mut ranks_cache: Vec<u32> = cache_ranks.into_iter().collect();
ranks_cache.sort_unstable();

let schedule_str = if ranks_schedule.is_empty() {
String::new()
} else {
ranks_schedule
.iter()
.map(|r| format!("r{}", r))
.collect::<Vec<_>>()
.join(", ")
};
let cache_str = if ranks_cache.is_empty() {
String::new()
} else {
ranks_cache
.iter()
.map(|r| format!("r{}", r))
.collect::<Vec<_>>()
.join(", ")
};

Some(ExecOrderSummary {
order_differs,
ranks_schedule,
ranks_cache,
ranks_schedule_str: schedule_str,
ranks_cache_str: cache_str,
})
}
},
};

let (landing_page_path, landing_html) = generate_multi_rank_html(
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod types;

pub use types::{
ArtifactFlags, CollectivesParityReport, Diagnostics, DivergenceFlags, DivergenceGroup,
GraphAnalysis, GraphCollectivesParity, GraphRuntime, RankMetaData, RuntimeAnalysis,
ExecOrderSummary, GraphAnalysis, GraphCollectivesParity, GraphRuntime, RankMetaData, RuntimeAnalysis,
RuntimeRankDetail,
};

Expand Down
16 changes: 16 additions & 0 deletions src/templates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,22 @@ Each rank appears as a separate process (PID) in the trace; within each process,
You can download and view this trace in <a href='https://ui.perfetto.dev/'>Perfetto</a> to visualize performance differences across ranks.
</p>
{{ endif }}
{{ if diagnostics.exec_order }}
<h3>Graph Execution-Order Diagnostics</h3>
{{ if diagnostics.exec_order.order_differs }}
<p><strong>Warning:</strong> Graph execution order differs across ranks.</p>
{{ endif }}
{{ if diagnostics.exec_order.ranks_schedule.len() > 0 }}
<p>Schedule mismatch across ranks: {diagnostics.exec_order.ranks_schedule_str}</p>
{{ else }}
<p>Schedule: consistent across ranks.</p>
{{ endif }}
{{ if diagnostics.exec_order.ranks_cache.len() > 0 }}
<p>Cache hit/miss mismatch across ranks: {diagnostics.exec_order.ranks_cache_str}</p>
{{ else }}
<p>Cache hit/miss: consistent across ranks.</p>
{{ endif }}
{{ endif }}
<p>
Individual rank reports:
</p>
Expand Down
17 changes: 17 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,21 @@ pub struct ArtifactFlags {
pub runtime_trace: bool,
}

/// Summary of graph execution-order diagnostics across ranks
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct ExecOrderSummary {
/// True if any index has differing compile_ids across ranks
pub order_differs: bool,
/// Ranks involved in any schedule mismatches (sorted numerically)
pub ranks_schedule: Vec<u32>,
/// Ranks involved in any cache hit/miss mismatches (sorted numerically)
pub ranks_cache: Vec<u32>,
/// Pretty-printed ranks for template rendering (e.g., "r0, r1, r2")
pub ranks_schedule_str: String,
/// Pretty-printed ranks for template rendering (e.g., "r0, r2")
pub ranks_cache_str: String,
}

#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct Diagnostics {
pub divergence: DivergenceFlags,
Expand All @@ -937,6 +952,8 @@ pub struct Diagnostics {
pub cache_groups: Vec<DivergenceGroup>,
pub collective_groups: Vec<DivergenceGroup>,
pub tensor_meta_groups: Vec<DivergenceGroup>,
/// Optional graph execution-order summary (present when >=2 ranks have artifacts)
pub exec_order: Option<ExecOrderSummary>,
}

#[derive(Serialize)]
Expand Down
Loading