Skip to content
1 change: 1 addition & 0 deletions controlplane/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
"stream-json": "^1.8.0",
"stripe": "^14.19.0",
"tiny-lru": "^11.2.11",
"tinypool": "^2.0.0",
"uid": "^2.0.2",
"uuid": "^10.0.0",
"zod": "^3.22.4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ export function createContract(
chClient: opts.chClient!,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: [{ ...contractGraph, contract }],
composeWorkerPool: opts.composeWorkerPool,
});

compositionErrors.push(...composition.compositionErrors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ export function updateContract(
},
labelMatchers: [],
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
});

Expand All @@ -155,6 +156,7 @@ export function updateContract(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ export function createFeatureFlag(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ export function deleteFeatureFlag(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export function enableFeatureFlag(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ export function updateFeatureFlag(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: allFederatedGraphsToCompose,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ export function createFederatedGraph(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: [federatedGraph],
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ export function migrateFromApollo(
webhookJWTSecret: opts.admissionWebhookJWTSecret,
},
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: {
disableResolvabilityValidation: true,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export function moveFederatedGraph(
jwtSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
);

const allDeploymentErrors: PlainMessage<DeploymentError>[] = [];
Expand Down Expand Up @@ -163,6 +164,7 @@ export function moveFederatedGraph(
jwtSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
);

allCompositionErrors.push(...contractErrors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export function updateFederatedGraph(
admissionWebhookURL: req.admissionWebhookURL,
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
labelMatchers: req.labelMatchers,
namespaceId: federatedGraph.namespaceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ export function setGraphRouterCompatibilityVersion(
},
blobStorage: opts.blobStorage,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: [federatedGraph],
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ export function publishMonograph(
webhookJWTSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
);

for (const graph of updatedFederatedGraphs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ export function updateMonograph(
admissionWebhookURL: req.admissionWebhookURL,
admissionWebhookSecret: req.admissionWebhookSecret,
chClient: opts.chClient!,
composeWorkerPool: opts.composeWorkerPool,
});

await subgraphRepo.update(
Expand All @@ -173,6 +174,7 @@ export function updateMonograph(
webhookJWTSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
);

await auditLogRepo.addAuditLog({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ export function deleteFederatedSubgraph(
chClient: opts.chClient!,
compositionOptions: newCompositionOptions(req.disableResolvabilityValidation),
federatedGraphs: affectedFederatedGraphs,
composeWorkerPool: opts.composeWorkerPool,
});

return { affectedFederatedGraphs, compositionErrors, deploymentErrors, compositionWarnings };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ export function fixSubgraphSchema(
if (composition.errors.length > 0) {
for (const error of composition.errors) {
compositionErrors.push({
message: error.message,
message: error,
federatedGraphName: composition.name,
namespace: composition.namespace,
featureFlag: '',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export function moveSubgraph(
jwtSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
newCompositionOptions(req.disableResolvabilityValidation),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ export function publishFederatedSubgraph(
webhookJWTSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
newCompositionOptions(req.disableResolvabilityValidation),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ export function updateSubgraph(
webhookJWTSecret: opts.admissionWebhookJWTSecret,
},
opts.chClient!,
opts.composeWorkerPool,
newCompositionOptions(req.disableResolvabilityValidation),
);

Expand Down
17 changes: 17 additions & 0 deletions controlplane/src/core/build-server.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { join } from 'node:path';
import Fastify, { FastifyBaseLogger } from 'fastify';
import { S3Client } from '@aws-sdk/client-s3';
import { fastifyConnectPlugin } from '@connectrpc/connect-fastify';
Expand All @@ -8,6 +9,7 @@ import { compressionBrotli, compressionGzip } from '@connectrpc/connect-node';
import fastifyGracefulShutdown from 'fastify-graceful-shutdown';
import { App } from 'octokit';
import { Worker } from 'bullmq';
import { Tinypool } from 'tinypool';
import routes from './routes.js';
import fastifyHealth from './plugins/health.js';
import fastifyMetrics, { MetricsPluginOptions } from './plugins/metrics.js';
Expand Down Expand Up @@ -136,6 +138,9 @@ export interface BuildConfig {
key?: string; // e.g. string or '/path/to/my/client-key.pem'
};
};
composeWorkers?: {
maxCount: number;
};
}

export interface MetricsOptions {
Expand Down Expand Up @@ -469,6 +474,11 @@ export default async function build(opts: BuildConfig) {
keycloakRealm: opts.keycloak.realm,
});

const composeWorkerPool = new Tinypool({
filename: join(process.cwd(), 'dist/workers/compose.js'),
maxThreads: opts.composeWorkers?.maxCount,
});

// Must be registered after custom fastify routes
// Because it registers an all-catch route for connect handlers

Expand Down Expand Up @@ -502,6 +512,7 @@ export default async function build(opts: BuildConfig) {
stripeSecretKey: opts.stripe?.secret,
admissionWebhookJWTSecret: opts.admissionWebhook.secret,
cdnBaseUrl: opts.cdnBaseUrl,
composeWorkerPool,
}),
contextValues(req) {
return createContextValues().set<FastifyBaseLogger>({ id: fastifyLoggerId, defaultValue: req.log }, req.log);
Expand All @@ -522,6 +533,12 @@ export default async function build(opts: BuildConfig) {
});

fastify.gracefulShutdown(async () => {
fastify.log.debug('Shutting down compose worker pool');

await composeWorkerPool.destroy();

fastify.log.debug('Compose worker pool shut down');

fastify.log.debug('Shutting down bull workers');

await Promise.all(bullWorkers.map((worker) => worker.close()));
Expand Down
8 changes: 4 additions & 4 deletions controlplane/src/core/composition/composer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,10 @@ export function mapResultToComposedGraph(
composedSchema: result.success ? printSchemaWithDirectives(result.federatedGraphSchema) : undefined,
federatedClientSchema: result.success ? printSchemaWithDirectives(result.federatedGraphClientSchema) : undefined,
shouldIncludeClientSchema: result.success ? result.shouldIncludeClientSchema : false,
errors: result.success ? [] : result.errors,
errors: result.success ? [] : result.errors.map((e) => e.message),
subgraphs: subgraphDTOsToComposedSubgraphs(federatedGraph.organizationId, subgraphs, result),
fieldConfigurations: result.success ? result.fieldConfigurations : [],
warnings: result.warnings,
warnings: result.warnings.map((e) => e.message),
};
}

Expand All @@ -236,12 +236,12 @@ export interface ComposedFederatedGraph {
namespace: string;
namespaceId: string;
composedSchema?: string;
errors: Error[];
errors: string[];
subgraphs: ComposedSubgraph[];
fieldConfigurations: FieldConfiguration[];
federatedClientSchema?: string;
shouldIncludeClientSchema?: boolean;
warnings: Warning[];
warnings: string[];
}

export interface CompositionDeployResult {
Expand Down
Loading
Loading