diff --git a/packages/client/src/async-completion-client.ts b/packages/client/src/async-completion-client.ts index d632fcee1..51af79f67 100644 --- a/packages/client/src/async-completion-client.ts +++ b/packages/client/src/async-completion-client.ts @@ -1,5 +1,4 @@ import os from 'os'; -import { ServerErrorResponse } from '@grpc/grpc-js'; import { Status } from '@grpc/grpc-js/build/src/constants'; import { DataConverter, @@ -9,6 +8,7 @@ import { filterNullAndUndefined, } from '@temporalio/common'; import { Connection, WorkflowService } from './connection'; +import { isServerErrorResponse } from './errors'; /** * Thrown by {@link AsyncCompletionClient} when trying to complete or heartbeat @@ -34,14 +34,6 @@ export class ActivityCancelledError extends Error { public readonly name = 'ActivityCancelledError'; } -/** - * Type assertion helper, assertion is mostly empty because any additional - * properties are optional. - */ -function isServerErrorResponse(err: unknown): err is ServerErrorResponse { - return err instanceof Error; -} - /** * Options used to configure {@link AsyncCompletionClient} */ diff --git a/packages/client/src/errors.ts b/packages/client/src/errors.ts index dc4b59c39..a1a3220a4 100644 --- a/packages/client/src/errors.ts +++ b/packages/client/src/errors.ts @@ -1,4 +1,19 @@ +import { ServerErrorResponse } from '@grpc/grpc-js'; import { RetryState, TemporalFailure } from '@temporalio/common'; +export { WorkflowExecutionAlreadyStartedError } from '@temporalio/common'; + +/** + * Generic Error class for errors coming from the service + */ +export class ServiceError extends Error { + public readonly name: string = 'ServiceError'; + public readonly cause?: Error; + + constructor(message: string, opts?: { cause: Error }) { + super(message); + this.cause = opts?.cause; + } +} /** * Thrown by the client while waiting on Workflow execution result if execution @@ -32,3 +47,11 @@ export class WorkflowContinuedAsNewError extends Error { super(message); } } + +/** + * Type assertion helper, assertion is mostly empty because any additional + * properties are optional. + */ +export function isServerErrorResponse(err: unknown): err is ServerErrorResponse { + return err instanceof Error; +} diff --git a/packages/client/src/interceptors.ts b/packages/client/src/interceptors.ts index ce34d3067..0e0d9d5f7 100644 --- a/packages/client/src/interceptors.ts +++ b/packages/client/src/interceptors.ts @@ -48,11 +48,13 @@ export interface WorkflowTerminateInput { readonly workflowExecution: WorkflowExecution; readonly reason?: string; readonly details?: unknown[]; + readonly firstExecutionRunId?: string; } /** Input for WorkflowClientCallsInterceptor.cancel */ export interface WorkflowCancelInput { readonly workflowExecution: WorkflowExecution; + readonly firstExecutionRunId?: string; } /** diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index 7a0d1ef79..cc8c4e31c 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -1,6 +1,9 @@ import { temporal } from '@temporalio/proto'; -export type WorkflowExecution = temporal.api.common.v1.IWorkflowExecution; +export interface WorkflowExecution { + workflowId: string; + runId?: string; +} export type StartWorkflowExecutionRequest = temporal.api.workflowservice.v1.IStartWorkflowExecutionRequest; export type GetWorkflowExecutionHistoryRequest = temporal.api.workflowservice.v1.IGetWorkflowExecutionHistoryRequest; export type DescribeWorkflowExecutionResponse = temporal.api.workflowservice.v1.IDescribeWorkflowExecutionResponse; diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index 8ac0e4050..29946917a 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -1,4 +1,5 @@ import os from 'os'; +import { status as grpcStatus } from '@grpc/grpc-js'; import { temporal } from '@temporalio/proto'; import { WorkflowClientInterceptors } from './interceptors'; import { v4 as uuid4 } from 'uuid'; @@ -21,6 +22,7 @@ import { TimeoutFailure, TimeoutType, compileRetryPolicy, + WorkflowNotFoundError, } from '@temporalio/common'; import { WorkflowOptions, compileWorkflowOptions, WorkflowSignalWithStartOptions } from './workflow-options'; import { @@ -38,8 +40,15 @@ import { StartWorkflowExecutionRequest, TerminateWorkflowExecutionResponse, RequestCancelWorkflowExecutionResponse, + WorkflowExecution, } from './types'; -import { WorkflowFailedError, WorkflowContinuedAsNewError } from './errors'; +import { + WorkflowFailedError, + WorkflowContinuedAsNewError, + WorkflowExecutionAlreadyStartedError, + ServiceError, + isServerErrorResponse, +} from './errors'; import { Connection, WorkflowService } from './connection'; /** @@ -190,7 +199,32 @@ export interface WorkflowResultOptions { * * @default true */ - followRuns: boolean; + followRuns?: boolean; +} + +export interface GetWorkflowHandleOptions extends WorkflowResultOptions { + /** + * ID of the first execution in the Workflow execution chain. + * + * When getting a handle with no `runId`, pass this option to ensure some + * {@link WorkflowHandle} methods (e.g. `terminate` and `cancel`) don't + * affect executions from another chain. + */ + firstExecutionRunId?: string; +} + +interface WorkflowHandleOptions extends GetWorkflowHandleOptions { + workflowId: string; + runId?: string; + interceptors: WorkflowClientCallsInterceptor[]; + /** + * A runId to use for getting the workflow's result. + * + * - When creating a handle using `getHandle`, uses the provided runId or firstExecutionRunId + * - When creating a handle using `start`, uses the returned runId (first in the chain) + * - When creating a handle using `signalWithStart`, uses the the returned runId + */ + runIdForResult?: string; } /** @@ -275,7 +309,14 @@ export class WorkflowClient { // Cast is needed because it's impossible to deduce the type in this situation const interceptors = (this.options.interceptors.calls ?? []).map((ctor) => ctor({ workflowId })); const runId = await this._start(workflowTypeOrFunc, { ...options, workflowId }, interceptors); - const handle = this._createWorkflowHandle(workflowId, runId, interceptors, { + // runId is not used in handles created with `start*` calls because these + // handles should allow interacting with the workflow if it continues as new. + const handle = this._createWorkflowHandle({ + workflowId, + runId: undefined, + firstExecutionRunId: runId, + runIdForResult: runId, + interceptors, followRuns: options.followRuns ?? true, }) as WorkflowHandleWithRunId; // Cast is safe because we know we add the originalRunId below (handle as any) /* readonly */.originalRunId = runId; @@ -295,7 +336,14 @@ export class WorkflowClient { const { workflowId } = options; const interceptors = (this.options.interceptors.calls ?? []).map((ctor) => ctor({ workflowId })); const runId = await this._signalWithStart(workflowTypeOrFunc, options, interceptors); - const handle = this._createWorkflowHandle(workflowId, runId, interceptors, { + // runId is not used in handles created with `start*` calls because these + // handles should allow interacting with the workflow if it continues as new. + const handle = this._createWorkflowHandle({ + workflowId, + runId: undefined, + firstExecutionRunId: undefined, // We don't know if this runId is first in the chain or not + runIdForResult: runId, + interceptors, followRuns: options.followRuns ?? true, }) as WorkflowHandleWithRunId; // Cast is safe because we know we add the originalRunId below (handle as any) /* readonly */.originalRunId = runId; @@ -341,7 +389,12 @@ export class WorkflowClient { let ev: temporal.api.history.v1.IHistoryEvent; for (;;) { - const res = await this.service.getWorkflowExecutionHistory(req); + let res: temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse; + try { + res = await this.service.getWorkflowExecutionHistory(req); + } catch (err) { + this.rethrowGrpcError(err, { workflowId, runId }, 'Failed to get Workflow execution history'); + } if (!res.history) { throw new Error('No history returned by service'); } @@ -377,11 +430,11 @@ export class WorkflowClient { req.nextPageToken = undefined; continue; } - const { failure } = ev.workflowExecutionFailedEventAttributes; + const { failure, retryState } = ev.workflowExecutionFailedEventAttributes; throw new WorkflowFailedError( 'Workflow execution failed', await optionalFailureToOptionalError(failure, this.options.dataConverter), - RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE + retryState ?? RetryState.RETRY_STATE_UNSPECIFIED ); } else if (ev.workflowExecutionCanceledEventAttributes) { const failure = new CancelledFailure( @@ -408,7 +461,7 @@ export class WorkflowClient { RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE ); } else if (ev.workflowExecutionTimedOutEventAttributes) { - if (ev.workflowExecutionTimedOutEventAttributes.newExecutionRunId) { + if (followRuns && ev.workflowExecutionTimedOutEventAttributes.newExecutionRunId) { execution.runId = ev.workflowExecutionTimedOutEventAttributes.newExecutionRunId; req.nextPageToken = undefined; continue; @@ -439,21 +492,40 @@ export class WorkflowClient { } } + protected rethrowGrpcError(err: unknown, workflowExecution: WorkflowExecution, fallbackMessage: string): never { + if (isServerErrorResponse(err)) { + if (err.code === grpcStatus.NOT_FOUND) { + throw new WorkflowNotFoundError( + err.details ?? 'Workflow not found', + workflowExecution.workflowId, + workflowExecution.runId + ); + } + throw new ServiceError(fallbackMessage, { cause: err }); + } + throw new ServiceError('Unexpeced error while making gRPC request'); + } + /** * Uses given input to make a queryWorkflow call to the service * * Used as the final function of the query interceptor chain */ protected async _queryWorkflowHandler(input: WorkflowQueryInput): Promise { - const response = await this.service.queryWorkflow({ - queryRejectCondition: input.queryRejectCondition, - namespace: this.options.namespace, - execution: input.workflowExecution, - query: { - queryType: input.queryType, - queryArgs: { payloads: await this.options.dataConverter.toPayloads(...input.args) }, - }, - }); + let response: temporal.api.workflowservice.v1.QueryWorkflowResponse; + try { + response = await this.service.queryWorkflow({ + queryRejectCondition: input.queryRejectCondition, + namespace: this.options.namespace, + execution: input.workflowExecution, + query: { + queryType: input.queryType, + queryArgs: { payloads: await this.options.dataConverter.toPayloads(...input.args) }, + }, + }); + } catch (err) { + this.rethrowGrpcError(err, input.workflowExecution, 'Failed to query Workflow'); + } if (response.queryRejected) { if (response.queryRejected.status === undefined || response.queryRejected.status === null) { throw new TypeError('Received queryRejected from server with no status'); @@ -473,15 +545,19 @@ export class WorkflowClient { * Used as the final function of the signal interceptor chain */ protected async _signalWorkflowHandler(input: WorkflowSignalInput): Promise { - await this.service.signalWorkflowExecution({ - identity: this.options.identity, - namespace: this.options.namespace, - workflowExecution: input.workflowExecution, - requestId: uuid4(), - // control is unused, - signalName: input.signalName, - input: { payloads: await this.options.dataConverter.toPayloads(...input.args) }, - }); + try { + await this.service.signalWorkflowExecution({ + identity: this.options.identity, + namespace: this.options.namespace, + workflowExecution: input.workflowExecution, + requestId: uuid4(), + // control is unused, + signalName: input.signalName, + input: { payloads: await this.options.dataConverter.toPayloads(...input.args) }, + }); + } catch (err) { + this.rethrowGrpcError(err, input.workflowExecution, 'Failed to signal Workflow'); + } } /** @@ -492,34 +568,38 @@ export class WorkflowClient { protected async _signalWithStartWorkflowHandler(input: WorkflowSignalWithStartInput): Promise { const { identity, dataConverter } = this.options; const { options, workflowType, signalName, signalArgs, headers } = input; - const { runId } = await this.service.signalWithStartWorkflowExecution({ - namespace: this.options.namespace, - identity, - requestId: uuid4(), - workflowId: options.workflowId, - workflowIdReusePolicy: options.workflowIdReusePolicy, - workflowType: { name: workflowType }, - input: { payloads: await dataConverter.toPayloads(...options.args) }, - signalName, - signalInput: { payloads: await dataConverter.toPayloads(...signalArgs) }, - taskQueue: { - kind: temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_UNSPECIFIED, - name: options.taskQueue, - }, - workflowExecutionTimeout: options.workflowExecutionTimeout, - workflowRunTimeout: options.workflowRunTimeout, - workflowTaskTimeout: options.workflowTaskTimeout, - retryPolicy: options.retry ? compileRetryPolicy(options.retry) : undefined, - memo: options.memo ? { fields: await mapToPayloads(dataConverter, options.memo) } : undefined, - searchAttributes: options.searchAttributes - ? { - indexedFields: await mapToPayloads(dataConverter, options.searchAttributes), - } - : undefined, - cronSchedule: options.cronSchedule, - header: { fields: headers }, - }); - return runId; + try { + const { runId } = await this.service.signalWithStartWorkflowExecution({ + namespace: this.options.namespace, + identity, + requestId: uuid4(), + workflowId: options.workflowId, + workflowIdReusePolicy: options.workflowIdReusePolicy, + workflowType: { name: workflowType }, + input: { payloads: await dataConverter.toPayloads(...options.args) }, + signalName, + signalInput: { payloads: await dataConverter.toPayloads(...signalArgs) }, + taskQueue: { + kind: temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_UNSPECIFIED, + name: options.taskQueue, + }, + workflowExecutionTimeout: options.workflowExecutionTimeout, + workflowRunTimeout: options.workflowRunTimeout, + workflowTaskTimeout: options.workflowTaskTimeout, + retryPolicy: options.retry ? compileRetryPolicy(options.retry) : undefined, + memo: options.memo ? { fields: await mapToPayloads(dataConverter, options.memo) } : undefined, + searchAttributes: options.searchAttributes + ? { + indexedFields: await mapToPayloads(dataConverter, options.searchAttributes), + } + : undefined, + cronSchedule: options.cronSchedule, + header: { fields: headers }, + }); + return runId; + } catch (err) { + this.rethrowGrpcError(err, { workflowId: options.workflowId }, 'Failed to signalWithStart Workflow'); + } } /** @@ -528,7 +608,7 @@ export class WorkflowClient { * Used as the final function of the start interceptor chain */ protected async _startWorkflowHandler(input: WorkflowStartInput): Promise { - const { options: opts, workflowType: name, headers } = input; + const { options: opts, workflowType, headers } = input; const { identity, dataConverter } = this.options; const req: StartWorkflowExecutionRequest = { namespace: this.options.namespace, @@ -536,7 +616,7 @@ export class WorkflowClient { requestId: uuid4(), workflowId: opts.workflowId, workflowIdReusePolicy: opts.workflowIdReusePolicy, - workflowType: { name }, + workflowType: { name: workflowType }, input: { payloads: await dataConverter.toPayloads(...opts.args) }, taskQueue: { kind: temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_UNSPECIFIED, @@ -555,8 +635,19 @@ export class WorkflowClient { cronSchedule: opts.cronSchedule, header: { fields: headers }, }; - const res = await this.service.startWorkflowExecution(req); - return res.runId; + try { + const res = await this.service.startWorkflowExecution(req); + return res.runId; + } catch (err: any) { + if (err.code === grpcStatus.ALREADY_EXISTS) { + throw new WorkflowExecutionAlreadyStartedError( + 'Workflow execution already started', + opts.workflowId, + workflowType + ); + } + this.rethrowGrpcError(err, { workflowId: opts.workflowId }, 'Failed to start Workflow'); + } } /** @@ -567,12 +658,17 @@ export class WorkflowClient { protected async _terminateWorkflowHandler( input: WorkflowTerminateInput ): Promise { - return await this.service.terminateWorkflowExecution({ - namespace: this.options.namespace, - identity: this.options.identity, - ...input, - details: { payloads: await this.options.dataConverter.toPayloads(input.details) }, - }); + try { + return await this.service.terminateWorkflowExecution({ + namespace: this.options.namespace, + identity: this.options.identity, + ...input, + details: { payloads: await this.options.dataConverter.toPayloads(input.details) }, + firstExecutionRunId: input.firstExecutionRunId, + }); + } catch (err) { + this.rethrowGrpcError(err, input.workflowExecution, 'Failed to terminate Workflow'); + } } /** @@ -581,30 +677,37 @@ export class WorkflowClient { * Used as the final function of the cancel interceptor chain */ protected async _cancelWorkflowHandler(input: WorkflowCancelInput): Promise { - return await this.service.requestCancelWorkflowExecution({ - namespace: this.options.namespace, - identity: this.options.identity, - requestId: uuid4(), - workflowExecution: input.workflowExecution, - }); + try { + return await this.service.requestCancelWorkflowExecution({ + namespace: this.options.namespace, + identity: this.options.identity, + requestId: uuid4(), + workflowExecution: input.workflowExecution, + firstExecutionRunId: input.firstExecutionRunId, + }); + } catch (err) { + this.rethrowGrpcError(err, input.workflowExecution, 'Failed to cancel workflow'); + } } /** * Create a new workflow handle for new or existing Workflow execution */ - protected _createWorkflowHandle( - workflowId: string, - runId: string | undefined, - interceptors: WorkflowClientCallsInterceptor[], - resultOptions: WorkflowResultOptions - ): WorkflowHandle { + protected _createWorkflowHandle({ + workflowId, + runId, + firstExecutionRunId, + interceptors, + runIdForResult, + ...resultOptions + }: WorkflowHandleOptions): WorkflowHandle { const namespace = this.options.namespace; return { client: this, workflowId, async result(): Promise> { - return this.client.result(workflowId, runId, resultOptions); + return this.client.result(workflowId, runIdForResult, resultOptions); }, async terminate(reason?: string) { const next = this.client._terminateWorkflowHandler.bind(this.client); @@ -612,6 +715,7 @@ export class WorkflowClient { return await fn({ workflowExecution: { workflowId, runId }, reason, + firstExecutionRunId, }); }, async cancel() { @@ -619,6 +723,7 @@ export class WorkflowClient { const fn = interceptors.length ? composeInterceptors(interceptors, 'cancel', next) : next; return await fn({ workflowExecution: { workflowId, runId }, + firstExecutionRunId, }); }, async describe() { @@ -661,11 +766,16 @@ export class WorkflowClient { public getHandle( workflowId: string, runId?: string, - options?: WorkflowResultOptions + options?: GetWorkflowHandleOptions ): WorkflowHandle { const interceptors = (this.options.interceptors.calls ?? []).map((ctor) => ctor({ workflowId, runId })); - return this._createWorkflowHandle(workflowId, runId, interceptors, { + return this._createWorkflowHandle({ + workflowId, + runId, + firstExecutionRunId: options?.firstExecutionRunId, + runIdForResult: runId ?? options?.firstExecutionRunId, + interceptors, followRuns: options?.followRuns ?? true, }); } diff --git a/packages/common/src/errors.ts b/packages/common/src/errors.ts index cebe70750..5a1c26b9f 100644 --- a/packages/common/src/errors.ts +++ b/packages/common/src/errors.ts @@ -13,6 +13,39 @@ export class IllegalStateError extends Error { public readonly name: string = 'IllegalStateError'; } +/** + * This exception is thrown in the following cases: + * - Workflow with the same WorkflowId is currently running + * - There is a closed workflow with the same ID and the {@link WorkflowOptions.workflowIdReusePolicy} + * is `WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE` + * - There is successfully closed workflow with the same ID and the {@link WorkflowOptions.workflowIdReusePolicy} + * is `WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY` + * - {@link Workflow.execute} is called *more than once* on a handle created through {@link createChildWorkflowHandle} and the + * {@link WorkflowOptions.workflowIdReusePolicy} is `WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE` + */ +export class WorkflowExecutionAlreadyStartedError extends Error { + public readonly name: string = 'WorkflowExecutionAlreadyStartedError'; + + constructor(message: string, public readonly workflowId: string, public readonly workflowType: string) { + super(message); + } +} + +/** + * Thrown when workflow with the given id is not known to the Temporal service. + * It could be because: + * - ID passed is incorrect + * - Workflow execution is complete (for some calls e.g. terminate), + * - workflow was purged from the service after reaching its retention limit. + */ +export class WorkflowNotFoundError extends Error { + public readonly name: string = 'WorkflowNotFoundError'; + + constructor(message: string, public readonly workflowId: string, public readonly runId: string | undefined) { + super(message); + } +} + /** * Get error message from an Error or string or return undefined */ diff --git a/packages/test/src/test-integration.ts b/packages/test/src/test-integration.ts index 6db9728ab..b813492fe 100644 --- a/packages/test/src/test-integration.ts +++ b/packages/test/src/test-integration.ts @@ -13,6 +13,8 @@ import { TimeoutType, tsToMs, WorkflowExecution, + WorkflowExecutionAlreadyStartedError, + WorkflowNotFoundError, } from '@temporalio/common'; import { Worker, DefaultLogger, Core } from '@temporalio/worker'; import * as iface from '@temporalio/proto'; @@ -773,4 +775,120 @@ if (RUN_INTEGRATION_TESTS) { iface.temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED ); }); + + test('WorkflowClient.start fails with WorkflowExecutionAlreadyStartedError', async (t) => { + const { client } = t.context; + const workflowId = uuid4(); + const handle = await client.start(workflows.sleeper, { taskQueue: 'test', workflowId, args: [10000000] }); + try { + await t.throwsAsync( + client.start(workflows.sleeper, { + taskQueue: 'test', + workflowId, + }), + { instanceOf: WorkflowExecutionAlreadyStartedError, message: 'Workflow execution already started' } + ); + } finally { + await handle.terminate(); + } + }); + + test('Handle from WorkflowClient.start follows only own execution chain', async (t) => { + const { client } = t.context; + const workflowId = uuid4(); + const handleFromThrowerStart = await client.start(workflows.throwAsync, { taskQueue: 'test', workflowId }); + const handleFromGet = client.getHandle(workflowId); + await t.throwsAsync(handleFromGet.result(), { message: /.*/ }); + const handleFromSleeperStart = await client.start(workflows.sleeper, { + taskQueue: 'test', + workflowId, + args: [1_000_000], + }); + try { + await t.throwsAsync(handleFromThrowerStart.result(), { message: 'Workflow execution failed' }); + } finally { + await handleFromSleeperStart.terminate(); + } + }); + + test('Handle from WorkflowClient.signalWithStart follows only own execution chain', async (t) => { + const { client } = t.context; + const workflowId = uuid4(); + const handleFromThrowerStart = await client.signalWithStart(workflows.throwAsync, { + taskQueue: 'test', + workflowId, + signal: 'unblock', + signalArgs: [], + }); + const handleFromGet = client.getHandle(workflowId); + await t.throwsAsync(handleFromGet.result(), { message: /.*/ }); + const handleFromSleeperStart = await client.start(workflows.sleeper, { + taskQueue: 'test', + workflowId, + args: [1_000_000], + }); + try { + await t.throwsAsync(handleFromThrowerStart.result(), { message: 'Workflow execution failed' }); + } finally { + await handleFromSleeperStart.terminate(); + } + }); + + test('Handle from WorkflowClient.getHandle follows only own execution chain', async (t) => { + const { client } = t.context; + const workflowId = uuid4(); + const handleFromThrowerStart = await client.start(workflows.throwAsync, { + taskQueue: 'test', + workflowId, + }); + const handleFromGet = client.getHandle(workflowId, undefined, { + firstExecutionRunId: handleFromThrowerStart.originalRunId, + }); + await t.throwsAsync(handleFromThrowerStart.result(), { message: /.*/ }); + const handleFromSleeperStart = await client.start(workflows.sleeper, { + taskQueue: 'test', + workflowId, + args: [1_000_000], + }); + try { + await t.throwsAsync(handleFromGet.result(), { message: 'Workflow execution failed' }); + } finally { + await handleFromSleeperStart.terminate(); + } + }); + + test('Handle from WorkflowClient.start terminates run after continue as new', async (t) => { + const { client } = t.context; + const workflowId = uuid4(); + const handleFromStart = await client.start(workflows.continueAsNewToDifferentWorkflow, { + taskQueue: 'test', + workflowId, + args: [1_000_000], + }); + const handleFromGet = client.getHandle(workflowId, handleFromStart.originalRunId, { followRuns: false }); + await t.throwsAsync(handleFromGet.result(), { instanceOf: WorkflowContinuedAsNewError }); + await handleFromStart.terminate(); + await t.throwsAsync(handleFromStart.result(), { message: 'Workflow execution terminated' }); + }); + + test('Handle from WorkflowClient.getHandle does not terminate run after continue as new if given runId', async (t) => { + const { client } = t.context; + const workflowId = uuid4(); + const handleFromStart = await client.start(workflows.continueAsNewToDifferentWorkflow, { + taskQueue: 'test', + workflowId, + args: [1_000_000], + followRuns: false, + }); + const handleFromGet = client.getHandle(workflowId, handleFromStart.originalRunId); + await t.throwsAsync(handleFromStart.result(), { instanceOf: WorkflowContinuedAsNewError }); + try { + await t.throwsAsync(handleFromGet.terminate(), { + instanceOf: WorkflowNotFoundError, + message: 'workflow execution already completed', + }); + } finally { + await client.getHandle(workflowId).terminate(); + } + }); } diff --git a/packages/test/src/workflows/continue-as-new-to-different-workflow.ts b/packages/test/src/workflows/continue-as-new-to-different-workflow.ts index 018db5a9a..66700d1b8 100644 --- a/packages/test/src/workflows/continue-as-new-to-different-workflow.ts +++ b/packages/test/src/workflows/continue-as-new-to-different-workflow.ts @@ -5,7 +5,7 @@ import { makeContinueAsNewFunc } from '@temporalio/workflow'; import { sleeper } from './sleep'; -export async function continueAsNewToDifferentWorkflow(): Promise { +export async function continueAsNewToDifferentWorkflow(ms = 1): Promise { const continueAsNew = makeContinueAsNewFunc({ workflowType: 'sleeper' }); - await continueAsNew(1); + await continueAsNew(ms); } diff --git a/packages/workflow/src/errors.ts b/packages/workflow/src/errors.ts index 782570d96..a70e332f7 100644 --- a/packages/workflow/src/errors.ts +++ b/packages/workflow/src/errors.ts @@ -1,4 +1,5 @@ import { CancelledFailure, ActivityFailure, ChildWorkflowFailure } from '@temporalio/common'; +export { WorkflowExecutionAlreadyStartedError } from '@temporalio/common'; /** * Base class for all workflow errors @@ -14,24 +15,6 @@ export class DeterminismViolationError extends WorkflowError { public readonly name: string = 'DeterminismViolationError'; } -/** - * This exception is thrown in the following cases: - * - Workflow with the same WorkflowId is currently running - * - There is a closed workflow with the same ID and the {@link WorkflowOptions.workflowIdReusePolicy} - * is `WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE` - * - There is successfully closed workflow with the same ID and the {@link WorkflowOptions.workflowIdReusePolicy} - * is `WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY` - * - {@link Workflow.execute} is called *more than once* on a handle created through {@link createChildWorkflowHandle} and the - * {@link WorkflowOptions.workflowIdReusePolicy} is `WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE` - */ -export class WorkflowExecutionAlreadyStartedError extends WorkflowError { - public readonly name: string = 'ChildWorkflowExecutionAlreadyStartedError'; - - constructor(message: string, public readonly workflowId: string, public readonly workflowType: string) { - super(message); - } -} - /** * Returns whether provided `err` is caused by cancellation */