diff --git a/docs/features/batch.md b/docs/features/batch.md index f1dd9a5da8..ab0d81b568 100644 --- a/docs/features/batch.md +++ b/docs/features/batch.md @@ -108,7 +108,7 @@ Processing batches from SQS works in three stages: 3. Use **`processPartialResponse`** to kick off processing !!! note - By default, the batch processor will process messages in parallel, which does not guarantee the order of processing. If you need to process messages in order, set the [`processInParallel` option to `false`](#sequential-async-processing), or use [`SqsFifoPartialProcessor` for SQS FIFO queues](#fifo-queues). + By default, the batch processor will process messages in parallel, which does not guarantee the order of processing. If you need to process messages in order, set the [`processInParallel` option to `false`](#sequential-processing), or use [`SqsFifoPartialProcessor` for SQS FIFO queues](#fifo-queues). === "index.ts" @@ -235,7 +235,7 @@ By default, we catch any exception raised by your record handler function. This --8<-- ``` - 1. Any exception works here. See [extending BatchProcessorSync section, if you want to override this behavior.](#extending-batchprocessor) + 1. Any exception works here. See [extending `BatchProcessor` section, if you want to override this behavior.](#extending-batchprocessor) 2. Exceptions raised in `recordHandler` will propagate to `process_partial_response`.

We catch them and include each failed batch item identifier in the response dictionary (see `Sample response` tab). @@ -411,7 +411,7 @@ Use the `BatchProcessor` directly in your function to access a list of all retur Within your `recordHandler` function, you might need access to the Lambda context to determine how much time you have left before your function times out. -We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you register it when using `BatchProcessorSync` or the `processPartialResponseSync` function. +We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you pass it to the `processPartialResponse` function. ```typescript hl_lines="12 27" --8<-- "examples/snippets/batch/accessLambdaContext.ts" @@ -444,7 +444,7 @@ Let's suppose you'd like to add a metric named `BatchRecordFailures` for each ba --8<-- "examples/snippets/batch/extendingFailure.ts" ``` -### Sequential async processing +### Sequential processing By default, the `BatchProcessor` processes records in parallel using `Promise.all()`. However, if you need to preserve the order of records, you can set the `processInParallel` option to `false` to process records sequentially. @@ -452,7 +452,7 @@ By default, the `BatchProcessor` processes records in parallel using `Promise.al When processing records from SQS FIFO queues, we recommend using the [`SqsFifoPartialProcessor`](#fifo-queues) class, which guarantees ordering of records and implements a short-circuit mechanism to skip processing records from a different message group ID. -```typescript hl_lines="8 17" title="Sequential async processing" +```typescript hl_lines="8 17" title="Sequential processing" --8<-- "examples/snippets/batch/sequentialAsyncProcessing.ts" ``` @@ -487,7 +487,7 @@ classDiagram * **`processRecord()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic * **`processRecordSync()`** – handles all processing logic for each individual message of a batch, including calling the `recordHandler` (`this.handler`) -You can then use this class as a context manager, or pass it to `processPartialResponseSync` to process the records in your Lambda handler function. +You can then pass this class to `processPartialResponse` to process the records in your Lambda handler function. ```typescript hl_lines="21 35 55 60 72 85" title="Creating a custom batch processor" --8<-- "examples/snippets/batch/customPartialProcessor.ts" @@ -507,7 +507,7 @@ You can use Tracer to create subsegments for each batch record processed. To do ## Testing your code -As there is no external calls, you can unit test your code with `BatchProcessorSync` quite easily. +As there is no external calls, you can unit test your code with `BatchProcessor` quite easily. **Example**: diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index 68bcac467d..1640cc4355 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -106,20 +106,6 @@ abstract class BasePartialProcessor { * Before and after processing, the processor will call the prepare and clean methods respectively. */ public async process(): Promise<(SuccessResponse | FailureResponse)[]> { - /** - * If this is a sync processor, user should have called processSync instead, - * so we call the method early to throw the error early thus failing fast. - * - * The type casting is necessary to ensure that we have test coverage for the - * block of code that throws the error, without having to change the return type - * of the method. This is because this call will always throw an error. - */ - if (this.constructor.name === 'BatchProcessorSync') { - return (await this.processRecord(this.records[0])) as ( - | SuccessResponse - | FailureResponse - )[]; - } this.prepare(); // Default to `true` if `processInParallel` is not specified. diff --git a/packages/batch/src/BatchProcessorSync.ts b/packages/batch/src/BatchProcessorSync.ts index 8feb8f14fa..a3613a3d51 100644 --- a/packages/batch/src/BatchProcessorSync.ts +++ b/packages/batch/src/BatchProcessorSync.ts @@ -82,7 +82,7 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js'; * @param eventType The type of event to process (SQS, Kinesis, DynamoDB) * @deprecated Use {@link BasePartialBatchProcessor} instead, this class is deprecated and will be removed in the next major version. */ -class BatchProcessorSync extends BasePartialBatchProcessor { +/* v8 ignore start */ class BatchProcessorSync extends BasePartialBatchProcessor { /** * @throws {BatchProcessingError} This method is not implemented for asynchronous processing. * @@ -120,6 +120,6 @@ class BatchProcessorSync extends BasePartialBatchProcessor { return this.failureHandler(record, error as Error); } } -} +} /* v8 ignore stop */ export { BatchProcessorSync }; diff --git a/packages/batch/src/processPartialResponseSync.ts b/packages/batch/src/processPartialResponseSync.ts index 39e1dce7c1..d5c22d9975 100644 --- a/packages/batch/src/processPartialResponseSync.ts +++ b/packages/batch/src/processPartialResponseSync.ts @@ -99,7 +99,9 @@ import type { * @param processor Batch processor instance to handle the batch processing * @param options Batch processing options, which can vary with chosen batch processor implementation */ -const processPartialResponseSync = ( +/* v8 ignore start */ const processPartialResponseSync = < + T extends BasePartialBatchProcessor, +>( event: { Records: BaseRecord[] }, recordHandler: CallableFunction, processor: T, @@ -114,6 +116,6 @@ const processPartialResponseSync = ( processor.processSync(); return processor.response(); -}; +}; /* v8 ignore stop */ export { processPartialResponseSync }; diff --git a/packages/batch/tests/unit/BatchProcessorSync.test.ts b/packages/batch/tests/unit/BatchProcessorSync.test.ts deleted file mode 100644 index 91c9e7bfe0..0000000000 --- a/packages/batch/tests/unit/BatchProcessorSync.test.ts +++ /dev/null @@ -1,270 +0,0 @@ -import context from '@aws-lambda-powertools/testing-utils/context'; -import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; -import { - BatchProcessingError, - BatchProcessorSync, - EventType, - FullBatchFailureError, -} from '../../src/index.js'; -import type { BatchProcessingOptions } from '../../src/types.js'; -import { - dynamodbRecordFactory, - kinesisRecordFactory, - sqsRecordFactory, -} from '../helpers/factories.js'; -import { - dynamodbRecordHandler, - handlerWithContext, - kinesisRecordHandler, - sqsRecordHandler, -} from '../helpers/handlers.js'; - -describe('Class: BatchProcessor', () => { - const ENVIRONMENT_VARIABLES = process.env; - const options: BatchProcessingOptions = { - context, - }; - - beforeEach(() => { - vi.clearAllMocks(); - process.env = { ...ENVIRONMENT_VARIABLES }; - }); - - afterAll(() => { - process.env = ENVIRONMENT_VARIABLES; - }); - - describe('Synchronously processing SQS Records', () => { - it('completes processing with no failures', () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessorSync(EventType.SQS); - - // Act - processor.register(records, sqsRecordHandler); - const processedMessages = processor.processSync(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.body, firstRecord], - ['success', secondRecord.body, secondRecord], - ]); - }); - - it('completes processing with some failures', () => { - // Prepare - const firstRecord = sqsRecordFactory('failure'); - const secondRecord = sqsRecordFactory('success'); - const thirdRecord = sqsRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessorSync(EventType.SQS); - - // Act - processor.register(records, sqsRecordHandler); - const processedMessages = processor.processSync(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.body, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.messageId }, - { itemIdentifier: thirdRecord.messageId }, - ], - }); - }); - - it('completes processing with all failures', () => { - // Prepare - const firstRecord = sqsRecordFactory('failure'); - const secondRecord = sqsRecordFactory('failure'); - const thirdRecord = sqsRecordFactory('fail'); - - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessorSync(EventType.SQS); - - // Act & Assess - processor.register(records, sqsRecordHandler); - expect(() => processor.processSync()).toThrowError(FullBatchFailureError); - }); - }); - - describe('Synchronously processing Kinesis Records', () => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('success'); - const secondRecord = kinesisRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessorSync(EventType.KinesisDataStreams); - - // Act - processor.register(records, kinesisRecordHandler); - const processedMessages = processor.processSync(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.kinesis.data, firstRecord], - ['success', secondRecord.kinesis.data, secondRecord], - ]); - }); - - it('completes processing with some failures', async () => { - // Prepare - const firstRecord = kinesisRecordFactory('failure'); - const secondRecord = kinesisRecordFactory('success'); - const thirdRecord = kinesisRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessorSync(EventType.KinesisDataStreams); - - // Act - processor.register(records, kinesisRecordHandler); - const processedMessages = processor.processSync(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.kinesis.data, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.kinesis.sequenceNumber }, - { itemIdentifier: thirdRecord.kinesis.sequenceNumber }, - ], - }); - }); - - it('completes processing with all failures', async () => { - const firstRecord = kinesisRecordFactory('failure'); - const secondRecord = kinesisRecordFactory('failure'); - const thirdRecord = kinesisRecordFactory('fail'); - - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessorSync(EventType.KinesisDataStreams); - - // Act - processor.register(records, kinesisRecordHandler); - - // Assess - expect(() => processor.processSync()).toThrowError(FullBatchFailureError); - }); - }); - - describe('Synchronously processing DynamoDB Records', () => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('success'); - const secondRecord = dynamodbRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessorSync(EventType.DynamoDBStreams); - - // Act - processor.register(records, dynamodbRecordHandler); - const processedMessages = processor.processSync(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.dynamodb?.NewImage?.Message, firstRecord], - ['success', secondRecord.dynamodb?.NewImage?.Message, secondRecord], - ]); - }); - - it('completes processing with some failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('failure'); - const secondRecord = dynamodbRecordFactory('success'); - const thirdRecord = dynamodbRecordFactory('fail'); - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessorSync(EventType.DynamoDBStreams); - - // Act - processor.register(records, dynamodbRecordHandler); - const processedMessages = processor.processSync(); - - // Assess - expect(processedMessages[1]).toStrictEqual([ - 'success', - secondRecord.dynamodb?.NewImage?.Message, - secondRecord, - ]); - expect(processor.failureMessages.length).toBe(2); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: firstRecord.dynamodb?.SequenceNumber }, - { itemIdentifier: thirdRecord.dynamodb?.SequenceNumber }, - ], - }); - }); - - it('completes processing with all failures', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory('failure'); - const secondRecord = dynamodbRecordFactory('failure'); - const thirdRecord = dynamodbRecordFactory('fail'); - - const records = [firstRecord, secondRecord, thirdRecord]; - const processor = new BatchProcessorSync(EventType.DynamoDBStreams); - - // Act - processor.register(records, dynamodbRecordHandler); - - // Assess - expect(() => processor.processSync()).toThrowError(FullBatchFailureError); - }); - }); - - describe('Batch processing with Lambda context', () => { - it('passes the context to the record handler', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessorSync(EventType.SQS); - - // Act - processor.register(records, handlerWithContext, options); - const processedMessages = processor.processSync(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.body, firstRecord], - ['success', secondRecord.body, secondRecord], - ]); - }); - - it('throws an error when passing an invalid context object', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('success'); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessorSync(EventType.SQS); - - // Act - processor.register(records, sqsRecordHandler, options); - const processedMessages = processor.processSync(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', firstRecord.body, firstRecord], - ['success', secondRecord.body, secondRecord], - ]); - }); - }); - - it('throws an error when the async process method is called', async () => { - // Prepare - const processor = new BatchProcessorSync(EventType.SQS); - - // Act & Assess - await expect(() => processor.process()).rejects.toThrowError( - BatchProcessingError - ); - }); -}); diff --git a/packages/batch/tests/unit/processPartialResponseSync.test.ts b/packages/batch/tests/unit/processPartialResponseSync.test.ts deleted file mode 100644 index ccd773aefd..0000000000 --- a/packages/batch/tests/unit/processPartialResponseSync.test.ts +++ /dev/null @@ -1,341 +0,0 @@ -import context from '@aws-lambda-powertools/testing-utils/context'; -import type { - Context, - DynamoDBStreamEvent, - KinesisStreamEvent, - SQSEvent, -} from 'aws-lambda'; -import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; -import { - BatchProcessorSync, - EventType, - FullBatchFailureError, - processPartialResponseSync, - UnexpectedBatchTypeError, -} from '../../src/index.js'; -import type { - BatchProcessingOptions, - PartialItemFailureResponse, -} from '../../src/types.js'; -import { - dynamodbRecordFactory, - kinesisRecordFactory, - sqsRecordFactory, -} from '../helpers/factories.js'; -import { - dynamodbRecordHandler, - handlerWithContext, - kinesisRecordHandler, - sqsRecordHandler, -} from '../helpers/handlers.js'; - -describe('Function: processPartialResponse()', () => { - const ENVIRONMENT_VARIABLES = process.env; - const options: BatchProcessingOptions = { - context, - }; - - beforeEach(() => { - vi.clearAllMocks(); - process.env = { ...ENVIRONMENT_VARIABLES }; - }); - - afterAll(() => { - process.env = ENVIRONMENT_VARIABLES; - }); - - describe('Process partial response function call tests', () => { - it('Process partial response function call with synchronous handler', () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const batch = { Records: records }; - const processor = new BatchProcessorSync(EventType.SQS); - - // Act - const ret = processPartialResponseSync( - batch, - sqsRecordHandler, - processor - ); - - // Assess - expect(ret).toStrictEqual({ batchItemFailures: [] }); - }); - - it('Process partial response function call with context provided', () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const batch = { Records: records }; - const processor = new BatchProcessorSync(EventType.SQS); - - // Act - const ret = processPartialResponseSync( - batch, - handlerWithContext, - processor, - options - ); - - // Assess - expect(ret).toStrictEqual({ batchItemFailures: [] }); - }); - - it('Process partial response function call with synchronous handler for full batch failure', () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const batch = { Records: records }; - const processor = new BatchProcessorSync(EventType.SQS); - - // Act & Assess - expect(() => - processPartialResponseSync(batch, sqsRecordHandler, processor) - ).toThrow(FullBatchFailureError); - }); - - it('Process partial response function call with synchronous handler for full batch failure when `throwOnFullBatchFailure` is `true`', () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const batch = { Records: records }; - const processor = new BatchProcessorSync(EventType.SQS); - - // Act & Assess - expect(() => - processPartialResponseSync(batch, sqsRecordHandler, processor, { - ...options, - throwOnFullBatchFailure: true, - }) - ).toThrow(FullBatchFailureError); - }); - - it('Process partial response function call with synchronous handler for full batch failure when `throwOnFullBatchFailure` is `false`', () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const batch = { Records: records }; - const processor = new BatchProcessorSync(EventType.SQS); - - // Act - const response = processPartialResponseSync( - batch, - sqsRecordHandler, - processor, - { - ...options, - throwOnFullBatchFailure: false, - } - ); - - // Assess - expect(response).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: records[0].messageId }, - { itemIdentifier: records[1].messageId }, - ], - }); - }); - }); - - describe('Process partial response function call through handler', () => { - it('Process partial response through handler with SQS event', () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const processor = new BatchProcessorSync(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = ( - event: SQSEvent, - _context: Context - ): PartialItemFailureResponse => { - return processPartialResponseSync(event, sqsRecordHandler, processor); - }; - - // Act - const result = handler(event, context); - - // Assess - expect(result).toStrictEqual({ batchItemFailures: [] }); - }); - - it('Process partial response through handler with Kinesis event', () => { - // Prepare - const records = [ - kinesisRecordFactory('success'), - kinesisRecordFactory('success'), - ]; - const processor = new BatchProcessorSync(EventType.KinesisDataStreams); - const event: KinesisStreamEvent = { Records: records }; - - const handler = ( - event: KinesisStreamEvent, - _context: Context - ): PartialItemFailureResponse => { - return processPartialResponseSync( - event, - kinesisRecordHandler, - processor - ); - }; - - // Act - const result = handler(event, context); - - // Assess - expect(result).toStrictEqual({ batchItemFailures: [] }); - }); - - it('Process partial response through handler with DynamoDB event', () => { - // Prepare - const records = [ - dynamodbRecordFactory('success'), - dynamodbRecordFactory('success'), - ]; - const processor = new BatchProcessorSync(EventType.DynamoDBStreams); - const event: DynamoDBStreamEvent = { Records: records }; - - const handler = ( - event: DynamoDBStreamEvent, - _context: Context - ): PartialItemFailureResponse => { - return processPartialResponseSync( - event, - dynamodbRecordHandler, - processor - ); - }; - - // Act - const result = handler(event, context); - - // Assess - expect(result).toStrictEqual({ batchItemFailures: [] }); - }); - - it('Process partial response through handler for SQS records with incorrect event type', () => { - // Prepare - const processor = new BatchProcessorSync(EventType.SQS); - - const handler = ( - event: SQSEvent, - _context: Context - ): PartialItemFailureResponse => { - return processPartialResponseSync(event, sqsRecordHandler, processor); - }; - - try { - // Act - handler({} as unknown as SQSEvent, context); - } catch (error) { - // Assess - expect(error).toBeInstanceOf(UnexpectedBatchTypeError); - expect((error as Error).message).toBe( - `Unexpected batch type. Possible values are: ${Object.keys( - EventType - ).join(', ')}` - ); - } - }); - - it('Process partial response through handler with context provided', () => { - // Prepare - const records = [ - sqsRecordFactory('success'), - sqsRecordFactory('success'), - ]; - const processor = new BatchProcessorSync(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = ( - event: SQSEvent, - context: Context - ): PartialItemFailureResponse => { - const options: BatchProcessingOptions = { context: context }; - - return processPartialResponseSync( - event, - handlerWithContext, - processor, - options - ); - }; - - // Act - const result = handler(event, context); - - // Assess - expect(result).toStrictEqual({ batchItemFailures: [] }); - }); - - it('Process partial response through handler for full batch failure', () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const processor = new BatchProcessorSync(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = ( - event: SQSEvent, - _context: Context - ): PartialItemFailureResponse => { - return processPartialResponseSync(event, sqsRecordHandler, processor); - }; - - // Act & Assess - expect(() => handler(event, context)).toThrow(FullBatchFailureError); - }); - - it('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `true`', () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const processor = new BatchProcessorSync(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = ( - event: SQSEvent, - _context: Context - ): PartialItemFailureResponse => { - return processPartialResponseSync(event, sqsRecordHandler, processor, { - ...options, - throwOnFullBatchFailure: true, - }); - }; - - // Act & Assess - expect(() => handler(event, context)).toThrow(FullBatchFailureError); - }); - - it('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `false`', () => { - // Prepare - const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')]; - const processor = new BatchProcessorSync(EventType.SQS); - const event: SQSEvent = { Records: records }; - - const handler = ( - event: SQSEvent, - _context: Context - ): PartialItemFailureResponse => { - return processPartialResponseSync(event, sqsRecordHandler, processor, { - ...options, - throwOnFullBatchFailure: false, - }); - }; - - // Act - const response = handler(event, context); - - // Assess - expect(response).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: records[0].messageId }, - { itemIdentifier: records[1].messageId }, - ], - }); - }); - }); -});