Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/utilities/parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Parser comes with the following built-in schemas:
| **KafkaSelfManagedEventSchema** | Lambda Event Source payload for self managed Kafka payload |
| **KinesisDataStreamSchema** | Lambda Event Source payload for Amazon Kinesis Data Streams |
| **KinesisFirehoseSchema** | Lambda Event Source payload for Amazon Kinesis Firehose |
| **KinesisDynamoDBStreamSchema** | Lambda Event Source payload for DynamodbStream record wrapped in Kinesis Data stream |
| **KinesisFirehoseSqsSchema** | Lambda Event Source payload for SQS messages wrapped in Kinesis Firehose records |
| **LambdaFunctionUrlSchema** | Lambda Event Source payload for Lambda Function URL payload |
| **S3EventNotificationEventBridgeSchema** | Lambda Event Source payload for Amazon S3 Event Notification to EventBridge. |
Expand Down
14 changes: 14 additions & 0 deletions packages/parser/src/schemas/dynamodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ const DynamoDBStreamRecord = z.object({
userIdentity: UserIdentity.optional(),
});

const DynamoDBStreamToKinesisRecord = DynamoDBStreamRecord.extend({
recordFormat: z.literal('application/json'),
tableName: z.string(),
userIdentity: UserIdentity.nullish(),
dynamodb: DynamoDBStreamChangeRecord.omit({
SequenceNumber: true,
StreamViewType: true,
}),
}).omit({
eventVersion: true,
eventSourceARN: true,
});

