Skip to content

Commit f0c41db

Browse files
authored
Add execution order divergence analysis library module (#140)
What This PR Does - Adds a new execution_order library module that parses graph execution sequences from PyTorch logs and detects divergences across ranks using existing collective schedule and cache analysis infrastructure. Key Components - parse_graph_execution_order(): Extracts compile_id sequences from graph_execution JSON artifacts - analyze_execution_order(): Aligns ranks by execution index and flags ScheduleMismatch/CacheMismatch issues - Structured reporting: In-memory ExecOrderReport with per-index divergence details Integration - Reuses existing read_collective_schedules() and cache analysis data Next Steps - Wire this analysis into --all-ranks-html landing page to display "Execution-Order Diagnostics" section when artifacts are present.
1 parent e7cf667 commit f0c41db

File tree

2 files changed

+204
-0
lines changed

2 files changed

+204
-0
lines changed

src/lib.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ pub use types::{
2929
RankMetaData, RuntimeAnalysis, RuntimeRankDetail,
3030
};
3131

32+
pub use execution_order::{
33+
analyze_execution_order, parse_graph_execution_order, ExecOrderIndexRow, ExecOrderIssue,
34+
ExecOrderReport,
35+
};
36+
3237
#[derive(Debug)]
3338
enum ParserResult {
3439
NoPayload,
@@ -1795,3 +1800,131 @@ fn convert_node_mappings_to_line_numbers(
17951800
"postToCppCode": hashmap_to_json_map(line_post_to_cpp_code)
17961801
})
17971802
}
1803+
1804+
pub mod execution_order {
1805+
use fxhash::FxHashMap;
1806+
1807+
/// Issue types detected at a given execution index across ranks
1808+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1809+
pub enum ExecOrderIssue {
1810+
ScheduleMismatch,
1811+
CacheMismatch,
1812+
}
1813+
1814+
/// One row in the execution-order report
1815+
#[derive(Debug, Clone)]
1816+
pub struct ExecOrderIndexRow {
1817+
/// Zero-based index into per-rank execution orders
1818+
pub index: usize,
1819+
/// Mapping: rank -> compile_id (compile directory name)
1820+
pub by_rank: FxHashMap<u32, String>,
1821+
/// Issues found for this index
1822+
pub issues: Vec<ExecOrderIssue>,
1823+
}
1824+
1825+
/// Final report for execution-order diagnostics
1826+
#[derive(Debug, Clone, Default)]
1827+
pub struct ExecOrderReport {
1828+
pub by_index: Vec<ExecOrderIndexRow>,
1829+
}
1830+
1831+
/// Analyze per-rank execution orders, aligning entries by index and flagging issues
1832+
/// using provided per-graph properties.
1833+
pub fn analyze_execution_order(
1834+
exec_orders: &FxHashMap<u32, Vec<String>>,
1835+
collective_schedule_by_graph: &FxHashMap<(u32, String), Vec<String>>,
1836+
cache_status: &FxHashMap<(u32, String), String>,
1837+
) -> ExecOrderReport {
1838+
// Determine max length across ranks (N)
1839+
let max_len = exec_orders.values().map(|v| v.len()).max().unwrap_or(0);
1840+
1841+
if max_len == 0 || exec_orders.is_empty() {
1842+
return ExecOrderReport::default();
1843+
}
1844+
1845+
// Memoize property lookups per (rank, compile_id)
1846+
let mut sched_memo: FxHashMap<(u32, String), Option<Vec<String>>> = FxHashMap::default();
1847+
let mut cache_memo: FxHashMap<(u32, String), Option<String>> = FxHashMap::default();
1848+
1849+
let mut rows: Vec<ExecOrderIndexRow> = Vec::with_capacity(max_len);
1850+
1851+
for idx in 0..max_len {
1852+
// Gather present ranks and their compile_ids at this index
1853+
let by_rank: FxHashMap<u32, String> = exec_orders
1854+
.iter()
1855+
.filter_map(|(&rank, order)| order.get(idx).cloned().map(|cid| (rank, cid)))
1856+
.collect();
1857+
1858+
if by_rank.is_empty() {
1859+
continue;
1860+
}
1861+
1862+
// Evaluate issues among present ranks
1863+
let mut issues: Vec<ExecOrderIssue> = Vec::new();
1864+
1865+
// Schedule mismatch: compare collective op sequences
1866+
{
1867+
let schedules: Vec<Vec<String>> = by_rank
1868+
.iter()
1869+
.filter_map(|(&rank, cid)| {
1870+
let key = (rank, cid.clone());
1871+
let entry = sched_memo
1872+
.entry((rank, cid.clone()))
1873+
.or_insert_with(|| collective_schedule_by_graph.get(&key).cloned());
1874+
entry.clone()
1875+
})
1876+
.collect();
1877+
if schedules.len() >= 2 && schedules[1..].iter().any(|s| s != &schedules[0]) {
1878+
issues.push(ExecOrderIssue::ScheduleMismatch);
1879+
}
1880+
}
1881+
1882+
// Cache skew: compare cache status markers
1883+
{
1884+
let statuses: Vec<String> = by_rank
1885+
.iter()
1886+
.filter_map(|(&rank, cid)| {
1887+
let key = (rank, cid.clone());
1888+
let entry = cache_memo
1889+
.entry((rank, cid.clone()))
1890+
.or_insert_with(|| cache_status.get(&key).cloned());
1891+
entry.clone().filter(|s| !s.is_empty())
1892+
})
1893+
.collect();
1894+
if statuses.len() >= 2 && statuses[1..].iter().any(|s| s != &statuses[0]) {
1895+
issues.push(ExecOrderIssue::CacheMismatch);
1896+
}
1897+
}
1898+
1899+
rows.push(ExecOrderIndexRow {
1900+
index: idx,
1901+
by_rank,
1902+
issues,
1903+
});
1904+
}
1905+
1906+
ExecOrderReport { by_index: rows }
1907+
}
1908+
1909+
pub fn parse_graph_execution_order(payload: &str) -> anyhow::Result<Vec<String>> {
1910+
let value: serde_json::Value = serde_json::from_str(payload)?;
1911+
let arr = value
1912+
.get("graph_execution_order")
1913+
.and_then(|v| v.as_array())
1914+
.ok_or_else(|| anyhow::anyhow!("missing graph_execution_order array"))?;
1915+
1916+
let mut out = Vec::with_capacity(arr.len());
1917+
for item in arr {
1918+
match item {
1919+
serde_json::Value::String(s) => out.push(s.clone()),
1920+
serde_json::Value::Object(map) => {
1921+
if let Some(s) = map.get("compile_id").and_then(|v| v.as_str()) {
1922+
out.push(s.to_string());
1923+
}
1924+
}
1925+
_ => {}
1926+
}
1927+
}
1928+
Ok(out)
1929+
}
1930+
}

