Skip to content
Merged
9 changes: 5 additions & 4 deletions docs/utilities/idempotency.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,11 @@ If you're not [changing the default configuration for the DynamoDB persistence l
Larger items cannot be written to DynamoDB and will cause exceptions.

???+ info "Info: DynamoDB"
Each function invocation will generally make 2 requests to DynamoDB. If the
result returned by your Lambda is less than 1kb, you can expect 2 WCUs per invocation. For retried invocations, you will
see 1WCU and 1RCU. Review the [DynamoDB pricing documentation](https://aws.amazon.com/dynamodb/pricing/){target="_blank"} to
estimate the cost.
Each function invocation will make only 1 request to DynamoDB by using DynamoDB's [conditional expressions](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Expressions.ConditionExpressions.html){target="_blank"} to ensure that we don't overwrite existing records,
and [ReturnValuesOnConditionCheckFailure](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html#DDB-PutItem-request-ReturnValuesOnConditionCheckFailure){target="_blank"} to return the record if it exists.
See [AWS Blog post on handling conditional write errors](https://aws.amazon.com/blogs/database/handle-conditional-write-errors-in-high-concurrency-scenarios-with-amazon-dynamodb/) for more details.
For retried invocations, you will see 1WCU and 1RCU.
Review the [DynamoDB pricing documentation](https://aws.amazon.com/dynamodb/pricing/){target="_blank"} to estimate the cost.

### MakeIdempotent function wrapper

Expand Down
5 changes: 3 additions & 2 deletions packages/idempotency/src/IdempotencyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,10 @@ export class IdempotencyHandler<Func extends AnyFunction> {
} catch (e) {
if (e instanceof IdempotencyItemAlreadyExistsError) {
const idempotencyRecord: IdempotencyRecord =
await this.#persistenceStore.getRecord(
e.existingRecord ||
(await this.#persistenceStore.getRecord(
this.#functionPayloadToBeHashed
);
));

return IdempotencyHandler.determineResultFromIdempotencyRecord(
idempotencyRecord
Expand Down
11 changes: 10 additions & 1 deletion packages/idempotency/src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import type { IdempotencyRecord } from './persistence';

/**
* Item attempting to be inserted into persistence store already exists and is not expired
*/
class IdempotencyItemAlreadyExistsError extends Error {}
class IdempotencyItemAlreadyExistsError extends Error {
public existingRecord?: IdempotencyRecord;

public constructor(message?: string, existingRecord?: IdempotencyRecord) {
super(message);
this.existingRecord = existingRecord;
}
}

/**
* Item does not exist in persistence store
Expand Down
5 changes: 4 additions & 1 deletion packages/idempotency/src/persistence/BasePersistenceLayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
}

if (this.getFromCache(idempotencyRecord.idempotencyKey)) {
throw new IdempotencyItemAlreadyExistsError();
throw new IdempotencyItemAlreadyExistsError(
`Failed to put record for already existing idempotency key: ${idempotencyRecord.idempotencyKey}`,
idempotencyRecord
);
}

await this._putRecord(idempotencyRecord);
Expand Down
25 changes: 18 additions & 7 deletions packages/idempotency/src/persistence/DynamoDBPersistenceLayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import { IdempotencyRecordStatus } from '../constants';
import type { DynamoDBPersistenceOptions } from '../types';
import {
AttributeValue,
ConditionalCheckFailedException,
DeleteItemCommand,
DynamoDBClient,
DynamoDBClientConfig,
DynamoDBServiceException,
GetItemCommand,
PutItemCommand,
UpdateItemCommand,
Expand Down Expand Up @@ -198,15 +198,26 @@ class DynamoDBPersistenceLayer extends BasePersistenceLayer {
':inprogress': IdempotencyRecordStatus.INPROGRESS,
}),
ConditionExpression: conditionExpression,
ReturnValuesOnConditionCheckFailure: 'ALL_OLD',
})
);
} catch (error) {
if (error instanceof DynamoDBServiceException) {
if (error.name === 'ConditionalCheckFailedException') {
throw new IdempotencyItemAlreadyExistsError(
`Failed to put record for already existing idempotency key: ${record.idempotencyKey}`
);
}
if (error instanceof ConditionalCheckFailedException) {
const item = error.Item && unmarshall(error.Item);
const idempotencyRecord =
item &&
new IdempotencyRecord({
idempotencyKey: item[this.keyAttr],
status: item[this.statusAttr],
expiryTimestamp: item[this.expiryAttr],
inProgressExpiryTimestamp: item[this.inProgressExpiryAttr],
responseData: item[this.dataAttr],
payloadHash: item[this.validationKeyAttr],
});
throw new IdempotencyItemAlreadyExistsError(
`Failed to put record for already existing idempotency key: ${record.idempotencyKey}`,
idempotencyRecord
);
}

throw error;
Expand Down
22 changes: 22 additions & 0 deletions packages/idempotency/tests/unit/IdempotencyHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,28 @@ describe('Class IdempotencyHandler', () => {
expect(saveInProgressSpy).toHaveBeenCalledTimes(1);
});

test('when IdempotencyAlreadyInProgressError is thrown and it contains the existing item, it returns it directly', async () => {
// Prepare
const saveInProgressSpy = jest
.spyOn(persistenceStore, 'saveInProgress')
.mockRejectedValueOnce(
new IdempotencyItemAlreadyExistsError(
'Failed to put record for already existing idempotency key: idempotence-key',
new IdempotencyRecord({
idempotencyKey: 'key',
status: IdempotencyRecordStatus.COMPLETED,
responseData: 'Hi',
})
)
);
const getRecordSpy = jest.spyOn(persistenceStore, 'getRecord');

// Act & Assess
await expect(idempotentHandler.handle()).resolves.toEqual('Hi');
expect(saveInProgressSpy).toHaveBeenCalledTimes(1);
expect(getRecordSpy).toHaveBeenCalledTimes(0);
});

test('when IdempotencyInconsistentStateError is thrown, it retries until max retries are exhausted', async () => {
// Prepare
const mockProcessIdempotency = jest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ describe('Class: BasePersistenceLayer', () => {
// Act & Assess
await expect(
persistenceLayer.saveInProgress({ foo: 'bar' })
).rejects.toThrow(new IdempotencyItemAlreadyExistsError());
).rejects.toThrow(IdempotencyItemAlreadyExistsError);
expect(putRecordSpy).toHaveBeenCalledTimes(0);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import { IdempotencyRecord } from '../../../src/persistence';
import type { DynamoDBPersistenceOptions } from '../../../src/types';
import { IdempotencyRecordStatus } from '../../../src';
import {
ConditionalCheckFailedException,
DynamoDBClient,
DynamoDBServiceException,
PutItemCommand,
GetItemCommand,
UpdateItemCommand,
Expand Down Expand Up @@ -395,19 +395,30 @@ describe('Class: DynamoDBPersistenceLayer', () => {
expiryTimestamp: 0,
});
client.on(PutItemCommand).rejects(
new DynamoDBServiceException({
$fault: 'client',
new ConditionalCheckFailedException({
$metadata: {
httpStatusCode: 400,
requestId: 'someRequestId',
},
name: 'ConditionalCheckFailedException',
message: 'Conditional check failed',
Item: {
id: { S: 'test-key' },
status: { S: 'INPROGRESS' },
expiration: { N: Date.now().toString() },
},
})
);

// Act & Assess
await expect(persistenceLayer._putRecord(record)).rejects.toThrowError(
IdempotencyItemAlreadyExistsError
new IdempotencyItemAlreadyExistsError(
`Failed to put record for already existing idempotency key: ${record.idempotencyKey}`,
new IdempotencyRecord({
idempotencyKey: record.idempotencyKey,
status: IdempotencyRecordStatus.EXPIRED,
expiryTimestamp: Date.now() / 1000 - 1,
})
)
);
});

Expand Down Expand Up @@ -676,4 +687,26 @@ describe('Class: DynamoDBPersistenceLayer', () => {
});
});
});

test('_putRecord throws Error when Item is undefined', async () => {
// Prepare
const persistenceLayer = new TestDynamoDBPersistenceLayer({
tableName: dummyTableName,
});
const mockRecord = new IdempotencyRecord({
idempotencyKey: 'test-key',
status: 'INPROGRESS',
expiryTimestamp: Date.now(),
});

DynamoDBClient.prototype.send = jest.fn().mockRejectedValueOnce(
new ConditionalCheckFailedException({
message: 'Conditional check failed',
$metadata: {},
})
);
await expect(
persistenceLayer._putRecord(mockRecord)
).rejects.toThrowError();
});
});