/**
* Zod schema for Amazon DynamoDB Stream event.
*
Expand Down Expand Up @@ -111,6 +124,7 @@ const DynamoDBStreamSchema = z.object({
});

export {
DynamoDBStreamToKinesisRecord,
DynamoDBStreamSchema,
DynamoDBStreamRecord,
DynamoDBStreamChangeRecord,
Expand Down
6 changes: 5 additions & 1 deletion packages/parser/src/schemas/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ export {
KafkaSelfManagedEventSchema,
KafkaRecordSchema,
} from './kafka.js';
export { KinesisDataStreamSchema, KinesisDataStreamRecord } from './kinesis.js';
export {
KinesisDataStreamSchema,
KinesisDynamoDBStreamSchema,
KinesisDataStreamRecord,
} from './kinesis.js';
export {
KinesisFirehoseSchema,
KinesisFirehoseSqsSchema,
Expand Down
23 changes: 20 additions & 3 deletions packages/parser/src/schemas/kinesis.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import { gunzipSync } from 'node:zlib';
import { z } from 'zod';
import { DynamoDBStreamToKinesisRecord } from './dynamodb.js';

const KinesisDataStreamRecordPayload = z.object({
kinesisSchemaVersion: z.string(),
partitionKey: z.string(),
sequenceNumber: z.string(),
approximateArrivalTimestamp: z.number(),
data: z.string().transform((data) => {
const decompresed = decompress(data);
const decompressed = decompress(data);
const decoded = Buffer.from(data, 'base64').toString('utf-8');
try {
// If data was not compressed, try to parse it as JSON otherwise it must be string
return decompresed === data ? JSON.parse(decoded) : decompresed;
return decompressed === data ? JSON.parse(decoded) : decompressed;
} catch (e) {
return decoded;
}
Expand All @@ -37,6 +38,21 @@ const KinesisDataStreamRecord = z.object({
kinesis: KinesisDataStreamRecordPayload,
});

const KinesisDynamoDBStreamSchema = z.object({
Records: z.array(
KinesisDataStreamRecord.extend({
kinesis: KinesisDataStreamRecordPayload.extend({
data: z
.string()
.transform((data) => {
return JSON.parse(Buffer.from(data, 'base64').toString('utf8'));
})
.pipe(DynamoDBStreamToKinesisRecord),
}),
})
),
});

/**
* Zod schema for Kinesis Data Stream event
*
Expand Down Expand Up @@ -88,7 +104,8 @@ const KinesisDataStreamSchema = z.object({
});

export {
KinesisDataStreamSchema,
KinesisDataStreamRecord,
KinesisDataStreamRecordPayload,
KinesisDataStreamSchema,
KinesisDynamoDBStreamSchema,
};
4 changes: 4 additions & 0 deletions packages/parser/src/types/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type {
KafkaRecordSchema,
KafkaSelfManagedEventSchema,
KinesisDataStreamSchema,
KinesisDynamoDBStreamSchema,
KinesisFirehoseRecordSchema,
KinesisFirehoseSchema,
KinesisFirehoseSqsRecordSchema,
Expand Down Expand Up @@ -79,6 +80,8 @@ type KafkaMskEvent = z.infer<typeof KafkaMskEventSchema>;

type KinesisDataStreamEvent = z.infer<typeof KinesisDataStreamSchema>;

type KinesisDynamoDBStreamEvent = z.infer<typeof KinesisDynamoDBStreamSchema>;

type KinesisFireHoseEvent = z.infer<typeof KinesisFirehoseSchema>;

type KinesisFirehoseRecord = z.infer<typeof KinesisFirehoseRecordSchema>;
Expand Down Expand Up @@ -136,6 +139,7 @@ export type {
KafkaMskEvent,
KafkaRecord,
KinesisDataStreamEvent,
KinesisDynamoDBStreamEvent,
KinesisFireHoseEvent,
KinesisFirehoseRecord,
KinesisFireHoseSqsEvent,
Expand Down
36 changes: 36 additions & 0 deletions packages/parser/tests/events/kinesis/dynamodb-stream.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "859F7C064A4818874FA67ABEC9BF2AF1",
"sequenceNumber": "49657828409187701520019995242508390119953358325192589314",
"data": "eyJhd3NSZWdpb24iOiJldS13ZXN0LTEiLCJldmVudElEIjoiZDk0MjgwMjktMGY2My00MDU2LTg2ZGEtY2UxMGQ1NDViMWI5IiwiZXZlbnROYW1lIjoiSU5TRVJUIiwidXNlcklkZW50aXR5IjpudWxsLCJyZWNvcmRGb3JtYXQiOiJhcHBsaWNhdGlvbi9qc29uIiwidGFibGVOYW1lIjoiUG93ZXJ0b29sc0V2ZW50c1N0YWNrLUR5bmFtb0RCVGFibGU1OTc4NEZDMC04TktBTVRFUlRBWFkiLCJkeW5hbW9kYiI6eyJBcHByb3hpbWF0ZUNyZWF0aW9uRGF0ZVRpbWUiOjE3MzE5MjQ1NTUzNzAsIktleXMiOnsiaWQiOnsiUyI6InJlY29yZC0xcWl0Mnk4MTlnaSJ9fSwiTmV3SW1hZ2UiOnsiaWQiOnsiUyI6InJlY29yZC0xcWl0Mnk4MTlnaSJ9LCJkYXRhIjp7IlMiOiJkYXRhLXg2YXE3Y2tkcGdrIn19LCJTaXplQnl0ZXMiOjYwfSwiZXZlbnRTb3VyY2UiOiJhd3M6ZHluYW1vZGIifQ==",
"approximateArrivalTimestamp": 1731924555.932
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49657828409187701520019995242508390119953358325192589314",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::1234567789012:role/PowertoolsEventsStack-KinesisConsumerFunctionServic-JG17OEKZaDq6",
"awsRegion": "eu-west-1",
"eventSourceARN": "arn:aws:kinesis:eu-west-1:1234567789012:stream/PowertoolsEventsStack-KinesisStream46752A3E-u0C9B3ZKjgG0"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "6037E47B707479B67E577C989D96E9F8",
"sequenceNumber": "49657828409187701520019995242509599045772972954367295490",
"data": "eyJhd3NSZWdpb24iOiJldS13ZXN0LTEiLCJldmVudElEIjoiYWE1NmNhZDQtMzExYS00NmM4LWFiNWYtYzdhMTNhN2E2Mjk4IiwiZXZlbnROYW1lIjoiSU5TRVJUIiwidXNlcklkZW50aXR5IjpudWxsLCJyZWNvcmRGb3JtYXQiOiJhcHBsaWNhdGlvbi9qc29uIiwidGFibGVOYW1lIjoiUG93ZXJ0b29sc0V2ZW50c1N0YWNrLUR5bmFtb0RCVGFibGU1OTc4NEZDMC04TktBTVRFUlRBWFkiLCJkeW5hbW9kYiI6eyJBcHByb3hpbWF0ZUNyZWF0aW9uRGF0ZVRpbWUiOjE3MzE5MjQ1NTUzNzAsIktleXMiOnsiaWQiOnsiUyI6InJlY29yZC1mdnhuM3E0cTVqdyJ9fSwiTmV3SW1hZ2UiOnsiaWQiOnsiUyI6InJlY29yZC1mdnhuM3E0cTVqdyJ9LCJkYXRhIjp7IlMiOiJkYXRhLTRlb21wanM4OW41In19LCJTaXplQnl0ZXMiOjYwfSwiZXZlbnRTb3VyY2UiOiJhd3M6ZHluYW1vZGIifQ==",
"approximateArrivalTimestamp": 1731924555.935
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49657828409187701520019995242509599045772972954367295490",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::1234567789012:role/PowertoolsEventsStack-KinesisConsumerFunctionServic-JG17OEKZaDq6",
"awsRegion": "eu-west-1",
"eventSourceARN": "arn:aws:kinesis:eu-west-1:1234567789012:stream/PowertoolsEventsStack-KinesisStream46752A3E-u0C9B3ZKjgG0"
}
]
}
70 changes: 42 additions & 28 deletions packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@ import { ZodError, type z } from 'zod';
import { ParseError } from '../../../src';
import { KinesisFirehoseEnvelope } from '../../../src/envelopes/index.js';
import type { KinesisFirehoseSchema } from '../../../src/schemas/';
import { TestEvents, TestSchema } from '../schema/utils.js';
import type {
KinesisFireHoseEvent,
KinesisFireHoseSqsEvent,
} from '../../../src/types';
import { TestEvents, TestSchema, getTestEvent } from '../schema/utils.js';

describe('Kinesis Firehose Envelope', () => {
const eventsPath = 'kinesis';
describe('parse', () => {
it('should parse records for PutEvent', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseEvent>({
eventsPath,
filename: 'firehose-put',
});

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -24,9 +30,10 @@ describe('Kinesis Firehose Envelope', () => {

it('should parse a single record for SQS event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehoseSQSEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseSqsEvent>({
eventsPath,
filename: 'firehose-sqs',
});

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -38,9 +45,10 @@ describe('Kinesis Firehose Envelope', () => {

it('should parse records for kinesis event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehoseKinesisEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseEvent>({
eventsPath,
filename: 'firehose',
});

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -50,9 +58,10 @@ describe('Kinesis Firehose Envelope', () => {
expect(resp).toEqual([mock, mock]);
});
it('should throw if record is not base64 encoded', () => {
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseEvent>({
eventsPath,
filename: 'firehose-put',
});

testEvent.records.map((record) => {
record.data = 'not base64 encoded';
Expand All @@ -68,9 +77,10 @@ describe('Kinesis Firehose Envelope', () => {
}).toThrow();
});
it('should throw when schema does not match record', () => {
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseEvent>({
eventsPath,
filename: 'firehose-put',
});

testEvent.records.map((record) => {
record.data = Buffer.from('not a valid json').toString('base64');
Expand All @@ -84,9 +94,10 @@ describe('Kinesis Firehose Envelope', () => {
describe('safeParse', () => {
it('should parse records for PutEvent', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseEvent>({
eventsPath,
filename: 'firehose-put',
});

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -98,9 +109,10 @@ describe('Kinesis Firehose Envelope', () => {

it('should parse a single record for SQS event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehoseSQSEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseSqsEvent>({
eventsPath,
filename: 'firehose-sqs',
});

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -112,9 +124,10 @@ describe('Kinesis Firehose Envelope', () => {

it('should parse records for kinesis event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisFirehoseKinesisEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseEvent>({
eventsPath,
filename: 'firehose',
});

testEvent.records.map((record) => {
record.data = Buffer.from(JSON.stringify(mock)).toString('base64');
Expand All @@ -139,9 +152,10 @@ describe('Kinesis Firehose Envelope', () => {
}
});
it('should return original event if record is not base64 encoded', () => {
const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer<
typeof KinesisFirehoseSchema
>;
const testEvent = getTestEvent<KinesisFireHoseEvent>({
eventsPath,
filename: 'firehose-put',
});

testEvent.records.map((record) => {
record.data = 'not base64 encoded';
Expand Down
24 changes: 19 additions & 5 deletions packages/parser/tests/unit/envelopes/kinesis.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@ import type { KinesisStreamEvent } from 'aws-lambda';
import { describe, expect, it } from 'vitest';
import { KinesisEnvelope } from '../../../src/envelopes/index.js';
import { ParseError } from '../../../src/errors.js';
import { TestEvents, TestSchema } from '../schema/utils.js';
import type { KinesisDataStreamEvent } from '../../../src/types/schema.js';
import { TestEvents, TestSchema, getTestEvent } from '../schema/utils.js';

describe('KinesisEnvelope', () => {
const eventsPath = 'kinesis';
describe('parse', () => {
it('should parse Kinesis Stream event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent;
const testEvent = getTestEvent<KinesisDataStreamEvent>({
eventsPath,
filename: 'stream',
});

testEvent.Records.map((record) => {
record.kinesis.data = Buffer.from(JSON.stringify(mock)).toString(
Expand All @@ -24,7 +29,10 @@ describe('KinesisEnvelope', () => {
expect(() => KinesisEnvelope.parse({ foo: 'bar' }, TestSchema)).toThrow();
});
it('should throw if record is invalid', () => {
const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent;
const testEvent = getTestEvent<KinesisDataStreamEvent>({
eventsPath,
filename: 'stream',
});
testEvent.Records[0].kinesis.data = 'invalid';
expect(() => KinesisEnvelope.parse(testEvent, TestSchema)).toThrow();
});
Expand All @@ -33,7 +41,10 @@ describe('KinesisEnvelope', () => {
describe('safeParse', () => {
it('should parse Kinesis Stream event', () => {
const mock = generateMock(TestSchema);
const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent;
const testEvent = getTestEvent<KinesisDataStreamEvent>({
eventsPath,
filename: 'stream',
});

testEvent.Records.map((record) => {
record.kinesis.data = Buffer.from(JSON.stringify(mock)).toString(
Expand All @@ -54,7 +65,10 @@ describe('KinesisEnvelope', () => {
});
});
it('should return original event if record is invalid', () => {
const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent;
const testEvent = getTestEvent<KinesisDataStreamEvent>({
eventsPath,
filename: 'stream',
});
testEvent.Records[0].kinesis.data = 'invalid';
const parseResult = KinesisEnvelope.safeParse(testEvent, TestSchema);
expect(parseResult).toEqual({
Expand Down
Loading