From 8216a1a6bdd7c578c18148b7d7df67ca3c7b3944 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 17 Jul 2024 13:22:59 -0400 Subject: [PATCH 1/5] Honor all non-terminal commands https://github.com/temporalio/features/issues/481 --- core/src/core_tests/determinism.rs | 2 +- core/src/core_tests/workflow_tasks.rs | 73 +++++- core/src/internal_flags.rs | 24 +- core/src/test_help/mod.rs | 6 +- .../workflow/machines/workflow_machines.rs | 8 +- core/src/worker/workflow/managed_run.rs | 238 ++++++++++++++++-- core/src/worker/workflow/mod.rs | 12 +- sdk-core-protos/src/history_builder.rs | 8 + 8 files changed, 325 insertions(+), 46 deletions(-) diff --git a/core/src/core_tests/determinism.rs b/core/src/core_tests/determinism.rs index 3a7c35ee1..d7b2f3a94 100644 --- a/core/src/core_tests/determinism.rs +++ b/core/src/core_tests/determinism.rs @@ -125,7 +125,7 @@ async fn activity_id_or_type_change_is_nondeterministic( ) { let wf_id = "fakeid"; let wf_type = DEFAULT_WORKFLOW_TYPE; - let mut t = if local_act { + let mut t: TestHistoryBuilder = if local_act { canned_histories::single_local_activity("1") } else { canned_histories::single_activity("1") diff --git a/core/src/core_tests/workflow_tasks.rs b/core/src/core_tests/workflow_tasks.rs index 0c79d9e0b..341c53275 100644 --- a/core/src/core_tests/workflow_tasks.rs +++ b/core/src/core_tests/workflow_tasks.rs @@ -45,7 +45,7 @@ use temporal_sdk_core_protos::{ StartWorkflow, UpdateRandomSeed, WorkflowActivationJob, }, workflow_commands::{ - update_response::Response, ActivityCancellationType, CancelTimer, + update_response::Response, workflow_command, ActivityCancellationType, CancelTimer, CompleteWorkflowExecution, ContinueAsNewWorkflowExecution, FailWorkflowExecution, RequestCancelActivity, ScheduleActivity, SetPatchMarker, StartChildWorkflowExecution, UpdateResponse, @@ -56,7 +56,7 @@ use temporal_sdk_core_protos::{ temporal::api::{ command::v1::command::Attributes, common::v1::{Payload, RetryPolicy, WorkerVersionStamp}, - enums::v1::{EventType, WorkflowTaskFailedCause}, + enums::v1::{CommandType, EventType, WorkflowTaskFailedCause}, failure::v1::Failure, history::v1::{ history_event, TimerFiredEventAttributes, @@ -2503,11 +2503,73 @@ async fn core_internal_flags() { core.shutdown().await; } +// A post-terminal command is retained, and placed before the terminal command. #[tokio::test] -async fn post_terminal_commands_are_discarded() { +async fn post_terminal_commands_are_retained_when_not_replaying() { let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); t.add_full_wf_task(); + t.add_timer_started("1".to_string()); + t.add_workflow_execution_completed(); + + let commands_sent_by_lang = vec![ + CompleteWorkflowExecution { result: None }.into(), + start_timer_cmd(1, Duration::from_secs(1)), + ]; + let expected_command_types_emitted = vec![ + CommandType::StartTimer, + CommandType::CompleteWorkflowExecution, + ]; + _simulate_completion_from_lang_and_assert_commands_emitted_by_core( + commands_sent_by_lang, + expected_command_types_emitted, + t, + ) + .await +} + +async fn _simulate_completion_from_lang_and_assert_commands_emitted_by_core( + commands_sent_by_lang: Vec, + expected_command_types: Vec, + t: TestHistoryBuilder, +) { + let mut mh = MockPollCfg::from_resp_batches( + "fake_wf_id", + t, + [ResponseType::ToTaskNum(1), ResponseType::AllHistory], + mock_workflow_client(), + ); + mh.completion_mock_fn = Some(Box::new(move |c| { + let command_types: Vec<_> = c.commands.iter().map(|c| c.command_type()).collect(); + assert_eq!(command_types, expected_command_types); + Ok(Default::default()) + })); + let mut mock = build_mock_pollers(mh); + mock.worker_cfg(|wc| wc.max_cached_workflows = 1); + let core = mock_worker(mock); + + let act = core.poll_workflow_activation().await.unwrap(); + + core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( + act.run_id, + commands_sent_by_lang, + )) + .await + .unwrap(); + + // This just ensures applying the complete history w/ the completion command works, though + // there's no activation. + let act = core.poll_workflow_activation().await; + assert_matches!(act.unwrap_err(), PollWfError::ShutDown); + core.shutdown().await; +} + +#[tokio::test] +async fn move_terminal_commands_flag_test_1() { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + t.add_timer_started("1".to_string()); t.add_workflow_execution_completed(); let mut mh = MockPollCfg::from_resp_batches( @@ -2517,8 +2579,9 @@ async fn post_terminal_commands_are_discarded() { mock_workflow_client(), ); mh.completion_mock_fn = Some(Box::new(|c| { - // Only the complete execution command should actually be sent - assert_eq!(c.commands.len(), 1); + // The start timer and complete execution commands should be sent, in + // that order. + assert_eq!(c.commands.len(), 2); Ok(Default::default()) })); let mut mock = build_mock_pollers(mh); diff --git a/core/src/internal_flags.rs b/core/src/internal_flags.rs index 67bea9ecf..7497ff0f4 100644 --- a/core/src/internal_flags.rs +++ b/core/src/internal_flags.rs @@ -14,7 +14,7 @@ use temporal_sdk_core_protos::temporal::api::{ /// This enumeration contains internal flags that may result in incompatible history changes with /// older workflows, or other breaking changes. /// -/// When a flag has existed long enough the version it was introduced in is no longer supported, it +/// When a flag has existed long enough that the version it was introduced in is no longer supported, it /// may be removed from the enum. *Importantly*, all variants must be given explicit values, such /// that removing older variants does not create any change in existing values. Removed flag /// variants must be reserved forever (a-la protobuf), and should be called out in a comment. @@ -27,6 +27,15 @@ pub(crate) enum CoreInternalFlags { /// Introduced automatically upserting search attributes for each patched call, and /// nondeterminism checks for upserts. UpsertSearchAttributeOnPatch = 2, + /// Prior to this flag, we truncated commands received from lang at the + /// first terminal (i.e. workflow-terminating) command. With this flag, we + /// reorder commands such that all non-terminal commands come first, + /// followed by the first terminal command, if any (it's possible that + /// multiple workflow coroutines generated a terminal command). This has the + /// consequence that all non-terminal commands are sent to the server, even + /// if in the sequence delivered by lang they came after a terminal command. + /// See https://github.com/temporalio/features/issues/481. + MoveTerminalCommands = 3, /// We received a value higher than this code can understand. TooHigh = u32::MAX, } @@ -82,7 +91,7 @@ impl InternalFlags { /// Returns true if this flag may currently be used. If `should_record` is true, always returns /// true and records the flag as being used, for taking later via /// [Self::gather_for_wft_complete]. - pub(crate) fn try_use(&mut self, core_patch: CoreInternalFlags, should_record: bool) -> bool { + pub(crate) fn try_use(&mut self, flag: CoreInternalFlags, should_record: bool) -> bool { match self { Self::Enabled { core, @@ -90,10 +99,10 @@ impl InternalFlags { .. } => { if should_record { - core_since_last_complete.insert(core_patch); + core_since_last_complete.insert(flag); true } else { - core.contains(&core_patch) + core.contains(&flag) } } // If the server does not support the metadata field, we must assume we can never use @@ -114,9 +123,9 @@ impl InternalFlags { } } - /// Wipes the recorded flags used during the current WFT and returns a partially filled - /// sdk metadata message that can be combined with any existing data before sending the WFT - /// complete + /// Return a partially filled sdk metadata message containing core and lang flags added since + /// the last WFT complete. The returned value can be combined with other data before sending the + /// WFT complete. pub(crate) fn gather_for_wft_complete(&mut self) -> WorkflowTaskCompletedMetadata { match self { Self::Enabled { @@ -161,6 +170,7 @@ impl CoreInternalFlags { match v { 1 => Self::IdAndTypeDeterminismChecks, 2 => Self::UpsertSearchAttributeOnPatch, + 3 => Self::MoveTerminalCommands, _ => Self::TooHigh, } } diff --git a/core/src/test_help/mod.rs b/core/src/test_help/mod.rs index 40286713a..d0629eed2 100644 --- a/core/src/test_help/mod.rs +++ b/core/src/test_help/mod.rs @@ -382,7 +382,7 @@ pub(crate) fn single_hist_mock_sg( build_mock_pollers(mh) } -type WFTCompeltionMockFn = dyn FnMut(&WorkflowTaskCompletion) -> Result +type WFTCompletionMockFn = dyn FnMut(&WorkflowTaskCompletion) -> Result + Send; #[allow(clippy::type_complexity)] @@ -395,7 +395,7 @@ pub(crate) struct MockPollCfg { /// All calls to fail WFTs must match this predicate pub(crate) expect_fail_wft_matcher: Box) -> bool + Send>, - pub(crate) completion_mock_fn: Option>, + pub(crate) completion_mock_fn: Option>, pub(crate) num_expected_completions: Option, /// If being used with the Rust SDK, this is set true. It ensures pollers will not error out /// early with no work, since we cannot know the exact number of times polling will happen. @@ -476,7 +476,7 @@ impl MockPollCfg { #[allow(clippy::type_complexity)] pub(crate) struct CompletionAssertsBuilder<'a> { - dest: &'a mut Option>, + dest: &'a mut Option>, assertions: VecDeque>, } diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index f5825aeb6..0c283c76e 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -14,7 +14,7 @@ use super::{ }; use crate::{ abstractions::dbg_panic, - internal_flags::InternalFlags, + internal_flags::{CoreInternalFlags, InternalFlags}, protosext::{ protocol_messages::{IncomingProtocolMessage, IncomingProtocolMessageBody}, CompleteLocalActivityData, HistoryEventExt, ValidScheduleLA, @@ -470,6 +470,12 @@ impl WorkflowMachines { .add_lang_used(flags); } + pub(crate) fn try_use_flag(&self, flag: CoreInternalFlags, should_record: bool) -> bool { + self.observed_internal_flags + .borrow_mut() + .try_use(flag, should_record) + } + /// Undo a speculative workflow task by resetting to a certain WFT Started ID. This can happen /// when an update request is rejected. pub(crate) fn reset_last_started_id(&mut self, id: i64) { diff --git a/core/src/worker/workflow/managed_run.rs b/core/src/worker/workflow/managed_run.rs index 4dcf8daf6..272223184 100644 --- a/core/src/worker/workflow/managed_run.rs +++ b/core/src/worker/workflow/managed_run.rs @@ -1,5 +1,6 @@ use crate::{ abstractions::dbg_panic, + internal_flags::CoreInternalFlags, protosext::{protocol_messages::IncomingProtocolMessage, WorkflowActivationExt}, telemetry::metrics, worker::{ @@ -366,7 +367,7 @@ impl ManagedRun { pub(super) fn successful_completion( &mut self, mut commands: Vec, - used_flags: Vec, + lang_used_flags: Vec, resp_chan: Option>, ) -> Result { let activation_was_only_eviction = self.activation_is_eviction(); @@ -422,20 +423,7 @@ impl ManagedRun { ); Ok(None) } else { - // First strip out query responses from other commands that actually affect machines - // Would be prettier with `drain_filter` - let mut query_responses = vec![]; - commands = std::mem::take(&mut commands) - .into_iter() - .filter_map(|x| { - if let WFCommand::QueryResponse(qr) = x { - query_responses.push(qr); - None - } else { - Some(x) - } - }) - .collect(); + let (commands, query_responses) = self.preprocess_command_sequence(commands); if activation_was_only_eviction && !commands.is_empty() { dbg_panic!("Reply to an eviction included commands"); @@ -448,7 +436,7 @@ impl ManagedRun { activation_was_eviction: self.activation_is_eviction(), has_pending_query, query_responses, - used_flags, + lang_used_flags, resp_chan, }; @@ -479,6 +467,38 @@ impl ManagedRun { } } + /// Core has received from lang a sequence containing all commands generated + /// by all workflow coroutines. Return a command sequence containing all + /// non-terminal (i.e. non-workflow-terminating) commands, followed by the + /// first terminal command if there are any. Also strip out and return query + /// results (these don't affect machines and are handled separately + /// downstream) + /// + /// The reordering is done in order that all non-terminal commands generated + /// by workflow coroutines are given a chance for the server to honor them. + /// For example, in order to deliver an update result to a client as the + /// workflow completes. + /// + /// Behavior here has changed backwards-incompatibly, so a flag is set if + /// the outcome differs from what the outcome would have been previously. + /// See also CoreInternalFlags::MoveTerminalCommands docstring and + /// https://github.com/temporalio/features/issues/481. + fn preprocess_command_sequence( + &mut self, + commands: Vec, + ) -> (Vec, Vec) { + if self.wfm.machines.replaying + && !self + .wfm + .machines + .try_use_flag(CoreInternalFlags::MoveTerminalCommands, false) + { + preprocess_command_sequence_old_behavior(commands) + } else { + preprocess_command_sequence(commands) + } + } + /// Called after the higher-up machinery has fetched more pages of event history needed to apply /// the next workflow task. The history update and paginator used to perform the fetch are /// passed in, with the update being used to apply the task, and the paginator stored to be @@ -654,7 +674,9 @@ impl ManagedRun { activation_was_eviction: completion.activation_was_eviction, }; - self.wfm.machines.add_lang_used_flags(completion.used_flags); + self.wfm + .machines + .add_lang_used_flags(completion.lang_used_flags); // If this is just bookkeeping after a reply to an eviction activation, we can bypass // everything, since there is no reason to continue trying to update machines. @@ -1169,6 +1191,60 @@ impl ManagedRun { } } +// Construct a new command sequence with query responses removed, and any +// terminal responses removed, except for the first terminal response, which is +// placed at the end. Return new command sequence and query commands. Note that +// multiple coroutines may have generated a terminal command, leading to +// multiple terminal commands in the input to this function. +fn preprocess_command_sequence(mut commands: Vec) -> (Vec, Vec) { + let mut query_results = vec![]; + let mut terminals = vec![]; + + commands = std::mem::take(&mut commands) + .into_iter() + .filter_map(|c| { + if let WFCommand::QueryResponse(qr) = c { + query_results.push(qr); + None + } else if c.is_terminal() { + terminals.push(c); + None + } else { + Some(c) + } + }) + .collect(); + if let Some(first_terminal) = terminals.into_iter().next() { + commands.push(first_terminal); + } + (commands, query_results) +} + +fn preprocess_command_sequence_old_behavior( + mut commands: Vec, +) -> (Vec, Vec) { + let mut query_results = vec![]; + let mut seen_terminal = false; + + commands = std::mem::take(&mut commands) + .into_iter() + .filter_map(|c| { + if let WFCommand::QueryResponse(qr) = c { + query_results.push(qr); + None + } else if seen_terminal { + None + } else { + if c.is_terminal() { + seen_terminal = true; + } + Some(c) + } + }) + .collect(); + (commands, query_results) +} + /// Drains pending queries from the workflow task and appends them to the activation's jobs fn put_queries_in_act(act: &mut WorkflowActivation, wft: &mut OutstandingTask) { // Nothing to do if there are no pending queries @@ -1371,7 +1447,7 @@ struct RunActivationCompletion { activation_was_eviction: bool, has_pending_query: bool, query_responses: Vec, - used_flags: Vec, + lang_used_flags: Vec, /// Used to notify the worker when the completion is done processing and the completion can /// unblock. Must always be `Some` when initialized. resp_chan: Option>, @@ -1397,3 +1473,129 @@ impl From for RunUpdateErr { } } } + +#[cfg(test)] +mod tests { + use crate::worker::workflow::WFCommand; + use std::mem::{discriminant, Discriminant}; + + use command_utils::*; + + #[rstest::rstest] + #[case::empty( + vec![], + vec![])] + #[case::non_terminal_is_retained( + vec![update_response()], + vec![update_response()])] + #[case::terminal_is_retained( + vec![complete()], + vec![complete()])] + #[case::post_terminal_is_retained( + vec![complete(), update_response()], + vec![update_response(), complete()])] + #[case::second_terminal_is_discarded( + vec![cancel(), complete()], + vec![cancel()])] + #[case::move_terminals_to_end_and_retain_first( + vec![update_response(), complete(), update_response(), cancel(), update_response()], + vec![update_response(), update_response(), update_response(), complete()])] + #[test] + fn preprocess_command_sequence( + #[case] commands_in: Vec, + #[case] expected_commands: Vec, + ) { + let (commands, _) = super::preprocess_command_sequence(commands_in); + assert_eq!(command_types(&commands), command_types(&expected_commands)); + } + + #[rstest::rstest] + #[case::query_responses_extracted( + vec![query_response(), update_response(), query_response(), complete(), query_response()], + 3, + )] + #[test] + fn preprocess_command_sequence_extracts_queries( + #[case] commands_in: Vec, + #[case] expected_queries_out: usize, + ) { + let (_, query_responses_out) = super::preprocess_command_sequence(commands_in); + assert_eq!(query_responses_out.len(), expected_queries_out); + } + + #[rstest::rstest] + #[case::empty( + vec![], + vec![])] + #[case::non_terminal_is_retained( + vec![update_response()], + vec![update_response()])] + #[case::terminal_is_retained( + vec![complete()], + vec![complete()])] + #[case::post_terminal_is_discarded( + vec![complete(), update_response()], + vec![complete()])] + #[case::second_terminal_is_discarded( + vec![cancel(), complete()], + vec![cancel()])] + #[case::truncate_at_first_complete( + vec![update_response(), complete(), update_response(), cancel()], + vec![update_response(), complete()])] + #[test] + fn preprocess_command_sequence_old_behavior( + #[case] commands_in: Vec, + #[case] expected_out: Vec, + ) { + let (commands_out, _) = super::preprocess_command_sequence_old_behavior(commands_in); + assert_eq!(command_types(&commands_out), command_types(&expected_out)); + } + + #[rstest::rstest] + #[case::query_responses_extracted( + vec![query_response(), update_response(), query_response(), complete(), query_response()], + 3, + )] + #[test] + fn preprocess_command_sequence_old_behavior_extracts_queries( + #[case] commands_in: Vec, + #[case] expected_queries_out: usize, + ) { + let (_, query_responses_out) = super::preprocess_command_sequence_old_behavior(commands_in); + assert_eq!(query_responses_out.len(), expected_queries_out); + } + + mod command_utils { + use temporal_sdk_core_protos::coresdk::workflow_commands::{ + CancelWorkflowExecution, CompleteWorkflowExecution, QueryResult, UpdateResponse, + }; + + use super::*; + + pub(crate) fn complete() -> WFCommand { + WFCommand::CompleteWorkflow(CompleteWorkflowExecution { result: None }) + } + + pub(crate) fn cancel() -> WFCommand { + WFCommand::CancelWorkflow(CancelWorkflowExecution {}) + } + + pub(crate) fn query_response() -> WFCommand { + WFCommand::QueryResponse(QueryResult { + query_id: "".into(), + variant: None, + }) + } + + pub(crate) fn update_response() -> WFCommand { + WFCommand::UpdateResponse(UpdateResponse { + protocol_instance_id: "".into(), + response: None, + }) + } + + pub(crate) fn command_types(commands: &[WFCommand]) -> Vec> { + commands.iter().map(discriminant).collect() + } + } +} diff --git a/core/src/worker/workflow/mod.rs b/core/src/worker/workflow/mod.rs index 787cfc4dd..eecded491 100644 --- a/core/src/worker/workflow/mod.rs +++ b/core/src/worker/workflow/mod.rs @@ -1043,7 +1043,7 @@ fn validate_completion( match completion.status { Some(workflow_activation_completion::Status::Successful(success)) => { // Convert to wf commands - let mut commands = success + let commands = success .commands .into_iter() .map(|c| c.try_into()) @@ -1070,16 +1070,6 @@ fn validate_completion( }); } - // Any non-query-response commands after a terminal command should be ignored - if let Some(term_cmd_pos) = commands.iter().position(|c| c.is_terminal()) { - // Query responses are just fine, so keep them. - let queries = commands - .split_off(term_cmd_pos + 1) - .into_iter() - .filter(|c| matches!(c, WFCommand::QueryResponse(_))); - commands.extend(queries); - } - Ok(ValidatedCompletion::Success { run_id: completion.run_id, commands, diff --git a/sdk-core-protos/src/history_builder.rs b/sdk-core-protos/src/history_builder.rs index 1488653c4..1ae80f7ae 100644 --- a/sdk-core-protos/src/history_builder.rs +++ b/sdk-core-protos/src/history_builder.rs @@ -243,6 +243,14 @@ impl TestHistoryBuilder { self.build_and_push_event(EventType::WorkflowTaskFailed, attrs.into()); } + pub fn add_timer_started(&mut self, timer_id: String) { + self.add(TimerStartedEventAttributes { + timer_id, + workflow_task_completed_event_id: self.previous_task_completed_id, + ..Default::default() + }); + } + pub fn add_timer_fired(&mut self, timer_started_evt_id: i64, timer_id: String) { self.add(TimerFiredEventAttributes { started_event_id: timer_started_evt_id, From 26b6093470dce086bc0b454547b8aeb42e44be18 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 17 Jul 2024 18:43:15 -0400 Subject: [PATCH 2/5] Test consequences of replay and flag --- core/src/core_tests/workflow_tasks.rs | 124 +++++++++++++++----------- 1 file changed, 70 insertions(+), 54 deletions(-) diff --git a/core/src/core_tests/workflow_tasks.rs b/core/src/core_tests/workflow_tasks.rs index 341c53275..77f035ee0 100644 --- a/core/src/core_tests/workflow_tasks.rs +++ b/core/src/core_tests/workflow_tasks.rs @@ -17,6 +17,7 @@ use crate::{ Worker, }; use futures::{stream, FutureExt}; +use mockall::TimesRange; use rstest::{fixture, rstest}; use std::{ collections::{HashMap, HashSet, VecDeque}, @@ -2503,9 +2504,11 @@ async fn core_internal_flags() { core.shutdown().await; } -// A post-terminal command is retained, and placed before the terminal command. #[tokio::test] async fn post_terminal_commands_are_retained_when_not_replaying() { + // History contains a non-terminal command (N) followed by the terminal + // command (T). The test establishes that, when lang completes an activation + // with [T, N], core emits commands [N, T]. let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); t.add_full_wf_task(); @@ -2516,94 +2519,107 @@ async fn post_terminal_commands_are_retained_when_not_replaying() { CompleteWorkflowExecution { result: None }.into(), start_timer_cmd(1, Duration::from_secs(1)), ]; - let expected_command_types_emitted = vec![ + let expected_command_types_emitted = Some(vec![ CommandType::StartTimer, CommandType::CompleteWorkflowExecution, - ]; - _simulate_completion_from_lang_and_assert_commands_emitted_by_core( + ]); + _do_post_terminal_commands_test( commands_sent_by_lang, + [ResponseType::ToTaskNum(1), ResponseType::AllHistory], expected_command_types_emitted, t, ) .await } -async fn _simulate_completion_from_lang_and_assert_commands_emitted_by_core( - commands_sent_by_lang: Vec, - expected_command_types: Vec, - t: TestHistoryBuilder, -) { - let mut mh = MockPollCfg::from_resp_batches( - "fake_wf_id", - t, - [ResponseType::ToTaskNum(1), ResponseType::AllHistory], - mock_workflow_client(), - ); - mh.completion_mock_fn = Some(Box::new(move |c| { - let command_types: Vec<_> = c.commands.iter().map(|c| c.command_type()).collect(); - assert_eq!(command_types, expected_command_types); - Ok(Default::default()) - })); - let mut mock = build_mock_pollers(mh); - mock.worker_cfg(|wc| wc.max_cached_workflows = 1); - let core = mock_worker(mock); +#[tokio::test] +async fn post_terminal_commands_are_retained_when_replaying_and_flag_set() { + // History contains a non-terminal command (N) followed by the terminal + // command (T), with the MoveTerminalCommands flag set in the last WFT The + // test establishes that, when core replays this history, it is consistent + // with lang completing an activation with [T, N]. + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + t.add_timer_started("1".to_string()); + t.add_workflow_execution_completed(); + t.set_flags_last_wft(&[CoreInternalFlags::MoveTerminalCommands as u32], &[]); - let act = core.poll_workflow_activation().await.unwrap(); + let commands_sent_by_lang = vec![ + CompleteWorkflowExecution { result: None }.into(), + start_timer_cmd(1, Duration::from_secs(1)), + ]; + let expected_command_types_emitted = None; - core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( - act.run_id, + _do_post_terminal_commands_test( commands_sent_by_lang, - )) + [ResponseType::AllHistory], + expected_command_types_emitted, + t, + ) .await - .unwrap(); - - // This just ensures applying the complete history w/ the completion command works, though - // there's no activation. - let act = core.poll_workflow_activation().await; - assert_matches!(act.unwrap_err(), PollWfError::ShutDown); - core.shutdown().await; } #[tokio::test] -async fn move_terminal_commands_flag_test_1() { +async fn post_terminal_commands_are_not_retained_when_replaying_and_flag_not_set() { + // History contains the terminal command (T) preceded immediately by + // WFTCompleted, i.e. without any intervening non-terminal command (N), and + // the MoveTerminalCommands flag is not set. The test establishes that when + // core replays this history, it is consistent with lang completing an + // activation with [T, N]. let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); t.add_full_wf_task(); - t.add_timer_started("1".to_string()); t.add_workflow_execution_completed(); - let mut mh = MockPollCfg::from_resp_batches( - "fake_wf_id", + let commands_sent_by_lang = vec![ + CompleteWorkflowExecution { result: None }.into(), + start_timer_cmd(1, Duration::from_secs(1)), + ]; + let expected_command_types_emitted = None; + + _do_post_terminal_commands_test( + commands_sent_by_lang, + [ResponseType::AllHistory], + expected_command_types_emitted, t, - [ResponseType::ToTaskNum(1), ResponseType::AllHistory], - mock_workflow_client(), - ); - mh.completion_mock_fn = Some(Box::new(|c| { - // The start timer and complete execution commands should be sent, in - // that order. - assert_eq!(c.commands.len(), 2); - Ok(Default::default()) - })); + ) + .await +} + +async fn _do_post_terminal_commands_test( + commands_sent_by_lang: Vec, + response_types: impl IntoIterator>, + expected_command_types: Option>, + t: TestHistoryBuilder, +) { + let mut mh = + MockPollCfg::from_resp_batches("fake_wf_id", t, response_types, mock_workflow_client()); + if let Some(expected_command_types) = expected_command_types { + mh.num_expected_completions = Some(TimesRange::from(1)); + mh.completion_mock_fn = Some(Box::new(move |c| { + let command_types: Vec<_> = c.commands.iter().map(|c| c.command_type()).collect(); + assert_eq!(command_types, expected_command_types); + Ok(Default::default()) + })); + } else { + mh.num_expected_completions = Some(TimesRange::from(0)); + } let mut mock = build_mock_pollers(mh); mock.worker_cfg(|wc| wc.max_cached_workflows = 1); let core = mock_worker(mock); let act = core.poll_workflow_activation().await.unwrap(); + core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( act.run_id, - vec![ - CompleteWorkflowExecution { result: None }.into(), - start_timer_cmd(1, Duration::from_secs(1)), - ], + commands_sent_by_lang, )) .await .unwrap(); - // This just ensures applying the complete history w/ the completion command works, though - // there's no activation. let act = core.poll_workflow_activation().await; assert_matches!(act.unwrap_err(), PollWfError::ShutDown); - core.shutdown().await; } From 05724a4fdeaa4ad6039fe8d9245647063627b4f2 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 17 Jul 2024 19:00:26 -0400 Subject: [PATCH 3/5] Remove redundant std::mem::take --- core/src/worker/workflow/managed_run.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/worker/workflow/managed_run.rs b/core/src/worker/workflow/managed_run.rs index 272223184..7032bc298 100644 --- a/core/src/worker/workflow/managed_run.rs +++ b/core/src/worker/workflow/managed_run.rs @@ -1196,11 +1196,11 @@ impl ManagedRun { // placed at the end. Return new command sequence and query commands. Note that // multiple coroutines may have generated a terminal command, leading to // multiple terminal commands in the input to this function. -fn preprocess_command_sequence(mut commands: Vec) -> (Vec, Vec) { +fn preprocess_command_sequence(commands: Vec) -> (Vec, Vec) { let mut query_results = vec![]; let mut terminals = vec![]; - commands = std::mem::take(&mut commands) + let mut commands: Vec<_> = commands .into_iter() .filter_map(|c| { if let WFCommand::QueryResponse(qr) = c { @@ -1221,12 +1221,12 @@ fn preprocess_command_sequence(mut commands: Vec) -> (Vec, } fn preprocess_command_sequence_old_behavior( - mut commands: Vec, + commands: Vec, ) -> (Vec, Vec) { let mut query_results = vec![]; let mut seen_terminal = false; - commands = std::mem::take(&mut commands) + let commands: Vec<_> = commands .into_iter() .filter_map(|c| { if let WFCommand::QueryResponse(qr) = c { From a18d7b8b7672f33c7ec63edb0f562baf756370b6 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 17 Jul 2024 19:08:52 -0400 Subject: [PATCH 4/5] Revert field rename --- core/src/worker/workflow/managed_run.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/worker/workflow/managed_run.rs b/core/src/worker/workflow/managed_run.rs index 7032bc298..0893616b2 100644 --- a/core/src/worker/workflow/managed_run.rs +++ b/core/src/worker/workflow/managed_run.rs @@ -367,7 +367,7 @@ impl ManagedRun { pub(super) fn successful_completion( &mut self, mut commands: Vec, - lang_used_flags: Vec, + used_flags: Vec, resp_chan: Option>, ) -> Result { let activation_was_only_eviction = self.activation_is_eviction(); @@ -436,7 +436,7 @@ impl ManagedRun { activation_was_eviction: self.activation_is_eviction(), has_pending_query, query_responses, - lang_used_flags, + used_flags, resp_chan, }; @@ -674,9 +674,7 @@ impl ManagedRun { activation_was_eviction: completion.activation_was_eviction, }; - self.wfm - .machines - .add_lang_used_flags(completion.lang_used_flags); + self.wfm.machines.add_lang_used_flags(completion.used_flags); // If this is just bookkeeping after a reply to an eviction activation, we can bypass // everything, since there is no reason to continue trying to update machines. @@ -1447,7 +1445,7 @@ struct RunActivationCompletion { activation_was_eviction: bool, has_pending_query: bool, query_responses: Vec, - lang_used_flags: Vec, + used_flags: Vec, /// Used to notify the worker when the completion is done processing and the completion can /// unblock. Must always be `Some` when initialized. resp_chan: Option>, From 24d23a87f82d9b533691b356c6706976634791f4 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Sun, 21 Jul 2024 06:36:28 -0400 Subject: [PATCH 5/5] Clean up --- core/src/core_tests/workflow_tasks.rs | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/core/src/core_tests/workflow_tasks.rs b/core/src/core_tests/workflow_tasks.rs index 77f035ee0..a7f0b706b 100644 --- a/core/src/core_tests/workflow_tasks.rs +++ b/core/src/core_tests/workflow_tasks.rs @@ -2549,15 +2549,9 @@ async fn post_terminal_commands_are_retained_when_replaying_and_flag_set() { CompleteWorkflowExecution { result: None }.into(), start_timer_cmd(1, Duration::from_secs(1)), ]; - let expected_command_types_emitted = None; - _do_post_terminal_commands_test( - commands_sent_by_lang, - [ResponseType::AllHistory], - expected_command_types_emitted, - t, - ) - .await + _do_post_terminal_commands_test(commands_sent_by_lang, [ResponseType::AllHistory], None, t) + .await } #[tokio::test] @@ -2576,15 +2570,9 @@ async fn post_terminal_commands_are_not_retained_when_replaying_and_flag_not_set CompleteWorkflowExecution { result: None }.into(), start_timer_cmd(1, Duration::from_secs(1)), ]; - let expected_command_types_emitted = None; - _do_post_terminal_commands_test( - commands_sent_by_lang, - [ResponseType::AllHistory], - expected_command_types_emitted, - t, - ) - .await + _do_post_terminal_commands_test(commands_sent_by_lang, [ResponseType::AllHistory], None, t) + .await } async fn _do_post_terminal_commands_test(