Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Makefile.schemaregistry
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ TS_NODE ?= ./node_modules/.bin/ts-node

# Paths
SRC_DIR = schemaregistry
TEST_DIR = test/schemaregistry
SR_TEST_DIR = test/schemaregistry
DEK_TEST_DIR = test/dekregistry
INTEG_DIR = e2e/schemaregistry

# Tasks
Expand All @@ -17,10 +18,10 @@ INTEG_DIR = e2e/schemaregistry
all: lint test

lint:
$(ESLINT) $(SRC_DIR) $(TEST_DIR) $(INTEG_DIR)
$(ESLINT) $(SRC_DIR) $(TEST_DIR) $(INTEG_DIR) $(DEK_TEST_DIR)

test:
$(JEST) $(TEST_DIR)
$(JEST) $(SR_TEST_DIR) $(DEK_TEST_DIR)

integtest:
$(JEST) $(INTEG_DIR)
5 changes: 5 additions & 0 deletions dekregistry/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
const MOCK_TS = 11112223334;

export {
MOCK_TS
};
218 changes: 218 additions & 0 deletions dekregistry/dekregistry-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
import { LRUCache } from 'lru-cache';
import { Mutex } from 'async-mutex';
import { RestService } from '../schemaregistry/rest-service';
import stringify from 'json-stringify-deterministic';

interface Kek {
name?: string;
kmsType?: string;
kmsKeyId?: string;
kmsProps?: { [key: string]: string };
doc?: string;
shared?: boolean;
ts?: number;
deleted?: boolean;
}

interface CreateKekRequest {
name?: string;
kmsType?: string;
kmsKeyId?: string;
kmsProps?: { [key: string]: string };
doc?: string;
shared?: boolean;
}

interface Dek {
kekName?: string;
subject?: string;
version?: number;
algorithm?: string;
encryptedKeyMaterial?: string;
encryptedKeyMaterialBytes?: Buffer;
keyMaterial?: string;
keyMaterialBytes?: Buffer;
ts?: number;
deleted?: boolean;
}

interface Client {
registerKek(name: string, kmsType: string, kmsKeyId: string, kmsProps: { [key: string]: string }, doc: string, shared: boolean): Promise<Kek>;
getKek(name: string, deleted: boolean): Promise<Kek>;
registerDek(kekName: string, subject: string, algorithm: string, encryptedKeyMaterial: string, version: number): Promise<Dek>;
getDek(kekName: string, subject: string, algorithm: string, version: number, deleted: boolean): Promise<Dek>;
close(): Promise<void>;
}

class DekRegistryClient implements Client {
private restService: RestService;
private kekCache: LRUCache<string, Kek>;
private dekCache: LRUCache<string, Dek>;
private kekMutex: Mutex;
private dekMutex: Mutex;

constructor(restService: RestService, cacheSize: number = 512, cacheTTL?: number) {
const cacheOptions = {
max: cacheSize,
...(cacheTTL !== undefined && { maxAge: cacheTTL })
};

this.restService = restService;
this.kekCache = new LRUCache<string, Kek>(cacheOptions);
this.dekCache = new LRUCache<string, Dek>(cacheOptions);
this.kekMutex = new Mutex();
this.dekMutex = new Mutex();
}

public static getEncryptedKeyMaterialBytes(dek: Dek): Buffer | null {
if (!dek.encryptedKeyMaterial) {
return null;
}

if (!dek.encryptedKeyMaterialBytes) {
try {
const bytes = Buffer.from(dek.encryptedKeyMaterial, 'base64');
dek.encryptedKeyMaterialBytes = bytes;
} catch (err) {
throw new Error(`Failed to decode base64 string: ${err.message}`);
}
}

return dek.encryptedKeyMaterialBytes;
}

public static getKeyMaterialBytes(dek: Dek): Buffer | null {
if (!dek.keyMaterial) {
return null;
}

if (!dek.keyMaterialBytes) {
try {
const bytes = Buffer.from(dek.keyMaterial, 'base64');
dek.keyMaterialBytes = bytes;
} catch (err) {
throw new Error(`Failed to decode base64 string: ${err.message}`);
}
}

return dek.keyMaterialBytes;
}

public static setKeyMaterial(dek: Dek, keyMaterialBytes: Buffer): void {
if (keyMaterialBytes) {
const str = keyMaterialBytes.toString('base64');
dek.keyMaterial = str;
}
}

public async registerKek(name: string, kmsType: string, kmsKeyId: string,
kmsProps: { [key: string]: string }, doc: string, shared: boolean): Promise<Kek> {
const cacheKey = stringify({ name, deleted: false });

return await this.kekMutex.runExclusive(async () => {
const kek = this.kekCache.get(cacheKey);
if (kek) {
return kek;
}

const request: CreateKekRequest = {
name,
kmsType,
kmsKeyId,
kmsProps,
doc,
shared,
};

const response = await this.restService.sendHttpRequest<Kek>(
'/dek-registry/v1/keks',
'POST',
request);
this.kekCache.set(cacheKey, response.data);
return response.data;
});
}

public async getKek(name: string, deleted: boolean = false): Promise<Kek> {
const cacheKey = stringify({ name, deleted });

return await this.kekMutex.runExclusive(async () => {
const kek = this.kekCache.get(cacheKey);
if (kek) {
return kek;
}
name = encodeURIComponent(name);

const response = await this.restService.sendHttpRequest<Kek>(
`/dek-registry/v1/keks/${name}?deleted=${deleted}`,
'GET');
this.kekCache.set(cacheKey, response.data);
return response.data;
});
}

public async registerDek(kekName: string, subject: string,
algorithm: string, encryptedKeyMaterial: string, version: number = 1): Promise<Dek> {
const cacheKey = stringify({ kekName, subject, version, algorithm, deleted: false });

return await this.dekMutex.runExclusive(async () => {
const dek = this.dekCache.get(cacheKey);
if (dek) {
return dek;
}

const request: Dek = {
subject,
version,
algorithm,
encryptedKeyMaterial,
};
kekName = encodeURIComponent(kekName);

const response = await this.restService.sendHttpRequest<Dek>(
`/dek-registry/v1/keks/${kekName}/deks`,
'POST',
request);
this.dekCache.set(cacheKey, response.data);

this.dekCache.delete(stringify({ kekName, subject, version: -1, algorithm, deleted: false }));
this.dekCache.delete(stringify({ kekName, subject, version: -1, algorithm, deleted: true }));

return response.data;
});
}

public async getDek(kekName: string, subject: string,
algorithm: string, version: number = 1, deleted: boolean = false): Promise<Dek> {
const cacheKey = stringify({ kekName, subject, version, algorithm, deleted });

return await this.dekMutex.runExclusive(async () => {
const dek = this.dekCache.get(cacheKey);
if (dek) {
return dek;
}
kekName = encodeURIComponent(kekName);
subject = encodeURIComponent(subject);

const response = await this.restService.sendHttpRequest<Dek>(
`/dek-registry/v1/keks/${kekName}/deks/${subject}/versions/${version}?deleted=${deleted}`,
'GET');
this.dekCache.set(cacheKey, response.data);
return response.data;
});
}

public async close(): Promise<void> {
return;
}

//Cache methods for testing
public async checkLatestDekInCache(kekName: string, subject: string, algorithm: string): Promise<boolean> {
const cacheKey = stringify({ kekName, subject, version: -1, algorithm, deleted: false });
const cachedDek = this.dekCache.get(cacheKey);
return cachedDek !== undefined;
}
}

