Skip to content

Commit d2bb28b

Browse files
committed
Honor all non-terminal commands
temporalio/features#481
1 parent 5e3b274 commit d2bb28b

File tree

4 files changed

+249
-27
lines changed

4 files changed

+249
-27
lines changed

core/src/internal_flags.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,15 @@ pub(crate) enum CoreInternalFlags {
2929
UpsertSearchAttributeOnPatch = 2,
3030
/// We received a value higher than this code can understand.
3131
TooHigh = u32::MAX,
32+
/// Prior to this flag, we truncated commands received from lang at the
33+
/// first terminal (i.e. workflow-terminating) command. With this flag, we
34+
/// reorder commands such that all non-terminal commands come first,
35+
/// followed by the first terminal command, if any (it's possible that
36+
/// multiple workflow coroutines generated a terminal command). This has the
37+
/// consequence that all non-terminal commands are sent to the server, even
38+
/// if in the sequence delivered by lang they came after a terminal command.
39+
/// See https://github.com/temporalio/features/issues/481.
40+
ReorderCommands = 3,
3241
}
3342

3443
#[derive(Debug, Clone, PartialEq, Eq)]

core/src/worker/workflow/machines/workflow_machines.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ pub(crate) struct WorkflowMachines {
117117
current_wf_time: Option<SystemTime>,
118118
/// The internal flags which have been seen so far during this run's execution and thus are
119119
/// usable during replay.
120-
observed_internal_flags: InternalFlagsRef,
120+
pub(crate) observed_internal_flags: InternalFlagsRef,
121121
/// Set on each WFT started event, the most recent size of history in bytes
122122
history_size_bytes: u64,
123123
/// Set on each WFT started event

core/src/worker/workflow/managed_run.rs

Lines changed: 238 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::{
22
abstractions::dbg_panic,
3+
internal_flags::CoreInternalFlags,
34
protosext::{protocol_messages::IncomingProtocolMessage, WorkflowActivationExt},
45
telemetry::metrics,
56
worker::{
@@ -366,7 +367,7 @@ impl ManagedRun {
366367
pub(super) fn successful_completion(
367368
&mut self,
368369
mut commands: Vec<WFCommand>,
369-
used_flags: Vec<u32>,
370+
mut used_flags: Vec<u32>,
370371
resp_chan: Option<oneshot::Sender<ActivationCompleteResult>>,
371372
) -> Result<RunUpdateAct, NextPageReq> {
372373
let activation_was_only_eviction = self.activation_is_eviction();
@@ -422,20 +423,8 @@ impl ManagedRun {
422423
);
423424
Ok(None)
424425
} else {
425-
// First strip out query responses from other commands that actually affect machines
426-
// Would be prettier with `drain_filter`
427-
let mut query_responses = vec![];
428-
commands = std::mem::take(&mut commands)
429-
.into_iter()
430-
.filter_map(|x| {
431-
if let WFCommand::QueryResponse(qr) = x {
432-
query_responses.push(qr);
433-
None
434-
} else {
435-
Some(x)
436-
}
437-
})
438-
.collect();
426+
let (commands, query_responses) =
427+
self.preprocess_command_sequence(commands, &mut used_flags);
439428

440429
if activation_was_only_eviction && !commands.is_empty() {
441430
dbg_panic!("Reply to an eviction included commands");
@@ -479,6 +468,47 @@ impl ManagedRun {
479468
}
480469
}
481470

471+
/// Core has received from lang a sequence containing all commands generated
472+
/// by all workflow coroutines. Return a command sequence containing all
473+
/// non-terminal (i.e. non-workflow-terminating) commands, followed by a
474+
/// single terminal command if there is one. Also strip out and return query
475+
/// results (these don't affect machines and are handled separately
476+
/// downstream)
477+
///
478+
/// The reordering is done in order that all non-terminal commands generated
479+
/// by workflow coroutines are given a chance for the server to honor them.
480+
/// For example, in order to deliver an update result to a client as the
481+
/// workflow completes. See
482+
/// https://github.com/temporalio/features/issues/481. Behavior here has
483+
/// changed backwards-incompatibly, so a flag is set if the outcome differs
484+
/// from what the outcome would have been previously. Note that multiple
485+
/// coroutines may have generated a terminal (i.e. workflow-terminating)
486+
/// command; if so, the first is used.
487+
fn preprocess_command_sequence(
488+
&mut self,
489+
commands: Vec<WFCommand>,
490+
used_flags: &mut Vec<u32>,
491+
) -> (Vec<WFCommand>, Vec<QueryResult>) {
492+
let reorder_commands_flag_in_effect = self
493+
.wfm
494+
.machines
495+
.observed_internal_flags
496+
.borrow_mut()
497+
.try_use(CoreInternalFlags::ReorderCommands, false);
498+
499+
if self.wfm.machines.replaying && !reorder_commands_flag_in_effect {
500+
preprocess_command_sequence_old_behavior(commands)
501+
} else {
502+
let (commands, query_results, any_reordered) = preprocess_command_sequence(commands);
503+
if any_reordered {
504+
// See comment on CoreInternalFlags::ReorderCommands.
505+
used_flags.append(&mut vec![CoreInternalFlags::ReorderCommands as u32]);
506+
};
507+
508+
(commands, query_results)
509+
}
510+
}
511+
482512
/// Called after the higher-up machinery has fetched more pages of event history needed to apply
483513
/// the next workflow task. The history update and paginator used to perform the fetch are
484514
/// passed in, with the update being used to apply the task, and the paginator stored to be
@@ -1169,6 +1199,65 @@ impl ManagedRun {
11691199
}
11701200
}
11711201

1202+
// Remove query responses and terminal commands; append first terminal command
1203+
// (if any) to end of sequence. Return resulting command sequence, query
1204+
// commands, and a boolean recording whether the result differs from the old
1205+
// behavior (i.e. whether there were any non-terminal, non-query commands after
1206+
// the first terminal).
1207+
fn preprocess_command_sequence(
1208+
mut commands: Vec<WFCommand>,
1209+
) -> (Vec<WFCommand>, Vec<QueryResult>, bool) {
1210+
let mut query_results = vec![];
1211+
let mut terminals = vec![];
1212+
let (mut seen_terminal, mut any_moved) = (false, false);
1213+
1214+
commands = std::mem::take(&mut commands)
1215+
.into_iter()
1216+
.filter_map(|c| {
1217+
if let WFCommand::QueryResponse(qr) = c {
1218+
query_results.push(qr);
1219+
None
1220+
} else if c.is_terminal() {
1221+
terminals.push(c);
1222+
seen_terminal = true;
1223+
None
1224+
} else {
1225+
any_moved |= seen_terminal;
1226+
Some(c)
1227+
}
1228+
})
1229+
.collect();
1230+
if let Some(first_terminal) = terminals.into_iter().nth(0) {
1231+
commands.push(first_terminal);
1232+
}
1233+
(commands, query_results, any_moved)
1234+
}
1235+
1236+
fn preprocess_command_sequence_old_behavior(
1237+
mut commands: Vec<WFCommand>,
1238+
) -> (Vec<WFCommand>, Vec<QueryResult>) {
1239+
let mut query_results = vec![];
1240+
let mut seen_terminal = false;
1241+
1242+
commands = std::mem::take(&mut commands)
1243+
.into_iter()
1244+
.filter_map(|c| {
1245+
if let WFCommand::QueryResponse(qr) = c {
1246+
query_results.push(qr);
1247+
None
1248+
} else if seen_terminal {
1249+
None
1250+
} else {
1251+
if c.is_terminal() {
1252+
seen_terminal = true;
1253+
}
1254+
Some(c)
1255+
}
1256+
})
1257+
.collect();
1258+
(commands, query_results)
1259+
}
1260+
11721261
/// Drains pending queries from the workflow task and appends them to the activation's jobs
11731262
fn put_queries_in_act(act: &mut WorkflowActivation, wft: &mut OutstandingTask) {
11741263
// Nothing to do if there are no pending queries
@@ -1397,3 +1486,137 @@ impl From<WFMachinesError> for RunUpdateErr {
13971486
}
13981487
}
13991488
}
1489+
1490+
#[cfg(test)]
1491+
mod tests {
1492+
use crate::worker::workflow::WFCommand;
1493+
use std::mem::{discriminant, Discriminant};
1494+
1495+
use command_utils::*;
1496+
1497+
#[rstest::rstest]
1498+
#[case::empty(
1499+
vec![],
1500+
vec![],
1501+
false)]
1502+
#[case::non_terminal_is_retained(
1503+
vec![update_response()],
1504+
vec![update_response()],
1505+
false)]
1506+
#[case::terminal_is_retained(
1507+
vec![complete()],
1508+
vec![complete()],
1509+
false)]
1510+
#[case::post_terminal_is_retained(
1511+
vec![complete(), update_response()],
1512+
vec![update_response(), complete()],
1513+
true)]
1514+
#[case::second_terminal_is_discarded(
1515+
vec![cancel(), complete()],
1516+
vec![cancel()],
1517+
false)]
1518+
#[case::move_terminals_to_end_and_retain_first(
1519+
vec![update_response(), complete(), update_response(), cancel(), update_response()],
1520+
vec![update_response(), update_response(), update_response(), complete()],
1521+
true)]
1522+
#[test]
1523+
fn preprocess_command_sequence(
1524+
#[case] commands_in: Vec<WFCommand>,
1525+
#[case] expected_commands: Vec<WFCommand>,
1526+
#[case] expected_any_reordered: bool,
1527+
) {
1528+
let (commands, _, any_reordered) = super::preprocess_command_sequence(commands_in);
1529+
assert_eq!(command_types(&commands), command_types(&expected_commands));
1530+
assert_eq!(any_reordered, expected_any_reordered);
1531+
}
1532+
1533+
#[rstest::rstest]
1534+
#[case::query_responses_extracted(
1535+
vec![query_response(), update_response(), query_response(), complete(), query_response()],
1536+
3,
1537+
)]
1538+
#[test]
1539+
fn preprocess_command_sequence_extracts_queries(
1540+
#[case] commands_in: Vec<WFCommand>,
1541+
#[case] expected_queries_out: usize,
1542+
) {
1543+
let (_, query_responses_out, _) = super::preprocess_command_sequence(commands_in);
1544+
assert_eq!(query_responses_out.len(), expected_queries_out);
1545+
}
1546+
1547+
#[rstest::rstest]
1548+
#[case::empty(
1549+
vec![],
1550+
vec![])]
1551+
#[case::non_terminal_is_retained(
1552+
vec![update_response()],
1553+
vec![update_response()])]
1554+
#[case::terminal_is_retained(
1555+
vec![complete()],
1556+
vec![complete()])]
1557+
#[case::post_terminal_is_discarded(
1558+
vec![complete(), update_response()],
1559+
vec![complete()])]
1560+
#[case::second_terminal_is_discarded(
1561+
vec![cancel(), complete()],
1562+
vec![cancel()])]
1563+
#[case::truncate_at_first_complete(
1564+
vec![update_response(), complete(), update_response(), cancel()],
1565+
vec![update_response(), complete()])]
1566+
#[test]
1567+
fn preprocess_command_sequence_old_behavior(
1568+
#[case] commands_in: Vec<WFCommand>,
1569+
#[case] expected_out: Vec<WFCommand>,
1570+
) {
1571+
let (commands_out, _) = super::preprocess_command_sequence_old_behavior(commands_in);
1572+
assert_eq!(command_types(&commands_out), command_types(&expected_out));
1573+
}
1574+
1575+
#[rstest::rstest]
1576+
#[case::query_responses_extracted(
1577+
vec![query_response(), update_response(), query_response(), complete(), query_response()],
1578+
3,
1579+
)]
1580+
#[test]
1581+
fn preprocess_command_sequence_old_behavior_extracts_queries(
1582+
#[case] commands_in: Vec<WFCommand>,
1583+
#[case] expected_queries_out: usize,
1584+
) {
1585+
let (_, query_responses_out) = super::preprocess_command_sequence_old_behavior(commands_in);
1586+
assert_eq!(query_responses_out.len(), expected_queries_out);
1587+
}
1588+
1589+
mod command_utils {
1590+
use temporal_sdk_core_protos::coresdk::workflow_commands::{
1591+
CancelWorkflowExecution, CompleteWorkflowExecution, QueryResult, UpdateResponse,
1592+
};
1593+
1594+
use super::*;
1595+
1596+
pub(crate) fn complete() -> WFCommand {
1597+
WFCommand::CompleteWorkflow(CompleteWorkflowExecution { result: None })
1598+
}
1599+
1600+
pub(crate) fn cancel() -> WFCommand {
1601+
WFCommand::CancelWorkflow(CancelWorkflowExecution {})
1602+
}
1603+
1604+
pub(crate) fn query_response() -> WFCommand {
1605+
WFCommand::QueryResponse(QueryResult {
1606+
query_id: "".into(),
1607+
variant: None,
1608+
})
1609+
}
1610+
1611+
pub(crate) fn update_response() -> WFCommand {
1612+
WFCommand::UpdateResponse(UpdateResponse {
1613+
protocol_instance_id: "".into(),
1614+
response: None,
1615+
})
1616+
}
1617+
1618+
pub(crate) fn command_types(commands: &Vec<WFCommand>) -> Vec<Discriminant<WFCommand>> {
1619+
commands.iter().map(discriminant).collect()
1620+
}
1621+
}
1622+
}

core/src/worker/workflow/mod.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,7 +1043,7 @@ fn validate_completion(
10431043
match completion.status {
10441044
Some(workflow_activation_completion::Status::Successful(success)) => {
10451045
// Convert to wf commands
1046-
let mut commands = success
1046+
let commands = success
10471047
.commands
10481048
.into_iter()
10491049
.map(|c| c.try_into())
@@ -1070,16 +1070,6 @@ fn validate_completion(
10701070
});
10711071
}
10721072

1073-
// Any non-query-response commands after a terminal command should be ignored
1074-
if let Some(term_cmd_pos) = commands.iter().position(|c| c.is_terminal()) {
1075-
// Query responses are just fine, so keep them.
1076-
let queries = commands
1077-
.split_off(term_cmd_pos + 1)
1078-
.into_iter()
1079-
.filter(|c| matches!(c, WFCommand::QueryResponse(_)));
1080-
commands.extend(queries);
1081-
}
1082-
10831073
Ok(ValidatedCompletion::Success {
10841074
run_id: completion.run_id,
10851075
commands,

0 commit comments

Comments
 (0)