Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
30e4be4
fix(controlplane): introducing abstract classes to improve worker & q…
fahimfaisaal Jul 6, 2025
7752503
chore: fix lint issues
fahimfaisaal Jul 6, 2025
1f7ffbe
fix: parent logger overright
fahimfaisaal Jul 6, 2025
cd269dc
fix: used async await with handler for proper sync
fahimfaisaal Jul 7, 2025
085738e
fix: change access modifier of handle func to protected
fahimfaisaal Jul 7, 2025
a859b2f
fix: linted al changes
fahimfaisaal Jul 7, 2025
f1e5eb8
fix: removed Worker & Queue suffix from file names for better naming …
fahimfaisaal Jul 7, 2025
25f113d
fix: throw error after failed audit log deletion for better error han…
fahimfaisaal Jul 7, 2025
8584730
fix: remove redundant async await from worker handler
fahimfaisaal Jul 7, 2025
fd8310f
fix: replace PostgresJsDatabase with DB type in worker constructors f…
fahimfaisaal Jul 8, 2025
7eb4dd5
fix: used outer path
fahimfaisaal Jul 8, 2025
d3a1e00
Merge branch 'main' into fahim/used-abstract-classes-instead-interfaces
fahimfaisaal Jul 14, 2025
c702aa2
Merge branch 'main' into fahim/used-abstract-classes-instead-interfaces
fahimfaisaal Jul 28, 2025
ec4c58d
Merge branch 'main' into fahim/used-abstract-classes-instead-interfaces
fahimfaisaal Aug 7, 2025
a6a2947
Merge branch 'main' into fahim/used-abstract-classes-instead-interfaces
fahimfaisaal Aug 10, 2025
d080371
Merge branch 'main' into fahim/used-abstract-classes-instead-interfaces
fahimfaisaal Aug 22, 2025
2b586ec
Merge branch 'main' into fahim/used-abstract-classes-instead-interfaces
fahimfaisaal Aug 25, 2025
768eae7
Merge branch 'main' into fahim/used-abstract-classes-instead-interfaces
fahimfaisaal Aug 31, 2025
247f8f8
Merge branch 'main' into fahim/used-abstract-classes-instead-interfaces
fahimfaisaal Sep 7, 2025
aac56ca
Merge branch 'main' into fahim/used-abstract-classes-instead-interfaces
fahimfaisaal Sep 13, 2025
7692d4c
Merge branch 'main' into fahim/used-abstract-classes-instead-interfaces
fahimfaisaal Sep 21, 2025
2615837
Merge branch 'main' into fahim/used-abstract-classes-instead-interfaces
fahimfaisaal Oct 7, 2025
b162b35
Merge branch 'main' into fahim/used-abstract-classes-instead-interfaces
fahimfaisaal Oct 25, 2025
aef9eb4
Merge branch 'main' into fahim/used-abstract-classes-instead-interfaces
fahimfaisaal Nov 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion controlplane/src/bin/deactivate-org.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import process from 'node:process';
import { pino } from 'pino';
import { DeactivateOrganizationQueue } from '../core/workers/DeactivateOrganizationWorker.js';
import { DeactivateOrganizationQueue } from '../core/workers/DeactivateOrganization.js';
import { createRedisConnections } from '../core/plugins/redis.js';
import { getConfig } from './get-config.js';

Expand Down
2 changes: 1 addition & 1 deletion controlplane/src/bin/delete-user.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import process from 'node:process';
import { pino } from 'pino';
import { createRedisConnections } from '../core/plugins/redis.js';
import { DeleteUserQueue } from '../core/workers/DeleteUserQueue.js';
import { DeleteUserQueue } from '../core/workers/DeleteUser.js';
import { getConfig } from './get-config.js';

const { redis } = getConfig();
Expand Down
2 changes: 1 addition & 1 deletion controlplane/src/bin/reactivate-org.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import process from 'node:process';
import { pino } from 'pino';
import { createRedisConnections } from '../core/plugins/redis.js';
import { ReactivateOrganizationQueue } from '../core/workers/ReactivateOrganizationWorker.js';
import { ReactivateOrganizationQueue } from '../core/workers/ReactivateOrganization.js';
import { getConfig } from './get-config.js';

const { organizationSlug, redis } = getConfig();
Expand Down
130 changes: 60 additions & 70 deletions controlplane/src/core/build-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { App } from 'octokit';
import { Worker } from 'bullmq';
import routes from './routes.js';
import fastifyHealth from './plugins/health.js';
import fastifyMetrics, { MetricsPluginOptions } from './plugins/metrics.js';
import fastifyMetrics from './plugins/metrics.js';
import fastifyDatabase from './plugins/database.js';
import fastifyClickHouse from './plugins/clickhouse.js';
import fastifyRedis from './plugins/redis.js';
Expand All @@ -36,23 +36,18 @@ import { Authorization } from './services/Authorization.js';
import { BillingRepository } from './repositories/BillingRepository.js';
import { BillingService } from './services/BillingService.js';
import { UserRepository } from './repositories/UserRepository.js';
import { AIGraphReadmeQueue, createAIGraphReadmeWorker } from './workers/AIGraphReadmeWorker.js';
import { AIGraphReadmeQueue, AIGraphReadmeWorker } from './workers/AIGraphReadme.js';
import { fastifyLoggerId, createS3ClientConfig, extractS3BucketName, isGoogleCloudStorageUrl } from './util.js';

import { ApiKeyRepository } from './repositories/ApiKeyRepository.js';
import { createDeleteOrganizationWorker, DeleteOrganizationQueue } from './workers/DeleteOrganizationWorker.js';
import { DeleteOrganizationWorker, DeleteOrganizationQueue } from './workers/DeleteOrganization.js';
import {
createDeleteOrganizationAuditLogsWorker,
DeleteOrganizationAuditLogsWorker,
DeleteOrganizationAuditLogsQueue,
} from './workers/DeleteOrganizationAuditLogsWorker.js';
import {
createDeactivateOrganizationWorker,
DeactivateOrganizationQueue,
} from './workers/DeactivateOrganizationWorker.js';
import { createDeleteUserWorker, DeleteUserQueue } from './workers/DeleteUserQueue.js';
import {
createReactivateOrganizationWorker,
ReactivateOrganizationQueue,
} from './workers/ReactivateOrganizationWorker.js';
} from './workers/DeleteOrganizationAuditLogs.js';
import { DeactivateOrganizationWorker, DeactivateOrganizationQueue } from './workers/DeactivateOrganization.js';
import { DeleteUserWorker, DeleteUserQueue } from './workers/DeleteUser.js';
import { ReactivateOrganizationWorker, ReactivateOrganizationQueue } from './workers/ReactivateOrganization.js';

