Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/amqp10/decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export const FormatCode = {
Map32: 0xd1,
Null: 0x40,
ULong0: 0x44,
Ubyte: 0x50,
SmallUlong: 0x53,
ULong: 0x80,
Uint: 0x70,
Expand Down
48 changes: 40 additions & 8 deletions src/amqp10/encoder.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import { inspect } from "node:util"
import { isDate } from "node:util/types"
import { Message, MessageApplicationProperties, MessageProperties } from "../publisher"
import {
AmqpByte,
Message,
MessageAnnotations,
MessageAnnotationsValue,
MessageApplicationProperties,
MessageProperties,
} from "../publisher"
import { DataWriter } from "../requests/data_writer"

const FormatCodeType = {
Expand All @@ -23,6 +30,7 @@ const FormatCode = {
Null: 0x40,
SmallUlong: 0x53,
Uint: 0x70,
Ubyte: 0x50,
Int: 0x71,
Timestamp: 0x83,
} as const
Expand All @@ -35,14 +43,14 @@ const PropertySizeDescription =

type MessageApplicationPropertiesList = { key: string; value: string | number }[]

type MessageAnnotationsList = { key: string; value: string | number }[]
type MessageAnnotationsList = { key: string; value: MessageAnnotationsValue }[]

export function amqpEncode(
writer: DataWriter,
{ content, messageProperties, applicationProperties, messageAnnotations }: Message
): void {
writer.writeUInt32(messageSize({ content, messageProperties, applicationProperties, messageAnnotations }))
writeMessageAnnotations(writer, toList(messageAnnotations))
writeMessageAnnotations(writer, toAnnotationsList(messageAnnotations))
writeProperties(writer, messageProperties)
writeApplicationProperties(writer, toList(applicationProperties))
writeContent(writer, content)
Expand All @@ -53,7 +61,7 @@ export function messageSize({ content, messageProperties, applicationProperties,
lengthOfContent(content) +
lengthOfProperties(messageProperties) +
lengthOfApplicationProperties(toList(applicationProperties)) +
lengthOfMessageAnnotations(toList(messageAnnotations))
lengthOfMessageAnnotations(toAnnotationsList(messageAnnotations))
)
}

Expand Down Expand Up @@ -121,7 +129,11 @@ function writeMessageAnnotations(writer: DataWriter, messageAnnotationsList: Mes
.filter((elem) => elem.key)
.forEach((elem) => {
amqpWriteString(writer, elem.key)
typeof elem.value === "string" ? amqpWriteString(writer, elem.value) : amqpWriteIntNumber(writer, elem.value)
if (elem.value instanceof AmqpByte) {
amqpWriteByte(writer, elem.value)
} else {
typeof elem.value === "string" ? amqpWriteString(writer, elem.value) : amqpWriteIntNumber(writer, elem.value)
}
})
}

Expand Down Expand Up @@ -188,8 +200,12 @@ function getPropertySize(properties: MessageProperties): number {
)
}

function getListSize(list: MessageApplicationPropertiesList | MessageAnnotationsList): number {
return list.reduce((sum, elem) => sum + getSizeOf(elem.key) + getSizeOf(elem.value), 0)
function getListSize(list: MessageAnnotationsList): number {
return list.reduce(
(sum: number, elem: { key: string; value: MessageAnnotationsValue }) =>
sum + getSizeOf(elem.key) + getSizeOf(elem.value),
0
)
}

function amqpWriteString(writer: DataWriter, data?: string): void {
Expand Down Expand Up @@ -242,6 +258,11 @@ function amqpWriteIntNumber(writer: DataWriter, data?: number): void {
writer.writeInt32(data)
}

function amqpWriteByte(writer: DataWriter, data: AmqpByte): void {
writer.writeByte(FormatCode.Ubyte)
writer.writeByte(data.byteValue)
}

function amqpWriteBuffer(writer: DataWriter, data?: Buffer): void {
if (!data || !data.length) {
return amqpWriteNull(writer)
Expand Down Expand Up @@ -269,11 +290,15 @@ function amqpWriteDate(writer: DataWriter, date?: Date): void {
writer.writeUInt64(BigInt(date.getTime()))
}

function getSizeOf(value?: string | Date | number | Buffer): number {
function getSizeOf(value?: string | Date | number | Buffer | AmqpByte): number {
if (!value) {
return 1
}

if (value instanceof AmqpByte) {
return 1 + 1
}

if (typeof value === "string") {
const count = Buffer.from(value).length
return count <= 255 ? 1 + 1 + count : 1 + 4 + count
Expand All @@ -300,3 +325,10 @@ function toList(applicationProperties?: MessageApplicationProperties): MessageAp
return { key: elem[0], value: elem[1] }
})
}

function toAnnotationsList(annotations?: MessageAnnotations): MessageAnnotationsList {
if (!annotations) return []
return Object.entries(annotations).map((elem) => {
return { key: elem[0], value: elem[1] }
})
}
21 changes: 19 additions & 2 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,24 @@ import { PublishRequestV2 } from "./requests/publish_request_v2"

export type MessageApplicationProperties = Record<string, string | number>

export type MessageAnnotations = Record<string, string | number>
export type MessageAnnotations = Record<string, MessageAnnotationsValue>

export type MessageAnnotationsValue = string | number | AmqpByte

export class AmqpByte {
private value: number

constructor(value: number) {
if (value > 255 || value < 0) {
throw new Error("Invalid byte, value must be between 0 and 255")
}
this.value = value
}

public get byteValue() {
return this.value
}
}

export interface MessageProperties {
contentType?: string
Expand Down Expand Up @@ -54,7 +71,7 @@ export interface Message {
export interface MessageOptions {
messageProperties?: MessageProperties
applicationProperties?: Record<string, string | number>
messageAnnotations?: Record<string, string | number>
messageAnnotations?: Record<string, MessageAnnotationsValue>
}

export interface Publisher {
Expand Down
3 changes: 3 additions & 0 deletions src/response_decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ export function decodeFormatCode(dataResponse: DataReader, formatCode: number, s
return dataResponse.readUInt32()
case FormatCode.SmallUlong:
return dataResponse.readInt8() // Read a SmallUlong
case FormatCode.Ubyte:
dataResponse.forward(1)
return dataResponse.readUInt8()
case FormatCode.ULong:
return dataResponse.readUInt64() // Read an ULong
case FormatCode.List0:
Expand Down
55 changes: 23 additions & 32 deletions test/e2e/declare_consumer.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import { expect } from "chai"
import { randomUUID } from "crypto"
import { readFileSync } from "fs"
import path from "path"
import { Client, Publisher } from "../../src"
import {
AmqpByte,
Message,
MessageAnnotations,
MessageApplicationProperties,
MessageProperties,
MessageHeader,
MessageProperties,
} from "../../src/publisher"
import { Offset } from "../../src/requests/subscribe_request"
import { BufferDataReader } from "../../src/response_decoder"
import { getMaxSharedConnectionInstances, range } from "../../src/util"
import {
createClient,
createConsumer,
Expand All @@ -16,20 +22,14 @@ import {
createStreamName,
} from "../support/fake_data"
import { Rabbit, RabbitConnectionResponse } from "../support/rabbit"
import { getMaxSharedConnectionInstances, range } from "../../src/util"
import { BufferDataReader } from "../../src/response_decoder"
import {
decodeMessageTesting,
eventually,
expectToThrowAsync,
username,
password,
createClassicPublisher,
decodeMessageTesting,
getTestNodesFromEnv,
password,
username,
} from "../support/util"
import { readFileSync } from "fs"
import path from "path"
import { randomUUID } from "crypto"

describe("declare consumer", () => {
let streamName: string
Expand Down Expand Up @@ -241,32 +241,23 @@ describe("declare consumer", () => {
await eventually(async () => expect(messageAnnotations).eql([annotations]))
}).timeout(10000)

it("messageAnnotations are ignored by a classic driver", async () => {
it("messageAnnotations with bytes are read correctly", async () => {
const messageAnnotations: MessageAnnotations[] = []
const annotations = createAnnotations()
const classicPublisher = await createClassicPublisher()
await classicPublisher.ch.assertQueue("testQ", {
exclusive: false,
durable: true,
autoDelete: false,
arguments: {
"x-queue-type": "stream", // Mandatory to define stream queue
},
})
classicPublisher.ch.sendToQueue("testQ", Buffer.from("Hello"), {
headers: {
messageAnnotations: annotations,
},
})
const annotations = { test: new AmqpByte(123) }
await rabbit.createStream("testQ")
await client.declareConsumer(
{ stream: "testQ", offset: Offset.next(), consumerRef: "test" },
(message: Message) => {
messageAnnotations.push(message.messageAnnotations ?? {})
}
)

await client.declareConsumer({ stream: "testQ", offset: Offset.first() }, (message: Message) => {
messageAnnotations.push(message.messageAnnotations || {})
})
const testP = await client.declarePublisher({ stream: "testQ" })
await testP.send(Buffer.from("Hello"), { messageAnnotations: annotations })

await eventually(async () => {
expect(messageAnnotations).not.eql([annotations])
await classicPublisher.ch.close()
await classicPublisher.conn.close()
const [messageAnnotation] = messageAnnotations
expect(messageAnnotation).to.eql({ test: 123 })
})
}).timeout(10000)

Expand Down