Skip to content

Conversation

@pepol
Copy link
Member

@pepol pepol commented Nov 12, 2025

  • feat(controlplane): run federated composition in worker pool to unblock main event loop

Summary by CodeRabbit

  • New Features

    • Composition now runs on a dedicated worker pool, improving concurrency and reliability across graph, subgraph, contract, monograph, and feature-flag operations.
  • Chores

    • Added configurable composeWorkers (maxCount) and integrated worker-pool lifecycle into server startup and graceful shutdown.
    • Composition error/warning payloads now use string/raw-object messages (format change).
  • Tests

    • Test setup exposes the new compose worker pool for integration testing.

Checklist

@coderabbitai
Copy link

coderabbitai bot commented Nov 12, 2025

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

Adds a Tinypool-based compose worker pool and new worker module, introduces composeWorkerPool across router options, services, and repositories, updates composition flows to use worker IPC, and initializes/destroys the pool in the build server and test harness.

Changes

Cohort / File(s) Summary
Package
controlplane/package.json
Adds tinypool dependency.
Build server & lifecycle
controlplane/src/core/build-server.ts
Adds BuildConfig.composeWorkers, creates a Tinypool composeWorkerPool at startup, injects it into router/plugin options, and destroys the pool during graceful shutdown.
Router types & wiring
controlplane/src/core/routes.ts
Imports Tinypool and extends RouterOptions with composeWorkerPool: Tinypool.
Worker module
controlplane/src/workers/compose.ts
New worker entry implementing composition via a MessagePort-based IPC contract (exports typed Inputs/Outputs/BaseMessage/BaseResult and default worker function).
Repository changes (federated graph)
controlplane/src/core/repositories/FederatedGraphRepository.ts
Imports Tinypool and MessageChannel types; updates public method signatures (update, move, composeAndDeployGraphs) to accept/forward composeWorkerPool; switches Composer instantiation from db to tx; adapts composition result handling to use worker outputs and stringified errors/warnings.
Repository changes (subgraph & schema checks)
controlplane/src/core/repositories/SubgraphRepository.ts, controlplane/src/core/repositories/SchemaCheckRepository.ts
Adds Tinypool param composeWorkerPool to update, move, and performSchemaCheck; forwards pool to fed-graph composition; changes composition error/warning entries to carry raw error/warning values (message field now contains the object).
Composition core
controlplane/src/core/composition/composer.ts
Changes composed graph error/warning types from `Error
Worker wiring util
controlplane/src/core/util.ts
Reorganizes imports and keeps composition helper delegation; composition invocation flows adjusted to use worker-based composition helper.
Service-layer call sites
controlplane/src/core/bufservices/...
contract/*, feature-flag/*, federated-graph/*, graph/*, monograph/*, subgraph/*
Thread composeWorkerPool through service calls by passing opts.composeWorkerPool into repository methods and into composeAndDeployGraphs across create/update/delete/move/publish/enable flows.
Subgraph-specific flows
controlplane/src/core/bufservices/subgraph/*.ts
Passes opts.composeWorkerPool into subgraphRepo.update, subgraphRepo.move, and fedGraphRepo.composeAndDeployGraphs call sites.
Monograph & federated graph flows
controlplane/src/core/bufservices/monograph/*.ts, controlplane/src/core/bufservices/federated-graph/*.ts
Forward opts.composeWorkerPool into repository update/move/composeAndDeployGraphs calls.
Feature flag & graph flows
controlplane/src/core/bufservices/feature-flag/*.ts, controlplane/src/core/bufservices/graph/setGraphRouterCompatibilityVersion.ts
Add composeWorkerPool: opts.composeWorkerPool to composition/deploy call argument objects.
Contract flows
controlplane/src/core/bufservices/contract/*.ts
Propagates composeWorkerPool into compose/deploy calls during contract create/update operations.
Tests / test harness
controlplane/test/test-util.ts
Creates a test composeWorkerPool (Tinypool running dist/workers/compose.js) and exposes it on returned queues.composeWorkerPool.
Minor error mapping change
controlplane/src/core/bufservices/subgraph/fixSubgraphSchema.ts
Aggregated composition errors now store the raw error value in message instead of error.message.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Potential hotspots to inspect:

  • Worker IPC contract: verify types/serialization between main thread and controlplane/src/workers/compose.ts (BaseMessage/BaseResult/Outputs).
  • Lifetime and shutdown ordering for composeWorkerPool in build-server.ts and tests to avoid races.
  • Signature propagation consistency across repositories/services/routes and any callers not included in this diff.
  • Conversion of errors/warnings to raw objects or strings (ensure consumers/DB schemas tolerate changed shapes).

Possibly related PRs

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title clearly and specifically describes the main objective: implementing federated graph composition in a worker pool to prevent blocking the main event loop, which is the central focus of all changes across the codebase.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link

github-actions bot commented Nov 12, 2025

Router-nonroot image scan passed

✅ No security vulnerabilities found in image:

ghcr.io/wundergraph/cosmo/router:sha-1775acd8ad81b91e299e6b0c0e7a0a0fc2e8bf07-nonroot

@pepol pepol force-pushed the peter/eng-8238-mitigate-event-loop-block-in-cosmo-controlplane branch from 28a2c45 to cb7ec48 Compare November 14, 2025 09:39
@pepol pepol marked this pull request as ready for review November 14, 2025 09:40
@pepol pepol requested a review from asoorm November 14, 2025 09:40
@pepol pepol changed the title feat: run federated composition in worker pool to unblock main event loop' feat(controlplane): run federated composition in worker pool to unblock main event loop' Nov 14, 2025
@pepol pepol changed the title feat(controlplane): run federated composition in worker pool to unblock main event loop' feat(controlplane): run federated graph composition in worker pool to unblock main event loop Nov 14, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (6)
controlplane/src/core/repositories/SubgraphRepository.ts (1)

16-16: SubgraphRepository correctly propagates the worker pool, with minor ergonomics considerations

The composeWorkerPool parameter is consistently added to update and move and passed through to fedGraphRepo.composeAndDeployGraphs, so subgraph-driven recomposition now uses the shared worker pool as intended. This aligns with the broader wiring in the PR and doesn’t alter existing control flow.

Two optional refinements you might consider:

  • Make composeWorkerPool Tinypool | undefined (and handle that inside composeAndDeployGraphs) to keep SubgraphRepository methods easier to call from non-HTTP contexts or tests that may not want to spin up a pool.
  • If Tinypool is only needed as a type here, switch to import type { Tinypool } from 'tinypool'; to avoid a value import in this module.

These are nice-to-haves and not blockers.

Also applies to: 224-248, 464-477, 492-509, 551-562

controlplane/src/workers/compose.ts (1)

28-37: Worker pool creation and filename resolution could be more robust

Right now getWorkerPool assumes process.cwd() points at the controlplane package root and instantiates a new Tinypool each time it’s called. In a monorepo or when running via different tooling, process.cwd() may not match the package root, which would break the dist/workers/compose.js path, and repeated calls could create multiple pools unnecessarily.

Consider:

  • Deriving the JS worker path relative to this module instead of process.cwd() (e.g., via fileURLToPath(import.meta.url) + path.resolve into dist), so it’s independent of the working directory.
  • Caching a single Tinypool instance at module level (or in the caller) and reusing it, unless you explicitly need separate pools per caller.
controlplane/src/core/util.ts (1)

14-14: Async worker delegation is fine; watch for callsite and naming ergonomics

The new async getFederationResultWithPotentialContracts correctly forwards the full payload into composeWorkerPool.run(..., { name: 'composeFederatedGraph' }) and returns the union result as expected. Two follow-ups to keep this maintainable:

  • Now that this function always uses a worker, all call sites must both pass a non‑undefined Tinypool and await the Promise. It’s worth double‑checking that there are no remaining synchronous usages or tests assuming a sync API.
  • The string 'composeFederatedGraph' is now a coupling point between this helper, the worker export, and the pool configuration. If you ever rename the worker function or pool name, this will silently break. Consider centralizing this task name in a shared constant or at least adding a small comment near both usages to keep them in sync.

Also applies to: 535-547

controlplane/src/core/repositories/FederatedGraphRepository.ts (3)

197-223: update(): composeWorkerPool threading looks correct; ensure all callers pass a shared pool

Adding composeWorkerPool: Tinypool to the update payload and forwarding it into composeAndDeployGraphs wires the worker pool into the recomposition path cleanly. There’s no behavioral change when label matchers don’t change.

The main thing to verify is that all service-layer callers of FederatedGraphRepository.update now supply a long-lived composeWorkerPool instance rather than constructing a new pool per call.

Also applies to: 332-343


361-377: move(): worker pool parameter is propagated consistently

The new composeWorkerPool: Tinypool parameter is correctly passed into composeAndDeployGraphs for both the contract-specific path and the base federated graph path, so both code paths now use the worker pool for composition.

Similar to update, it’s worth confirming that the service layer passes the same shared pool instance into move, rather than creating ad-hoc pools, to avoid unnecessary thread pools and overhead.

Also applies to: 420-458


1507-1526: composeAndDeployGraphs(): good transactional change; worker call looks sound

Within composeAndDeployGraphs:

  • Passing composeWorkerPool: Tinypool into the method signature and then into getFederationResultWithPotentialContracts cleanly offloads the actual composition work to the worker pool.
  • Switching Composer to use tx instead of this.db should make all composition-related writes participate in the same ambient transaction started at the top of composeAndDeployGraphs, which improves consistency.
  • The payload passed to getFederationResultWithPotentialContracts (federatedGraph, subgraphsToCompose, tagOptionsByContractName, compositionOptions) matches the worker’s Inputs shape, so the worker-side composeFederatedGraph has everything it needs.

Two things to double‑check:

  • All current and future callers of composeAndDeployGraphs provide a valid, shared composeWorkerPool (no undefined, no per-call instantiation).
  • There are tests that exercise both the “base graph + contracts” case and the “contract graph only” case to confirm the worker-based composition continues to yield the same FederationResult / FederationResultWithContracts shapes used by the rest of this method.

Also applies to: 1527-1541, 1587-1595

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ac085ab and cb7ec48.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (24)
  • controlplane/package.json (1 hunks)
  • controlplane/src/core/bufservices/contract/createContract.ts (1 hunks)
  • controlplane/src/core/bufservices/contract/updateContract.ts (2 hunks)
  • controlplane/src/core/bufservices/feature-flag/createFeatureFlag.ts (1 hunks)
  • controlplane/src/core/bufservices/feature-flag/deleteFeatureFlag.ts (1 hunks)
  • controlplane/src/core/bufservices/feature-flag/enableFeatureFlag.ts (1 hunks)
  • controlplane/src/core/bufservices/feature-flag/updateFeatureFlag.ts (1 hunks)
  • controlplane/src/core/bufservices/federated-graph/createFederatedGraph.ts (1 hunks)
  • controlplane/src/core/bufservices/federated-graph/migrateFromApollo.ts (1 hunks)
  • controlplane/src/core/bufservices/federated-graph/moveFederatedGraph.ts (2 hunks)
  • controlplane/src/core/bufservices/federated-graph/updateFederatedGraph.ts (1 hunks)
  • controlplane/src/core/bufservices/graph/setGraphRouterCompatibilityVersion.ts (1 hunks)
  • controlplane/src/core/bufservices/monograph/publishMonograph.ts (1 hunks)
  • controlplane/src/core/bufservices/monograph/updateMonograph.ts (2 hunks)
  • controlplane/src/core/bufservices/subgraph/deleteFederatedSubgraph.ts (1 hunks)
  • controlplane/src/core/bufservices/subgraph/moveSubgraph.ts (1 hunks)
  • controlplane/src/core/bufservices/subgraph/publishFederatedSubgraph.ts (1 hunks)
  • controlplane/src/core/bufservices/subgraph/updateSubgraph.ts (1 hunks)
  • controlplane/src/core/build-server.ts (6 hunks)
  • controlplane/src/core/repositories/FederatedGraphRepository.ts (10 hunks)
  • controlplane/src/core/repositories/SubgraphRepository.ts (5 hunks)
  • controlplane/src/core/routes.ts (2 hunks)
  • controlplane/src/core/util.ts (2 hunks)
  • controlplane/src/workers/compose.ts (1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-29T10:28:04.846Z
Learnt from: JivusAyrus
Repo: wundergraph/cosmo PR: 2156
File: controlplane/src/core/repositories/SubgraphRepository.ts:1749-1751
Timestamp: 2025-08-29T10:28:04.846Z
Learning: In the controlplane codebase, authentication and authorization checks (including organization scoping) are handled at the service layer in files like unlinkSubgraph.ts before calling repository methods. Repository methods like unlinkSubgraph() in SubgraphRepository.ts can focus purely on data operations without redundant security checks.

Applied to files:

  • controlplane/src/core/bufservices/subgraph/publishFederatedSubgraph.ts
  • controlplane/src/core/bufservices/subgraph/updateSubgraph.ts
  • controlplane/src/core/bufservices/subgraph/moveSubgraph.ts
  • controlplane/src/core/bufservices/subgraph/deleteFederatedSubgraph.ts
  • controlplane/src/core/repositories/FederatedGraphRepository.ts
  • controlplane/src/core/repositories/SubgraphRepository.ts
📚 Learning: 2025-09-17T20:55:39.456Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2172
File: router/core/graph_server.go:0-0
Timestamp: 2025-09-17T20:55:39.456Z
Learning: The Initialize method in router/internal/retrytransport/manager.go has been updated to properly handle feature-flag-only subgraphs by collecting subgraphs from both routerConfig.GetSubgraphs() and routerConfig.FeatureFlagConfigs.ConfigByFeatureFlagName, ensuring all subgraphs receive retry configuration.

Applied to files:

  • controlplane/src/core/bufservices/feature-flag/updateFeatureFlag.ts
  • controlplane/src/core/bufservices/feature-flag/createFeatureFlag.ts
🧬 Code graph analysis (5)
controlplane/src/core/util.ts (3)
controlplane/src/types/index.ts (2)
  • FederatedGraphDTO (76-99)
  • CompositionOptions (796-798)
controlplane/src/core/repositories/FeatureFlagRepository.ts (1)
  • SubgraphsToCompose (42-48)
composition/src/federation/types.ts (3)
  • ContractTagOptions (57-60)
  • FederationResult (33-33)
  • FederationResultWithContracts (55-55)
controlplane/src/workers/compose.ts (5)
controlplane/src/types/index.ts (2)
  • FederatedGraphDTO (76-99)
  • CompositionOptions (796-798)
controlplane/src/core/repositories/FeatureFlagRepository.ts (1)
  • SubgraphsToCompose (42-48)
composition/src/federation/types.ts (3)
  • ContractTagOptions (57-60)
  • FederationResult (33-33)
  • FederationResultWithContracts (55-55)
controlplane/src/core/composition/composition.ts (2)
  • composeFederatedContract (35-47)
  • composeFederatedGraphWithPotentialContracts (18-30)
composition/src/v1/federation/utils.ts (1)
  • newContractTagOptionsFromArrays (242-250)
controlplane/src/core/repositories/FederatedGraphRepository.ts (1)
controlplane/src/core/util.ts (1)
  • getFederationResultWithPotentialContracts (535-548)
controlplane/src/core/build-server.ts (1)
controlplane/src/workers/compose.ts (1)
  • getWorkerPool (28-37)
controlplane/src/core/bufservices/graph/setGraphRouterCompatibilityVersion.ts (1)
controlplane/src/core/routes.ts (1)
  • opts (64-69)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: build_push_image
  • GitHub Check: build_push_image
  • GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
  • GitHub Check: Analyze (go)
  • GitHub Check: build_push_image (nonroot)
  • GitHub Check: integration_test (./events)
  • GitHub Check: build_test
  • GitHub Check: build_push_image
  • GitHub Check: build_test
🔇 Additional comments (21)
controlplane/src/core/bufservices/federated-graph/updateFederatedGraph.ts (1)

122-122: LGTM!

The worker pool parameter is correctly threaded through to the repository layer for async composition.

controlplane/src/core/bufservices/feature-flag/deleteFeatureFlag.ts (1)

138-138: LGTM!

Worker pool parameter is correctly passed to the composition and deployment flow.

controlplane/src/core/bufservices/subgraph/publishFederatedSubgraph.ts (1)

577-577: LGTM!

Worker pool parameter is correctly threaded through to the subgraph update flow.

controlplane/src/core/bufservices/subgraph/updateSubgraph.ts (1)

187-187: LGTM!

Worker pool parameter is correctly passed to the subgraph update operation.

controlplane/src/core/bufservices/contract/createContract.ts (1)

209-209: LGTM!

Worker pool parameter is correctly passed to the compose and deploy operation during contract creation.

controlplane/src/core/bufservices/federated-graph/migrateFromApollo.ts (1)

159-159: LGTM!

Worker pool parameter is correctly threaded through to the Apollo migration composition flow.

controlplane/src/core/routes.ts (1)

8-8: Worker pool lifecycle management verified and correctly implemented.

All concerns from the review have been confirmed:

  1. Initialization order: Worker pool is created at line 477 before route initialization at line 512 ✓
  2. Graceful shutdown: Pool is properly destroyed at line 535 within fastify.gracefulShutdown() callback with logging ✓

The worker pool is actively used throughout the codebase (util.ts, repositories, bufservices) and cleanup is handled correctly alongside other worker cleanup (bullWorkers).

controlplane/package.json (1)

93-93: tinypool@^2.0.0 is appropriately specified with no security concerns.

The latest version of tinypool on npm is 2.0.0, which matches the PR specification. The version specification uses ^2.0.0, allowing for compatible updates within the 2.x range. No security vulnerabilities were found for tinypool in the GitHub vulnerability database.

controlplane/src/core/bufservices/subgraph/deleteFederatedSubgraph.ts (1)

189-189: LGTM! Worker pool parameter correctly propagated.

The addition of composeWorkerPool: opts.composeWorkerPool correctly threads the worker pool through to the composition and deployment flow, enabling async composition without blocking the main event loop.

controlplane/src/core/bufservices/federated-graph/moveFederatedGraph.ts (1)

128-128: LGTM! Worker pool parameter correctly propagated to both move operations.

The worker pool is correctly passed to both the main federated graph move (line 128) and contract graph moves (line 167), ensuring consistent async composition behavior across all affected graphs.

Also applies to: 167-167

controlplane/src/core/bufservices/feature-flag/createFeatureFlag.ts (1)

208-208: LGTM! Worker pool parameter correctly added.

The worker pool parameter is correctly propagated to composeAndDeployGraphs, enabling async composition for federated graphs affected by the new feature flag.

controlplane/src/core/bufservices/feature-flag/updateFeatureFlag.ts (1)

177-177: LGTM! Worker pool parameter correctly added.

The worker pool parameter is correctly propagated to composeAndDeployGraphs for handling composition of all affected federated graphs after feature flag update.

controlplane/src/core/bufservices/federated-graph/createFederatedGraph.ts (1)

235-235: LGTM! Worker pool parameter correctly added.

The worker pool parameter is correctly propagated to composeAndDeployGraphs, enabling async composition for the newly created federated graph.

controlplane/src/core/bufservices/feature-flag/enableFeatureFlag.ts (1)

122-122: LGTM! Worker pool parameter correctly added.

The worker pool parameter is correctly propagated to composeAndDeployGraphs, enabling async composition for federated graphs affected by the feature flag state change.

controlplane/src/core/bufservices/subgraph/moveSubgraph.ts (1)

120-120: LGTM! Worker pool parameter correctly added.

The worker pool parameter is correctly propagated to subgraphRepo.move, enabling async composition for all federated graphs affected by the subgraph move.

controlplane/src/core/bufservices/monograph/updateMonograph.ts (1)

153-153: LGTM! Worker pool parameter correctly propagated to both update operations.

The worker pool is correctly passed to both the federated graph update (line 153) and subgraph update (line 177), ensuring async composition for the monograph update flow.

Also applies to: 177-177

controlplane/src/core/bufservices/graph/setGraphRouterCompatibilityVersion.ts (1)

135-146: Wiring composeWorkerPool into composition is consistent and correct

Passing opts.composeWorkerPool through to composeAndDeployGraphs keeps this flow aligned with the rest of the composition/deploy paths and doesn’t alter existing control flow or error handling. No issues from this change.

controlplane/src/core/bufservices/monograph/publishMonograph.ts (1)

138-156: Monograph update correctly threads the compose worker pool

Adding opts.composeWorkerPool to the subgraphRepo.update call matches the updated repository signature and ensures monograph recomposition also benefits from the worker pool. No behavior change beyond using the pool.

controlplane/src/core/bufservices/contract/updateContract.ts (1)

126-145: Contract update is correctly wired to use the compose worker pool

Both the fedGraphRepo.update and fedGraphRepo.composeAndDeployGraphs calls now pass composeWorkerPool, which is consistent with the updated repository API and the rest of the PR’s wiring. The argument order is correct and there’s no change to the surrounding error-handling logic.

Also applies to: 151-174

controlplane/src/core/build-server.ts (1)

141-144: Worker pool creation and routing wiring look correct

The new composeWorkers config, instantiation of composeWorkerPool via getWorkerPool, and passing it into routes(...) cleanly expose the shared worker pool through RouterOptions to all buf services. This matches the rest of the PR’s composition wiring and keeps the pool scoped to the server lifecycle.

Also applies to: 477-513

controlplane/src/workers/compose.ts (1)

39-60: Composition branching logic in the worker looks correct

The contract vs non-contract branching mirrors the existing composition flow: contracts go through composeFederatedContract with tag options derived from federatedGraph.contract, while non-contract graphs go through composeFederatedGraphWithPotentialContracts with tagOptionsByContractName. The version and compositionOptions are threaded consistently.

This looks like a faithful extraction of the previous inline logic into the worker context.

Comment on lines +11 to 12
import { Tinypool } from 'tinypool';
import routes from './routes.js';
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

Harden composeWorkerPool initialization and shutdown handling

Two small robustness points around the worker-pool lifecycle:

  • getWorkerPool(opts.composeWorkers?.maxCount)! assumes it never returns undefined. If build() were ever called off the main thread, this would throw at startup. Guarding the result and either failing fast with a clearer error or falling back to inline composition would be safer.
  • During graceful shutdown, await composeWorkerPool.destroy() is unguarded. A try/catch with a warning log would avoid aborting the shutdown sequence if the pool destruction fails.

One possible shape (adjust as needed with corresponding type changes in RouterOptions/call sites):

-  const composeWorkerPool: Tinypool = getWorkerPool(opts.composeWorkers?.maxCount)!;
+  const composeWorkerPool = getWorkerPool(opts.composeWorkers?.maxCount);
+  if (!composeWorkerPool) {
+    logger.warn('compose worker pool could not be initialized; falling back to in-process composition');
+  }
@@
-      composeWorkerPool: composeWorkerPool,
+      composeWorkerPool,
@@
-  fastify.gracefulShutdown(async () => {
-    fastify.log.debug('Shutting down compose worker pool');
-
-    await composeWorkerPool.destroy();
-
-    fastify.log.debug('Compose worker pool shut down');
+  fastify.gracefulShutdown(async () => {
+    if (composeWorkerPool) {
+      fastify.log.debug('Shutting down compose worker pool');
+      try {
+        await composeWorkerPool.destroy();
+        fastify.log.debug('Compose worker pool shut down');
+      } catch (err) {
+        fastify.log.warn({ err }, 'Error while shutting down compose worker pool');
+      }
+    }

You could also change import { Tinypool } from 'tinypool'; to an import type if it’s only needed for typing.

Also applies to: 57-58, 477-477, 533-538


I'll verify the review comment by examining the codebase to understand the current implementation and whether the concerns raised are valid.


---

🏁 Script executed:

```shell
#!/bin/bash
# Find and examine the build-server.ts file
find . -name "build-server.ts" -type f 2>/dev/null | head -5

