Skip to content

Commit 72c7e20

Browse files
Add SchemaRegistryClient, RestService, and testing (#1)
* Add SchemaRegistryClient, RestService, and testing * Add new Makefile for schema registry * Merging * Revert to throwing exceptions
1 parent 1d9533b commit 72c7e20

File tree

11 files changed

+1589
-5
lines changed

11 files changed

+1589
-5
lines changed

.eslintrc.js

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
module.exports = {
2+
"env": {
3+
"browser": true,
4+
"commonjs": true,
5+
"es2021": true
6+
},
7+
"extends": "eslint:recommended",
8+
"overrides": [
9+
{
10+
"env": {
11+
"node": true
12+
},
13+
"files": [
14+
".eslintrc.{js,cjs}"
15+
],
16+
"parserOptions": {
17+
"sourceType": "script"
18+
}
19+
},
20+
{
21+
"files": ["*.ts"],
22+
"parser": "@typescript-eslint/parser",
23+
"parserOptions": {
24+
"ecmaVersion": 2020,
25+
"sourceType": "module"
26+
},
27+
"extends": [
28+
"plugin:@typescript-eslint/recommended",
29+
]
30+
}
31+
],
32+
"parserOptions": {
33+
"ecmaVersion": "latest"
34+
},
35+
"rules": {
36+
}
37+
}

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,4 @@ release-patch:
8888

8989
clean: node_modules/.dirstamp
9090
@rm -f deps/librdkafka/config.h
91-
@$(NODE-GYP) clean
91+
@$(NODE-GYP) clean

Makefile.schemaregistry

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Makefile.schemaregistry
2+
3+
# Variables
4+
NODE ?= node
5+
ESLINT ?= ./node_modules/.bin/eslint
6+
JEST ?= ./node_modules/.bin/jest
7+
TS_NODE ?= ./node_modules/.bin/ts-node
8+
9+
# Paths
10+
SRC_DIR = schemaregistry
11+
TEST_DIR = test/schemaregistry
12+
INTEG_DIR = e2e/schemaregistry
13+
14+
# Tasks
15+
.PHONY: all lint test integtest
16+
17+
all: lint test
18+
19+
lint:
20+
$(ESLINT) $(SRC_DIR) $(TEST_DIR) $(INTEG_DIR)
21+
22+
test:
23+
$(JEST) $(TEST_DIR)
24+
25+
integtest:
26+
$(JEST) $(INTEG_DIR)
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
import { RestService } from '../../schemaregistry/rest-service';
2+
import {
3+
Compatibility,
4+
SchemaRegistryClient,
5+
ServerConfig,
6+
SchemaInfo,
7+
SchemaMetadata,
8+
Metadata
9+
} from '../../schemaregistry/schemaregistry-client';
10+
import { beforeEach, describe, expect, it } from '@jest/globals';
11+
12+
/* eslint-disable @typescript-eslint/no-non-null-asserted-optional-chain */
13+
14+
const baseUrls = ['http://localhost:8081'];
15+
const headers = { 'Content-Type': 'application/vnd.schemaregistry.v1+json' };
16+
const restService = new RestService(baseUrls, false);
17+
restService.setHeaders(headers);
18+
19+
const basicAuth = Buffer.from('RBACAllowedUser-lsrc1:nohash').toString('base64');
20+
restService.setAuth(basicAuth);
21+
22+
restService.setTimeout(10000);
23+
24+
let schemaRegistryClient: SchemaRegistryClient;
25+
const testSubject = 'integ-test-subject';
26+
const testServerConfigSubject = 'integ-test-server-config-subject';
27+
28+
const schemaString: string = JSON.stringify({
29+
type: 'record',
30+
name: 'User',
31+
fields: [
32+
{ name: 'name', type: 'string' },
33+
{ name: 'age', type: 'int' },
34+
],
35+
});
36+
37+
const metadata: Metadata = {
38+
properties: {
39+
owner: 'Bob Jones',
40+
41+
},
42+
};
43+
44+
const schemaInfo: SchemaInfo = {
45+
schema: schemaString,
46+
metadata: metadata,
47+
};
48+
49+
const backwardCompatibleSchemaString: string = JSON.stringify({
50+
type: 'record',
51+
name: 'User',
52+
fields: [
53+
{ name: 'name', type: 'string' },
54+
{ name: 'age', type: 'int' },
55+
{ name: 'email', type: 'string', default: "" },
56+
],
57+
});
58+
59+
const backwardCompatibleMetadata: Metadata = {
60+
properties: {
61+
owner: 'Bob Jones2',
62+
63+
},
64+
};
65+
66+
const backwardCompatibleSchemaInfo: SchemaInfo = {
67+
schema: backwardCompatibleSchemaString,
68+
schemaType: 'AVRO',
69+
metadata: backwardCompatibleMetadata,
70+
};
71+
72+
describe('SchemaRegistryClient Integration Test', () => {
73+
74+
beforeEach(async () => {
75+
schemaRegistryClient = new SchemaRegistryClient(restService);
76+
const subjects: string[] = await schemaRegistryClient.getAllSubjects();
77+
78+
if (subjects && subjects.includes(testSubject)) {
79+
await schemaRegistryClient.deleteSubject(testSubject);
80+
await schemaRegistryClient.deleteSubject(testSubject, true);
81+
}
82+
83+
if (subjects && subjects.includes(testServerConfigSubject)) {
84+
await schemaRegistryClient.deleteSubject(testServerConfigSubject);
85+
await schemaRegistryClient.deleteSubject(testServerConfigSubject, true);
86+
}
87+
});
88+
89+
it('should register, retrieve, and delete a schema', async () => {
90+
// Register a schema
91+
const registerResponse: SchemaMetadata = await schemaRegistryClient.registerFullResponse(testSubject, schemaInfo);
92+
expect(registerResponse).toBeDefined();
93+
94+
const schemaId = registerResponse?.id!;
95+
const version = registerResponse?.version!;
96+
97+
const getSchemaResponse: SchemaInfo = await schemaRegistryClient.getBySubjectAndId(testSubject, schemaId);
98+
expect(getSchemaResponse).toEqual(schemaInfo);
99+
100+
const getIdResponse: number = await schemaRegistryClient.getId(testSubject, schemaInfo);
101+
expect(getIdResponse).toEqual(schemaId);
102+
103+
// Delete the schema
104+
const deleteSubjectResponse: number = await schemaRegistryClient.deleteSubjectVersion(testSubject, version);
105+
expect(deleteSubjectResponse).toEqual(version);
106+
107+
const permanentDeleteSubjectResponse: number = await schemaRegistryClient.deleteSubjectVersion(testSubject, version, true);
108+
expect(permanentDeleteSubjectResponse).toEqual(version);
109+
});
110+
111+
it('Should get all versions and a specific version of a schema', async () => {
112+
// Register a schema
113+
const registerResponse: SchemaMetadata = await schemaRegistryClient.registerFullResponse(testSubject, schemaInfo);
114+
expect(registerResponse).toBeDefined();
115+
116+
const version = registerResponse?.version!;
117+
118+
const getVersionResponse: number = await schemaRegistryClient.getVersion(testSubject, schemaInfo);
119+
expect(getVersionResponse).toEqual(version);
120+
121+
const allVersionsResponse: number[] = await schemaRegistryClient.getAllVersions(testSubject);
122+
expect(allVersionsResponse).toEqual([version]);
123+
});
124+
125+
it('Should get schema metadata', async () => {
126+
// Register a schema
127+
const registerResponse: SchemaMetadata = await schemaRegistryClient.registerFullResponse(testSubject, schemaInfo);
128+
expect(registerResponse).toBeDefined();
129+
130+
const schemaVersion: number = registerResponse?.version!;
131+
132+
const registerResponse2: SchemaMetadata = await schemaRegistryClient.registerFullResponse(testSubject, backwardCompatibleSchemaInfo);
133+
expect(registerResponse2).toBeDefined();
134+
135+
const schemaMetadata: SchemaMetadata = {
136+
id: registerResponse?.id!,
137+
version: schemaVersion,
138+
schema: schemaInfo.schema,
139+
subject: testSubject,
140+
metadata: metadata,
141+
};
142+
143+
const schemaMetadata2: SchemaMetadata = {
144+
id: registerResponse2?.id!,
145+
version: registerResponse2?.version!,
146+
schema: backwardCompatibleSchemaInfo.schema,
147+
subject: testSubject,
148+
metadata: backwardCompatibleMetadata,
149+
};
150+
151+
const getLatestMetadataResponse: SchemaMetadata = await schemaRegistryClient.getLatestSchemaMetadata(testSubject);
152+
expect(schemaMetadata2).toEqual(getLatestMetadataResponse);
153+
154+
const getMetadataResponse: SchemaMetadata = await schemaRegistryClient.getSchemaMetadata(testSubject, schemaVersion);
155+
expect(schemaMetadata).toEqual(getMetadataResponse);
156+
});
157+
158+
it('Should test compatibility for a version and subject, getting and updating', async () => {
159+
const registerResponse: SchemaMetadata = await schemaRegistryClient.registerFullResponse(testSubject, schemaInfo);
160+
expect(registerResponse).toBeDefined();
161+
162+
const version = registerResponse?.version!;
163+
164+
const updateCompatibilityResponse: Compatibility = await schemaRegistryClient.updateCompatibility(testSubject, Compatibility.BackwardTransitive);
165+
expect(updateCompatibilityResponse).toEqual(Compatibility.BackwardTransitive);
166+
167+
const getCompatibilityResponse: Compatibility = await schemaRegistryClient.getCompatibility(testSubject);
168+
expect(getCompatibilityResponse).toEqual(Compatibility.BackwardTransitive);
169+
170+
const testSubjectCompatibilityResponse: boolean = await schemaRegistryClient.testSubjectCompatibility(testSubject, backwardCompatibleSchemaInfo);
171+
expect(testSubjectCompatibilityResponse).toEqual(true);
172+
173+
const testCompatibilityResponse: boolean = await schemaRegistryClient.testCompatibility(testSubject, version, backwardCompatibleSchemaInfo);
174+
expect(testCompatibilityResponse).toEqual(true);
175+
});
176+
177+
it('Should update and get default compatibility', async () => {
178+
const updateDefaultCompatibilityResponse: Compatibility = await schemaRegistryClient.updateDefaultCompatibility(Compatibility.Full);
179+
expect(updateDefaultCompatibilityResponse).toEqual(Compatibility.Full);
180+
181+
const getDefaultCompatibilityResponse: Compatibility = await schemaRegistryClient.getDefaultCompatibility();
182+
expect(getDefaultCompatibilityResponse).toEqual(Compatibility.Full);
183+
});
184+
185+
it('Should update and get subject Config', async () => {
186+
const subjectConfigRequest: ServerConfig = {
187+
compatibility: Compatibility.Full,
188+
normalize: true
189+
};
190+
191+
const subjectConfigResponse: ServerConfig = {
192+
compatibilityLevel: Compatibility.Full,
193+
normalize: true
194+
};
195+
196+
const registerResponse: SchemaMetadata = await schemaRegistryClient.registerFullResponse(testServerConfigSubject, schemaInfo);
197+
expect(registerResponse).toBeDefined();
198+
199+
const updateConfigResponse: ServerConfig = await schemaRegistryClient.updateConfig(testServerConfigSubject, subjectConfigRequest);
200+
expect(updateConfigResponse).toBeDefined();
201+
202+
const getConfigResponse: ServerConfig = await schemaRegistryClient.getConfig(testServerConfigSubject);
203+
expect(getConfigResponse).toEqual(subjectConfigResponse);
204+
});
205+
206+
it('Should get and set default Config', async () => {
207+
const serverConfigRequest: ServerConfig = {
208+
compatibility: Compatibility.Full,
209+
normalize: false
210+
};
211+
212+
const serverConfigResponse: ServerConfig = {
213+
compatibilityLevel: Compatibility.Full,
214+
normalize: false
215+
};
216+
217+
const updateDefaultConfigResponse: ServerConfig = await schemaRegistryClient.updateDefaultConfig(serverConfigRequest);
218+
expect(updateDefaultConfigResponse).toBeDefined();
219+
220+
const getDefaultConfigResponse: ServerConfig = await schemaRegistryClient.getDefaultConfig();
221+
expect(getDefaultConfigResponse).toEqual(serverConfigResponse);
222+
});
223+
224+
});

jest.config.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
module.exports = {
2+
preset: 'ts-jest',
3+
testEnvironment: 'node',
4+
testMatch: ['**/test/**/*.ts', '**/e2e/**/*.ts'],
5+
transform: {
6+
'^.+\\.tsx?$': 'ts-jest',
7+
},
8+
};

package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
"test": "make test",
1313
"install": "node-pre-gyp install --fallback-to-build",
1414
"prepack": "node ./ci/prepublish.js",
15-
"test:types": "tsc -p ."
15+
"test:types": "tsc -p .",
16+
"test:schemaregistry": "make -f Makefile.schemaregistry test"
1617
},
1718
"binary": {
1819
"module_name": "confluent-kafka-javascript",

schemaregistry/rest-service.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import axios, { AxiosInstance, AxiosRequestConfig, AxiosResponse } from 'axios';
2+
3+
/*
4+
* Confluent-Schema-Registry-TypeScript - Node.js wrapper for Confluent Schema Registry
5+
*
6+
* Copyright (c) 2024 Confluent, Inc.
7+
*
8+
* This software may be modified and distributed under the terms
9+
* of the MIT license. See the LICENSE.txt file for details.
10+
*/
11+
12+
export class RestService {
13+
private client: AxiosInstance
14+
15+
constructor(baseUrls: string[], isForward = false) {
16+
this.client = axios.create({
17+
baseURL: baseUrls[0], // Use the first base URL as the default
18+
timeout: 5000, // Default timeout
19+
headers: { 'Content-Type': 'application/vnd.schemaregistry.v1+json' },
20+
})
21+
22+
if (isForward) {
23+
this.client.defaults.headers.common['X-Forward'] = 'true'
24+
}
25+
}
26+
27+
public async sendHttpRequest<T>(
28+
url: string,
29+
method: 'GET' | 'POST' | 'PUT' | 'DELETE',
30+
data?: any, // eslint-disable-line @typescript-eslint/no-explicit-any
31+
config?: AxiosRequestConfig,
32+
): Promise<AxiosResponse<T>> {
33+
try {
34+
const response = await this.client.request<T>({
35+
url,
36+
method,
37+
data,
38+
...config,
39+
})
40+
return response
41+
} catch (error) {
42+
if (axios.isAxiosError(error) && error.response) {
43+
throw new Error(`HTTP error: ${error.response.status} - ${error.response.data}`)
44+
} else {
45+
const err = error as Error;
46+
throw new Error(`Unknown error: ${err.message}`)
47+
}
48+
}
49+
}
50+
51+
public setHeaders(headers: Record<string, string>): void {
52+
this.client.defaults.headers.common = { ...this.client.defaults.headers.common, ...headers }
53+
}
54+
55+
public setAuth(basicAuth?: string, bearerToken?: string): void {
56+
if (basicAuth) {
57+
this.client.defaults.headers.common['Authorization'] = `Basic ${basicAuth}`
58+
}
59+
60+
if (bearerToken) {
61+
this.client.defaults.headers.common['Authorization'] = `Bearer ${bearerToken}`
62+
}
63+
}
64+
65+
public setTimeout(timeout: number): void {
66+
this.client.defaults.timeout = timeout
67+
}
68+
69+
public setBaseURL(baseUrl: string): void {
70+
this.client.defaults.baseURL = baseUrl
71+
}
72+
}

0 commit comments

Comments
 (0)