Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions docs/features/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ Processing batches from SQS works in three stages:
3. Use **`processPartialResponse`** to kick off processing

!!! note
By default, the batch processor will process messages in parallel, which does not guarantee the order of processing. If you need to process messages in order, set the [`processInParallel` option to `false`](#sequential-async-processing), or use [`SqsFifoPartialProcessor` for SQS FIFO queues](#fifo-queues).
By default, the batch processor will process messages in parallel, which does not guarantee the order of processing. If you need to process messages in order, set the [`processInParallel` option to `false`](#sequential-processing), or use [`SqsFifoPartialProcessor` for SQS FIFO queues](#fifo-queues).

=== "index.ts"

Expand Down Expand Up @@ -235,7 +235,7 @@ By default, we catch any exception raised by your record handler function. This
--8<--
```

1. Any exception works here. See [extending BatchProcessorSync section, if you want to override this behavior.](#extending-batchprocessor)
1. Any exception works here. See [extending `BatchProcessor` section, if you want to override this behavior.](#extending-batchprocessor)

2. Exceptions raised in `recordHandler` will propagate to `process_partial_response`. <br/><br/> We catch them and include each failed batch item identifier in the response dictionary (see `Sample response` tab).

Expand Down Expand Up @@ -411,7 +411,7 @@ Use the `BatchProcessor` directly in your function to access a list of all retur

Within your `recordHandler` function, you might need access to the Lambda context to determine how much time you have left before your function times out.

We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you register it when using `BatchProcessorSync` or the `processPartialResponseSync` function.
We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you pass it to the `processPartialResponse` function.

```typescript hl_lines="12 27"
--8<-- "examples/snippets/batch/accessLambdaContext.ts"
Expand Down Expand Up @@ -444,15 +444,15 @@ Let's suppose you'd like to add a metric named `BatchRecordFailures` for each ba
--8<-- "examples/snippets/batch/extendingFailure.ts"
```

### Sequential async processing
### Sequential processing

By default, the `BatchProcessor` processes records in parallel using `Promise.all()`. However, if you need to preserve the order of records, you can set the `processInParallel` option to `false` to process records sequentially.

!!! important "If the `processInParallel` option is not provided, the `BatchProcessor` will process records in parallel."

When processing records from SQS FIFO queues, we recommend using the [`SqsFifoPartialProcessor`](#fifo-queues) class, which guarantees ordering of records and implements a short-circuit mechanism to skip processing records from a different message group ID.

```typescript hl_lines="8 17" title="Sequential async processing"
```typescript hl_lines="8 17" title="Sequential processing"
--8<-- "examples/snippets/batch/sequentialAsyncProcessing.ts"
```

Expand Down Expand Up @@ -487,7 +487,7 @@ classDiagram
* **`processRecord()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic
* **`processRecordSync()`** – handles all processing logic for each individual message of a batch, including calling the `recordHandler` (`this.handler`)

You can then use this class as a context manager, or pass it to `processPartialResponseSync` to process the records in your Lambda handler function.
You can then pass this class to `processPartialResponse` to process the records in your Lambda handler function.

```typescript hl_lines="21 35 55 60 72 85" title="Creating a custom batch processor"
--8<-- "examples/snippets/batch/customPartialProcessor.ts"
Expand All @@ -507,7 +507,7 @@ You can use Tracer to create subsegments for each batch record processed. To do

## Testing your code

As there is no external calls, you can unit test your code with `BatchProcessorSync` quite easily.
As there is no external calls, you can unit test your code with `BatchProcessor` quite easily.

**Example**:

Expand Down
14 changes: 0 additions & 14 deletions packages/batch/src/BasePartialProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,20 +106,6 @@ abstract class BasePartialProcessor {
* Before and after processing, the processor will call the prepare and clean methods respectively.
*/
public async process(): Promise<(SuccessResponse | FailureResponse)[]> {
/**
* If this is a sync processor, user should have called processSync instead,
* so we call the method early to throw the error early thus failing fast.
*
* The type casting is necessary to ensure that we have test coverage for the
* block of code that throws the error, without having to change the return type
* of the method. This is because this call will always throw an error.
*/
if (this.constructor.name === 'BatchProcessorSync') {
return (await this.processRecord(this.records[0])) as (
| SuccessResponse
| FailureResponse
)[];
}
this.prepare();

// Default to `true` if `processInParallel` is not specified.
Expand Down
4 changes: 2 additions & 2 deletions packages/batch/src/BatchProcessorSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';
* @param eventType The type of event to process (SQS, Kinesis, DynamoDB)
* @deprecated Use {@link BasePartialBatchProcessor} instead, this class is deprecated and will be removed in the next major version.
*/
class BatchProcessorSync extends BasePartialBatchProcessor {
/* v8 ignore start */ class BatchProcessorSync extends BasePartialBatchProcessor {
/**
* @throws {BatchProcessingError} This method is not implemented for asynchronous processing.
*
Expand Down Expand Up @@ -120,6 +120,6 @@ class BatchProcessorSync extends BasePartialBatchProcessor {
return this.failureHandler(record, error as Error);
}
}
}
} /* v8 ignore stop */

export { BatchProcessorSync };
6 changes: 4 additions & 2 deletions packages/batch/src/processPartialResponseSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ import type {
* @param processor Batch processor instance to handle the batch processing
* @param options Batch processing options, which can vary with chosen batch processor implementation
*/
const processPartialResponseSync = <T extends BasePartialBatchProcessor>(
/* v8 ignore start */ const processPartialResponseSync = <
T extends BasePartialBatchProcessor,
>(
event: { Records: BaseRecord[] },
recordHandler: CallableFunction,
processor: T,
Expand All @@ -114,6 +116,6 @@ const processPartialResponseSync = <T extends BasePartialBatchProcessor>(
processor.processSync();

return processor.response();
};
}; /* v8 ignore stop */

export { processPartialResponseSync };
270 changes: 0 additions & 270 deletions packages/batch/tests/unit/BatchProcessorSync.test.ts

This file was deleted.

Loading