Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 81 additions & 67 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import { randomUUID } from "crypto"
import { coerce, lt } from "semver"
import { inspect } from "util"
import { Compression, CompressionType, GzipCompression, NoneCompression } from "./compression"
import { Connection, ConnectionInfo, errorMessageOf } from "./connection"
import { ConnectionPool } from "./connection_pool"
import { Consumer, ConsumerFunc, StreamConsumer } from "./consumer"
import { Connection, ConnectionInfo, ConnectionParams, errorMessageOf } from "./connection"
import { ConnectionPool, ConnectionPurpose } from "./connection_pool"
import { Consumer, ConsumerFunc, StreamConsumer, computeExtendedConsumerId } from "./consumer"
import { STREAM_ALREADY_EXISTS_ERROR_CODE } from "./error_codes"
import { Logger, NullLogger } from "./logger"
import { FilterFunc, Message, Publisher, StreamPublisher } from "./publisher"
Expand Down Expand Up @@ -56,10 +56,8 @@ type PublisherMappedValue = {
}
export class Client {
public readonly id: string = randomUUID()
private publisherId = 0
private consumerId = 0
private consumers = new Map<number, ConsumerMappedValue>()
private publishers = new Map<number, PublisherMappedValue>()
private consumers = new Map<string, ConsumerMappedValue>()
private publishers = new Map<string, PublisherMappedValue>()
private compressions = new Map<CompressionType, Compression>()
private connection: Connection

Expand Down Expand Up @@ -134,8 +132,8 @@ export class Client {
}

public async declarePublisher(params: DeclarePublisherParams, filter?: FilterFunc): Promise<Publisher> {
const publisherId = this.incPublisherId()
const connection = await this.getConnection(params.stream, true, params.connectionClosedListener)
const connection = await this.getConnection(params.stream, "publisher", params.connectionClosedListener)
const publisherId = connection.getNextPublisherId()
await this.declarePublisherOnConnection(params, publisherId, connection, filter)
const streamPublisherParams = {
connection: connection,
Expand All @@ -151,32 +149,35 @@ export class Client {
connection.on("metadata_update", async (metadata) => {
if (metadata.metadataInfo.stream === publisher.streamName) {
await publisher.close(false)
this.publishers.delete(publisherId)
this.publishers.delete(publisher.extendedId)
}
})
this.publishers.set(publisherId, { publisher, connection, params, filter })
this.publishers.set(publisher.extendedId, { publisher, connection, params, filter })
this.logger.info(
`New publisher created with stream name ${params.stream}, publisher id ${publisherId} and publisher reference ${params.publisherRef}`
)
return publisher
}

public async deletePublisher(publisherId: number) {
const publisherConnection = this.publishers.get(publisherId)?.connection ?? this.connection
const res = await publisherConnection.sendAndWait<DeletePublisherResponse>(new DeletePublisherRequest(publisherId))
public async deletePublisher(extendedPublisherId: string) {
const { publisher, connection } = this.publishers.get(extendedPublisherId) ?? {
publisher: undefined,
connection: this.connection,
}
const publisherId = extractPublisherId(extendedPublisherId)
const res = await connection.sendAndWait<DeletePublisherResponse>(new DeletePublisherRequest(publisherId))
if (!res.ok) {
throw new Error(`Delete Publisher command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
await this.publishers.get(publisherId)?.publisher.close(true)
this.publishers.delete(publisherId)
await publisher?.close(true)
this.publishers.delete(extendedPublisherId)
this.logger.info(`deleted publisher with publishing id ${publisherId}`)
return res.ok
}

public async declareConsumer(params: DeclareConsumerParams, handle: ConsumerFunc): Promise<Consumer> {
const consumerId = this.incConsumerId()

const connection = await this.getConnection(params.stream, false, params.connectionClosedListener)
const connection = await this.getConnection(params.stream, "consumer", params.connectionClosedListener)
const consumerId = connection.getNextConsumerId()

if (params.filter && !connection.isFilteringEnabled) {
throw new Error(`Broker does not support message filtering.`)
Expand All @@ -192,31 +193,35 @@ export class Client {
if (params.connectionClosedListener) {
params.connectionClosedListener(false)
}
await this.closeConsumer(consumerId)
await this.closeConsumer(consumer.extendedId)
}
})
this.consumers.set(consumerId, { connection, consumer, params })
this.consumers.set(consumer.extendedId, { connection, consumer, params })
await this.declareConsumerOnConnection(params, consumerId, connection)
this.logger.info(
`New consumer created with stream name ${params.stream}, consumer id ${consumerId} and offset ${params.offset.type}`
)
return consumer
}

public async closeConsumer(consumerId: number) {
const { consumer, connection } = this.consumers.get(consumerId) ?? { consumer: undefined, connection: undefined }
public async closeConsumer(extendedConsumerId: string) {
const { consumer, connection } = this.consumers.get(extendedConsumerId) ?? {
consumer: undefined,
connection: undefined,
}
const consumerId = extractConsumerId(extendedConsumerId)

if (!consumer) {
this.logger.error("Consumer does not exist")
throw new Error(`Consumer with id: ${consumerId} does not exist`)
throw new Error(`Consumer with id: ${extendedConsumerId} does not exist`)
}
const res = await connection.sendAndWait<UnsubscribeResponse>(new UnsubscribeRequest(consumerId))
await consumer.close(true)
this.consumers.delete(consumerId)
this.consumers.delete(extendedConsumerId)
if (!res.ok) {
throw new Error(`Unsubscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
this.logger.info(`Closed consumer with id: ${consumerId}`)
this.logger.info(`Closed consumer with id: ${extendedConsumerId}`)
return res.ok
}

Expand Down Expand Up @@ -248,12 +253,12 @@ export class Client {

private async closeAllConsumers(manuallyClose: boolean) {
await Promise.all([...this.consumers.values()].map(({ consumer }) => consumer.close(manuallyClose)))
this.consumers = new Map<number, ConsumerMappedValue>()
this.consumers = new Map<string, ConsumerMappedValue>()
}

private async closeAllPublishers(manuallyClose: boolean) {
await Promise.all([...this.publishers.values()].map((c) => c.publisher.close(manuallyClose)))
this.publishers = new Map<number, PublisherMappedValue>()
this.publishers = new Map<string, PublisherMappedValue>()
}

public consumerCounts() {
Expand Down Expand Up @@ -366,9 +371,9 @@ export class Client {
}

public async restart() {
this.logger.info(`Restarting client connection ${this.connection.id}`)
this.logger.info(`Restarting client connection ${this.connection.connectionId}`)
const uniqueConnectionIds = new Set<string>()
uniqueConnectionIds.add(this.connection.id)
uniqueConnectionIds.add(this.connection.connectionId)

await new Promise(async (res) => {
setTimeout(() => {
Expand All @@ -378,21 +383,21 @@ export class Client {
await this.connection.restart()

for (const { consumer, connection, params } of this.consumers.values()) {
if (!uniqueConnectionIds.has(connection.id)) {
this.logger.info(`Restarting consumer connection ${connection.id}`)
if (!uniqueConnectionIds.has(connection.connectionId)) {
this.logger.info(`Restarting consumer connection ${connection.connectionId}`)
await connection.restart()
}
uniqueConnectionIds.add(connection.id)
uniqueConnectionIds.add(connection.connectionId)
const consumerParams = { ...params, offset: consumer.localOffset }
await this.declareConsumerOnConnection(consumerParams, consumer.consumerId, connection)
}

for (const { publisher, connection, params, filter } of this.publishers.values()) {
if (!uniqueConnectionIds.has(connection.id)) {
this.logger.info(`Restarting publisher connection ${connection.id}`)
if (!uniqueConnectionIds.has(connection.connectionId)) {
this.logger.info(`Restarting publisher connection ${connection.connectionId}`)
await connection.restart()
}
uniqueConnectionIds.add(connection.id)
uniqueConnectionIds.add(connection.connectionId)
await this.declarePublisherOnConnection(params, publisher.publisherId, connection, filter)
}
}
Expand Down Expand Up @@ -466,7 +471,7 @@ export class Client {
)

if (!res.ok) {
this.consumers.delete(consumerId)
this.consumers.delete(computeExtendedConsumerId(consumerId, connection.connectionId))
throw new Error(`Declare Consumer command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
}
Expand All @@ -475,21 +480,11 @@ export class Client {
return connection.send(new CreditRequest({ ...params }))
}

private incPublisherId() {
const publisherId = this.publisherId
this.publisherId++
return publisherId
}

private incConsumerId() {
const consumerId = this.consumerId
this.consumerId++
return consumerId
}

private getDeliverV1Callback() {
private getDeliverV1Callback(connectionId: string) {
return async (response: DeliverResponse) => {
const { consumer, connection } = this.consumers.get(response.subscriptionId) ?? {
const { consumer, connection } = this.consumers.get(
computeExtendedConsumerId(response.subscriptionId, connectionId)
) ?? {
consumer: undefined,
connection: undefined,
}
Expand All @@ -504,9 +499,11 @@ export class Client {
}
}

private getDeliverV2Callback() {
private getDeliverV2Callback(connectionId: string) {
return async (response: DeliverResponseV2) => {
const { consumer, connection } = this.consumers.get(response.subscriptionId) ?? {
const { consumer, connection } = this.consumers.get(
computeExtendedConsumerId(response.subscriptionId, connectionId)
) ?? {
consumer: undefined,
connection: undefined,
}
Expand All @@ -525,9 +522,11 @@ export class Client {
}
}

private getConsumerUpdateCallback() {
private getConsumerUpdateCallback(connectionId: string) {
return async (response: ConsumerUpdateQuery) => {
const { consumer, connection } = this.consumers.get(response.subscriptionId) ?? {
const { consumer, connection } = this.consumers.get(
computeExtendedConsumerId(response.subscriptionId, connectionId)
) ?? {
consumer: undefined,
connection: undefined,
}
Expand All @@ -549,26 +548,26 @@ export class Client {

private async getConnection(
streamName: string,
leader: boolean,
purpose: ConnectionPurpose,
connectionClosedListener?: ConnectionClosedListener
): Promise<Connection> {
const [metadata] = await this.queryMetadata({ streams: [streamName] })
const chosenNode = chooseNode(metadata, leader)
const chosenNode = chooseNode(metadata, purpose === "publisher")
if (!chosenNode) {
throw new Error(`Stream was not found on any node`)
}
const cachedConnection = ConnectionPool.getUsableCachedConnection(leader, streamName, chosenNode.host)
const cachedConnection = ConnectionPool.getUsableCachedConnection(purpose, streamName, chosenNode.host)
if (cachedConnection) return cachedConnection

const newConnection = await this.getConnectionOnChosenNode(
leader,
purpose,
streamName,
chosenNode,
metadata,
connectionClosedListener
)

ConnectionPool.cacheConnection(leader, streamName, newConnection.hostname, newConnection)
ConnectionPool.cacheConnection(purpose, streamName, newConnection.hostname, newConnection)
return newConnection
}

Expand All @@ -593,25 +592,32 @@ export class Client {
leader: boolean,
streamName: string,
connectionClosedListener?: ConnectionClosedListener
) {
): ConnectionParams {
const connectionId = randomUUID()
const connectionListeners = {
...this.params.listeners,
connection_closed: connectionClosedListener,
deliverV1: this.getDeliverV1Callback(),
deliverV2: this.getDeliverV2Callback(),
consumer_update_query: this.getConsumerUpdateCallback(),
deliverV1: this.getDeliverV1Callback(connectionId),
deliverV2: this.getDeliverV2Callback(connectionId),
consumer_update_query: this.getConsumerUpdateCallback(connectionId),
}
return {
...this.params,
listeners: connectionListeners,
leader: leader,
streamName: streamName,
connectionId,
}
return { ...this.params, listeners: connectionListeners, leader: leader, streamName: streamName }
}

private async getConnectionOnChosenNode(
leader: boolean,
purpose: ConnectionPurpose,
streamName: string,
chosenNode: { host: string; port: number },
metadata: StreamMetadata,
connectionClosedListener?: ConnectionClosedListener
): Promise<Connection> {
const connectionParams = this.buildConnectionParams(leader, streamName, connectionClosedListener)
const connectionParams = this.buildConnectionParams(purpose === "publisher", streamName, connectionClosedListener)
if (this.params.addressResolver && this.params.addressResolver.enabled) {
const maxAttempts = computeMaxAttempts(metadata)
const resolver = this.params.addressResolver
Expand Down Expand Up @@ -756,3 +762,11 @@ const chooseNode = (metadata: { leader?: Broker; replicas?: Broker[] }, leader:
const computeMaxAttempts = (metadata: { leader?: Broker; replicas?: Broker[] }): number => {
return Math.pow(2 + (metadata.leader ? 1 : 0) + (metadata.replicas?.length ?? 0), 2)
}

const extractConsumerId = (extendedConsumerId: string) => {
return parseInt(extendedConsumerId.split("@").shift() ?? "0")
}

const extractPublisherId = (extendedPublisherId: string) => {
return parseInt(extendedPublisherId.split("@").shift() ?? "0")
}
Loading