Length of output: 43



---

🏁 Script executed:

```shell
#!/bin/bash
# Examine the build-server.ts file, focusing on key lines
cat ./git/controlplane/src/core/build-server.ts

Length of output: 75



---

🏁 Script executed:

```shell
#!/bin/bash
# Examine the build-server.ts file
cat controlplane/src/core/build-server.ts | head -100

Length of output: 4297



---

🏁 Script executed:

```shell
#!/bin/bash
# Get more context around lines 57-58 and the initialization
sed -n '50,150p' controlplane/src/core/build-server.ts

Length of output: 2429



---

🏁 Script executed:

```shell
#!/bin/bash
# Search for build function and worker pool initialization
sed -n '400,550p' controlplane/src/core/build-server.ts

Length of output: 4234



---

🏁 Script executed:

```shell
#!/bin/bash
# Find and examine the getWorkerPool function
cat controlplane/src/workers/compose.js

Length of output: 67



---

🏁 Script executed:

```shell
#!/bin/bash
# Search for the compose file
find controlplane/src/workers -name "compose*" -type f

Length of output: 35



---

🏁 Script executed:

```shell
#!/bin/bash
# Examine the getWorkerPool function
cat controlplane/src/workers/compose.ts

Length of output: 2135


Harden composeWorkerPool initialization and shutdown handling

The function getWorkerPool() at line 27 in controlplane/src/workers/compose.ts has the signature export function getWorkerPool(maxCount?: number): Tinypool | undefined. It returns undefined when not on the main thread (line 32).

In controlplane/src/core/build-server.ts line 477, the code unsafely assumes a non-null result:

const composeWorkerPool: Tinypool = getWorkerPool(opts.composeWorkers?.maxCount)!;

If build() were ever called off the main thread, this assertion would fail at runtime. Guard the result and either fail fast with a clearer error or fall back to inline composition.

Additionally, the graceful shutdown at lines 533-538 calls await composeWorkerPool.destroy() without a try/catch. Wrap this in error handling to prevent shutdown sequence abortion if pool destruction fails.

One possible shape (adjust type annotations in RouterOptions/call sites as needed):

-  const composeWorkerPool: Tinypool = getWorkerPool(opts.composeWorkers?.maxCount)!;
+  const composeWorkerPool = getWorkerPool(opts.composeWorkers?.maxCount);
+  if (!composeWorkerPool) {
+    logger.warn('compose worker pool could not be initialized; falling back to in-process composition');
+  }
@@
-  fastify.gracefulShutdown(async () => {
-    fastify.log.debug('Shutting down compose worker pool');
-
-    await composeWorkerPool.destroy();
-
-    fastify.log.debug('Compose worker pool shut down');
+  fastify.gracefulShutdown(async () => {
+    if (composeWorkerPool) {
+      fastify.log.debug('Shutting down compose worker pool');
+      try {
+        await composeWorkerPool.destroy();
+        fastify.log.debug('Compose worker pool shut down');
+      } catch (err) {
+        fastify.log.warn({ err }, 'Error while shutting down compose worker pool');
+      }
+    }

Committable suggestion skipped: line range outside the PR's diff.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
controlplane/src/core/build-server.ts (1)

477-478: Guard worker-pool initialization and shutdown to avoid brittle failures

getWorkerPool() returns Tinypool | undefined, so the non-null assertion and unguarded destroy are brittle if build() is ever invoked off the main thread or the pool fails to initialize/destroy (e.g., env issues, worker crash). A small robustness tweak:

  • Fail fast with a clear error if getWorkerPool returns undefined, instead of relying on !.
  • Wrap destroy() in a try/catch so graceful shutdown still proceeds even if pool teardown fails.

For example:

-  const composeWorkerPool: Tinypool = getWorkerPool(opts.composeWorkers?.maxCount)!;
+  const composeWorkerPool = getWorkerPool(opts.composeWorkers?.maxCount);
+  if (!composeWorkerPool) {
+    logger.error('compose worker pool could not be initialized; build() must be called on the main thread');
+    throw new Error('compose worker pool initialization failed');
+  }
@@
-  fastify.gracefulShutdown(async () => {
-    fastify.log.debug('Shutting down compose worker pool');
-
-    await composeWorkerPool.destroy();
-
-    fastify.log.debug('Compose worker pool shut down');
+  fastify.gracefulShutdown(async () => {
+    fastify.log.debug('Shutting down compose worker pool');
+    try {
+      await composeWorkerPool.destroy();
+      fastify.log.debug('Compose worker pool shut down');
+    } catch (err) {
+      fastify.log.warn({ err }, 'Error while shutting down compose worker pool');
+    }

This keeps the “always use worker pool” invariant, but makes failure modes clearer and shutdown more resilient.

Also applies to: 533-537

🧹 Nitpick comments (3)
controlplane/src/core/build-server.ts (1)

11-12: Worker-pool wiring and config surface look good; minor type/import nit

The addition of composeWorkers to BuildConfig, creation of composeWorkerPool, and threading it into routes({ …, composeWorkerPool }) is coherent and keeps the concern localized to the build server.

If Tinypool is only used for typing here, consider switching to an import type { Tinypool } from 'tinypool'; (and relying on inference for composeWorkerPool) to avoid a runtime import where not needed, depending on your TS/module settings. This is purely optional.

Also applies to: 141-143, 477-478, 512-512

controlplane/src/core/util.ts (1)

14-14: Async worker-based composition helper looks correct; consider tightening typing

The new getFederationResultWithPotentialContracts that delegates to composeWorkerPool.run matches the worker-pool design and keeps composition logic centralized.

Two small type-level nits you might consider:

  • Use Tinypool’s generic to strongly type the result instead of relying on any inference.
  • Since run already returns a Promise, you can return it directly without an extra await.

For example:

-export async function getFederationResultWithPotentialContracts(
+export async function getFederationResultWithPotentialContracts(
   composeWorkerPool: Tinypool,
@@
-): Promise<FederationResult | FederationResultWithContracts> {
-  return await composeWorkerPool.run(
+): Promise<FederationResult | FederationResultWithContracts> {
+  return composeWorkerPool.run<FederationResult | FederationResultWithContracts>(
     {
       federatedGraph,
       subgraphsToCompose,
       tagOptionsByContractName,
       compositionOptions,
     },
     { name: 'composeFederatedGraph' },
   );
}

This makes downstream usage safer while preserving the current behavior.

Also applies to: 535-550

controlplane/src/core/repositories/FederatedGraphRepository.ts (1)

1514-1541: Worker-based composition in composeAndDeployGraphs is sound; consider keeping DB access consistently on the transaction

The updated composeAndDeployGraphs:

  • Constructs Composer with tx (instead of this.db), which is a nice improvement for transactional consistency.
  • Uses the shared composeWorkerPool via getFederationResultWithPotentialContracts inside the transaction to offload heavy composition work, which aligns with the PR’s goal.

One small consistency tweak you might consider: when you later fetch DTOs inside the same transaction, you still call this.byId (which uses this.db) rather than the transaction-backed repo you already instantiated:

const federatedGraphDTO = await this.byId(federatedGraph.id);

const contractDTO = await this.byId(contractId);

For tighter transactional semantics and to avoid mixing tx and this.db in the same flow, you could instead use the fedGraphRepo constructed with tx:

-        const federatedGraphDTO = await this.byId(federatedGraph.id);
+        const federatedGraphDTO = await fedGraphRepo.byId(federatedGraph.id);
@@
-          const contractDTO = await this.byId(contractId);
+          const contractDTO = await fedGraphRepo.byId(contractId);

Functionally this should be equivalent today, but it keeps all DB access in this path on the same transaction handle and makes the intent clearer.

Also applies to: 1588-1595, 1751-1758, 1793-1801

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cb7ec48 and 366aeb3.

📒 Files selected for processing (4)
  • controlplane/src/core/build-server.ts (5 hunks)
  • controlplane/src/core/repositories/FederatedGraphRepository.ts (10 hunks)
  • controlplane/src/core/util.ts (2 hunks)
  • controlplane/src/workers/compose.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • controlplane/src/workers/compose.ts
🧰 Additional context used
🧬 Code graph analysis (3)
controlplane/src/core/build-server.ts (1)
controlplane/src/workers/compose.ts (1)
  • getWorkerPool (28-35)
controlplane/src/core/repositories/FederatedGraphRepository.ts (2)
composition/src/federation/types.ts (2)
  • FederationResult (33-33)
  • FederationResultWithContracts (55-55)
controlplane/src/core/util.ts (1)
  • getFederationResultWithPotentialContracts (535-551)
controlplane/src/core/util.ts (3)
controlplane/src/types/index.ts (2)
  • FederatedGraphDTO (76-99)
  • CompositionOptions (796-798)
controlplane/src/core/repositories/FeatureFlagRepository.ts (1)
  • SubgraphsToCompose (42-48)
composition/src/federation/types.ts (3)
  • ContractTagOptions (57-60)
  • FederationResult (33-33)
  • FederationResultWithContracts (55-55)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
  • GitHub Check: build_push_image (nonroot)
  • GitHub Check: build_push_image
  • GitHub Check: image_scan
  • GitHub Check: integration_test (./events)
  • GitHub Check: build_test
  • GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
  • GitHub Check: image_scan (nonroot)
  • GitHub Check: build_test
  • GitHub Check: build_push_image
  • GitHub Check: build_test
  • GitHub Check: build_test
  • GitHub Check: build_push_image
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: Analyze (go)
  • GitHub Check: build_test
🔇 Additional comments (1)
controlplane/src/core/repositories/FederatedGraphRepository.ts (1)

32-32: Verified: composeWorkerPool threading is correctly wired throughout

The new composeWorkerPool requirement is consistently implemented across the codebase. Verification confirms:

  • update(), move(), and composeAndDeployGraphs() all correctly require and propagate the composeWorkerPool: Tinypool parameter
  • All sampled external call sites (deleteFederatedSubgraph, createFederatedGraph, updateContract, createContract, moveSubgraph, moveFederatedGraph) properly pass opts.composeWorkerPool to these methods
  • Internal routing through transaction-scoped repository instances is correct

No issues found.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 366aeb3 and 4e8bd05.

📒 Files selected for processing (1)
  • controlplane/src/workers/compose.ts (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
controlplane/src/workers/compose.ts (5)
controlplane/src/types/index.ts (2)
  • FederatedGraphDTO (76-99)
  • CompositionOptions (796-798)
controlplane/src/core/repositories/FeatureFlagRepository.ts (1)
  • SubgraphsToCompose (42-48)
composition/src/federation/types.ts (1)
  • ContractTagOptions (57-60)
controlplane/src/core/composition/composition.ts (2)
  • composeFederatedContract (35-47)
  • composeFederatedGraphWithPotentialContracts (18-30)
composition/src/v1/federation/utils.ts (1)
  • newContractTagOptionsFromArrays (242-250)
🪛 GitHub Actions: Controlplane CI
controlplane/src/workers/compose.ts

[error] 27-34: Git diff detected changes in controlplane/src/workers/compose.ts; exit code 1. The diff shows a formatting update to Tinypool config (multi-line object). Run the formatter and commit if intended.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Analyze (go)
  • GitHub Check: build_test
🔇 Additional comments (1)
controlplane/src/workers/compose.ts (1)

36-57: LGTM! Composition dispatch logic is correct.

The function correctly routes to contract-specific or general composition based on the presence of federatedGraph.contract. The parameter mapping to composeFederatedContract and composeFederatedGraphWithPotentialContracts aligns with the signatures defined in the composition module, and the use of newContractTagOptionsFromArrays properly constructs the required ContractTagOptions from the contract's tag arrays.

Comment on lines 28 to 34
export function getWorkerPool(maxCount?: number): Tinypool | undefined {
if (isMainThread) {
return new Tinypool({ filename: join(process.cwd(), 'dist/workers/compose.js'), name: 'composeFederatedGraph', maxThreads: maxCount });
} else {
return undefined;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix formatting to pass CI and reconsider hardcoded path.

Two issues identified:

  1. CI Failure: The pipeline indicates a formatting issue with the Tinypool configuration. The object should be formatted across multiple lines.

  2. Hardcoded Path: The path 'dist/workers/compose.js' is hardcoded. Past review comments suggest this was discussed and "fixed," but the hardcoded approach remains. While this may work in production when process.cwd() points to the deployment root, it could be fragile in development, testing, or non-standard deployment scenarios where the working directory differs.

Apply this diff to fix the formatting issue:

-    return new Tinypool({ filename: join(process.cwd(), 'dist/workers/compose.js'), name: 'composeFederatedGraph', maxThreads: maxCount });
+    return new Tinypool({
+      filename: join(process.cwd(), 'dist/workers/compose.js'),
+      name: 'composeFederatedGraph',
+      maxThreads: maxCount,
+    });

Consider whether a more robust path resolution would be appropriate:

// Alternative: derive the path relative to this module's location
const workerPath = import.meta.url.endsWith('.ts')
  ? join(process.cwd(), 'dist/workers/compose.js')
  : import.meta.url.replace(/\/workers\/compose\.(js|ts)$/, '/workers/compose.js');
🧰 Tools
🪛 GitHub Actions: Controlplane CI

[error] 27-34: Git diff detected changes in controlplane/src/workers/compose.ts; exit code 1. The diff shows a formatting update to Tinypool config (multi-line object). Run the formatter and commit if intended.

🤖 Prompt for AI Agents
In controlplane/src/workers/compose.ts around lines 28 to 34, fix the formatting
and hardcoded path: reformat the Tinypool constructor options to be spread
across multiple lines (each option on its own line) to satisfy CI formatting
rules, and replace the hardcoded join(process.cwd(), 'dist/workers/compose.js')
with a robust workerPath computed from import.meta.url (use a conditional: if
import.meta.url endsWith '.ts' derive the dist path from process.cwd(),
otherwise derive the .js path by replacing the module URL suffix to point to
workers/compose.js), then pass workerPath to Tinypool. Ensure the function still
returns undefined off the main thread.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
controlplane/src/workers/compose.ts (2)

5-6: Remove unused imports.

Both isMainThread and join are imported but never used in this file. Additionally, Tinypool on line 13 is imported but not used here.

Apply this diff to remove the unused imports:

-import { isMainThread } from 'node:worker_threads';
-import { join } from 'node:path';
 import {
   ContractTagOptions,
   FederationResult,
   FederationResultWithContracts,
   newContractTagOptionsFromArrays,
 } from '@wundergraph/composition';
-import { Tinypool } from 'tinypool';

28-49: Consider adding error handling for worker robustness.

The worker function has no try-catch blocks. If either composeFederatedContract or composeFederatedGraphWithPotentialContracts throws an exception, the worker thread will terminate ungracefully. While Tinypool handles worker crashes, explicit error handling would provide better observability and allow for structured error responses.

Consider wrapping the logic in a try-catch:

export default function ({
  federatedGraph,
  subgraphsToCompose,
  tagOptionsByContractName,
  compositionOptions,
}: Inputs): FederationResult | FederationResultWithContracts {
  try {
    if (federatedGraph.contract) {
      return composeFederatedContract(
        subgraphsToCompose.compositionSubgraphs,
        newContractTagOptionsFromArrays(federatedGraph.contract.excludeTags, federatedGraph.contract.includeTags),
        federatedGraph.routerCompatibilityVersion,
        compositionOptions,
      );
    }
    return composeFederatedGraphWithPotentialContracts(
      subgraphsToCompose.compositionSubgraphs,
      new Map(Object.entries(tagOptionsByContractName)),
      federatedGraph.routerCompatibilityVersion,
      compositionOptions,
    );
  } catch (error) {
    // Re-throw with context or return a structured error
    throw new Error(`Worker composition failed: ${error instanceof Error ? error.message : String(error)}`);
  }
}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4e8bd05 and ef629b3.

📒 Files selected for processing (2)
  • controlplane/src/core/build-server.ts (6 hunks)
  • controlplane/src/workers/compose.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • controlplane/src/core/build-server.ts
🧰 Additional context used
🧬 Code graph analysis (1)
controlplane/src/workers/compose.ts (5)
controlplane/src/types/index.ts (2)
  • FederatedGraphDTO (76-99)
  • CompositionOptions (796-798)
controlplane/src/core/repositories/FeatureFlagRepository.ts (1)
  • SubgraphsToCompose (42-48)
composition/src/federation/types.ts (3)
  • ContractTagOptions (57-60)
  • FederationResult (33-33)
  • FederationResultWithContracts (55-55)
controlplane/src/core/composition/composition.ts (2)
  • composeFederatedContract (35-47)
  • composeFederatedGraphWithPotentialContracts (18-30)
composition/src/v1/federation/utils.ts (1)
  • newContractTagOptionsFromArrays (242-250)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
  • GitHub Check: build_test
  • GitHub Check: build_test
  • GitHub Check: build_push_image
  • GitHub Check: build_push_image (nonroot)
  • GitHub Check: build_test
  • GitHub Check: image_scan
  • GitHub Check: image_scan (nonroot)
  • GitHub Check: build_push_image
  • GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
  • GitHub Check: integration_test (./telemetry)
  • GitHub Check: build_test
  • GitHub Check: build_push_image
  • GitHub Check: build_test
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: Analyze (go)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
controlplane/src/workers/compose.ts (1)

5-6: Remove unused imports.

The imports isMainThread, join, and Tinypool are not used anywhere in this file. Past review comments suggest the file structure has changed, leaving these imports orphaned.

Apply this diff to remove the unused imports:

-import { isMainThread } from 'node:worker_threads';
-import { join } from 'node:path';
 import {
   ContractTagOptions,
   FederationResult,
   FederationResultWithContracts,
   newContractTagOptionsFromArrays,
 } from '@wundergraph/composition';
-import { Tinypool } from 'tinypool';
 import {
   composeFederatedContract,
   composeFederatedGraphWithPotentialContracts,
 } from '../core/composition/composition.js';

Also applies to: 13-13

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ef629b3 and 3d32657.

📒 Files selected for processing (2)
  • controlplane/src/core/util.ts (2 hunks)
  • controlplane/src/workers/compose.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • controlplane/src/core/util.ts
🧰 Additional context used
🧬 Code graph analysis (1)
controlplane/src/workers/compose.ts (5)
controlplane/src/types/index.ts (2)
  • FederatedGraphDTO (76-99)
  • CompositionOptions (796-798)
controlplane/src/core/repositories/FeatureFlagRepository.ts (1)
  • SubgraphsToCompose (42-48)
composition/src/federation/types.ts (3)
  • ContractTagOptions (57-60)
  • FederationResult (33-33)
  • FederationResultWithContracts (55-55)
controlplane/src/core/composition/composition.ts (2)
  • composeFederatedContract (35-47)
  • composeFederatedGraphWithPotentialContracts (18-30)
composition/src/v1/federation/utils.ts (1)
  • newContractTagOptionsFromArrays (242-250)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
  • GitHub Check: build_push_image
  • GitHub Check: build_test
  • GitHub Check: build_push_image
  • GitHub Check: image_scan
  • GitHub Check: build_push_image (nonroot)
  • GitHub Check: image_scan (nonroot)
  • GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
  • GitHub Check: integration_test (./telemetry)
  • GitHub Check: build_test
  • GitHub Check: integration_test (./events)
  • GitHub Check: Analyze (go)
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: build_push_image
  • GitHub Check: build_test
  • GitHub Check: build_test
  • GitHub Check: build_test
🔇 Additional comments (1)
controlplane/src/workers/compose.ts (1)

28-49: Code is correct; parameter usage pattern is appropriate.

The worker function properly handles internal branching based on federatedGraph.contract presence. While tagOptionsByContractName is unused when a contract exists, this is not a concern because callers unconditionally pass all parameters to the worker, leaving routing logic to the worker itself. This is a sound design pattern—callers pass complete context, and the worker determines execution flow. No changes needed.

Comment on lines 21 to 26
interface Inputs {
federatedGraph: FederatedGraphDTO;
subgraphsToCompose: SubgraphsToCompose;
tagOptionsByContractName: Record<string, ContractTagOptions>;
compositionOptions?: CompositionOptions;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

ContractTagOptions contains non-serializable Sets.

While the Map serialization issue was fixed by using Record<string, ContractTagOptions>, the ContractTagOptions type itself contains Set<string> fields (tagNamesToExclude and tagNamesToInclude), which also cannot be serialized via structured clone. This will cause a DataCloneError when Tinypool attempts to pass the data to the worker thread.

The interface should use serializable array-based tag options. Apply this diff:

 interface Inputs {
   federatedGraph: FederatedGraphDTO;
   subgraphsToCompose: SubgraphsToCompose;
-  tagOptionsByContractName: Record<string, ContractTagOptions>;
+  tagOptionsByContractName: Record<string, { tagNamesToExclude: string[]; tagNamesToInclude: string[] }>;
   compositionOptions?: CompositionOptions;
 }

Then update the worker function to convert arrays back to Sets:

   return composeFederatedGraphWithPotentialContracts(
     subgraphsToCompose.compositionSubgraphs,
-    new Map(Object.entries(tagOptionsByContractName)),
+    new Map(
+      Object.entries(tagOptionsByContractName).map(([name, opts]) => [
+        name,
+        {
+          tagNamesToExclude: new Set(opts.tagNamesToExclude),
+          tagNamesToInclude: new Set(opts.tagNamesToInclude),
+        },
+      ])
+    ),
     federatedGraph.routerCompatibilityVersion,
     compositionOptions,
   );

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In controlplane/src/workers/compose.ts around lines 21 to 26, the Inputs
interface still references ContractTagOptions that uses non-serializable Sets
(tagNamesToExclude and tagNamesToInclude), causing DataCloneError when sending
to the worker; change the Inputs type so tag options are array-based (e.g.,
tagNamesToExclude?: string[] and tagNamesToInclude?: string[]) or replace
ContractTagOptions with a serializable variant, then in the worker entry
function convert those arrays back into Sets (new Set(array)) before using them
so runtime logic remains unchanged.

@pepol pepol marked this pull request as draft November 14, 2025 11:17
@pepol pepol marked this pull request as ready for review November 14, 2025 20:46
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
controlplane/test/test-util.ts (1)

405-423: Inconsistency: composeWorkerPool missing from return value.

The AI-generated summary indicates that the pool should be exposed via queues.composeWorkerPool, but it's not included in the returned queues object. This prevents tests from accessing the pool for cleanup or verification purposes.

This is addressed by the diff in the previous comment.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3d32657 and ad4178c.

📒 Files selected for processing (1)
  • controlplane/test/test-util.ts (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
  • GitHub Check: build_test
  • GitHub Check: build_push_image
  • GitHub Check: build_push_image (nonroot)
  • GitHub Check: build_push_image
  • GitHub Check: build_test
  • GitHub Check: image_scan
  • GitHub Check: build_test
  • GitHub Check: build_push_image
  • GitHub Check: build_test
  • GitHub Check: image_scan (nonroot)
  • GitHub Check: integration_test (./telemetry)
  • GitHub Check: integration_test (./events)
  • GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
  • GitHub Check: build_test
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: Analyze (go)
🔇 Additional comments (1)
controlplane/test/test-util.ts (1)

15-15: LGTM!

The Tinypool import is correctly added to support the worker pool creation.

@pepol pepol marked this pull request as draft November 16, 2025 21:22
@pepol pepol marked this pull request as ready for review November 17, 2025 20:49
@pepol pepol marked this pull request as draft November 17, 2025 20:51
@pepol pepol marked this pull request as ready for review November 17, 2025 20:51
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
controlplane/src/core/repositories/SchemaCheckRepository.ts (2)

233-236: Avoid string-built SQL; use parameters to prevent quoting bugs and injection.

incomingChanges is built via string interpolation with changeType/path. Paths may contain quotes or commas, breaking the SQL; this also risks injection. Build the array parametrically with Drizzle instead.

Apply this refactor:

-      const incomingChanges = `array[${changes.map((c) => `('${c.changeType}'::schema_change_type, '${c.path}'::text)`)}]`;
+      const incomingChanges = sql`array[${sql.join(
+        changes.map((c) => sql`(${c.changeType}::schema_change_type, ${c.path}::text)`),
+        sql`, `
+      )}]`;

And keep storedChanges as-is; only the user-supplied pieces (changeType/path) must be parameterized.


831-889: proposalMatchMessage can start with "undefined" due to += on an undefined variable.

Initialize or aggregate safely to avoid leaking “undefined” prefixes.

Apply one of these:

-    let proposalMatchMessage: string | undefined;
+    let proposalMatchMessage: string | undefined;
+    const proposalMessages: string[] = [];
...
-              proposalMatchMessage += `The subgraph ${subgraphName}'s schema does not match to this subgraph's schema in any approved proposal.\n`;
+              proposalMessages.push(
+                `The subgraph ${subgraphName}'s schema does not match to this subgraph's schema in any approved proposal.`
+              );
...
-      proposalMatchMessage,
+      proposalMatchMessage: proposalMessages.length ? proposalMessages.join('\n') : undefined,

Or initialize to empty string and trim when returning.

controlplane/src/core/composition/composer.ts (2)

686-697: Type mismatch: errors field is string[], but catch pushes Error objects.

Convert errors to strings to match the updated type and avoid runtime surprises.

Apply:

-        composedGraphs.push({
+        composedGraphs.push({
           id: graph.id,
           name: graph.name,
           namespace: graph.namespace,
           namespaceId: graph.namespaceId,
           targetID: graph.targetId,
           fieldConfigurations: [],
-          errors: [e],
+          errors: [e instanceof Error ? e.message : String(e)],
           subgraphs: [],
           warnings: [],
         });

856-866: Same issue here: convert caught error to string.

Keep the type consistent everywhere.

-        composedGraphs.push({
+        composedGraphs.push({
           id: graph.id,
           name: graph.name,
           namespace: graph.namespace,
           namespaceId: graph.namespaceId,
           targetID: graph.targetId,
           fieldConfigurations: [],
-          errors: [e],
+          errors: [e instanceof Error ? e.message : String(e)],
           subgraphs: [],
           warnings: [],
         });
🧹 Nitpick comments (2)
controlplane/src/core/repositories/FederatedGraphRepository.ts (2)

1643-1666: Prefer worker-provided routerExecutionConfig for base; skip redundant recompute.

Use outputs.routerExecutionConfig if exposed, falling back only if absent.

Apply:

-          const routerExecutionConfig = buildRouterExecutionConfig(
-            outputs.composedGraph,
-            outputs.federatedSchemaVersionId,
-            federatedGraph.routerCompatibilityVersion,
-          );
+          const routerExecutionConfig =
+            (outputs as any).routerExecutionConfig ??
+            buildRouterExecutionConfig(
+              outputs.composedGraph,
+              outputs.federatedSchemaVersionId,
+              federatedGraph.routerCompatibilityVersion,
+            );

1673-1693: Do the same for contracts: consume worker-provided contract router configs if available.

Avoid recomputation on the request thread when worker already built them.

-          for (const contractGraph of outputs.contractGraphs) {
-            const contractRouterExecutionConfig = buildRouterExecutionConfig(
-              contractGraph.composedContract,
-              contractGraph.contractSchemaVersionId,
-              federatedGraph.routerCompatibilityVersion,
-            );
+          for (const contractGraph of outputs.contractGraphs) {
+            const contractRouterExecutionConfig =
+              (contractGraph as any).contractRouterExecutionConfig ??
+              buildRouterExecutionConfig(
+                contractGraph.composedContract,
+                contractGraph.contractSchemaVersionId,
+                federatedGraph.routerCompatibilityVersion,
+              );
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ad4178c and 9b51492.

📒 Files selected for processing (7)
  • controlplane/src/core/bufservices/subgraph/fixSubgraphSchema.ts (1 hunks)
  • controlplane/src/core/composition/composer.ts (2 hunks)
  • controlplane/src/core/repositories/FederatedGraphRepository.ts (18 hunks)
  • controlplane/src/core/repositories/SchemaCheckRepository.ts (2 hunks)
  • controlplane/src/core/repositories/SubgraphRepository.ts (7 hunks)
  • controlplane/src/core/util.ts (3 hunks)
  • controlplane/src/workers/compose.ts (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • controlplane/src/core/util.ts
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-09-08T20:57:07.946Z
Learnt from: JivusAyrus
Repo: wundergraph/cosmo PR: 2156
File: controlplane/src/core/repositories/SubgraphRepository.ts:1746-1763
Timestamp: 2025-09-08T20:57:07.946Z
Learning: The checkSubgraphSchema.ts file already correctly implements linked subgraph functionality, using byName(linkedSubgraph.name, linkedSubgraph.namespace) to fetch target subgraphs and properly handles parse(newSchemaSDL) for schema building. The implementation doesn't need fixes for byId usage or schema parsing as it's already correct.

Applied to files:

  • controlplane/src/core/bufservices/subgraph/fixSubgraphSchema.ts
📚 Learning: 2025-08-29T10:28:04.846Z
Learnt from: JivusAyrus
Repo: wundergraph/cosmo PR: 2156
File: controlplane/src/core/repositories/SubgraphRepository.ts:1749-1751
Timestamp: 2025-08-29T10:28:04.846Z
Learning: In the controlplane codebase, authentication and authorization checks (including organization scoping) are handled at the service layer in files like unlinkSubgraph.ts before calling repository methods. Repository methods like unlinkSubgraph() in SubgraphRepository.ts can focus purely on data operations without redundant security checks.

Applied to files:

  • controlplane/src/core/repositories/SubgraphRepository.ts
📚 Learning: 2025-09-10T11:15:52.157Z
Learnt from: JivusAyrus
Repo: wundergraph/cosmo PR: 2156
File: controlplane/src/core/repositories/ProposalRepository.ts:562-572
Timestamp: 2025-09-10T11:15:52.157Z
Learning: The getLatestCheckForProposal function in controlplane/src/core/repositories/ProposalRepository.ts is only called during proposal creation or updates, so proposal match error checking (hasProposalMatchError) is not needed since the proposal is being modified itself rather than being matched against.

Applied to files:

  • controlplane/src/core/repositories/SchemaCheckRepository.ts
🧬 Code graph analysis (2)
controlplane/src/workers/compose.ts (6)
controlplane/src/types/index.ts (2)
  • FederatedGraphDTO (76-99)
  • CompositionOptions (796-798)
controlplane/src/core/repositories/FeatureFlagRepository.ts (1)
  • SubgraphsToCompose (42-48)
composition/src/federation/types.ts (3)
  • ContractTagOptions (57-60)
  • FederationResult (33-33)
  • FederationResultWithContracts (55-55)
controlplane/src/core/composition/composer.ts (4)
  • ComposedFederatedGraph (232-245)
  • CompositionDeployResult (247-249)
  • mapResultToComposedGraph (211-230)
  • buildRouterExecutionConfig (94-111)
controlplane/src/core/util.ts (1)
  • getFederationResultWithPotentialContracts (537-559)
controlplane/src/core/errors/errors.ts (1)
  • unsuccessfulBaseCompositionError (41-52)
controlplane/src/core/repositories/FederatedGraphRepository.ts (2)
controlplane/src/core/composition/composer.ts (1)
  • buildRouterExecutionConfig (94-111)
controlplane/src/types/index.ts (1)
  • FederatedGraphDTO (76-99)
🔇 Additional comments (8)
controlplane/src/core/repositories/SchemaCheckRepository.ts (2)

1018-1024: LGTM: storing stringified composition errors.

Using the string messages for CompositionError aligns with the new ComposedFederatedGraph.errors: string[]. Looks good.


1026-1033: LGTM: storing stringified composition warnings.

Same as above for warnings; consistent with the updated types.

controlplane/src/core/composition/composer.ts (1)

225-229: LGTM: mapping errors/warnings to strings.

mapResultToComposedGraph now returns string[] for errors/warnings; consistent with downstream storage.

controlplane/src/core/bufservices/subgraph/fixSubgraphSchema.ts (1)

180-189: LGTM: store string messages for composition errors.

The change to message: error fits the upstream types.

controlplane/src/core/repositories/SubgraphRepository.ts (3)

469-477: LGTM: composeWorkerPool propagated and used in composeAndDeployGraphs.

Threading the pool through update() is correct and aligns with the new composition path.


2020-2027: LGTM: string messages for composition errors.

Matches the updated types.


2030-2036: LGTM: string messages for composition warnings.

Matches the updated types.

controlplane/src/workers/compose.ts (1)

29-32: Remove this review comment — the serialization concern does not apply to this project.

Node.js worker_threads uses the HTML structured clone algorithm for workerData and postMessage, which supports Map and Set (including nested Maps/Sets inside plain objects), and Tinypool uses node:worker_threads and inherits the same cloning behavior. The project's Node.js requirement (≥22.11.0) far exceeds any version where this support might be missing. ContractTagOptions contains only Set<string> fields, which are fully supported by structured cloning.

The current code is correct. The proposed refactoring to Record<string, { tagNamesToExclude: string[]; tagNamesToInclude: string[] }> would reduce type safety and clarity without addressing any real problem.

Likely an incorrect or invalid review comment.

Comment on lines 1604 to 1637
portMain.on('message', async (resultJSON: string) => {
const result: ComposeBaseMessage = JSON.parse(resultJSON);

const routerExecutionConfig = buildRouterExecutionConfig(
result.composedGraph,
result.federatedSchemaVersionId,
federatedGraph.routerCompatibilityVersion,
);
}

// Collect all composition warnings
allCompositionWarnings.push(
...result.warnings.map((w) => ({
federatedGraphName: federatedGraph.name,
namespace: federatedGraph.namespace,
message: w.message,
featureFlag: subgraphsToCompose.featureFlagName || '',
})),
);
const baseComposition = await composer.saveComposition({
composedGraph: result.composedGraph,
composedById: actorId,
isFeatureFlagComposition: subgraphsToCompose.isFeatureFlagComposition,
federatedSchemaVersionId: result.federatedSchemaVersionId,
routerExecutionConfig,
featureFlagId: subgraphsToCompose.featureFlagId,
});

if (!subgraphsToCompose.isFeatureFlagComposition && !result.success && !federatedGraph.contract) {
allCompositionErrors.push(unsuccessfulBaseCompositionError(federatedGraph.name, federatedGraph.namespace));
}
const contractGraphs: Map<string, FederatedGraphDTO> = new Map();
for (const contractName of result.contractNames) {
const contractGraph = await fedGraphRepo.byName(contractName, federatedGraph.namespace);
if (!contractGraph) {
throw new Error(`The contract graph "${contractName}" was not found.`);
}
contractGraphs.set(contractName, contractGraph);
}

const baseResult: ComposeBaseResult = {
compositionDeployResult: baseComposition,
contractGraphs,
};

const composedGraph = mapResultToComposedGraph(federatedGraph, subgraphsToCompose.subgraphs, result);
portMain.postMessage(baseResult);
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Stop parsing JSON and use RouterConfig from worker; avoid recomputing.

After switching the worker to post a structured BaseMessage with routerExecutionConfig, consume it directly and skip rebuild. Also protect the handler with try/catch to ensure a reply.

Apply:

-          portMain.on('message', async (resultJSON: string) => {
-            const result: ComposeBaseMessage = JSON.parse(resultJSON);
-
-            const routerExecutionConfig = buildRouterExecutionConfig(
-              result.composedGraph,
-              result.federatedSchemaVersionId,
-              federatedGraph.routerCompatibilityVersion,
-            );
+          portMain.on('message', async (result: ComposeBaseMessage) => {
+            const routerExecutionConfig =
+              result.routerExecutionConfig ??
+              buildRouterExecutionConfig(
+                result.composedGraph,
+                result.federatedSchemaVersionId,
+                federatedGraph.routerCompatibilityVersion,
+              );
             const baseComposition = await composer.saveComposition({
               composedGraph: result.composedGraph,
               composedById: actorId,
               isFeatureFlagComposition: subgraphsToCompose.isFeatureFlagComposition,
               federatedSchemaVersionId: result.federatedSchemaVersionId,
               routerExecutionConfig,
               featureFlagId: subgraphsToCompose.featureFlagId,
             });

This aligns main/worker and removes the JSON stringify dependency.

🤖 Prompt for AI Agents
In controlplane/src/core/repositories/FederatedGraphRepository.ts around lines
1604 to 1637, the handler currently parses a JSON string and rebuilds
routerExecutionConfig; change it to accept the structured ComposeBaseMessage
sent by the worker (don’t JSON.parse), use result.routerExecutionConfig
(RouterConfig) directly instead of recomputing via buildRouterExecutionConfig,
and wrap the entire handler body in try/catch so any error is caught and an
error response is posted back through portMain.postMessage (ensuring the worker
always receives a reply).

Comment on lines 125 to 133
const message: BaseMessage = {
composedGraph,
federatedSchemaVersionId,
// routerExecutionConfig,
contractNames,
};
const messageJSON = JSON.stringify(message);

port.postMessage(messageJSON);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don’t JSON.stringify composedGraph; it likely contains non-serializable/cyclic GraphQLSchema. Send RouterConfig/object instead.

mapResultToComposedGraph subgraphs include GraphQLSchema; JSON.stringify will throw. Use structured messaging with a lean payload and include routerExecutionConfig computed in the worker; the main thread can avoid recomputing it.

Minimal change:

 export interface BaseMessage {
-  composedGraph: ComposedFederatedGraph;
+  composedGraph: ComposedFederatedGraph; // stripped of non-serializable fields
   contractNames: string[];
   federatedSchemaVersionId: `${string}-${string}-${string}-${string}-${string}`;
-  // routerExecutionConfig?: RouterConfig;
+  routerExecutionConfig?: RouterConfig;
 }
@@
-    const message: BaseMessage = {
-      composedGraph,
-      federatedSchemaVersionId,
-      // routerExecutionConfig,
-      contractNames,
-    };
-    const messageJSON = JSON.stringify(message);
-    port.postMessage(messageJSON);
+    // Remove non-serializable fields before posting
+    const leanComposedGraph: ComposedFederatedGraph = {
+      ...composedGraph,
+      subgraphs: composedGraph.subgraphs.map((s: any) => {
+        const { schema, configurationDataByTypeName, protoSchema, mapping, imageReference, ...rest } = s;
+        return rest;
+      }),
+    };
+    const message: BaseMessage = {
+      composedGraph: leanComposedGraph,
+      federatedSchemaVersionId,
+      routerExecutionConfig,
+      contractNames,
+    };
+    port.postMessage(message);

Then in the main thread, use routerExecutionConfig from the message (see linked change below).

🤖 Prompt for AI Agents
In controlplane/src/workers/compose.ts around lines 125-133, the code
JSON.stringifys composedGraph which contains a non-serializable/cyclic
GraphQLSchema and will throw; instead remove composedGraph from the message,
compute (or include) a serializable routerExecutionConfig/routerConfig in the
worker, and send a lean, structured payload (e.g., { routerExecutionConfig,
federatedSchemaVersionId, contractNames }) via port.postMessage without
JSON.stringify so the main thread can use the provided routerExecutionConfig and
avoid recomputation; ensure all fields in the message are plain serializable
objects.

Comment on lines 135 to 147
port.on('message', (baseResult: BaseResult) => {
if (!result.success || !baseResult.compositionDeployResult.schemaVersionId || !routerExecutionConfig) {
const outputs: Outputs = {
resultSuccess: result.success,
composedGraph,
federatedSchemaVersionId,
schemaVersionId: baseResult.compositionDeployResult.schemaVersionId,
// routerExecutionConfig,
contractGraphs: [],
allCompositionErrors,
allCompositionWarnings,
};
resolve(outputs);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard message handler with try/catch to avoid deadlock if saveComposition fails.

If composer.saveComposition throws, the worker will await forever. Always reply to the worker.

Apply:

-    port.on('message', (baseResult: BaseResult) => {
+    port.on('message', (baseResult: BaseResult) => {
       if (!result.success || !baseResult.compositionDeployResult.schemaVersionId || !routerExecutionConfig) {
         const outputs: Outputs = {
           resultSuccess: result.success,
           composedGraph,
           federatedSchemaVersionId,
           schemaVersionId: baseResult.compositionDeployResult.schemaVersionId,
           // routerExecutionConfig,
           contractGraphs: [],
           allCompositionErrors,
           allCompositionWarnings,
         };
         resolve(outputs);
       }
+      // else branch continues below
     });

Wrap the inner body in try/catch and on error resolve with resultSuccess=false and existing aggregates.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In controlplane/src/workers/compose.ts around lines 135 to 147, the
port.on('message') handler body must be wrapped in a try/catch so a thrown error
(e.g., from composer.saveComposition) does not leave the worker waiting; enclose
the current inner logic in try { ... } catch (err) { resolve({ resultSuccess:
false, composedGraph, federatedSchemaVersionId, schemaVersionId: undefined,
contractGraphs: [], allCompositionErrors, allCompositionWarnings }); } (use the
existing aggregate variables) to ensure the worker is always resolved on error.

@pepol pepol marked this pull request as draft November 17, 2025 21:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants