diff --git a/controlplane/src/bin/deactivate-org.ts b/controlplane/src/bin/deactivate-org.ts index 229ac3d1df..08bceafe65 100644 --- a/controlplane/src/bin/deactivate-org.ts +++ b/controlplane/src/bin/deactivate-org.ts @@ -1,6 +1,6 @@ import process from 'node:process'; import { pino } from 'pino'; -import { DeactivateOrganizationQueue } from '../core/workers/DeactivateOrganizationWorker.js'; +import { DeactivateOrganizationQueue } from '../core/workers/DeactivateOrganization.js'; import { createRedisConnections } from '../core/plugins/redis.js'; import { getConfig } from './get-config.js'; diff --git a/controlplane/src/bin/delete-user.ts b/controlplane/src/bin/delete-user.ts index 292a51772e..178c848d8e 100644 --- a/controlplane/src/bin/delete-user.ts +++ b/controlplane/src/bin/delete-user.ts @@ -1,7 +1,7 @@ import process from 'node:process'; import { pino } from 'pino'; import { createRedisConnections } from '../core/plugins/redis.js'; -import { DeleteUserQueue } from '../core/workers/DeleteUserQueue.js'; +import { DeleteUserQueue } from '../core/workers/DeleteUser.js'; import { getConfig } from './get-config.js'; const { redis } = getConfig(); diff --git a/controlplane/src/bin/reactivate-org.ts b/controlplane/src/bin/reactivate-org.ts index 787354fc31..cc03ebc567 100644 --- a/controlplane/src/bin/reactivate-org.ts +++ b/controlplane/src/bin/reactivate-org.ts @@ -1,7 +1,7 @@ import process from 'node:process'; import { pino } from 'pino'; import { createRedisConnections } from '../core/plugins/redis.js'; -import { ReactivateOrganizationQueue } from '../core/workers/ReactivateOrganizationWorker.js'; +import { ReactivateOrganizationQueue } from '../core/workers/ReactivateOrganization.js'; import { getConfig } from './get-config.js'; const { organizationSlug, redis } = getConfig(); diff --git a/controlplane/src/core/build-server.ts b/controlplane/src/core/build-server.ts index 4ecb4e8173..53bdc014fe 100644 --- a/controlplane/src/core/build-server.ts +++ b/controlplane/src/core/build-server.ts @@ -10,7 +10,7 @@ import { App } from 'octokit'; import { Worker } from 'bullmq'; import routes from './routes.js'; import fastifyHealth from './plugins/health.js'; -import fastifyMetrics, { MetricsPluginOptions } from './plugins/metrics.js'; +import fastifyMetrics from './plugins/metrics.js'; import fastifyDatabase from './plugins/database.js'; import fastifyClickHouse from './plugins/clickhouse.js'; import fastifyRedis from './plugins/redis.js'; @@ -36,23 +36,18 @@ import { Authorization } from './services/Authorization.js'; import { BillingRepository } from './repositories/BillingRepository.js'; import { BillingService } from './services/BillingService.js'; import { UserRepository } from './repositories/UserRepository.js'; -import { AIGraphReadmeQueue, createAIGraphReadmeWorker } from './workers/AIGraphReadmeWorker.js'; +import { AIGraphReadmeQueue, AIGraphReadmeWorker } from './workers/AIGraphReadme.js'; import { fastifyLoggerId, createS3ClientConfig, extractS3BucketName, isGoogleCloudStorageUrl } from './util.js'; + import { ApiKeyRepository } from './repositories/ApiKeyRepository.js'; -import { createDeleteOrganizationWorker, DeleteOrganizationQueue } from './workers/DeleteOrganizationWorker.js'; +import { DeleteOrganizationWorker, DeleteOrganizationQueue } from './workers/DeleteOrganization.js'; import { - createDeleteOrganizationAuditLogsWorker, + DeleteOrganizationAuditLogsWorker, DeleteOrganizationAuditLogsQueue, -} from './workers/DeleteOrganizationAuditLogsWorker.js'; -import { - createDeactivateOrganizationWorker, - DeactivateOrganizationQueue, -} from './workers/DeactivateOrganizationWorker.js'; -import { createDeleteUserWorker, DeleteUserQueue } from './workers/DeleteUserQueue.js'; -import { - createReactivateOrganizationWorker, - ReactivateOrganizationQueue, -} from './workers/ReactivateOrganizationWorker.js'; +} from './workers/DeleteOrganizationAuditLogs.js'; +import { DeactivateOrganizationWorker, DeactivateOrganizationQueue } from './workers/DeactivateOrganization.js'; +import { DeleteUserWorker, DeleteUserQueue } from './workers/DeleteUser.js'; +import { ReactivateOrganizationWorker, ReactivateOrganizationQueue } from './workers/ReactivateOrganization.js'; export interface BuildConfig { logger: LoggerOptions; @@ -326,73 +321,68 @@ export default async function build(opts: BuildConfig) { const readmeQueue = new AIGraphReadmeQueue(logger, fastify.redisForQueue); if (opts.openaiAPIKey) { - bullWorkers.push( - createAIGraphReadmeWorker({ - redisConnection: fastify.redisForWorker, - db: fastify.db, - logger, - openAiApiKey: opts.openaiAPIKey, - }), - ); - } - - const deleteOrganizationAuditLogsQueue = new DeleteOrganizationAuditLogsQueue(logger, fastify.redisForQueue); - bullWorkers.push( - createDeleteOrganizationAuditLogsWorker({ + const readmeWorker = new AIGraphReadmeWorker({ redisConnection: fastify.redisForWorker, db: fastify.db, logger, - }), - ); + openAiApiKey: opts.openaiAPIKey, + }); + + bullWorkers.push(readmeWorker.create()); + } + + const deleteOrganizationAuditLogsQueue = new DeleteOrganizationAuditLogsQueue(logger, fastify.redisForQueue); + const deleteOrganizationAuditLogsWorker = new DeleteOrganizationAuditLogsWorker({ + redisConnection: fastify.redisForWorker, + db: fastify.db, + logger, + }); + bullWorkers.push(deleteOrganizationAuditLogsWorker.create()); const deleteOrganizationQueue = new DeleteOrganizationQueue(logger, fastify.redisForQueue); - bullWorkers.push( - createDeleteOrganizationWorker({ - redisConnection: fastify.redisForWorker, - db: fastify.db, - logger, - keycloakClient, - keycloakRealm: opts.keycloak.realm, - blobStorage, - deleteOrganizationAuditLogsQueue, - }), - ); + const deleteOrganizationWorker = new DeleteOrganizationWorker({ + redisConnection: fastify.redisForWorker, + db: fastify.db, + logger, + keycloakClient, + keycloakRealm: opts.keycloak.realm, + blobStorage, + deleteOrganizationAuditLogsQueue, + }); + bullWorkers.push(deleteOrganizationWorker.create()); const deactivateOrganizationQueue = new DeactivateOrganizationQueue(logger, fastify.redisForQueue); - bullWorkers.push( - createDeactivateOrganizationWorker({ - redisConnection: fastify.redisForWorker, - db: fastify.db, - logger, - keycloakClient, - keycloakRealm: opts.keycloak.realm, - deleteOrganizationQueue, - }), - ); + const deactivateOrganizationWorker = new DeactivateOrganizationWorker({ + redisConnection: fastify.redisForWorker, + db: fastify.db, + logger, + keycloakClient, + keycloakRealm: opts.keycloak.realm, + deleteOrganizationQueue, + }); + bullWorkers.push(deactivateOrganizationWorker.create()); const reactivateOrganizationQueue = new ReactivateOrganizationQueue(logger, fastify.redisForQueue); - bullWorkers.push( - createReactivateOrganizationWorker({ - redisConnection: fastify.redisForWorker, - db: fastify.db, - logger, - deleteOrganizationQueue, - }), - ); + const reactivateOrganizationWorker = new ReactivateOrganizationWorker({ + redisConnection: fastify.redisForWorker, + db: fastify.db, + logger, + deleteOrganizationQueue, + }); + bullWorkers.push(reactivateOrganizationWorker.create()); const deleteUserQueue = new DeleteUserQueue(logger, fastify.redisForQueue); - bullWorkers.push( - createDeleteUserWorker({ - redisConnection: fastify.redisForWorker, - db: fastify.db, - logger, - keycloakClient, - keycloakRealm: opts.keycloak.realm, - blobStorage, - platformWebhooks, - deleteOrganizationAuditLogsQueue, - }), - ); + const deleteUserWorker = new DeleteUserWorker({ + redisConnection: fastify.redisForWorker, + db: fastify.db, + logger, + keycloakClient, + keycloakRealm: opts.keycloak.realm, + blobStorage, + platformWebhooks, + deleteOrganizationAuditLogsQueue, + }); + bullWorkers.push(deleteUserWorker.create()); // required to verify webhook payloads await fastify.register(import('fastify-raw-body'), { diff --git a/controlplane/src/core/repositories/OrganizationRepository.ts b/controlplane/src/core/repositories/OrganizationRepository.ts index 0327f194c3..ba2ab4b541 100644 --- a/controlplane/src/core/repositories/OrganizationRepository.ts +++ b/controlplane/src/core/repositories/OrganizationRepository.ts @@ -34,10 +34,10 @@ import { WebhooksConfigDTO, } from '../../types/index.js'; import Keycloak from '../services/Keycloak.js'; -import { DeleteOrganizationQueue } from '../workers/DeleteOrganizationWorker.js'; +import { DeleteOrganizationQueue } from '../workers/DeleteOrganization.js'; import { BlobStorage } from '../blobstorage/index.js'; import { delayForManualOrgDeletionInDays, delayForOrgAuditLogsDeletionInDays } from '../constants.js'; -import { DeleteOrganizationAuditLogsQueue } from '../workers/DeleteOrganizationAuditLogsWorker.js'; +import { DeleteOrganizationAuditLogsQueue } from '../workers/DeleteOrganizationAuditLogs.js'; import { RBACEvaluator } from '../services/RBACEvaluator.js'; import { BillingRepository } from './BillingRepository.js'; import { FederatedGraphRepository } from './FederatedGraphRepository.js'; diff --git a/controlplane/src/core/repositories/UserRepository.ts b/controlplane/src/core/repositories/UserRepository.ts index 1c8bbae8ea..277d1fb80b 100644 --- a/controlplane/src/core/repositories/UserRepository.ts +++ b/controlplane/src/core/repositories/UserRepository.ts @@ -7,7 +7,7 @@ import { UserDTO } from '../../types/index.js'; import { BlobStorage } from '../blobstorage/index.js'; import Keycloak from '../services/Keycloak.js'; import OidcProvider from '../services/OidcProvider.js'; -import { DeleteOrganizationAuditLogsQueue } from '../workers/DeleteOrganizationAuditLogsWorker.js'; +import { DeleteOrganizationAuditLogsQueue } from '../workers/DeleteOrganizationAuditLogs.js'; import { BillingRepository } from './BillingRepository.js'; import { OidcRepository } from './OidcRepository.js'; import { OrganizationRepository } from './OrganizationRepository.js'; diff --git a/controlplane/src/core/routes.ts b/controlplane/src/core/routes.ts index d08fe04f42..f244a64a57 100644 --- a/controlplane/src/core/routes.ts +++ b/controlplane/src/core/routes.ts @@ -15,12 +15,12 @@ import { IPlatformWebhookService } from './webhooks/PlatformWebhookService.js'; import { BlobStorage } from './blobstorage/index.js'; import Mailer from './services/Mailer.js'; import { Authorization } from './services/Authorization.js'; -import { AIGraphReadmeQueue } from './workers/AIGraphReadmeWorker.js'; -import { DeleteOrganizationQueue } from './workers/DeleteOrganizationWorker.js'; -import { DeactivateOrganizationQueue } from './workers/DeactivateOrganizationWorker.js'; -import { DeleteUserQueue } from './workers/DeleteUserQueue.js'; -import { ReactivateOrganizationQueue } from './workers/ReactivateOrganizationWorker.js'; -import { DeleteOrganizationAuditLogsQueue } from './workers/DeleteOrganizationAuditLogsWorker.js'; +import { AIGraphReadmeQueue } from './workers/AIGraphReadme.js'; +import { DeleteOrganizationQueue } from './workers/DeleteOrganization.js'; +import { DeactivateOrganizationQueue } from './workers/DeactivateOrganization.js'; +import { DeleteUserQueue } from './workers/DeleteUser.js'; +import { ReactivateOrganizationQueue } from './workers/ReactivateOrganization.js'; +import { DeleteOrganizationAuditLogsQueue } from './workers/DeleteOrganizationAuditLogs.js'; export interface RouterOptions { db: PostgresJsDatabase; diff --git a/controlplane/src/core/workers/AIGraphReadmeWorker.ts b/controlplane/src/core/workers/AIGraphReadme.ts similarity index 70% rename from controlplane/src/core/workers/AIGraphReadmeWorker.ts rename to controlplane/src/core/workers/AIGraphReadme.ts index 05cc28c93c..7b4a85a910 100644 --- a/controlplane/src/core/workers/AIGraphReadmeWorker.ts +++ b/controlplane/src/core/workers/AIGraphReadme.ts @@ -1,11 +1,10 @@ -import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; -import { ConnectionOptions, Job, Worker, Queue } from 'bullmq'; +import { ConnectionOptions, Job } from 'bullmq'; import pino from 'pino'; -import * as schema from '../../db/schema.js'; import { OpenAIGraphql } from '../openai-graphql/index.js'; import { SubgraphRepository } from '../repositories/SubgraphRepository.js'; import { FederatedGraphRepository } from '../repositories/FederatedGraphRepository.js'; -import { IQueue, IWorker } from './Worker.js'; +import { DB } from '../../db/index.js'; +import { BaseQueue, BaseWorker } from './base/index.js'; const QueueName = 'ai.graph-readme-generator'; const WorkerName = 'AIGraphReadmeWorker'; @@ -16,15 +15,13 @@ export interface CreateReadmeInputEvent { type: 'subgraph' | 'federated_graph'; } -export class AIGraphReadmeQueue implements IQueue { - private readonly queue: Queue; - private readonly logger: pino.Logger; - +export class AIGraphReadmeQueue extends BaseQueue { constructor(log: pino.Logger, conn: ConnectionOptions) { - this.logger = log.child({ queue: QueueName }); - this.queue = new Queue(QueueName, { - connection: conn, - defaultJobOptions: { + super({ + name: QueueName, + conn, + log, + jobsOptions: { removeOnComplete: true, removeOnFail: true, attempts: 3, @@ -34,10 +31,6 @@ export class AIGraphReadmeQueue implements IQueue { }, }, }); - - this.queue.on('error', (err) => { - this.logger.error(err, 'Queue error'); - }); } public addJob(job: CreateReadmeInputEvent) { @@ -61,21 +54,22 @@ export class AIGraphReadmeQueue implements IQueue { } } -class AIGraphReadmeWorker implements IWorker { +export class AIGraphReadmeWorker extends BaseWorker { private readonly openaiGraphql: OpenAIGraphql; constructor( private input: { redisConnection: ConnectionOptions; - db: PostgresJsDatabase; + db: DB; logger: pino.Logger; openAiApiKey: string; }, ) { + super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger); + this.openaiGraphql = new OpenAIGraphql({ openAiApiKey: input.openAiApiKey, }); - this.input.logger = input.logger.child({ worker: WorkerName }); } private async generateSubgraphReadme(job: Job) { @@ -122,7 +116,7 @@ class AIGraphReadmeWorker implements IWorker { }); } - public async handler(job: Job) { + protected async handler(job: Job) { try { if (job.data.type === 'subgraph') { await this.generateSubgraphReadme(job); @@ -140,23 +134,3 @@ class AIGraphReadmeWorker implements IWorker { } } } - -export const createAIGraphReadmeWorker = (input: { - redisConnection: ConnectionOptions; - db: PostgresJsDatabase; - logger: pino.Logger; - openAiApiKey: string; -}) => { - const log = input.logger.child({ worker: WorkerName }); - const worker = new Worker(QueueName, (job) => new AIGraphReadmeWorker(input).handler(job), { - connection: input.redisConnection, - concurrency: 10, - }); - worker.on('stalled', (job) => { - log.warn(`Job ${job} stalled`); - }); - worker.on('error', (err) => { - log.error(err, 'Worker error'); - }); - return worker; -}; diff --git a/controlplane/src/core/workers/CacheWarmerWorker.ts b/controlplane/src/core/workers/CacheWarmer.ts similarity index 61% rename from controlplane/src/core/workers/CacheWarmerWorker.ts rename to controlplane/src/core/workers/CacheWarmer.ts index cb170f77e0..9dc1bab1ae 100644 --- a/controlplane/src/core/workers/CacheWarmerWorker.ts +++ b/controlplane/src/core/workers/CacheWarmer.ts @@ -1,12 +1,11 @@ -import { ConnectionOptions, Job, JobsOptions, Queue, Worker } from 'bullmq'; -import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; +import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; import pino from 'pino'; -import * as schema from '../../db/schema.js'; import { BlobStorage } from '../blobstorage/index.js'; import { ClickHouseClient } from '../clickhouse/index.js'; import { S3RouterConfigMetadata } from '../composition/composer.js'; import { CacheWarmerRepository } from '../repositories/CacheWarmerRepository.js'; -import { IQueue, IWorker } from './Worker.js'; +import { DB } from '../../db/index.js'; +import { BaseQueue, BaseWorker } from './base/index.js'; const QueueName = 'cache.warmer'; const WorkerName = 'CacheWarmerWorker'; @@ -17,31 +16,12 @@ export interface CacheWarmerInput { rangeInHours: number; } -export class CacheWarmerQueue implements IQueue { - private readonly queue: Queue; - private readonly logger: pino.Logger; - +export class CacheWarmerQueue extends BaseQueue { constructor(log: pino.Logger, conn: ConnectionOptions) { - this.logger = log.child({ queue: QueueName }); - this.queue = new Queue(QueueName, { - connection: conn, - defaultJobOptions: { - removeOnComplete: { - age: 90 * 86_400, - }, - removeOnFail: { - age: 90 * 86_400, - }, - attempts: 6, - backoff: { - type: 'exponential', - delay: 112_000, - }, - }, - }); - - this.queue.on('error', (err) => { - this.logger.error(err, 'Queue error'); + super({ + name: QueueName, + conn, + log, }); } @@ -65,20 +45,21 @@ export class CacheWarmerQueue implements IQueue { } } -class CacheWarmerWorker implements IWorker { +export class CacheWarmerWorker extends BaseWorker { constructor( private input: { - db: PostgresJsDatabase; + redisConnection: ConnectionOptions; + db: DB; + logger: pino.Logger; chClient: ClickHouseClient | undefined; blobStorage: BlobStorage; - logger: pino.Logger; cacheWarmerQueue: CacheWarmerQueue; }, ) { - this.input.logger = input.logger.child({ worker: WorkerName }); + super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger); } - public async handler(job: Job) { + protected async handler(job: Job) { const organizationId = job.data.organizationId; const federatedGraphId = job.data.federatedGraphId; const rangeInHours = job.data.rangeInHours; @@ -116,25 +97,3 @@ class CacheWarmerWorker implements IWorker { } } } - -export const createCacheWarmerWorker = (input: { - redisConnection: ConnectionOptions; - db: PostgresJsDatabase; - logger: pino.Logger; - chClient: ClickHouseClient | undefined; - blobStorage: BlobStorage; - cacheWarmerQueue: CacheWarmerQueue; -}) => { - const log = input.logger.child({ worker: WorkerName }); - const worker = new Worker(QueueName, (job) => new CacheWarmerWorker(input).handler(job), { - connection: input.redisConnection, - concurrency: 10, - }); - worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); - }); - worker.on('error', (err) => { - log.error(err, 'Worker error'); - }); - return worker; -}; diff --git a/controlplane/src/core/workers/DeactivateOrganizationWorker.ts b/controlplane/src/core/workers/DeactivateOrganization.ts similarity index 52% rename from controlplane/src/core/workers/DeactivateOrganizationWorker.ts rename to controlplane/src/core/workers/DeactivateOrganization.ts index d1ee93cb3e..442f29cf8a 100644 --- a/controlplane/src/core/workers/DeactivateOrganizationWorker.ts +++ b/controlplane/src/core/workers/DeactivateOrganization.ts @@ -1,11 +1,10 @@ -import { ConnectionOptions, Job, JobsOptions, Queue, Worker } from 'bullmq'; -import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; +import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; import pino from 'pino'; -import * as schema from '../../db/schema.js'; import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; import Keycloak from '../services/Keycloak.js'; -import { DeleteOrganizationQueue } from './DeleteOrganizationWorker.js'; -import { IQueue, IWorker } from './Worker.js'; +import { DB } from '../../db/index.js'; +import { DeleteOrganizationQueue } from './DeleteOrganization.js'; +import { BaseQueue, BaseWorker } from './base/index.js'; const QueueName = 'organization.deactivate'; const WorkerName = 'DeactivateOrganizationWorker'; @@ -16,32 +15,9 @@ export interface DeactivateOrganizationInput { deactivationReason?: string; } -export class DeactivateOrganizationQueue implements IQueue { - private readonly queue: Queue; - private readonly logger: pino.Logger; - +export class DeactivateOrganizationQueue extends BaseQueue { constructor(log: pino.Logger, conn: ConnectionOptions) { - this.logger = log.child({ queue: QueueName }); - this.queue = new Queue(QueueName, { - connection: conn, - defaultJobOptions: { - removeOnComplete: { - age: 90 * 86_400, - }, - removeOnFail: { - age: 90 * 86_400, - }, - attempts: 6, - backoff: { - type: 'exponential', - delay: 112_000, - }, - }, - }); - - this.queue.on('error', (err) => { - this.logger.error(err, 'Queue error'); - }); + super({ name: QueueName, conn, log }); } public addJob(job: DeactivateOrganizationInput, opts?: Omit) { @@ -61,20 +37,21 @@ export class DeactivateOrganizationQueue implements IQueue { constructor( private input: { - db: PostgresJsDatabase; + redisConnection: ConnectionOptions; + db: DB; logger: pino.Logger; keycloakClient: Keycloak; keycloakRealm: string; deleteOrganizationQueue: DeleteOrganizationQueue; }, ) { - this.input.logger = input.logger.child({ worker: WorkerName }); + super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger); } - public async handler(job: Job) { + protected async handler(job: Job) { try { this.input.logger.info('Processing deactivate job'); const orgRepo = new OrganizationRepository(this.input.logger, this.input.db); @@ -106,29 +83,3 @@ class DeactivateOrganizationWorker implements IWorker { } } } - -export const createDeactivateOrganizationWorker = (input: { - redisConnection: ConnectionOptions; - db: PostgresJsDatabase; - logger: pino.Logger; - keycloakClient: Keycloak; - keycloakRealm: string; - deleteOrganizationQueue: DeleteOrganizationQueue; -}) => { - const log = input.logger.child({ worker: WorkerName }); - const worker = new Worker( - QueueName, - (job) => new DeactivateOrganizationWorker(input).handler(job), - { - connection: input.redisConnection, - concurrency: 10, - }, - ); - worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); - }); - worker.on('error', (err) => { - log.error(err, 'Worker error'); - }); - return worker; -}; diff --git a/controlplane/src/core/workers/DeleteOrganizationWorker.ts b/controlplane/src/core/workers/DeleteOrganization.ts similarity index 61% rename from controlplane/src/core/workers/DeleteOrganizationWorker.ts rename to controlplane/src/core/workers/DeleteOrganization.ts index 2846b1a0d0..85aba824d2 100644 --- a/controlplane/src/core/workers/DeleteOrganizationWorker.ts +++ b/controlplane/src/core/workers/DeleteOrganization.ts @@ -1,14 +1,13 @@ -import { ConnectionOptions, Job, JobsOptions, Queue, Worker } from 'bullmq'; -import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; +import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; import pino from 'pino'; -import * as schema from '../../db/schema.js'; import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; import Keycloak from '../services/Keycloak.js'; import { OidcRepository } from '../repositories/OidcRepository.js'; import OidcProvider from '../services/OidcProvider.js'; import { BlobStorage } from '../blobstorage/index.js'; -import { IQueue, IWorker } from './Worker.js'; -import { DeleteOrganizationAuditLogsQueue } from './DeleteOrganizationAuditLogsWorker.js'; +import { DB } from '../../db/index.js'; +import { DeleteOrganizationAuditLogsQueue } from './DeleteOrganizationAuditLogs.js'; +import { BaseQueue, BaseWorker } from './base/index.js'; const QueueName = 'organization.delete'; const WorkerName = 'DeleteOrganizationWorker'; @@ -17,32 +16,9 @@ export interface DeleteOrganizationInput { organizationId: string; } -export class DeleteOrganizationQueue implements IQueue { - private readonly queue: Queue; - private readonly logger: pino.Logger; - +export class DeleteOrganizationQueue extends BaseQueue { constructor(log: pino.Logger, conn: ConnectionOptions) { - this.logger = log.child({ queue: QueueName }); - this.queue = new Queue(QueueName, { - connection: conn, - defaultJobOptions: { - removeOnComplete: { - age: 90 * 86_400, - }, - removeOnFail: { - age: 90 * 86_400, - }, - attempts: 6, - backoff: { - type: 'exponential', - delay: 112_000, - }, - }, - }); - - this.queue.on('error', (err) => { - this.logger.error(err, 'Queue error'); - }); + super({ name: QueueName, log, conn }); } public addJob(job: DeleteOrganizationInput, opts?: Omit) { @@ -61,11 +37,11 @@ export class DeleteOrganizationQueue implements IQueue } } -class DeleteOrganizationWorker implements IWorker { +export class DeleteOrganizationWorker extends BaseWorker { constructor( private input: { redisConnection: ConnectionOptions; - db: PostgresJsDatabase; + db: DB; logger: pino.Logger; keycloakClient: Keycloak; keycloakRealm: string; @@ -73,10 +49,10 @@ class DeleteOrganizationWorker implements IWorker { deleteOrganizationAuditLogsQueue: DeleteOrganizationAuditLogsQueue; }, ) { - this.input.logger = input.logger.child({ worker: WorkerName }); + super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger); } - public async handler(job: Job) { + protected async handler(job: Job) { try { const orgRepo = new OrganizationRepository(this.input.logger, this.input.db); const oidcRepo = new OidcRepository(this.input.db); @@ -131,30 +107,3 @@ class DeleteOrganizationWorker implements IWorker { } } } - -export const createDeleteOrganizationWorker = (input: { - redisConnection: ConnectionOptions; - db: PostgresJsDatabase; - logger: pino.Logger; - keycloakClient: Keycloak; - keycloakRealm: string; - blobStorage: BlobStorage; - deleteOrganizationAuditLogsQueue: DeleteOrganizationAuditLogsQueue; -}) => { - const log = input.logger.child({ worker: WorkerName }); - const worker = new Worker( - QueueName, - (job) => new DeleteOrganizationWorker(input).handler(job), - { - connection: input.redisConnection, - concurrency: 10, - }, - ); - worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); - }); - worker.on('error', (err) => { - log.error(err, 'Worker error'); - }); - return worker; -}; diff --git a/controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts b/controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts new file mode 100644 index 0000000000..44869a08a3 --- /dev/null +++ b/controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts @@ -0,0 +1,61 @@ +import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; +import pino from 'pino'; +import { AuditLogRepository } from '../repositories/AuditLogRepository.js'; +import { DB } from '../../db/index.js'; +import { BaseQueue, BaseWorker } from './base/index.js'; + +const QueueName = 'organization.delete_audit_logs'; +const WorkerName = 'DeleteOrganizationAuditLogsWorker'; + +export interface DeleteOrganizationAuditLogsInput { + organizationId: string; +} + +export class DeleteOrganizationAuditLogsQueue extends BaseQueue { + constructor(log: pino.Logger, conn: ConnectionOptions) { + super({ name: QueueName, log, conn }); + } + + public addJob(job: DeleteOrganizationAuditLogsInput, opts?: Omit) { + return this.queue.add(job.organizationId, job, { + ...opts, + jobId: job.organizationId, + }); + } + + public removeJob(job: DeleteOrganizationAuditLogsInput) { + return this.queue.remove(job.organizationId); + } + + public getJob(job: DeleteOrganizationAuditLogsInput) { + return this.queue.getJob(job.organizationId); + } +} + +export class DeleteOrganizationAuditLogsWorker extends BaseWorker { + constructor( + private input: { + redisConnection: ConnectionOptions; + db: DB; + logger: pino.Logger; + }, + ) { + super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger); + } + + protected async handler(job: Job) { + try { + const auditLogRepo = new AuditLogRepository(this.input.db); + + await auditLogRepo.deleteOrganizationLogs({ + organizationId: job.data.organizationId, + }); + } catch (err) { + this.input.logger.error( + { jobId: job.id, organizationId: job.data.organizationId, err }, + 'Failed to delete audit logs for organization', + ); + throw err; + } + } +} diff --git a/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts b/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts deleted file mode 100644 index 77465438c4..0000000000 --- a/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts +++ /dev/null @@ -1,109 +0,0 @@ -import { ConnectionOptions, Job, JobsOptions, Queue, Worker } from 'bullmq'; -import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; -import pino from 'pino'; -import * as schema from '../../db/schema.js'; -import { AuditLogRepository } from '../repositories/AuditLogRepository.js'; -import { IQueue, IWorker } from './Worker.js'; - -const QueueName = 'organization.delete_audit_logs'; -const WorkerName = 'DeleteOrganizationAuditLogsWorker'; - -export interface DeleteOrganizationAuditLogsInput { - organizationId: string; -} - -export class DeleteOrganizationAuditLogsQueue implements IQueue { - private readonly queue: Queue; - private readonly logger: pino.Logger; - - constructor(log: pino.Logger, conn: ConnectionOptions) { - this.logger = log.child({ queue: QueueName }); - this.queue = new Queue(QueueName, { - connection: conn, - defaultJobOptions: { - removeOnComplete: { - age: 90 * 86_400, - }, - removeOnFail: { - age: 90 * 86_400, - }, - attempts: 6, - backoff: { - type: 'exponential', - delay: 112_000, - }, - }, - }); - - this.queue.on('error', (err) => { - this.logger.error(err, 'Queue error'); - }); - } - - public addJob(job: DeleteOrganizationAuditLogsInput, opts?: Omit) { - return this.queue.add(job.organizationId, job, { - ...opts, - jobId: job.organizationId, - }); - } - - public removeJob(job: DeleteOrganizationAuditLogsInput) { - return this.queue.remove(job.organizationId); - } - - public getJob(job: DeleteOrganizationAuditLogsInput) { - return this.queue.getJob(job.organizationId); - } -} - -class DeleteOrganizationAuditLogsWorker implements IWorker { - constructor( - private input: { - redisConnection: ConnectionOptions; - db: PostgresJsDatabase; - logger: pino.Logger; - }, - ) { - this.input.logger = input.logger.child({ worker: WorkerName }); - } - - public async handler(job: Job) { - try { - const auditLogRepo = new AuditLogRepository(this.input.db); - - await auditLogRepo.deleteOrganizationLogs({ - organizationId: job.data.organizationId, - }); - } catch (err) { - this.input.logger.error( - { jobId: job.id, organizationId: job.data.organizationId, err }, - 'Failed to delete audit logs for organization', - ); - } - } -} - -export const createDeleteOrganizationAuditLogsWorker = (input: { - redisConnection: ConnectionOptions; - db: PostgresJsDatabase; - logger: pino.Logger; -}) => { - const log = input.logger.child({ worker: WorkerName }); - const worker = new Worker( - QueueName, - (job) => new DeleteOrganizationAuditLogsWorker(input).handler(job), - { - connection: input.redisConnection, - concurrency: 10, - }, - ); - - worker.on('stalled', (job) => { - log.warn({ joinId: job }, 'Job stalled'); - }); - worker.on('error', (err) => { - log.error(err, 'Worker error'); - }); - - return worker; -}; diff --git a/controlplane/src/core/workers/DeleteUserQueue.ts b/controlplane/src/core/workers/DeleteUser.ts similarity index 59% rename from controlplane/src/core/workers/DeleteUserQueue.ts rename to controlplane/src/core/workers/DeleteUser.ts index e03e29a35b..8d9ec37165 100644 --- a/controlplane/src/core/workers/DeleteUserQueue.ts +++ b/controlplane/src/core/workers/DeleteUser.ts @@ -1,15 +1,14 @@ import { PlatformEventName } from '@wundergraph/cosmo-connect/dist/notifications/events_pb'; -import { ConnectionOptions, Job, JobsOptions, Queue, Worker } from 'bullmq'; -import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; +import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; import pino from 'pino'; -import * as schema from '../../db/schema.js'; import { BlobStorage } from '../blobstorage/index.js'; import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; import { UserRepository } from '../repositories/UserRepository.js'; import Keycloak from '../services/Keycloak.js'; import { PlatformWebhookService } from '../webhooks/PlatformWebhookService.js'; -import { IQueue, IWorker } from './Worker.js'; -import { DeleteOrganizationAuditLogsQueue } from './DeleteOrganizationAuditLogsWorker.js'; +import { DB } from '../../db/index.js'; +import { BaseQueue, BaseWorker } from './base/index.js'; +import { DeleteOrganizationAuditLogsQueue } from './DeleteOrganizationAuditLogs.js'; const QueueName = 'user.delete'; const WorkerName = 'DeleteUserWorker'; @@ -19,32 +18,9 @@ export interface DeleteUserInput { userEmail: string; } -export class DeleteUserQueue implements IQueue { - private readonly queue: Queue; - private readonly logger: pino.Logger; - +export class DeleteUserQueue extends BaseQueue { constructor(log: pino.Logger, conn: ConnectionOptions) { - this.logger = log.child({ queue: QueueName }); - this.queue = new Queue(QueueName, { - connection: conn, - defaultJobOptions: { - removeOnComplete: { - age: 90 * 86_400, - }, - removeOnFail: { - age: 90 * 86_400, - }, - attempts: 6, - backoff: { - type: 'exponential', - delay: 112_000, - }, - }, - }); - - this.queue.on('error', (err) => { - this.logger.error(err, 'Queue error'); - }); + super({ name: QueueName, log, conn }); } public addJob(job: DeleteUserInput, opts?: Omit) { @@ -63,11 +39,11 @@ export class DeleteUserQueue implements IQueue { } } -class DeleteUserWorker implements IWorker { +export class DeleteUserWorker extends BaseWorker { constructor( private input: { + db: DB; redisConnection: ConnectionOptions; - db: PostgresJsDatabase; logger: pino.Logger; keycloakClient: Keycloak; keycloakRealm: string; @@ -76,10 +52,10 @@ class DeleteUserWorker implements IWorker { deleteOrganizationAuditLogsQueue: DeleteOrganizationAuditLogsQueue; }, ) { - this.input.logger = input.logger.child({ worker: WorkerName }); + super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger); } - public async handler(job: Job) { + protected async handler(job: Job) { try { const userRepo = new UserRepository(this.input.logger, this.input.db); const orgRepo = new OrganizationRepository(this.input.logger, this.input.db); @@ -123,27 +99,3 @@ class DeleteUserWorker implements IWorker { } } } - -export const createDeleteUserWorker = (input: { - redisConnection: ConnectionOptions; - db: PostgresJsDatabase; - logger: pino.Logger; - keycloakClient: Keycloak; - keycloakRealm: string; - blobStorage: BlobStorage; - platformWebhooks: PlatformWebhookService; - deleteOrganizationAuditLogsQueue: DeleteOrganizationAuditLogsQueue; -}) => { - const log = input.logger.child({ worker: WorkerName }); - const worker = new Worker(QueueName, (job) => new DeleteUserWorker(input).handler(job), { - connection: input.redisConnection, - concurrency: 10, - }); - worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); - }); - worker.on('error', (err) => { - log.error(err, 'Worker error'); - }); - return worker; -}; diff --git a/controlplane/src/core/workers/ReactivateOrganization.ts b/controlplane/src/core/workers/ReactivateOrganization.ts new file mode 100644 index 0000000000..853c416310 --- /dev/null +++ b/controlplane/src/core/workers/ReactivateOrganization.ts @@ -0,0 +1,74 @@ +import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; +import pino from 'pino'; +import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; +import { DB } from '../../db/index.js'; +import { DeleteOrganizationQueue } from './DeleteOrganization.js'; +import { BaseQueue, BaseWorker } from './base/index.js'; + +const QueueName = 'organization.reactivate'; +const WorkerName = 'ReactivateOrganizationWorker'; + +export interface ReactivateOrganizationInput { + organizationId: string; + organizationSlug: string; +} + +export class ReactivateOrganizationQueue extends BaseQueue { + constructor(log: pino.Logger, conn: ConnectionOptions) { + super({ name: QueueName, log, conn }); + } + + public addJob(job: ReactivateOrganizationInput, opts?: Omit) { + return this.queue.add(job.organizationId, job, { + ...opts, + jobId: job.organizationId, + }); + } + + public removeJob(job: ReactivateOrganizationInput) { + return this.queue.remove(job.organizationId); + } + + public getJob(job: ReactivateOrganizationInput) { + return this.queue.getJob(job.organizationId); + } +} + +export class ReactivateOrganizationWorker extends BaseWorker { + constructor( + private input: { + redisConnection: ConnectionOptions; + db: DB; + logger: pino.Logger; + deleteOrganizationQueue: DeleteOrganizationQueue; + }, + ) { + super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger); + } + + protected async handler(job: Job) { + try { + const orgRepo = new OrganizationRepository(this.input.logger, this.input.db); + + const org = await orgRepo.bySlug(job.data.organizationSlug); + if (!org) { + throw new Error('Organization not found'); + } + + if (org.id !== job.data.organizationId) { + throw new Error('Id and slug mismatch'); + } + + await orgRepo.reactivateOrganization({ + organizationId: job.data.organizationId, + deleteOrganizationQueue: this.input.deleteOrganizationQueue, + }); + } catch (err) { + this.input.logger.error( + { jobId: job.id, organizationId: job.data.organizationId, err }, + `Failed to reactivate organization`, + ); + throw err; + } + } +} diff --git a/controlplane/src/core/workers/ReactivateOrganizationWorker.ts b/controlplane/src/core/workers/ReactivateOrganizationWorker.ts deleted file mode 100644 index 24c20fdf1a..0000000000 --- a/controlplane/src/core/workers/ReactivateOrganizationWorker.ts +++ /dev/null @@ -1,122 +0,0 @@ -import { ConnectionOptions, Job, JobsOptions, Queue, Worker } from 'bullmq'; -import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; -import pino from 'pino'; -import * as schema from '../../db/schema.js'; -import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; -import Keycloak from '../services/Keycloak.js'; -import { DeleteOrganizationQueue } from './DeleteOrganizationWorker.js'; -import { IQueue, IWorker } from './Worker.js'; - -const QueueName = 'organization.reactivate'; -const WorkerName = 'ReactivateOrganizationWorker'; - -export interface ReactivateOrganizationInput { - organizationId: string; - organizationSlug: string; -} - -export class ReactivateOrganizationQueue implements IQueue { - private readonly queue: Queue; - private readonly logger: pino.Logger; - - constructor(log: pino.Logger, conn: ConnectionOptions) { - this.logger = log.child({ queue: QueueName }); - this.queue = new Queue(QueueName, { - connection: conn, - defaultJobOptions: { - removeOnComplete: { - age: 90 * 86_400, - }, - removeOnFail: { - age: 90 * 86_400, - }, - attempts: 6, - backoff: { - type: 'exponential', - delay: 112_000, - }, - }, - }); - - this.queue.on('error', (err) => { - this.logger.error(err, 'Queue error'); - }); - } - - public addJob(job: ReactivateOrganizationInput, opts?: Omit) { - return this.queue.add(job.organizationId, job, { - ...opts, - jobId: job.organizationId, - }); - } - - public removeJob(job: ReactivateOrganizationInput) { - return this.queue.remove(job.organizationId); - } - - public getJob(job: ReactivateOrganizationInput) { - return this.queue.getJob(job.organizationId); - } -} - -class ReactivateOrganizationWorker implements IWorker { - constructor( - private input: { - db: PostgresJsDatabase; - logger: pino.Logger; - deleteOrganizationQueue: DeleteOrganizationQueue; - }, - ) { - this.input.logger = input.logger.child({ worker: WorkerName }); - } - - public async handler(job: Job) { - try { - const orgRepo = new OrganizationRepository(this.input.logger, this.input.db); - - const org = await orgRepo.bySlug(job.data.organizationSlug); - if (!org) { - throw new Error('Organization not found'); - } - - if (org.id !== job.data.organizationId) { - throw new Error('Id and slug mismatch'); - } - - await orgRepo.reactivateOrganization({ - organizationId: job.data.organizationId, - deleteOrganizationQueue: this.input.deleteOrganizationQueue, - }); - } catch (err) { - this.input.logger.error( - { jobId: job.id, organizationId: job.data.organizationId, err }, - `Failed to reactivate organization`, - ); - throw err; - } - } -} - -export const createReactivateOrganizationWorker = (input: { - redisConnection: ConnectionOptions; - db: PostgresJsDatabase; - logger: pino.Logger; - deleteOrganizationQueue: DeleteOrganizationQueue; -}) => { - const log = input.logger.child({ worker: WorkerName }); - const worker = new Worker( - QueueName, - (job) => new ReactivateOrganizationWorker(input).handler(job), - { - connection: input.redisConnection, - concurrency: 10, - }, - ); - worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); - }); - worker.on('error', (err) => { - log.error(err, 'Worker error'); - }); - return worker; -}; diff --git a/controlplane/src/core/workers/Worker.ts b/controlplane/src/core/workers/Worker.ts deleted file mode 100644 index b268460a33..0000000000 --- a/controlplane/src/core/workers/Worker.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { Job, JobsOptions } from 'bullmq'; - -export interface IQueue { - addJob(job: T, opts?: Omit): Promise | undefined>; - removeJob(job: T): Promise; - getJob(job: T): Promise | undefined>; -} - -export interface IWorker { - handler(job: Job): Promise; -} diff --git a/controlplane/src/core/workers/base/Queue.ts b/controlplane/src/core/workers/base/Queue.ts new file mode 100644 index 0000000000..7776249831 --- /dev/null +++ b/controlplane/src/core/workers/base/Queue.ts @@ -0,0 +1,46 @@ +import { ConnectionOptions, Job, JobsOptions, Queue } from 'bullmq'; +import pino from 'pino'; + +const defaultJobOptions: JobsOptions = { + removeOnComplete: { + age: 90 * 86_400, + }, + removeOnFail: { + age: 90 * 86_400, + }, + attempts: 6, + backoff: { + type: 'exponential', + delay: 112_000, + }, +}; + +export type BaseQueueParams = { + name: string; + conn: ConnectionOptions; + log: pino.Logger; + jobsOptions?: JobsOptions; +}; + +export abstract class BaseQueue { + protected readonly queue: Queue; + protected readonly logger: pino.Logger; + + constructor({ name, conn, log, jobsOptions = defaultJobOptions }: BaseQueueParams) { + this.logger = log.child({ queue: name }); + this.queue = new Queue(name, { + connection: conn, + defaultJobOptions: jobsOptions, + }); + + this.queue.on('error', (err) => { + this.logger.error(err, `error [Queue: ${name}]`); + }); + } + + public abstract addJob(job: T, opts?: Omit): Promise | undefined>; + + public abstract removeJob(job: T): Promise; + + public abstract getJob(job: T): Promise | undefined>; +} diff --git a/controlplane/src/core/workers/base/Worker.ts b/controlplane/src/core/workers/base/Worker.ts new file mode 100644 index 0000000000..5dc0f15deb --- /dev/null +++ b/controlplane/src/core/workers/base/Worker.ts @@ -0,0 +1,31 @@ +import { WorkerOptions, Job, Worker } from 'bullmq'; +import pino from 'pino'; + +export abstract class BaseWorker { + protected readonly logger: pino.Logger; + + constructor( + protected readonly name: string, + protected readonly queueName: string, + protected options: WorkerOptions, + logger: pino.Logger, + ) { + this.logger = logger.child({ worker: name }); + } + + public create(): Worker { + const worker = new Worker(this.queueName, (job) => this.handler(job), this.options); + + worker.on('stalled', (jobId) => { + this.logger.warn({ jobId }, `Job stalled [Worker: ${this.name}]`); + }); + + worker.on('error', (err) => { + this.logger.error(err, `error [Worker: ${this.name}]`); + }); + + return worker; + } + + protected abstract handler(job: Job): Promise; +} diff --git a/controlplane/src/core/workers/base/index.ts b/controlplane/src/core/workers/base/index.ts new file mode 100644 index 0000000000..1112c69e6a --- /dev/null +++ b/controlplane/src/core/workers/base/index.ts @@ -0,0 +1,2 @@ +export * from './Queue.js'; +export * from './Worker.js';