From 60e3468c06f14ad304da38f4c287a6fe000b9260 Mon Sep 17 00:00:00 2001 From: Micha Reiser Date: Wed, 29 Oct 2025 12:40:05 -0400 Subject: [PATCH 1/6] Track cycle function dependenciees as part of the cyclic query --- src/active_query.rs | 4 +++ src/cycle.rs | 4 +++ src/function/execute.rs | 62 ++++++++++++++++++++++++++++------------- src/zalsa_local.rs | 12 ++++++++ 4 files changed, 63 insertions(+), 19 deletions(-) diff --git a/src/active_query.rs b/src/active_query.rs index bb5987fcd..c80cded3b 100644 --- a/src/active_query.rs +++ b/src/active_query.rs @@ -91,6 +91,10 @@ impl ActiveQuery { .mark_all_active(active_tracked_ids.iter().copied()); } + pub(super) fn take_cycle_heads(&mut self) -> CycleHeads { + std::mem::take(&mut self.cycle_heads) + } + pub(super) fn add_read( &mut self, input: DatabaseKeyIndex, diff --git a/src/cycle.rs b/src/cycle.rs index fcbadf891..3f6f70aa0 100644 --- a/src/cycle.rs +++ b/src/cycle.rs @@ -490,4 +490,8 @@ impl<'db> ProvisionalStatus<'db> { _ => empty_cycle_heads(), } } + + pub(crate) const fn is_provisional(&self) -> bool { + matches!(self, ProvisionalStatus::Provisional { .. }) + } } diff --git a/src/function/execute.rs b/src/function/execute.rs index 9d6758730..d4ddb69cd 100644 --- a/src/function/execute.rs +++ b/src/function/execute.rs @@ -56,20 +56,25 @@ where }); let (new_value, mut completed_query) = match C::CYCLE_STRATEGY { - CycleRecoveryStrategy::Panic => Self::execute_query( - db, - zalsa, - zalsa_local.push_query(database_key_index, IterationCount::initial()), - opt_old_memo, - ), + CycleRecoveryStrategy::Panic => { + let (new_value, active_query) = Self::execute_query( + db, + zalsa, + zalsa_local.push_query(database_key_index, IterationCount::initial()), + opt_old_memo, + ); + (new_value, active_query.pop()) + } CycleRecoveryStrategy::FallbackImmediate => { - let (mut new_value, mut completed_query) = Self::execute_query( + let (mut new_value, active_query) = Self::execute_query( db, zalsa, zalsa_local.push_query(database_key_index, IterationCount::initial()), opt_old_memo, ); + let mut completed_query = active_query.pop(); + if let Some(cycle_heads) = completed_query.revisions.cycle_heads_mut() { // Did the new result we got depend on our own provisional value, in a cycle? if cycle_heads.contains(&database_key_index) { @@ -198,9 +203,10 @@ where let _poison_guard = PoisonProvisionalIfPanicking::new(self, zalsa, id, memo_ingredient_index); - let mut active_query = zalsa_local.push_query(database_key_index, iteration_count); let (new_value, completed_query) = loop { + let active_query = zalsa_local.push_query(database_key_index, iteration_count); + // Tracked struct ids that existed in the previous revision // but weren't recreated in the last iteration. It's important that we seed the next // query with these ids because the query might re-create them as part of the next iteration. @@ -209,29 +215,32 @@ where // if they aren't recreated when reaching the final iteration. active_query.seed_tracked_struct_ids(&last_stale_tracked_ids); - let (mut new_value, mut completed_query) = Self::execute_query( + let (mut new_value, mut active_query) = Self::execute_query( db, zalsa, active_query, last_provisional_memo.or(opt_old_memo), ); - // If there are no cycle heads, break out of the loop (`cycle_heads_mut` returns `None` if the cycle head list is empty) - let Some(cycle_heads) = completed_query.revisions.cycle_heads_mut() else { + // Take the cycle heads to not-fight-rust's-borrow-checker. + let mut cycle_heads = active_query.take_cycle_heads(); + + // If there are no cycle heads, break out of the loop. + if cycle_heads.is_empty() { iteration_count = iteration_count.increment().unwrap_or_else(|| { tracing::warn!("{database_key_index:?}: execute: too many cycle iterations"); panic!("{database_key_index:?}: execute: too many cycle iterations") }); + + let mut completed_query = active_query.pop(); completed_query .revisions .update_iteration_count_mut(database_key_index, iteration_count); claim_guard.set_release_mode(ReleaseMode::SelfOnly); - break (new_value, completed_query); - }; + break (new_value, active_query.pop()); + } - // Take the cycle heads to not-fight-rust's-borrow-checker. - let mut cycle_heads = std::mem::take(cycle_heads); let mut missing_heads: SmallVec<[(DatabaseKeyIndex, IterationCount); 1]> = SmallVec::new_const(); let mut max_iteration_count = iteration_count; @@ -262,6 +271,10 @@ where .provisional_status(zalsa, head.database_key_index.key_index()) .expect("cycle head memo must have been created during the execution"); + // A cycle head isn't allowed to be final because that would mean that this query + // dependent on the initial value (or last provisional) + assert!(provisional_status.is_provisional()); + for nested_head in provisional_status.cycle_heads() { let nested_as_tuple = ( nested_head.database_key_index, @@ -298,6 +311,8 @@ where claim_guard.set_release_mode(ReleaseMode::SelfOnly); } + let mut completed_query = active_query.pop(); + *completed_query.revisions.verified_final.get_mut() = false; completed_query.revisions.set_cycle_heads(cycle_heads); iteration_count = iteration_count.increment().unwrap_or_else(|| { @@ -378,8 +393,17 @@ where this_converged = C::values_equal(&new_value, last_provisional_value); } } + + let new_cycle_heads = active_query.take_cycle_heads(); + for head in new_cycle_heads { + if !cycle_heads.contains(&head.database_key_index) { + panic!("Cycle recovery function for {database_key_index:?} introduced a cycle, depending on {:?}. This is not allowed.", head.database_key_index); + } + } } + let mut completed_query = active_query.pop(); + if let Some(outer_cycle) = outer_cycle { tracing::info!( "Detected nested cycle {database_key_index:?}, iterate it as part of the outer cycle {outer_cycle:?}" @@ -390,6 +414,7 @@ where completed_query .revisions .set_cycle_converged(this_converged); + *completed_query.revisions.verified_final.get_mut() = false; // Transfer ownership of this query to the outer cycle, so that it can claim it // and other threads don't compete for the same lock. @@ -428,9 +453,9 @@ where } *completed_query.revisions.verified_final.get_mut() = true; - break (new_value, completed_query); } + *completed_query.revisions.verified_final.get_mut() = false; // The fixpoint iteration hasn't converged. Iterate again... iteration_count = iteration_count.increment().unwrap_or_else(|| { @@ -484,7 +509,6 @@ where last_provisional_memo = Some(new_memo); last_stale_tracked_ids = completed_query.stale_tracked_structs; - active_query = zalsa_local.push_query(database_key_index, iteration_count); continue; }; @@ -503,7 +527,7 @@ where zalsa: &'db Zalsa, active_query: ActiveQueryGuard<'db>, opt_old_memo: Option<&Memo<'db, C>>, - ) -> (C::Output<'db>, CompletedQuery) { + ) -> (C::Output<'db>, ActiveQueryGuard<'db>) { if let Some(old_memo) = opt_old_memo { // If we already executed this query once, then use the tracked-struct ids from the // previous execution as the starting point for the new one. @@ -528,7 +552,7 @@ where C::id_to_input(zalsa, active_query.database_key_index.key_index()), ); - (new_value, active_query.pop()) + (new_value, active_query) } } diff --git a/src/zalsa_local.rs b/src/zalsa_local.rs index f43eb78eb..bde3b6b24 100644 --- a/src/zalsa_local.rs +++ b/src/zalsa_local.rs @@ -1213,6 +1213,18 @@ impl ActiveQueryGuard<'_> { } } + pub(crate) fn take_cycle_heads(&mut self) -> CycleHeads { + // SAFETY: We do not access the query stack reentrantly. + unsafe { + self.local_state.with_query_stack_unchecked_mut(|stack| { + #[cfg(debug_assertions)] + assert_eq!(stack.len(), self.push_len); + let frame = stack.last_mut().unwrap(); + frame.take_cycle_heads() + }) + } + } + /// Invoked when the query has successfully completed execution. fn complete(self) -> CompletedQuery { // SAFETY: We do not access the query stack reentrantly. From a9e51facba6dc128d02677d4269607991466441c Mon Sep 17 00:00:00 2001 From: Micha Reiser Date: Wed, 29 Oct 2025 16:38:48 -0400 Subject: [PATCH 2/6] Add regression test --- src/function/backdate.rs | 2 +- src/function/maybe_changed_after.rs | 5 +- tests/cycle_recovery_dependencies.rs | 82 ++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 2 deletions(-) create mode 100644 tests/cycle_recovery_dependencies.rs diff --git a/src/function/backdate.rs b/src/function/backdate.rs index f735f577b..b849fadaf 100644 --- a/src/function/backdate.rs +++ b/src/function/backdate.rs @@ -34,7 +34,7 @@ where if revisions.durability >= old_memo.revisions.durability && C::values_equal(old_value, value) { - crate::tracing::debug!( + crate::tracing::info!( "{index:?} value is equal, back-dating to {:?}", old_memo.revisions.changed_at, ); diff --git a/src/function/maybe_changed_after.rs b/src/function/maybe_changed_after.rs index 20440883e..165a3fb02 100644 --- a/src/function/maybe_changed_after.rs +++ b/src/function/maybe_changed_after.rs @@ -592,7 +592,10 @@ where cycle_heads.append_heads(&mut child_cycle_heads); match input_result { - VerifyResult::Changed => return VerifyResult::changed(), + VerifyResult::Changed => { + cycle_heads.remove_head(database_key_index); + return VerifyResult::changed(); + } #[cfg(feature = "accumulator")] VerifyResult::Unchanged { accumulated } => { inputs |= accumulated; diff --git a/tests/cycle_recovery_dependencies.rs b/tests/cycle_recovery_dependencies.rs new file mode 100644 index 000000000..b26ce973b --- /dev/null +++ b/tests/cycle_recovery_dependencies.rs @@ -0,0 +1,82 @@ +#![cfg(feature = "inventory")] + +//! Queries or inputs read within the cycle recovery function +//! are tracked on the cycle function and don't "leak" into the +//! function calling the query with cycle handling. + +use expect_test::expect; +use salsa::Setter as _; + +use crate::common::LogDatabase; + +mod common; + +#[salsa::input] +struct Input { + value: u32, +} + +#[salsa::tracked] +fn entry(db: &dyn salsa::Database, input: Input) -> u32 { + query(db, input) +} + +#[salsa::tracked(cycle_fn=cycle_fn, cycle_initial=cycle_initial)] +fn query(db: &dyn salsa::Database, input: Input) -> u32 { + let val = query(db, input); + if val < 5 { + val + 1 + } else { + val + } +} + +fn cycle_initial(_db: &dyn salsa::Database, _id: salsa::Id, _input: Input) -> u32 { + 0 +} + +fn cycle_fn( + db: &dyn salsa::Database, + _id: salsa::Id, + _last_provisional_value: &u32, + _value: &u32, + _count: u32, + input: Input, +) -> salsa::CycleRecoveryAction { + let _input = input.value(db); + salsa::CycleRecoveryAction::Iterate +} + +#[test_log::test] +fn the_test() { + let mut db = common::EventLoggerDatabase::default(); + + let input = Input::new(&db, 1); + assert_eq!(entry(&db, input), 5); + + db.assert_logs_len(15); + + input.set_value(&mut db).to(2); + + assert_eq!(entry(&db, input), 5); + db.assert_logs(expect![[r#" + [ + "DidSetCancellationFlag", + "WillCheckCancellation", + "WillCheckCancellation", + "WillCheckCancellation", + "WillExecute { database_key: query(Id(0)) }", + "WillCheckCancellation", + "WillIterateCycle { database_key: query(Id(0)), iteration_count: IterationCount(1) }", + "WillCheckCancellation", + "WillIterateCycle { database_key: query(Id(0)), iteration_count: IterationCount(2) }", + "WillCheckCancellation", + "WillIterateCycle { database_key: query(Id(0)), iteration_count: IterationCount(3) }", + "WillCheckCancellation", + "WillIterateCycle { database_key: query(Id(0)), iteration_count: IterationCount(4) }", + "WillCheckCancellation", + "WillIterateCycle { database_key: query(Id(0)), iteration_count: IterationCount(5) }", + "WillCheckCancellation", + "DidValidateMemoizedValue { database_key: entry(Id(0)) }", + ]"#]]); +} From 45158fceb76cf3db7bc1af367c021b157c0db3bb Mon Sep 17 00:00:00 2001 From: Micha Reiser Date: Wed, 29 Oct 2025 21:40:46 +0100 Subject: [PATCH 3/6] Discard changes to src/function/backdate.rs --- src/function/backdate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/function/backdate.rs b/src/function/backdate.rs index b849fadaf..f735f577b 100644 --- a/src/function/backdate.rs +++ b/src/function/backdate.rs @@ -34,7 +34,7 @@ where if revisions.durability >= old_memo.revisions.durability && C::values_equal(old_value, value) { - crate::tracing::info!( + crate::tracing::debug!( "{index:?} value is equal, back-dating to {:?}", old_memo.revisions.changed_at, ); From fceeb37ff15da5752c68aae9ad9599e411bb6c08 Mon Sep 17 00:00:00 2001 From: Micha Reiser Date: Fri, 31 Oct 2025 20:56:49 -0400 Subject: [PATCH 4/6] Update comment --- src/function/execute.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/function/execute.rs b/src/function/execute.rs index d4ddb69cd..3029adb52 100644 --- a/src/function/execute.rs +++ b/src/function/execute.rs @@ -272,7 +272,8 @@ where .expect("cycle head memo must have been created during the execution"); // A cycle head isn't allowed to be final because that would mean that this query - // dependent on the initial value (or last provisional) + // dependents on the initial value of B, but B finalized without + // completing this query that also participates in the cycle. assert!(provisional_status.is_provisional()); for nested_head in provisional_status.cycle_heads() { From 686b929b6129368b03857eb9d5c78dd6e6552ad0 Mon Sep 17 00:00:00 2001 From: Micha Reiser Date: Fri, 31 Oct 2025 21:00:06 -0400 Subject: [PATCH 5/6] Fix merge error --- src/function/execute.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/function/execute.rs b/src/function/execute.rs index 3029adb52..35d5d73ca 100644 --- a/src/function/execute.rs +++ b/src/function/execute.rs @@ -238,7 +238,7 @@ where .update_iteration_count_mut(database_key_index, iteration_count); claim_guard.set_release_mode(ReleaseMode::SelfOnly); - break (new_value, active_query.pop()); + break (new_value, completed_query); } let mut missing_heads: SmallVec<[(DatabaseKeyIndex, IterationCount); 1]> = From 48fef276bd81309c904db0e3bd7d87a36a5d25a6 Mon Sep 17 00:00:00 2001 From: Micha Reiser Date: Mon, 3 Nov 2025 15:25:35 -0500 Subject: [PATCH 6/6] Refine comment --- src/function/execute.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/function/execute.rs b/src/function/execute.rs index 35d5d73ca..53bc640a2 100644 --- a/src/function/execute.rs +++ b/src/function/execute.rs @@ -271,9 +271,9 @@ where .provisional_status(zalsa, head.database_key_index.key_index()) .expect("cycle head memo must have been created during the execution"); - // A cycle head isn't allowed to be final because that would mean that this query - // dependents on the initial value of B, but B finalized without - // completing this query that also participates in the cycle. + // A query should only ever depend on other heads that are provisional. + // If this invariant is violated, it means that this query participates in a cycle, + // but it wasn't executed in the last iteration of said cycle. assert!(provisional_status.is_provisional()); for nested_head in provisional_status.cycle_heads() {