Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions schemaregistry/rest-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export interface BearerAuthCredentials {
//TODO: Consider retry policy, may need additional libraries on top of Axios
export interface ClientConfig {
baseURLs: string[],
cacheCapacity: number,
cacheCapacity?: number,
cacheLatestTtlSecs?: number,
isForward?: boolean,
createAxiosDefaults?: CreateAxiosDefaults,
Expand All @@ -37,7 +37,7 @@ export class RestService {
private OAuthClient?: OAuthClient;
private bearerAuth: boolean = false;

constructor(baseURLs: string[], isForward?: boolean, axiosDefaults?: CreateAxiosDefaults,
constructor(baseURLs: string[], isForward?: boolean, axiosDefaults?: CreateAxiosDefaults,
bearerAuthCredentials?: BearerAuthCredentials) {
this.client = axios.create(axiosDefaults);
this.baseURLs = baseURLs;
Expand All @@ -53,7 +53,7 @@ export class RestService {
'Confluent-Identity-Pool-Id': bearerAuthCredentials.identityPool,
'target-sr-cluster': bearerAuthCredentials.schemaRegistryLogicalCluster
});
this.OAuthClient = new OAuthClient(bearerAuthCredentials.clientId, bearerAuthCredentials.clientSecret,
this.OAuthClient = new OAuthClient(bearerAuthCredentials.clientId, bearerAuthCredentials.clientSecret,
bearerAuthCredentials.tokenHost, bearerAuthCredentials.tokenPath, bearerAuthCredentials.scope);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class DekRegistryClient implements Client {

constructor(config: ClientConfig) {
const cacheOptions = {
max: config.cacheCapacity,
max: config.cacheCapacity !== undefined ? config.cacheCapacity : 1000,
...(config.cacheLatestTtlSecs !== undefined && { maxAge: config.cacheLatestTtlSecs * 1000 }),
};

Expand Down
4 changes: 2 additions & 2 deletions schemaregistry/schemaregistry-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,11 @@ export class SchemaRegistryClient implements Client {
constructor(config: ClientConfig) {
this.clientConfig = config
const cacheOptions = {
max: config.cacheCapacity,
max: config.cacheCapacity !== undefined ? config.cacheCapacity : 1000,
...(config.cacheLatestTtlSecs !== undefined && { maxAge: config.cacheLatestTtlSecs * 1000 })
};

this.restService = new RestService(config.baseURLs, config.isForward, config.createAxiosDefaults,
this.restService = new RestService(config.baseURLs, config.isForward, config.createAxiosDefaults,
config.bearerAuthCredentials);

this.schemaToIdCache = new LRUCache(cacheOptions);
Expand Down
4 changes: 2 additions & 2 deletions schemaregistry/serde/avro.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ async function transform(ctx: RuleContext, schema: Type, msg: any, fieldTransfor
return await Promise.all(array.map(item => transform(ctx, arraySchema.itemsType, item, fieldTransform)))
case 'map':
const mapSchema = schema as MapType
const map = msg as Map<string, any>
const map = msg as { [key: string]: any }
for (const key of Object.keys(map)) {
map.set(key, await transform(ctx, mapSchema.valuesType, map.get(key), fieldTransform))
map[key] = await transform(ctx, mapSchema.valuesType, map[key], fieldTransform)
}
return map
case 'record':
Expand Down
6 changes: 4 additions & 2 deletions schemaregistry/serde/json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ async function toType(
const json = JSON.parse(info.schema)
const spec = json.$schema
let schema
if (spec === 'http://json-schema.org/draft/2020-12/schema') {
if (spec === 'http://json-schema.org/draft/2020-12/schema'
|| spec === 'https://json-schema.org/draft/2020-12/schema') {
schema = await dereferenceJSONSchemaDraft2020_12(json, { retrieve })
} else {
schema = await dereferenceJSONSchemaDraft07(json, { retrieve })
Expand Down Expand Up @@ -302,6 +303,7 @@ async function transform(ctx: RuleContext, schema: DereferencedJSONSchema, path:
for (let i = 0; i < msg.length; i++) {
msg[i] = await transform(ctx, schema.items, path, msg[i], fieldTransform)
}
return msg
}
}
if (schema.$ref != null) {
Expand Down Expand Up @@ -355,7 +357,7 @@ async function transformField(ctx: RuleContext, path: string, propName: string,
function validateSubschemas(subschemas: DereferencedJSONSchema[], msg: any): DereferencedJSONSchema | null {
for (let subschema of subschemas) {
try {
validateJSON(subschema, msg)
validateJSON(msg, subschema)
return subschema
} catch (error) {
// ignore
Expand Down
204 changes: 204 additions & 0 deletions test/schemaregistry/serde/avro.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,60 @@ const demoSchemaWithUnion = `
]
}
`
const schemaEvolution1 = `
{
"name": "SchemaEvolution",
"type": "record",
"fields": [
{
"name": "fieldToDelete",
"type": "string"
}
]
}
`
const schemaEvolution2 = `
{
"name": "SchemaEvolution",
"type": "record",
"fields": [
{
"name": "newOptionalField",
"type": ["string", "null"],
"default": "optional"
}
]
}
`
const complexSchema = `
{
"name": "ComplexSchema",
"type": "record",
"fields": [
{
"name": "arrayField",
"type": {
"type": "array",
"items": "string"
},
"confluent:tags": [ "PII" ]
},
{
"name": "mapField",
"type": {
"type": "map",
"values": "string"
},
"confluent:tags": [ "PII" ]
},
{
"name": "unionField",
"type": ["null", "string"],
"confluent:tags": [ "PII" ]
}
]
}
`

class FakeClock extends Clock {
fixedNow: number = 0
Expand Down Expand Up @@ -320,6 +374,38 @@ describe('AvroSerializer', () => {
expect(obj2.otherField.boolField).toEqual(nested.boolField);
expect(obj2.otherField.bytesField).toEqual(nested.bytesField);
})
it('schema evolution', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
cacheCapacity: 1000
}
let client = SchemaRegistryClient.newClient(conf)
let ser = new AvroSerializer(client, SerdeType.VALUE, {useLatestVersion: true})

let obj = {
fieldToDelete: "bye",
}
let info: SchemaInfo = {
schemaType: 'AVRO',
schema: schemaEvolution1,
}

await client.register(subject, info, false)

let bytes = await ser.serialize(topic, obj)

info = {
schemaType: 'AVRO',
schema: schemaEvolution2,
}

await client.register(subject, info, false)

let deser = new AvroDeserializer(client, SerdeType.VALUE, {useLatestVersion: true})
let obj2 = await deser.deserialize(topic, bytes)
expect(obj2.fieldToDelete).toEqual(undefined);
expect(obj2.newOptionalField).toEqual("optional");
})
it('basic encryption', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
Expand Down Expand Up @@ -876,6 +962,124 @@ describe('AvroSerializer', () => {
expect(obj2.boolField).toEqual(obj.boolField);
expect(obj2.bytesField).toEqual(obj.bytesField);
})
it('complex encryption', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
cacheCapacity: 1000
}
let client = SchemaRegistryClient.newClient(conf)
let serConfig: AvroSerializerConfig = {
useLatestVersion: true,
ruleConfig: {
secret: 'mysecret'
}
}
let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig)
let dekClient = fieldEncryptionExecutor.client!

let encRule: Rule = {
name: 'test-encrypt',
kind: 'TRANSFORM',
mode: RuleMode.WRITEREAD,
type: 'ENCRYPT',
tags: ['PII'],
params: {
'encrypt.kek.name': 'kek1',
'encrypt.kms.type': 'local-kms',
'encrypt.kms.key.id': 'mykey',
},
onFailure: 'ERROR,NONE'
}
let ruleSet: RuleSet = {
domainRules: [encRule]
}

let info: SchemaInfo = {
schemaType: 'AVRO',
schema: complexSchema,
ruleSet
}

await client.register(subject, info, false)

let obj = {
arrayField: [ 'hello' ],
mapField: { 'key': 'world' },
unionField: 'bye',
}
let bytes = await ser.serialize(topic, obj)

let deserConfig: AvroDeserializerConfig = {
ruleConfig: {
secret: 'mysecret'
}
}
let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig)
fieldEncryptionExecutor.client = dekClient
let obj2 = await deser.deserialize(topic, bytes)
expect(obj2.arrayField).toEqual([ 'hello' ]);
expect(obj2.mapField).toEqual({ 'key': 'world' });
expect(obj2.unionField).toEqual('bye');
})
it('complex encryption with null', async () => {
let conf: ClientConfig = {
baseURLs: [baseURL],
cacheCapacity: 1000
}
let client = SchemaRegistryClient.newClient(conf)
let serConfig: AvroSerializerConfig = {
useLatestVersion: true,
ruleConfig: {
secret: 'mysecret'
}
}
let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig)
let dekClient = fieldEncryptionExecutor.client!

let encRule: Rule = {
name: 'test-encrypt',
kind: 'TRANSFORM',
mode: RuleMode.WRITEREAD,
type: 'ENCRYPT',
tags: ['PII'],
params: {
'encrypt.kek.name': 'kek1',
'encrypt.kms.type': 'local-kms',
'encrypt.kms.key.id': 'mykey',
},
onFailure: 'ERROR,NONE'
}
let ruleSet: RuleSet = {
domainRules: [encRule]
}

let info: SchemaInfo = {
schemaType: 'AVRO',
schema: complexSchema,
ruleSet
}

await client.register(subject, info, false)

let obj = {
arrayField: [ 'hello' ],
mapField: { 'key': 'world' },
unionField: null
}
let bytes = await ser.serialize(topic, obj)

let deserConfig: AvroDeserializerConfig = {
ruleConfig: {
secret: 'mysecret'
}
}
let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig)
fieldEncryptionExecutor.client = dekClient
let obj2 = await deser.deserialize(topic, bytes)
expect(obj2.arrayField).toEqual([ 'hello' ]);
expect(obj2.mapField).toEqual({ 'key': 'world' });
expect(obj2.unionField).toEqual(null);
})
it('jsonata fully compatible', async () => {
let rule1To2 = "$merge([$sift($, function($v, $k) {$k != 'size'}), {'height': $.'size'}])"
let rule2To1 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'size': $.'height'}])"
Expand Down
Loading