tests/integration_test.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2365,3 +2365,74 @@ fn test_tensor_meta_divergence_groups() -> Result<(), Box<dyn std::error::Error>
23652365

23662366
Ok(())
23672367
}
2368+
2369+
#[test]
2370+
fn test_execution_order_multi_rank_divergence() -> anyhow::Result<()> {
2371+
// Test data: Two ranks with different execution orders
2372+
let exec_orders: fxhash::FxHashMap<u32, Vec<String>> = vec![
2373+
(
2374+
0_u32,
2375+
vec!["0/0".to_string(), "0/1".to_string(), "0/2".to_string()],
2376+
),
2377+
(
2378+
1_u32,
2379+
vec!["0/1".to_string(), "0/0".to_string(), "0/2".to_string()],
2380+
),
2381+
]
2382+
.into_iter()
2383+
.collect();
2384+
2385+
// Mock collective schedules - different at index 0
2386+
let collective_schedule_by_graph: fxhash::FxHashMap<(u32, String), Vec<String>> = vec![
2387+
((0_u32, "0/0".to_string()), vec!["all_reduce".to_string()]),
2388+
((0_u32, "0/1".to_string()), vec!["all_gather".to_string()]),
2389+
((1_u32, "0/0".to_string()), vec!["all_reduce".to_string()]),
2390+
((1_u32, "0/1".to_string()), vec!["broadcast".to_string()]), // Different!
2391+
]
2392+
.into_iter()
2393+
.collect();
2394+
2395+
// Mock cache status - different at index 1
2396+
let cache_status: fxhash::FxHashMap<(u32, String), String> = vec![
2397+
((0_u32, "0/0".to_string()), "✅".to_string()),
2398+
((0_u32, "0/1".to_string()), "❌".to_string()),
2399+
((1_u32, "0/0".to_string()), "✅".to_string()),
2400+
((1_u32, "0/1".to_string()), "✅".to_string()), // Different!
2401+
]
2402+
.into_iter()
2403+
.collect();
2404+
2405+
let report = tlparse::execution_order::analyze_execution_order(
2406+
&exec_orders,
2407+
&collective_schedule_by_graph,
2408+
&cache_status,
2409+
);
2410+
2411+
assert_eq!(report.by_index.len(), 3);
2412+
2413+
// Index 0: Rank 0 has "0/0", Rank 1 has "0/1"
2414+
// Should detect collective schedule mismatch
2415+
let row_0 = &report.by_index[0];
2416+
assert_eq!(row_0.by_rank.get(&0_u32), Some(&"0/0".to_string()));
2417+
assert_eq!(row_0.by_rank.get(&1_u32), Some(&"0/1".to_string()));
2418+
assert!(row_0
2419+
.issues
2420+
.contains(&tlparse::execution_order::ExecOrderIssue::ScheduleMismatch));
2421+
2422+
// Index 1: Rank 0 has "0/1", Rank 1 has "0/0"
2423+
// Should detect cache mismatch
2424+
let row_1 = &report.by_index[1];
2425+
assert_eq!(row_1.by_rank.get(&0_u32), Some(&"0/1".to_string()));
2426+
assert_eq!(row_1.by_rank.get(&1_u32), Some(&"0/0".to_string()));
2427+
assert!(row_1
2428+
.issues
2429+
.contains(&tlparse::execution_order::ExecOrderIssue::CacheMismatch));
2430+
2431+
// Index 2: Both ranks have "0/2" - no issues
2432+
let row_2 = &report.by_index[2];
2433+
assert_eq!(row_2.by_rank.get(&0_u32), Some(&"0/2".to_string()));
2434+
assert_eq!(row_2.by_rank.get(&1_u32), Some(&"0/2".to_string()));
2435+
assert!(row_2.issues.is_empty());
2436+
2437+
Ok(())
2438+
}

0 commit comments

Comments
 (0)