Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/core_tests/determinism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ async fn activity_id_or_type_change_is_nondeterministic(
) {
let wf_id = "fakeid";
let wf_type = DEFAULT_WORKFLOW_TYPE;
let mut t = if local_act {
let mut t: TestHistoryBuilder = if local_act {
canned_histories::single_local_activity("1")
} else {
canned_histories::single_activity("1")
Expand Down
107 changes: 87 additions & 20 deletions core/src/core_tests/workflow_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
Worker,
};
use futures::{stream, FutureExt};
use mockall::TimesRange;
use rstest::{fixture, rstest};
use std::{
collections::{HashMap, HashSet, VecDeque},
Expand Down Expand Up @@ -45,7 +46,7 @@ use temporal_sdk_core_protos::{
StartWorkflow, UpdateRandomSeed, WorkflowActivationJob,
},
workflow_commands::{
update_response::Response, ActivityCancellationType, CancelTimer,
update_response::Response, workflow_command, ActivityCancellationType, CancelTimer,
CompleteWorkflowExecution, ContinueAsNewWorkflowExecution, FailWorkflowExecution,
RequestCancelActivity, ScheduleActivity, SetPatchMarker, StartChildWorkflowExecution,
UpdateResponse,
Expand All @@ -56,7 +57,7 @@ use temporal_sdk_core_protos::{
temporal::api::{
command::v1::command::Attributes,
common::v1::{Payload, RetryPolicy, WorkerVersionStamp},
enums::v1::{EventType, WorkflowTaskFailedCause},
enums::v1::{CommandType, EventType, WorkflowTaskFailedCause},
failure::v1::Failure,
history::v1::{
history_event, TimerFiredEventAttributes,
Expand Down Expand Up @@ -2504,43 +2505,109 @@ async fn core_internal_flags() {
}

#[tokio::test]
async fn post_terminal_commands_are_discarded() {
async fn post_terminal_commands_are_retained_when_not_replaying() {
// History contains a non-terminal command (N) followed by the terminal
// command (T). The test establishes that, when lang completes an activation
// with [T, N], core emits commands [N, T].
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
t.add_timer_started("1".to_string());
t.add_workflow_execution_completed();

let mut mh = MockPollCfg::from_resp_batches(
"fake_wf_id",
t,
let commands_sent_by_lang = vec![
CompleteWorkflowExecution { result: None }.into(),
start_timer_cmd(1, Duration::from_secs(1)),
];
let expected_command_types_emitted = Some(vec![
CommandType::StartTimer,
CommandType::CompleteWorkflowExecution,
]);
_do_post_terminal_commands_test(
commands_sent_by_lang,
[ResponseType::ToTaskNum(1), ResponseType::AllHistory],
mock_workflow_client(),
);
mh.completion_mock_fn = Some(Box::new(|c| {
// Only the complete execution command should actually be sent
assert_eq!(c.commands.len(), 1);
Ok(Default::default())
}));
expected_command_types_emitted,
t,
)
.await
}

#[tokio::test]
async fn post_terminal_commands_are_retained_when_replaying_and_flag_set() {
// History contains a non-terminal command (N) followed by the terminal
// command (T), with the MoveTerminalCommands flag set in the last WFT The
// test establishes that, when core replays this history, it is consistent
// with lang completing an activation with [T, N].
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
t.add_timer_started("1".to_string());
t.add_workflow_execution_completed();
t.set_flags_last_wft(&[CoreInternalFlags::MoveTerminalCommands as u32], &[]);

let commands_sent_by_lang = vec![
CompleteWorkflowExecution { result: None }.into(),
start_timer_cmd(1, Duration::from_secs(1)),
];

_do_post_terminal_commands_test(commands_sent_by_lang, [ResponseType::AllHistory], None, t)
.await
}

#[tokio::test]
async fn post_terminal_commands_are_not_retained_when_replaying_and_flag_not_set() {
// History contains the terminal command (T) preceded immediately by
// WFTCompleted, i.e. without any intervening non-terminal command (N), and
// the MoveTerminalCommands flag is not set. The test establishes that when
// core replays this history, it is consistent with lang completing an
// activation with [T, N].
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
t.add_workflow_execution_completed();

let commands_sent_by_lang = vec![
CompleteWorkflowExecution { result: None }.into(),
start_timer_cmd(1, Duration::from_secs(1)),
];

_do_post_terminal_commands_test(commands_sent_by_lang, [ResponseType::AllHistory], None, t)
.await
}

async fn _do_post_terminal_commands_test(
commands_sent_by_lang: Vec<workflow_command::Variant>,
response_types: impl IntoIterator<Item = impl Into<ResponseType>>,
expected_command_types: Option<Vec<CommandType>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Doesn't need to be an option, can just be empty vec when None

Copy link
Contributor Author

@dandavison dandavison Jul 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think option is a reasonable choice: None is being used to mean the concept of expected commands is irrelevant, since the worker is purely replaying all history and will never emit a WFT task response; that's as opposed to Some(empty_vec) which would mean SDK did a non-replay WFT but replied without any commands (e.g. all coroutines are blocked on a workflow condition; also WFT fail might be represented like that). But lmk if that doesn't make sense; happy to change if so.

t: TestHistoryBuilder,
) {
let mut mh =
MockPollCfg::from_resp_batches("fake_wf_id", t, response_types, mock_workflow_client());
if let Some(expected_command_types) = expected_command_types {
mh.num_expected_completions = Some(TimesRange::from(1));
mh.completion_mock_fn = Some(Box::new(move |c| {
let command_types: Vec<_> = c.commands.iter().map(|c| c.command_type()).collect();
assert_eq!(command_types, expected_command_types);
Ok(Default::default())
}));
} else {
mh.num_expected_completions = Some(TimesRange::from(0));
}
let mut mock = build_mock_pollers(mh);
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
let core = mock_worker(mock);

let act = core.poll_workflow_activation().await.unwrap();

core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
act.run_id,
vec![
CompleteWorkflowExecution { result: None }.into(),
start_timer_cmd(1, Duration::from_secs(1)),
],
commands_sent_by_lang,
))
.await
.unwrap();

// This just ensures applying the complete history w/ the completion command works, though
// there's no activation.
let act = core.poll_workflow_activation().await;
assert_matches!(act.unwrap_err(), PollWfError::ShutDown);

core.shutdown().await;
}

Expand Down
24 changes: 17 additions & 7 deletions core/src/internal_flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use temporal_sdk_core_protos::temporal::api::{
/// This enumeration contains internal flags that may result in incompatible history changes with
/// older workflows, or other breaking changes.
///
/// When a flag has existed long enough the version it was introduced in is no longer supported, it
/// When a flag has existed long enough that the version it was introduced in is no longer supported, it
/// may be removed from the enum. *Importantly*, all variants must be given explicit values, such
/// that removing older variants does not create any change in existing values. Removed flag
/// variants must be reserved forever (a-la protobuf), and should be called out in a comment.
Expand All @@ -27,6 +27,15 @@ pub(crate) enum CoreInternalFlags {
/// Introduced automatically upserting search attributes for each patched call, and
/// nondeterminism checks for upserts.
UpsertSearchAttributeOnPatch = 2,
/// Prior to this flag, we truncated commands received from lang at the
/// first terminal (i.e. workflow-terminating) command. With this flag, we
/// reorder commands such that all non-terminal commands come first,
/// followed by the first terminal command, if any (it's possible that
/// multiple workflow coroutines generated a terminal command). This has the
/// consequence that all non-terminal commands are sent to the server, even
/// if in the sequence delivered by lang they came after a terminal command.
/// See https://github.com/temporalio/features/issues/481.
MoveTerminalCommands = 3,
/// We received a value higher than this code can understand.
TooHigh = u32::MAX,
}
Expand Down Expand Up @@ -82,18 +91,18 @@ impl InternalFlags {
/// Returns true if this flag may currently be used. If `should_record` is true, always returns
/// true and records the flag as being used, for taking later via
/// [Self::gather_for_wft_complete].
pub(crate) fn try_use(&mut self, core_patch: CoreInternalFlags, should_record: bool) -> bool {
pub(crate) fn try_use(&mut self, flag: CoreInternalFlags, should_record: bool) -> bool {
match self {
Self::Enabled {
core,
core_since_last_complete,
..
} => {
if should_record {
core_since_last_complete.insert(core_patch);
core_since_last_complete.insert(flag);
true
} else {
core.contains(&core_patch)
core.contains(&flag)
}
}
// If the server does not support the metadata field, we must assume we can never use
Expand All @@ -114,9 +123,9 @@ impl InternalFlags {
}
}

/// Wipes the recorded flags used during the current WFT and returns a partially filled
/// sdk metadata message that can be combined with any existing data before sending the WFT
/// complete
/// Return a partially filled sdk metadata message containing core and lang flags added since
/// the last WFT complete. The returned value can be combined with other data before sending the
/// WFT complete.
pub(crate) fn gather_for_wft_complete(&mut self) -> WorkflowTaskCompletedMetadata {
match self {
Self::Enabled {
Expand Down Expand Up @@ -161,6 +170,7 @@ impl CoreInternalFlags {
match v {
1 => Self::IdAndTypeDeterminismChecks,
2 => Self::UpsertSearchAttributeOnPatch,
3 => Self::MoveTerminalCommands,
_ => Self::TooHigh,
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/test_help/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ pub(crate) fn single_hist_mock_sg(
build_mock_pollers(mh)
}

type WFTCompeltionMockFn = dyn FnMut(&WorkflowTaskCompletion) -> Result<RespondWorkflowTaskCompletedResponse, tonic::Status>
type WFTCompletionMockFn = dyn FnMut(&WorkflowTaskCompletion) -> Result<RespondWorkflowTaskCompletedResponse, tonic::Status>
+ Send;

#[allow(clippy::type_complexity)]
Expand All @@ -395,7 +395,7 @@ pub(crate) struct MockPollCfg {
/// All calls to fail WFTs must match this predicate
pub(crate) expect_fail_wft_matcher:
Box<dyn Fn(&TaskToken, &WorkflowTaskFailedCause, &Option<Failure>) -> bool + Send>,
pub(crate) completion_mock_fn: Option<Box<WFTCompeltionMockFn>>,
pub(crate) completion_mock_fn: Option<Box<WFTCompletionMockFn>>,
pub(crate) num_expected_completions: Option<TimesRange>,
/// If being used with the Rust SDK, this is set true. It ensures pollers will not error out
/// early with no work, since we cannot know the exact number of times polling will happen.
Expand Down Expand Up @@ -476,7 +476,7 @@ impl MockPollCfg {

#[allow(clippy::type_complexity)]
pub(crate) struct CompletionAssertsBuilder<'a> {
dest: &'a mut Option<Box<WFTCompeltionMockFn>>,
dest: &'a mut Option<Box<WFTCompletionMockFn>>,
assertions: VecDeque<Box<dyn FnOnce(&WorkflowTaskCompletion) + Send>>,
}

Expand Down
8 changes: 7 additions & 1 deletion core/src/worker/workflow/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use super::{
};
use crate::{
abstractions::dbg_panic,
internal_flags::InternalFlags,
internal_flags::{CoreInternalFlags, InternalFlags},
protosext::{
protocol_messages::{IncomingProtocolMessage, IncomingProtocolMessageBody},
CompleteLocalActivityData, HistoryEventExt, ValidScheduleLA,
Expand Down Expand Up @@ -470,6 +470,12 @@ impl WorkflowMachines {
.add_lang_used(flags);
}

pub(crate) fn try_use_flag(&self, flag: CoreInternalFlags, should_record: bool) -> bool {
self.observed_internal_flags
.borrow_mut()
.try_use(flag, should_record)
}

/// Undo a speculative workflow task by resetting to a certain WFT Started ID. This can happen
/// when an update request is rejected.
pub(crate) fn reset_last_started_id(&mut self, id: i64) {
Expand Down
Loading