export interface BuildConfig {
logger: LoggerOptions;
Expand Down Expand Up @@ -326,73 +321,68 @@ export default async function build(opts: BuildConfig) {
const readmeQueue = new AIGraphReadmeQueue(logger, fastify.redisForQueue);

if (opts.openaiAPIKey) {
bullWorkers.push(
createAIGraphReadmeWorker({
redisConnection: fastify.redisForWorker,
db: fastify.db,
logger,
openAiApiKey: opts.openaiAPIKey,
}),
);
}

const deleteOrganizationAuditLogsQueue = new DeleteOrganizationAuditLogsQueue(logger, fastify.redisForQueue);
bullWorkers.push(
createDeleteOrganizationAuditLogsWorker({
const readmeWorker = new AIGraphReadmeWorker({
redisConnection: fastify.redisForWorker,
db: fastify.db,
logger,
}),
);
openAiApiKey: opts.openaiAPIKey,
});

bullWorkers.push(readmeWorker.create());
}

const deleteOrganizationAuditLogsQueue = new DeleteOrganizationAuditLogsQueue(logger, fastify.redisForQueue);
const deleteOrganizationAuditLogsWorker = new DeleteOrganizationAuditLogsWorker({
redisConnection: fastify.redisForWorker,
db: fastify.db,
logger,
});
bullWorkers.push(deleteOrganizationAuditLogsWorker.create());

const deleteOrganizationQueue = new DeleteOrganizationQueue(logger, fastify.redisForQueue);
bullWorkers.push(
createDeleteOrganizationWorker({
redisConnection: fastify.redisForWorker,
db: fastify.db,
logger,
keycloakClient,
keycloakRealm: opts.keycloak.realm,
blobStorage,
deleteOrganizationAuditLogsQueue,
}),
);
const deleteOrganizationWorker = new DeleteOrganizationWorker({
redisConnection: fastify.redisForWorker,
db: fastify.db,
logger,
keycloakClient,
keycloakRealm: opts.keycloak.realm,
blobStorage,
deleteOrganizationAuditLogsQueue,
});
bullWorkers.push(deleteOrganizationWorker.create());

const deactivateOrganizationQueue = new DeactivateOrganizationQueue(logger, fastify.redisForQueue);
bullWorkers.push(
createDeactivateOrganizationWorker({
redisConnection: fastify.redisForWorker,
db: fastify.db,
logger,
keycloakClient,
keycloakRealm: opts.keycloak.realm,
deleteOrganizationQueue,
}),
);
const deactivateOrganizationWorker = new DeactivateOrganizationWorker({
redisConnection: fastify.redisForWorker,
db: fastify.db,
logger,
keycloakClient,
keycloakRealm: opts.keycloak.realm,
deleteOrganizationQueue,
});
bullWorkers.push(deactivateOrganizationWorker.create());

const reactivateOrganizationQueue = new ReactivateOrganizationQueue(logger, fastify.redisForQueue);
bullWorkers.push(
createReactivateOrganizationWorker({
redisConnection: fastify.redisForWorker,
db: fastify.db,
logger,
deleteOrganizationQueue,
}),
);
const reactivateOrganizationWorker = new ReactivateOrganizationWorker({
redisConnection: fastify.redisForWorker,
db: fastify.db,
logger,
deleteOrganizationQueue,
});
bullWorkers.push(reactivateOrganizationWorker.create());

const deleteUserQueue = new DeleteUserQueue(logger, fastify.redisForQueue);
bullWorkers.push(
createDeleteUserWorker({
redisConnection: fastify.redisForWorker,
db: fastify.db,
logger,
keycloakClient,
keycloakRealm: opts.keycloak.realm,
blobStorage,
platformWebhooks,
deleteOrganizationAuditLogsQueue,
}),
);
const deleteUserWorker = new DeleteUserWorker({
redisConnection: fastify.redisForWorker,
db: fastify.db,
logger,
keycloakClient,
keycloakRealm: opts.keycloak.realm,
blobStorage,
platformWebhooks,
deleteOrganizationAuditLogsQueue,
});
bullWorkers.push(deleteUserWorker.create());

// required to verify webhook payloads
await fastify.register(import('fastify-raw-body'), {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import {
WebhooksConfigDTO,
} from '../../types/index.js';
import Keycloak from '../services/Keycloak.js';
import { DeleteOrganizationQueue } from '../workers/DeleteOrganizationWorker.js';
import { DeleteOrganizationQueue } from '../workers/DeleteOrganization.js';
import { BlobStorage } from '../blobstorage/index.js';
import { delayForManualOrgDeletionInDays, delayForOrgAuditLogsDeletionInDays } from '../constants.js';
import { DeleteOrganizationAuditLogsQueue } from '../workers/DeleteOrganizationAuditLogsWorker.js';
import { DeleteOrganizationAuditLogsQueue } from '../workers/DeleteOrganizationAuditLogs.js';
import { RBACEvaluator } from '../services/RBACEvaluator.js';
import { BillingRepository } from './BillingRepository.js';
import { FederatedGraphRepository } from './FederatedGraphRepository.js';
Expand Down
2 changes: 1 addition & 1 deletion controlplane/src/core/repositories/UserRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { UserDTO } from '../../types/index.js';
import { BlobStorage } from '../blobstorage/index.js';
import Keycloak from '../services/Keycloak.js';
import OidcProvider from '../services/OidcProvider.js';
import { DeleteOrganizationAuditLogsQueue } from '../workers/DeleteOrganizationAuditLogsWorker.js';
import { DeleteOrganizationAuditLogsQueue } from '../workers/DeleteOrganizationAuditLogs.js';
import { BillingRepository } from './BillingRepository.js';
import { OidcRepository } from './OidcRepository.js';
import { OrganizationRepository } from './OrganizationRepository.js';
Expand Down
12 changes: 6 additions & 6 deletions controlplane/src/core/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import { IPlatformWebhookService } from './webhooks/PlatformWebhookService.js';
import { BlobStorage } from './blobstorage/index.js';
import Mailer from './services/Mailer.js';
import { Authorization } from './services/Authorization.js';
import { AIGraphReadmeQueue } from './workers/AIGraphReadmeWorker.js';
import { DeleteOrganizationQueue } from './workers/DeleteOrganizationWorker.js';
import { DeactivateOrganizationQueue } from './workers/DeactivateOrganizationWorker.js';
import { DeleteUserQueue } from './workers/DeleteUserQueue.js';
import { ReactivateOrganizationQueue } from './workers/ReactivateOrganizationWorker.js';
import { DeleteOrganizationAuditLogsQueue } from './workers/DeleteOrganizationAuditLogsWorker.js';
import { AIGraphReadmeQueue } from './workers/AIGraphReadme.js';
import { DeleteOrganizationQueue } from './workers/DeleteOrganization.js';
import { DeactivateOrganizationQueue } from './workers/DeactivateOrganization.js';
import { DeleteUserQueue } from './workers/DeleteUser.js';
import { ReactivateOrganizationQueue } from './workers/ReactivateOrganization.js';
import { DeleteOrganizationAuditLogsQueue } from './workers/DeleteOrganizationAuditLogs.js';

export interface RouterOptions {
db: PostgresJsDatabase<typeof schema>;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { PostgresJsDatabase } from 'drizzle-orm/postgres-js';
import { ConnectionOptions, Job, Worker, Queue } from 'bullmq';
import { ConnectionOptions, Job } from 'bullmq';
import pino from 'pino';
import * as schema from '../../db/schema.js';
import { OpenAIGraphql } from '../openai-graphql/index.js';
import { SubgraphRepository } from '../repositories/SubgraphRepository.js';
import { FederatedGraphRepository } from '../repositories/FederatedGraphRepository.js';
import { IQueue, IWorker } from './Worker.js';
import { DB } from '../../db/index.js';
import { BaseQueue, BaseWorker } from './base/index.js';

const QueueName = 'ai.graph-readme-generator';
const WorkerName = 'AIGraphReadmeWorker';
Expand All @@ -16,15 +15,13 @@ export interface CreateReadmeInputEvent {
type: 'subgraph' | 'federated_graph';
}

export class AIGraphReadmeQueue implements IQueue<CreateReadmeInputEvent> {
private readonly queue: Queue<CreateReadmeInputEvent>;
private readonly logger: pino.Logger;

export class AIGraphReadmeQueue extends BaseQueue<CreateReadmeInputEvent> {
constructor(log: pino.Logger, conn: ConnectionOptions) {
this.logger = log.child({ queue: QueueName });
this.queue = new Queue<CreateReadmeInputEvent>(QueueName, {
connection: conn,
defaultJobOptions: {
super({
name: QueueName,
conn,
log,
jobsOptions: {
removeOnComplete: true,
removeOnFail: true,
attempts: 3,
Expand All @@ -34,10 +31,6 @@ export class AIGraphReadmeQueue implements IQueue<CreateReadmeInputEvent> {
},
},
});

this.queue.on('error', (err) => {
this.logger.error(err, 'Queue error');
});
}

public addJob(job: CreateReadmeInputEvent) {
Expand All @@ -61,21 +54,22 @@ export class AIGraphReadmeQueue implements IQueue<CreateReadmeInputEvent> {
}
}

class AIGraphReadmeWorker implements IWorker {
export class AIGraphReadmeWorker extends BaseWorker<CreateReadmeInputEvent> {
private readonly openaiGraphql: OpenAIGraphql;

constructor(
private input: {
redisConnection: ConnectionOptions;
db: PostgresJsDatabase<typeof schema>;
db: DB;
logger: pino.Logger;
openAiApiKey: string;
},
) {
super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger);

this.openaiGraphql = new OpenAIGraphql({
openAiApiKey: input.openAiApiKey,
});
this.input.logger = input.logger.child({ worker: WorkerName });
}

private async generateSubgraphReadme(job: Job<CreateReadmeInputEvent>) {
Expand Down Expand Up @@ -122,7 +116,7 @@ class AIGraphReadmeWorker implements IWorker {
});
}

public async handler(job: Job<CreateReadmeInputEvent>) {
protected async handler(job: Job<CreateReadmeInputEvent>) {
try {
if (job.data.type === 'subgraph') {
await this.generateSubgraphReadme(job);
Expand All @@ -140,23 +134,3 @@ class AIGraphReadmeWorker implements IWorker {
}
}
}

export const createAIGraphReadmeWorker = (input: {
redisConnection: ConnectionOptions;
db: PostgresJsDatabase<typeof schema>;
logger: pino.Logger;
openAiApiKey: string;
}) => {
const log = input.logger.child({ worker: WorkerName });
const worker = new Worker<CreateReadmeInputEvent>(QueueName, (job) => new AIGraphReadmeWorker(input).handler(job), {
connection: input.redisConnection,
concurrency: 10,
});
worker.on('stalled', (job) => {
log.warn(`Job ${job} stalled`);
});
worker.on('error', (err) => {
log.error(err, 'Worker error');
});
return worker;
};
Loading