Skip to content

Commit d73496f

Browse files
Add clientConfig, baseUrl retry, RestError, encodeURIComponent (#12) (#68)
* Add clientConfig, baseUrl retry, RestError * refactor such that RestService takes in necessary dependencies
1 parent 4aee89f commit d73496f

File tree

8 files changed

+315
-219
lines changed

8 files changed

+315
-219
lines changed

dekregistry/dekregistry-client.ts

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,17 @@
11
import { LRUCache } from 'lru-cache';
22
import { Mutex } from 'async-mutex';
3-
import { RestService } from '../schemaregistry/rest-service';
3+
import { ClientConfig, RestService } from '../schemaregistry/rest-service';
44
import stringify from 'json-stringify-deterministic';
55

6+
/*
7+
* Confluent-Schema-Registry-TypeScript - Node.js wrapper for Confluent Schema Registry
8+
*
9+
* Copyright (c) 2024 Confluent, Inc.
10+
*
11+
* This software may be modified and distributed under the terms
12+
* of the MIT license. See the LICENSE.txt file for details.
13+
*/
14+
615
interface Kek {
716
name?: string;
817
kmsType?: string;
@@ -51,13 +60,14 @@ class DekRegistryClient implements Client {
5160
private kekMutex: Mutex;
5261
private dekMutex: Mutex;
5362

54-
constructor(restService: RestService, cacheSize: number = 512, cacheTTL?: number) {
63+
constructor(config: ClientConfig) {
5564
const cacheOptions = {
56-
max: cacheSize,
57-
...(cacheTTL !== undefined && { maxAge: cacheTTL })
65+
max: config.cacheCapacity,
66+
...(config.cacheLatestTtlSecs !== undefined && { maxAge: config.cacheLatestTtlSecs * 1000 }),
5867
};
5968

60-
this.restService = restService;
69+
70+
this.restService = new RestService(config.createAxiosDefaults, config.baseURLs, config.isForward);
6171
this.kekCache = new LRUCache<string, Kek>(cacheOptions);
6272
this.dekCache = new LRUCache<string, Dek>(cacheOptions);
6373
this.kekMutex = new Mutex();
@@ -124,7 +134,7 @@ class DekRegistryClient implements Client {
124134
shared,
125135
};
126136

127-
const response = await this.restService.sendHttpRequest<Kek>(
137+
const response = await this.restService.handleRequest<Kek>(
128138
'/dek-registry/v1/keks',
129139
'POST',
130140
request);
@@ -143,7 +153,7 @@ class DekRegistryClient implements Client {
143153
}
144154
name = encodeURIComponent(name);
145155

146-
const response = await this.restService.sendHttpRequest<Kek>(
156+
const response = await this.restService.handleRequest<Kek>(
147157
`/dek-registry/v1/keks/${name}?deleted=${deleted}`,
148158
'GET');
149159
this.kekCache.set(cacheKey, response.data);
@@ -169,7 +179,7 @@ class DekRegistryClient implements Client {
169179
};
170180
kekName = encodeURIComponent(kekName);
171181

172-
const response = await this.restService.sendHttpRequest<Dek>(
182+
const response = await this.restService.handleRequest<Dek>(
173183
`/dek-registry/v1/keks/${kekName}/deks`,
174184
'POST',
175185
request);
@@ -194,7 +204,7 @@ class DekRegistryClient implements Client {
194204
kekName = encodeURIComponent(kekName);
195205
subject = encodeURIComponent(subject);
196206

197-
const response = await this.restService.sendHttpRequest<Dek>(
207+
const response = await this.restService.handleRequest<Dek>(
198208
`/dek-registry/v1/keks/${kekName}/deks/${subject}/versions/${version}?deleted=${deleted}`,
199209
'GET');
200210
this.dekCache.set(cacheKey, response.data);

e2e/schemaregistry/schemaregistry-client.spec.ts

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { RestService } from '../../schemaregistry/rest-service';
21
import {
32
Compatibility,
43
SchemaRegistryClient,
@@ -8,19 +7,10 @@ import {
87
Metadata
98
} from '../../schemaregistry/schemaregistry-client';
109
import { beforeEach, describe, expect, it } from '@jest/globals';
10+
import { clientConfig } from '../../test/schemaregistry/test-constants';
1111

1212
/* eslint-disable @typescript-eslint/no-non-null-asserted-optional-chain */
1313

14-
const baseUrls = ['http://localhost:8081'];
15-
const headers = { 'Content-Type': 'application/vnd.schemaregistry.v1+json' };
16-
const restService = new RestService(baseUrls, false);
17-
restService.setHeaders(headers);
18-
19-
const basicAuth = Buffer.from('RBACAllowedUser-lsrc1:nohash').toString('base64');
20-
restService.setAuth(basicAuth);
21-
22-
restService.setTimeout(10000);
23-
2414
let schemaRegistryClient: SchemaRegistryClient;
2515
const testSubject = 'integ-test-subject';
2616
const testServerConfigSubject = 'integ-test-server-config-subject';
@@ -72,7 +62,7 @@ const backwardCompatibleSchemaInfo: SchemaInfo = {
7262
describe('SchemaRegistryClient Integration Test', () => {
7363

7464
beforeEach(async () => {
75-
schemaRegistryClient = new SchemaRegistryClient(restService);
65+
schemaRegistryClient = new SchemaRegistryClient(clientConfig);
7666
const subjects: string[] = await schemaRegistryClient.getAllSubjects();
7767

7868
if (subjects && subjects.includes(testSubject)) {
@@ -86,7 +76,11 @@ describe('SchemaRegistryClient Integration Test', () => {
8676
}
8777
});
8878

89-
it('should register, retrieve, and delete a schema', async () => {
79+
it("Should return RestError when retrieving non-existent schema", async () => {
80+
await expect(schemaRegistryClient.getBySubjectAndId(testSubject, 1)).rejects.toThrow();
81+
});
82+
83+
it('Should register, retrieve, and delete a schema', async () => {
9084
// Register a schema
9185
const registerResponse: SchemaMetadata = await schemaRegistryClient.registerFullResponse(testSubject, schemaInfo);
9286
expect(registerResponse).toBeDefined();

schemaregistry/rest-error.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
export class RestError extends Error {
2+
status: number;
3+
errorCode: number;
4+
5+
constructor(message: string, status: number, errorCode: number) {
6+
super(message + "; Error code: " + errorCode);
7+
this.status = status;
8+
this.errorCode = errorCode;
9+
}
10+
}

schemaregistry/rest-service.ts

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import axios, { AxiosInstance, AxiosRequestConfig, AxiosResponse } from 'axios';
1+
import axios, { AxiosInstance, AxiosRequestConfig, AxiosResponse, CreateAxiosDefaults } from 'axios';
2+
import { RestError } from './rest-error';
23

34
/*
45
* Confluent-Schema-Registry-TypeScript - Node.js wrapper for Confluent Schema Registry
@@ -9,43 +10,60 @@ import axios, { AxiosInstance, AxiosRequestConfig, AxiosResponse } from 'axios';
910
* of the MIT license. See the LICENSE.txt file for details.
1011
*/
1112

13+
export type ClientConfig = {
14+
createAxiosDefaults: CreateAxiosDefaults,
15+
baseURLs: string[],
16+
cacheCapacity: number,
17+
cacheLatestTtlSecs?: number,
18+
isForward?: boolean
19+
}
20+
1221
export class RestService {
13-
private client: AxiosInstance
22+
private client: AxiosInstance;
23+
private baseURLs: string[];
1424

15-
constructor(baseUrls: string[], isForward = false) {
16-
this.client = axios.create({
17-
baseURL: baseUrls[0], // Use the first base URL as the default
18-
timeout: 5000, // Default timeout
19-
headers: { 'Content-Type': 'application/vnd.schemaregistry.v1+json' },
20-
})
25+
constructor(axiosDefaults: CreateAxiosDefaults, baseURLs: string[], isForward?: boolean) {
26+
this.client = axios.create(axiosDefaults);
27+
this.baseURLs = baseURLs;
2128

2229
if (isForward) {
2330
this.client.defaults.headers.common['X-Forward'] = 'true'
2431
}
2532
}
2633

27-
public async sendHttpRequest<T>(
34+
public async handleRequest<T>(
2835
url: string,
2936
method: 'GET' | 'POST' | 'PUT' | 'DELETE',
3037
data?: any, // eslint-disable-line @typescript-eslint/no-explicit-any
3138
config?: AxiosRequestConfig,
3239
): Promise<AxiosResponse<T>> {
33-
try {
34-
const response = await this.client.request<T>({
35-
url,
36-
method,
37-
data,
38-
...config,
39-
})
40-
return response
41-
} catch (error) {
42-
if (axios.isAxiosError(error) && error.response) {
43-
throw new Error(`HTTP error: ${error.response.status} - ${error.response.data}`)
44-
} else {
45-
const err = error as Error;
46-
throw new Error(`Unknown error: ${err.message}`)
40+
41+
for (let i = 0; i < this.baseURLs.length; i++) {
42+
try {
43+
this.setBaseURL(this.baseURLs[i]);
44+
const response = await this.client.request<T>({
45+
url,
46+
method,
47+
data,
48+
...config,
49+
})
50+
return response;
51+
} catch (error) {
52+
if (axios.isAxiosError(error) && error.response && (error.response.status < 200 || error.response.status > 299)) {
53+
const data = error.response.data;
54+
if (data.error_code && data.message) {
55+
error = new RestError(data.message, error.response.status, data.error_code);
56+
} else {
57+
error = new Error(`Unknown error: ${error.message}`)
58+
}
59+
}
60+
if (i === this.baseURLs.length - 1) {
61+
throw error;
62+
}
4763
}
4864
}
65+
66+
throw new Error('Internal HTTP retry error'); // Should never reach here
4967
}
5068

5169
public setHeaders(headers: Record<string, string>): void {

0 commit comments

Comments
 (0)