From 61c76244b24f692a8db6cd36ab685070a3bedfe2 Mon Sep 17 00:00:00 2001 From: Justin Wang <35274216+Claimundefine@users.noreply.github.com> Date: Wed, 25 Sep 2024 21:46:05 -0400 Subject: [PATCH] Add avro integration tests (#56) --- .../schemaregistry-avro.spec.ts | 615 ++++++++++++++++++ .../schemaregistry-json.spec.ts | 62 +- schemaregistry/package.json | 1 + 3 files changed, 643 insertions(+), 35 deletions(-) create mode 100644 e2e/schemaregistry/schemaregistry-avro.spec.ts diff --git a/e2e/schemaregistry/schemaregistry-avro.spec.ts b/e2e/schemaregistry/schemaregistry-avro.spec.ts new file mode 100644 index 00000000..de5bb240 --- /dev/null +++ b/e2e/schemaregistry/schemaregistry-avro.spec.ts @@ -0,0 +1,615 @@ +import { KafkaJS } from '@confluentinc/kafka-javascript'; +import { + Metadata, + SchemaRegistryClient, + SchemaInfo +} from '../../schemaregistry/schemaregistry-client'; +import { beforeEach, afterEach, describe, expect, it } from '@jest/globals'; +import { clientConfig } from '../../test/schemaregistry/test-constants'; +import { AvroDeserializer, AvroSerializer, AvroSerializerConfig } from '../../schemaregistry/serde/avro'; +import { SerdeType } from "../../schemaregistry/serde/serde"; +import stringify from 'json-stringify-deterministic'; +import { v4 } from 'uuid'; + +let schemaRegistryClient: SchemaRegistryClient; +let producer: any; + +const kafkaBrokerList = 'localhost:9092'; +const kafka = new KafkaJS.Kafka({ + kafkaJS: { + brokers: [kafkaBrokerList], + }, +}); + + +const userSchemaString: string = stringify({ + type: 'record', + name: 'User', + fields: [ + { name: 'name', type: 'string' }, + { name: 'age', type: 'int' }, + ], +}); + +const messageValue = { + "name": "Bob Jones", + "age": 25 +}; + +const metadata: Metadata = { + properties: { + owner: 'Bob Jones', + email: 'bob@acme.com', + }, +}; + +const schemaInfo: SchemaInfo = { + schema: userSchemaString, + metadata: metadata +}; + +let serializerConfig: AvroSerializerConfig; +let serializer: AvroSerializer; +let deserializer: AvroDeserializer; +let consumer: KafkaJS.Consumer; + +describe('Schema Registry Avro Integration Test', () => { + + beforeEach(async () => { + schemaRegistryClient = new SchemaRegistryClient(clientConfig); + + producer = kafka.producer({ + kafkaJS: { + allowAutoTopicCreation: true, + acks: 1, + compression: KafkaJS.CompressionTypes.GZIP, + } + }); + await producer.connect(); + + consumer = kafka.consumer({ + kafkaJS: { + groupId: 'test-group', + fromBeginning: true, + partitionAssigners: [KafkaJS.PartitionAssigners.roundRobin], + }, + }); + }); + + afterEach(async () => { + await producer.disconnect(); + producer = null; + }); + + it("Should serialize and deserialize Avro", async () => { + const testTopic = 'test-topic-' + v4(); + + await schemaRegistryClient.register(testTopic, schemaInfo); + + serializerConfig = { autoRegisterSchemas: true }; + serializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig); + deserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + const outgoingMessage = { + key: 'key', + value: await serializer.serialize(testTopic, messageValue) + }; + + await producer.send({ + topic: testTopic, + messages: [outgoingMessage] + }); + + await consumer.connect(); + await consumer.subscribe({ topic: testTopic }); + let messageRcvd = false; + await consumer.run({ + eachMessage: async ({ message }) => { + const decodedMessage = { + ...message, + value: await deserializer.deserialize(testTopic, message.value as Buffer) + }; + messageRcvd = true; + + expect(decodedMessage.value).toMatchObject(messageValue); + }, + }); + + // Wait around until we get a message, and then disconnect. + while (!messageRcvd) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + await consumer.disconnect(); + }, 30000); + + it("Should serialize with UseLatestVersion enabled", async () => { + const testTopic = v4(); + await schemaRegistryClient.register(testTopic, schemaInfo); + + serializerConfig = { autoRegisterSchemas: true, useLatestVersion: true }; + serializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig); + + const outgoingMessage = { + key: 'key', + value: await serializer.serialize(testTopic, messageValue) + }; + + await producer.send({ + topic: testTopic, + messages: [outgoingMessage] + }); + + }, 30000); + + it('Should fail to serialize with UseLatestVersion enabled and autoRegisterSchemas disabled', async () => { + const testTopic = v4(); + await schemaRegistryClient.register(testTopic, schemaInfo); + + serializerConfig = { autoRegisterSchemas: false, useLatestVersion: true }; + serializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig); + + const messageValue = { "name": "Bob Jones", "age": 25 }; + + await expect(serializer.serialize(testTopic, messageValue)).rejects.toThrowError(); + }); + + it('Should serialize with schemas registered, UseLatestVersion enabled and autoRegisterSchemas disabled', async () => { + const testTopic = v4(); + await schemaRegistryClient.register(testTopic, schemaInfo); + await schemaRegistryClient.register(testTopic+'-value', schemaInfo); + + serializerConfig = { autoRegisterSchemas: false, useLatestVersion: true }; + serializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig); + + const messageValue = { "name": "Bob Jones", "age": 25 }; + + await serializer.serialize(testTopic, messageValue); + }); + //TODO: Add test for Incompatible Types. The current Kafka Client runs console.error instead of throwing error + //Should use a spy, Jest wasn't playing nice with the spy + + it('Should produce generic message to multiple topics', async () => { + const topic1 = v4(); + const topic2 = v4(); + + await schemaRegistryClient.register(topic1, schemaInfo); + await schemaRegistryClient.register(topic2, schemaInfo); + + serializerConfig = { autoRegisterSchemas: true }; + serializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig); + deserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + + const outgoingMessage1 = { + key: 'key', + value: await serializer.serialize(topic1, messageValue) + }; + + const outgoingMessage2 = { + key: 'key', + value: await serializer.serialize(topic2, messageValue) + }; + + await producer.send( + { topic: topic1, messages: [outgoingMessage1] }, + ); + + await producer.send( + { topic: topic2, messages: [outgoingMessage2] }, + ); + + let consumer2 = kafka.consumer({ + kafkaJS: { + groupId: 'test-group', + fromBeginning: true, + partitionAssigners: [KafkaJS.PartitionAssigners.roundRobin], + }, + }); + + await consumer.connect(); + await consumer.subscribe({ topic: topic1 }); + await consumer2.connect(); + await consumer2.subscribe({ topic: topic2 }); + + let messageRcvd = false; + let messageRcvd2 = false; + + await consumer.run({ + eachMessage: async ({ message }) => { + const decodedMessage = { + ...message, + value: await deserializer.deserialize(topic1, message.value as Buffer) + }; + messageRcvd = true; + expect(decodedMessage.value).toMatchObject(messageValue); + }, + }); + + await consumer2.run({ + eachMessage: async ({ message }) => { + const decodedMessage = { + ...message, + value: await deserializer.deserialize(topic2, message.value as Buffer) + }; + messageRcvd2 = true; + expect(decodedMessage.value).toMatchObject(messageValue); + }, + }); + + while (!messageRcvd || !messageRcvd2) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + await consumer.disconnect(); + await consumer2.disconnect(); + }, 30000); +}); + +describe('Schema Registry Avro Integration Test - Primitives', () => { + beforeEach(async () => { + schemaRegistryClient = new SchemaRegistryClient(clientConfig); + + producer = kafka.producer({ + kafkaJS: { + allowAutoTopicCreation: true, + acks: 1, + compression: KafkaJS.CompressionTypes.GZIP, + } + }); + await producer.connect(); + serializerConfig = { autoRegisterSchemas: true }; + + serializer = new AvroSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig); + deserializer = new AvroDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); + consumer = kafka.consumer({ + kafkaJS: { + groupId: 'test-group', + fromBeginning: true, + partitionAssigners: [KafkaJS.PartitionAssigners.roundRobin], + }, + }); + }); + + afterEach(async () => { + await producer.disconnect(); + producer = null; + }); + + it('Should serialize and deserialize string', async () => { + const stringTopic = v4(); + + const stringSchemaString = stringify({ + type: 'string', + }); + + const stringSchemaInfo: SchemaInfo = { + schema: stringSchemaString, + metadata: metadata + }; + + await schemaRegistryClient.register(stringTopic, stringSchemaInfo); + + const stringMessageValue = "Hello, World!"; + const outgoingStringMessage = { + key: 'key', + value: await serializer.serialize(stringTopic, stringMessageValue) + }; + + await producer.send({ + topic: stringTopic, + messages: [outgoingStringMessage] + }); + + await consumer.connect(); + + await consumer.subscribe({ topic: stringTopic }); + + let messageRcvd = false; + await consumer.run({ + eachMessage: async ({ message }) => { + const decodedMessage = { + ...message, + value: await deserializer.deserialize(stringTopic, message.value as Buffer) + }; + messageRcvd = true; + expect(decodedMessage.value).toBe(stringMessageValue); + }, + }); + + while (!messageRcvd) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + await consumer.disconnect(); + }, 30000); + + it('Should serialize and deserialize bytes', async () => { + const topic = v4(); + + const schemaString = stringify({ + type: 'bytes', + }); + + const stringSchemaInfo: SchemaInfo = { + schema: schemaString, + metadata: metadata + }; + + await schemaRegistryClient.register(topic, stringSchemaInfo); + + const messageValue = Buffer.from("Hello, World!"); + const outgoingMessage = { + key: 'key', + value: await serializer.serialize(topic, messageValue) + }; + + await producer.send({ + topic: topic, + messages: [outgoingMessage] + }); + + await consumer.connect(); + + await consumer.subscribe({ topic }); + + let messageRcvd = false; + await consumer.run({ + eachMessage: async ({ message }) => { + const decodedMessage = { + ...message, + value: await deserializer.deserialize(topic, message.value as Buffer) + }; + messageRcvd = true; + expect(decodedMessage.value).toBe(messageValue); + }, + }); + + while (!messageRcvd) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + await consumer.disconnect(); + }, 30000); + + it('Should serialize and deserialize int', async () => { + const topic = v4(); + + const schemaString = stringify({ + type: 'int', + }); + + const stringSchemaInfo: SchemaInfo = { + schema: schemaString, + metadata: metadata + }; + + await schemaRegistryClient.register(topic, stringSchemaInfo); + + const messageValue = 25; + const outgoingMessage = { + key: 'key', + value: await serializer.serialize(topic, messageValue) + }; + + await producer.send({ + topic: topic, + messages: [outgoingMessage] + }); + + await consumer.connect(); + + await consumer.subscribe({ topic }); + + let messageRcvd = false; + await consumer.run({ + eachMessage: async ({ message }) => { + const decodedMessage = { + ...message, + value: await deserializer.deserialize(topic, message.value as Buffer) + }; + messageRcvd = true; + expect(decodedMessage.value).toBe(messageValue); + }, + }); + + while (!messageRcvd) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + await consumer.disconnect(); + }, 30000); + + it('Should serialize and deserialize long', async () => { + const topic = v4(); + + const schemaString = stringify({ + type: 'long', + }); + + const stringSchemaInfo: SchemaInfo = { + schema: schemaString, + metadata: metadata + }; + + await schemaRegistryClient.register(topic, stringSchemaInfo); + + const messageValue = 25; + const outgoingMessage = { + key: 'key', + value: await serializer.serialize(topic, messageValue) + }; + + await producer.send({ + topic: topic, + messages: [outgoingMessage] + }); + + await consumer.connect(); + + await consumer.subscribe({ topic }); + + let messageRcvd = false; + await consumer.run({ + eachMessage: async ({ message }) => { + const decodedMessage = { + ...message, + value: await deserializer.deserialize(topic, message.value as Buffer) + }; + messageRcvd = true; + expect(decodedMessage.value).toBe(messageValue); + }, + }); + + while (!messageRcvd) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + await consumer.disconnect(); + }, 30000); + + it('Should serialize and deserialize boolean', async () => { + const topic = v4(); + + const schemaString = stringify({ + type: 'boolean', + }); + + const stringSchemaInfo: SchemaInfo = { + schema: schemaString, + metadata: metadata + }; + + await schemaRegistryClient.register(topic, stringSchemaInfo); + + const messageValue = true; + const outgoingMessage = { + key: 'key', + value: await serializer.serialize(topic, messageValue) + }; + + await producer.send({ + topic: topic, + messages: [outgoingMessage] + }); + + await consumer.connect(); + + await consumer.subscribe({ topic }); + + let messageRcvd = false; + await consumer.run({ + eachMessage: async ({ message }) => { + const decodedMessage = { + ...message, + value: await deserializer.deserialize(topic, message.value as Buffer) + }; + messageRcvd = true; + expect(decodedMessage.value).toBe(messageValue); + }, + }); + + while (!messageRcvd) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + await consumer.disconnect(); + }, 30000); + + it('Should serialize and deserialize float', async () => { + const topic = v4(); + + const schemaString = stringify({ + type: 'float', + }); + + const stringSchemaInfo: SchemaInfo = { + schema: schemaString, + metadata: metadata + }; + + await schemaRegistryClient.register(topic, stringSchemaInfo); + + const messageValue = 1.354; + const outgoingMessage = { + key: 'key', + value: await serializer.serialize(topic, messageValue) + }; + + await producer.send({ + topic: topic, + messages: [outgoingMessage] + }); + + await consumer.connect(); + + await consumer.subscribe({ topic }); + + let messageRcvd = false; + await consumer.run({ + eachMessage: async ({ message }) => { + const decodedMessage = { + ...message, + value: await deserializer.deserialize(topic, message.value as Buffer) + }; + messageRcvd = true; + expect(decodedMessage.value).toBe(messageValue); + }, + }); + + while (!messageRcvd) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + await consumer.disconnect(); + }, 30000); + + it('Should serialize and deserialize double', async () => { + const topic = v4(); + + const schemaString = stringify({ + type: 'double', + }); + + const stringSchemaInfo: SchemaInfo = { + schema: schemaString, + metadata: metadata + }; + + await schemaRegistryClient.register(topic, stringSchemaInfo); + + const messageValue = 1.354; + const outgoingMessage = { + key: 'key', + value: await serializer.serialize(topic, messageValue) + }; + + await producer.send({ + topic: topic, + messages: [outgoingMessage] + }); + + await consumer.connect(); + + await consumer.subscribe({ topic }); + + let messageRcvd = false; + await consumer.run({ + eachMessage: async ({ message }) => { + const decodedMessage = { + ...message, + value: await deserializer.deserialize(topic, message.value as Buffer) + }; + messageRcvd = true; + expect(decodedMessage.value).toBe(messageValue); + }, + }); + + while (!messageRcvd) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + await consumer.disconnect(); + }, 30000); + + //Waiting on the null case +}); \ No newline at end of file diff --git a/e2e/schemaregistry/schemaregistry-json.spec.ts b/e2e/schemaregistry/schemaregistry-json.spec.ts index b4751d64..4ab46584 100644 --- a/e2e/schemaregistry/schemaregistry-json.spec.ts +++ b/e2e/schemaregistry/schemaregistry-json.spec.ts @@ -10,20 +10,17 @@ import { clientConfig } from '../../test/schemaregistry/test-constants'; import { JsonSerializer, JsonSerializerConfig, JsonDeserializer } from '../../schemaregistry/serde/json'; import { SerdeType } from "../../schemaregistry/serde/serde"; import stringify from 'json-stringify-deterministic'; +import { v4 } from 'uuid'; let schemaRegistryClient: SchemaRegistryClient; let producer: any; -const testServerConfigSubject = 'integ-test-server-config-subject'; - const kafkaBrokerList = 'localhost:9092'; const kafka = new KafkaJS.Kafka({ kafkaJS: { brokers: [kafkaBrokerList], }, }); -const testTopic = `test-topic`; -const testTopicValue = testTopic + '-value'; //Inspired by dotnet client const schemaString: string = stringify({ @@ -144,21 +141,21 @@ const orderDetailsSchema: SchemaInfo = { "description": "Order Details", "type": "object", "properties": { - "id": { - "description": "Order Id", - "type": "integer" - }, - "customer": { - "description": "Customer", - "$ref": "http://example.com/customer.schema.json" - }, - "payment_id": { - "description": "Payment Id", - "type": "string" - } + "id": { + "description": "Order Id", + "type": "integer" + }, + "customer": { + "description": "Customer", + "$ref": "http://example.com/customer.schema.json" + }, + "payment_id": { + "description": "Payment Id", + "type": "string" + } }, - "required": [ "id", "customer"] -}), + "required": ["id", "customer"] + }), schemaType: 'JSON', }; @@ -255,25 +252,13 @@ const customerSubject = 'Customer'; const orderSubject = 'Order'; const orderDetailsSubject = 'OrderDetails'; -const subjectList = [testTopic, testTopicValue, testServerConfigSubject, orderSubject, orderDetailsSubject, customerSubject]; +const subjectList = [orderSubject, orderDetailsSubject, customerSubject]; describe('SchemaRegistryClient json Integration Test', () => { beforeEach(async () => { schemaRegistryClient = new SchemaRegistryClient(clientConfig); - const admin = kafka.admin(); - await admin.connect(); - try { - await admin.deleteTopics({ - topics: [testTopic], - timeout: 5000, - }); - } catch (error) { - // Topic may not exist; ignore error - } - await admin.disconnect(); - producer = kafka.producer({ kafkaJS: { allowAutoTopicCreation: true, @@ -288,6 +273,12 @@ describe('SchemaRegistryClient json Integration Test', () => { if (subjects && subjects.includes(subject)) { await schemaRegistryClient.deleteSubject(subject); await schemaRegistryClient.deleteSubject(subject, true); + + const subjectValue = subject + '-value'; + if (subjects && subjects.includes(subjectValue)) { + await schemaRegistryClient.deleteSubject(subjectValue); + await schemaRegistryClient.deleteSubject(subjectValue, true); + } } } }); @@ -298,6 +289,7 @@ describe('SchemaRegistryClient json Integration Test', () => { }); it("Should serialize and deserialize json", async () => { + const testTopic = v4(); await schemaRegistryClient.register(testTopic, schemaInfo); @@ -344,10 +336,10 @@ describe('SchemaRegistryClient json Integration Test', () => { } await consumer.disconnect(); - expect(1).toEqual(1); }, 30000); it("Should serialize with UseLatestVersion enabled", async () => { + const testTopic = v4(); await schemaRegistryClient.register(testTopic, schemaInfo); const serializerConfig: JsonSerializerConfig = { autoRegisterSchemas: true, useLatestVersion: true }; @@ -366,6 +358,7 @@ describe('SchemaRegistryClient json Integration Test', () => { }); it('Should fail to serialize with UseLatestVersion enabled and autoRegisterSchemas disabled', async () => { + const testTopic = v4(); await schemaRegistryClient.register(testTopic, schemaInfo); const serializerConfig: JsonSerializerConfig = { autoRegisterSchemas: false, useLatestVersion: true }; @@ -377,13 +370,14 @@ describe('SchemaRegistryClient json Integration Test', () => { }); it("Should serialize referenced schemas", async () => { + const testTopic = v4(); const serializerConfig: JsonSerializerConfig = { autoRegisterSchemas: true }; const serializer = new JsonSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig); const deserializer = new JsonDeserializer(schemaRegistryClient, SerdeType.VALUE, {}); await schemaRegistryClient.register(customerSubject, customerSchema); const customerIdVersion: number = (await schemaRegistryClient.getLatestSchemaMetadata(customerSubject)).version!; - + const customerReference: Reference = { name: "http://example.com/customer.schema.json", subject: customerSubject, @@ -401,9 +395,7 @@ describe('SchemaRegistryClient json Integration Test', () => { }; orderSchema.references = [orderDetailsReference]; - const orderId = await schemaRegistryClient.register(orderSubject, orderSchema); await schemaRegistryClient.register(orderSubject, orderSchema); - console.log(`Order schema id: ${orderId}`); const order = { order_details: { diff --git a/schemaregistry/package.json b/schemaregistry/package.json index fb3705a7..db687244 100644 --- a/schemaregistry/package.json +++ b/schemaregistry/package.json @@ -46,6 +46,7 @@ "lru-cache": "^11.0.0", "node-vault": "^0.10.2", "simple-oauth2": "^5.1.0", + "uuid": "^10.0.0", "validator": "^13.12.0" }, "scripts": {