export { DekRegistryClient, Client, Kek, Dek };

97 changes: 97 additions & 0 deletions dekregistry/mock-dekregistry-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { Client, Dek, Kek } from "./dekregistry-client";
import { MOCK_TS } from "./constants";
import stringify from "json-stringify-deterministic";

class MockDekRegistryClient implements Client {
private kekCache: Map<string, Kek>;
private dekCache: Map<string, Dek>;

constructor() {
this.kekCache = new Map<string, Kek>();
this.dekCache = new Map<string, Dek>();
}

public async registerKek(name: string, kmsType: string, kmsKeyId: string,
kmsProps: { [key: string]: string }, doc: string, shared: boolean): Promise<Kek> {
const cacheKey = stringify({ name, deleted: false });
const cachedKek = this.kekCache.get(cacheKey);
if (cachedKek) {
return cachedKek;
}

const kek: Kek = {
name,
kmsType,
kmsKeyId,
kmsProps,
doc,
shared
};

this.kekCache.set(cacheKey, kek);
return kek;
}

public async getKek(name: string, deleted: boolean = false): Promise<Kek> {
const cacheKey = stringify({ name, deleted });
const cachedKek = this.kekCache.get(cacheKey);
if (cachedKek && (!cachedKek.deleted || deleted)) {
return cachedKek;
}

throw new Error(`Kek not found: ${name}`);
}

public async registerDek(kekName: string, subject: string,
algorithm: string, encryptedKeyMaterial: string, version: number): Promise<Dek> {
const cacheKey = stringify({ kekName, subject, version, algorithm, deleted: false });
const cachedDek = this.dekCache.get(cacheKey);
if (cachedDek) {
return cachedDek;
}

const dek: Dek = {
kekName,
subject,
algorithm,
encryptedKeyMaterial,
version,
ts: MOCK_TS
};

this.dekCache.set(cacheKey, dek);
return dek;
}

public async getDek(kekName: string, subject: string,
algorithm: string, version: number = 1, deleted: boolean = false): Promise<Dek> {
if (version === -1) {
let latestVersion = 0;
for (let key of this.dekCache.keys()) {
const parsedKey = JSON.parse(key);
if (parsedKey.kekName === kekName && parsedKey.subject === subject
&& parsedKey.algorithm === algorithm && parsedKey.deleted === deleted) {
latestVersion = Math.max(latestVersion, parsedKey.version);
}
}
if (latestVersion === 0) {
throw new Error(`Dek not found: ${subject}`);
}
version = latestVersion;
}

const cacheKey = stringify({ kekName, subject, version, algorithm, deleted });
const cachedDek = this.dekCache.get(cacheKey);
if (cachedDek) {
return cachedDek;
}

throw new Error(`Dek not found: ${subject}`);
}

public async close() {
return;
}
}

export { MockDekRegistryClient };
8 changes: 8 additions & 0 deletions e2e/schemaregistry/schemaregistry-client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ describe('SchemaRegistryClient Integration Test', () => {

const getMetadataResponse: SchemaMetadata = await schemaRegistryClient.getSchemaMetadata(testSubject, schemaVersion);
expect(schemaMetadata).toEqual(getMetadataResponse);

const keyValueMetadata: { [key: string]: string } = {
'owner': 'Bob Jones',
'email': '[email protected]'
}

const getLatestWithMetadataResponse: SchemaMetadata = await schemaRegistryClient.getLatestWithMetadata(testSubject, keyValueMetadata);
expect(schemaMetadata).toEqual(getLatestWithMetadataResponse);
});

it('Should test compatibility for a version and subject, getting and updating', async () => {
Expand Down
Loading