-
Notifications
You must be signed in to change notification settings - Fork 192
feat(controlplane): run federated graph composition in worker pool to unblock main event loop #2326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(controlplane): run federated graph composition in worker pool to unblock main event loop #2326
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughAdds a Tinypool-based compose worker pool and new worker module, introduces composeWorkerPool across router options, services, and repositories, updates composition flows to use worker IPC, and initializes/destroys the pool in the build server and test harness. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Potential hotspots to inspect:
Possibly related PRs
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Router-nonroot image scan passed✅ No security vulnerabilities found in image: |
…ck main event loop
28a2c45 to
cb7ec48
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (6)
controlplane/src/core/repositories/SubgraphRepository.ts (1)
16-16: SubgraphRepository correctly propagates the worker pool, with minor ergonomics considerationsThe
composeWorkerPoolparameter is consistently added toupdateandmoveand passed through tofedGraphRepo.composeAndDeployGraphs, so subgraph-driven recomposition now uses the shared worker pool as intended. This aligns with the broader wiring in the PR and doesn’t alter existing control flow.Two optional refinements you might consider:
- Make
composeWorkerPoolTinypool | undefined(and handle that insidecomposeAndDeployGraphs) to keepSubgraphRepositorymethods easier to call from non-HTTP contexts or tests that may not want to spin up a pool.- If
Tinypoolis only needed as a type here, switch toimport type { Tinypool } from 'tinypool';to avoid a value import in this module.These are nice-to-haves and not blockers.
Also applies to: 224-248, 464-477, 492-509, 551-562
controlplane/src/workers/compose.ts (1)
28-37: Worker pool creation and filename resolution could be more robustRight now
getWorkerPoolassumesprocess.cwd()points at the controlplane package root and instantiates a newTinypooleach time it’s called. In a monorepo or when running via different tooling,process.cwd()may not match the package root, which would break thedist/workers/compose.jspath, and repeated calls could create multiple pools unnecessarily.Consider:
- Deriving the JS worker path relative to this module instead of
process.cwd()(e.g., viafileURLToPath(import.meta.url)+path.resolveintodist), so it’s independent of the working directory.- Caching a single
Tinypoolinstance at module level (or in the caller) and reusing it, unless you explicitly need separate pools per caller.controlplane/src/core/util.ts (1)
14-14: Async worker delegation is fine; watch for callsite and naming ergonomicsThe new async
getFederationResultWithPotentialContractscorrectly forwards the full payload intocomposeWorkerPool.run(..., { name: 'composeFederatedGraph' })and returns the union result as expected. Two follow-ups to keep this maintainable:
- Now that this function always uses a worker, all call sites must both pass a non‑undefined
Tinypoolandawaitthe Promise. It’s worth double‑checking that there are no remaining synchronous usages or tests assuming a sync API.- The string
'composeFederatedGraph'is now a coupling point between this helper, the worker export, and the pool configuration. If you ever rename the worker function or poolname, this will silently break. Consider centralizing this task name in a shared constant or at least adding a small comment near both usages to keep them in sync.Also applies to: 535-547
controlplane/src/core/repositories/FederatedGraphRepository.ts (3)
197-223: update(): composeWorkerPool threading looks correct; ensure all callers pass a shared poolAdding
composeWorkerPool: Tinypoolto theupdatepayload and forwarding it intocomposeAndDeployGraphswires the worker pool into the recomposition path cleanly. There’s no behavioral change when label matchers don’t change.The main thing to verify is that all service-layer callers of
FederatedGraphRepository.updatenow supply a long-livedcomposeWorkerPoolinstance rather than constructing a new pool per call.Also applies to: 332-343
361-377: move(): worker pool parameter is propagated consistentlyThe new
composeWorkerPool: Tinypoolparameter is correctly passed intocomposeAndDeployGraphsfor both the contract-specific path and the base federated graph path, so both code paths now use the worker pool for composition.Similar to
update, it’s worth confirming that the service layer passes the same shared pool instance intomove, rather than creating ad-hoc pools, to avoid unnecessary thread pools and overhead.Also applies to: 420-458
1507-1526: composeAndDeployGraphs(): good transactional change; worker call looks soundWithin
composeAndDeployGraphs:
- Passing
composeWorkerPool: Tinypoolinto the method signature and then intogetFederationResultWithPotentialContractscleanly offloads the actual composition work to the worker pool.- Switching
Composerto usetxinstead ofthis.dbshould make all composition-related writes participate in the same ambient transaction started at the top ofcomposeAndDeployGraphs, which improves consistency.- The payload passed to
getFederationResultWithPotentialContracts(federatedGraph,subgraphsToCompose,tagOptionsByContractName,compositionOptions) matches the worker’sInputsshape, so the worker-sidecomposeFederatedGraphhas everything it needs.Two things to double‑check:
- All current and future callers of
composeAndDeployGraphsprovide a valid, sharedcomposeWorkerPool(noundefined, no per-call instantiation).- There are tests that exercise both the “base graph + contracts” case and the “contract graph only” case to confirm the worker-based composition continues to yield the same
FederationResult/FederationResultWithContractsshapes used by the rest of this method.Also applies to: 1527-1541, 1587-1595
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (24)
controlplane/package.json(1 hunks)controlplane/src/core/bufservices/contract/createContract.ts(1 hunks)controlplane/src/core/bufservices/contract/updateContract.ts(2 hunks)controlplane/src/core/bufservices/feature-flag/createFeatureFlag.ts(1 hunks)controlplane/src/core/bufservices/feature-flag/deleteFeatureFlag.ts(1 hunks)controlplane/src/core/bufservices/feature-flag/enableFeatureFlag.ts(1 hunks)controlplane/src/core/bufservices/feature-flag/updateFeatureFlag.ts(1 hunks)controlplane/src/core/bufservices/federated-graph/createFederatedGraph.ts(1 hunks)controlplane/src/core/bufservices/federated-graph/migrateFromApollo.ts(1 hunks)controlplane/src/core/bufservices/federated-graph/moveFederatedGraph.ts(2 hunks)controlplane/src/core/bufservices/federated-graph/updateFederatedGraph.ts(1 hunks)controlplane/src/core/bufservices/graph/setGraphRouterCompatibilityVersion.ts(1 hunks)controlplane/src/core/bufservices/monograph/publishMonograph.ts(1 hunks)controlplane/src/core/bufservices/monograph/updateMonograph.ts(2 hunks)controlplane/src/core/bufservices/subgraph/deleteFederatedSubgraph.ts(1 hunks)controlplane/src/core/bufservices/subgraph/moveSubgraph.ts(1 hunks)controlplane/src/core/bufservices/subgraph/publishFederatedSubgraph.ts(1 hunks)controlplane/src/core/bufservices/subgraph/updateSubgraph.ts(1 hunks)controlplane/src/core/build-server.ts(6 hunks)controlplane/src/core/repositories/FederatedGraphRepository.ts(10 hunks)controlplane/src/core/repositories/SubgraphRepository.ts(5 hunks)controlplane/src/core/routes.ts(2 hunks)controlplane/src/core/util.ts(2 hunks)controlplane/src/workers/compose.ts(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-29T10:28:04.846Z
Learnt from: JivusAyrus
Repo: wundergraph/cosmo PR: 2156
File: controlplane/src/core/repositories/SubgraphRepository.ts:1749-1751
Timestamp: 2025-08-29T10:28:04.846Z
Learning: In the controlplane codebase, authentication and authorization checks (including organization scoping) are handled at the service layer in files like unlinkSubgraph.ts before calling repository methods. Repository methods like unlinkSubgraph() in SubgraphRepository.ts can focus purely on data operations without redundant security checks.
Applied to files:
controlplane/src/core/bufservices/subgraph/publishFederatedSubgraph.tscontrolplane/src/core/bufservices/subgraph/updateSubgraph.tscontrolplane/src/core/bufservices/subgraph/moveSubgraph.tscontrolplane/src/core/bufservices/subgraph/deleteFederatedSubgraph.tscontrolplane/src/core/repositories/FederatedGraphRepository.tscontrolplane/src/core/repositories/SubgraphRepository.ts
📚 Learning: 2025-09-17T20:55:39.456Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2172
File: router/core/graph_server.go:0-0
Timestamp: 2025-09-17T20:55:39.456Z
Learning: The Initialize method in router/internal/retrytransport/manager.go has been updated to properly handle feature-flag-only subgraphs by collecting subgraphs from both routerConfig.GetSubgraphs() and routerConfig.FeatureFlagConfigs.ConfigByFeatureFlagName, ensuring all subgraphs receive retry configuration.
Applied to files:
controlplane/src/core/bufservices/feature-flag/updateFeatureFlag.tscontrolplane/src/core/bufservices/feature-flag/createFeatureFlag.ts
🧬 Code graph analysis (5)
controlplane/src/core/util.ts (3)
controlplane/src/types/index.ts (2)
FederatedGraphDTO(76-99)CompositionOptions(796-798)controlplane/src/core/repositories/FeatureFlagRepository.ts (1)
SubgraphsToCompose(42-48)composition/src/federation/types.ts (3)
ContractTagOptions(57-60)FederationResult(33-33)FederationResultWithContracts(55-55)
controlplane/src/workers/compose.ts (5)
controlplane/src/types/index.ts (2)
FederatedGraphDTO(76-99)CompositionOptions(796-798)controlplane/src/core/repositories/FeatureFlagRepository.ts (1)
SubgraphsToCompose(42-48)composition/src/federation/types.ts (3)
ContractTagOptions(57-60)FederationResult(33-33)FederationResultWithContracts(55-55)controlplane/src/core/composition/composition.ts (2)
composeFederatedContract(35-47)composeFederatedGraphWithPotentialContracts(18-30)composition/src/v1/federation/utils.ts (1)
newContractTagOptionsFromArrays(242-250)
controlplane/src/core/repositories/FederatedGraphRepository.ts (1)
controlplane/src/core/util.ts (1)
getFederationResultWithPotentialContracts(535-548)
controlplane/src/core/build-server.ts (1)
controlplane/src/workers/compose.ts (1)
getWorkerPool(28-37)
controlplane/src/core/bufservices/graph/setGraphRouterCompatibilityVersion.ts (1)
controlplane/src/core/routes.ts (1)
opts(64-69)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: build_push_image
- GitHub Check: build_push_image
- GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
- GitHub Check: Analyze (go)
- GitHub Check: build_push_image (nonroot)
- GitHub Check: integration_test (./events)
- GitHub Check: build_test
- GitHub Check: build_push_image
- GitHub Check: build_test
🔇 Additional comments (21)
controlplane/src/core/bufservices/federated-graph/updateFederatedGraph.ts (1)
122-122: LGTM!The worker pool parameter is correctly threaded through to the repository layer for async composition.
controlplane/src/core/bufservices/feature-flag/deleteFeatureFlag.ts (1)
138-138: LGTM!Worker pool parameter is correctly passed to the composition and deployment flow.
controlplane/src/core/bufservices/subgraph/publishFederatedSubgraph.ts (1)
577-577: LGTM!Worker pool parameter is correctly threaded through to the subgraph update flow.
controlplane/src/core/bufservices/subgraph/updateSubgraph.ts (1)
187-187: LGTM!Worker pool parameter is correctly passed to the subgraph update operation.
controlplane/src/core/bufservices/contract/createContract.ts (1)
209-209: LGTM!Worker pool parameter is correctly passed to the compose and deploy operation during contract creation.
controlplane/src/core/bufservices/federated-graph/migrateFromApollo.ts (1)
159-159: LGTM!Worker pool parameter is correctly threaded through to the Apollo migration composition flow.
controlplane/src/core/routes.ts (1)
8-8: Worker pool lifecycle management verified and correctly implemented.All concerns from the review have been confirmed:
- Initialization order: Worker pool is created at line 477 before route initialization at line 512 ✓
- Graceful shutdown: Pool is properly destroyed at line 535 within
fastify.gracefulShutdown()callback with logging ✓The worker pool is actively used throughout the codebase (util.ts, repositories, bufservices) and cleanup is handled correctly alongside other worker cleanup (bullWorkers).
controlplane/package.json (1)
93-93: tinypool@^2.0.0 is appropriately specified with no security concerns.The latest version of tinypool on npm is 2.0.0, which matches the PR specification. The version specification uses
^2.0.0, allowing for compatible updates within the 2.x range. No security vulnerabilities were found for tinypool in the GitHub vulnerability database.controlplane/src/core/bufservices/subgraph/deleteFederatedSubgraph.ts (1)
189-189: LGTM! Worker pool parameter correctly propagated.The addition of
composeWorkerPool: opts.composeWorkerPoolcorrectly threads the worker pool through to the composition and deployment flow, enabling async composition without blocking the main event loop.controlplane/src/core/bufservices/federated-graph/moveFederatedGraph.ts (1)
128-128: LGTM! Worker pool parameter correctly propagated to both move operations.The worker pool is correctly passed to both the main federated graph move (line 128) and contract graph moves (line 167), ensuring consistent async composition behavior across all affected graphs.
Also applies to: 167-167
controlplane/src/core/bufservices/feature-flag/createFeatureFlag.ts (1)
208-208: LGTM! Worker pool parameter correctly added.The worker pool parameter is correctly propagated to
composeAndDeployGraphs, enabling async composition for federated graphs affected by the new feature flag.controlplane/src/core/bufservices/feature-flag/updateFeatureFlag.ts (1)
177-177: LGTM! Worker pool parameter correctly added.The worker pool parameter is correctly propagated to
composeAndDeployGraphsfor handling composition of all affected federated graphs after feature flag update.controlplane/src/core/bufservices/federated-graph/createFederatedGraph.ts (1)
235-235: LGTM! Worker pool parameter correctly added.The worker pool parameter is correctly propagated to
composeAndDeployGraphs, enabling async composition for the newly created federated graph.controlplane/src/core/bufservices/feature-flag/enableFeatureFlag.ts (1)
122-122: LGTM! Worker pool parameter correctly added.The worker pool parameter is correctly propagated to
composeAndDeployGraphs, enabling async composition for federated graphs affected by the feature flag state change.controlplane/src/core/bufservices/subgraph/moveSubgraph.ts (1)
120-120: LGTM! Worker pool parameter correctly added.The worker pool parameter is correctly propagated to
subgraphRepo.move, enabling async composition for all federated graphs affected by the subgraph move.controlplane/src/core/bufservices/monograph/updateMonograph.ts (1)
153-153: LGTM! Worker pool parameter correctly propagated to both update operations.The worker pool is correctly passed to both the federated graph update (line 153) and subgraph update (line 177), ensuring async composition for the monograph update flow.
Also applies to: 177-177
controlplane/src/core/bufservices/graph/setGraphRouterCompatibilityVersion.ts (1)
135-146: Wiring composeWorkerPool into composition is consistent and correctPassing
opts.composeWorkerPoolthrough tocomposeAndDeployGraphskeeps this flow aligned with the rest of the composition/deploy paths and doesn’t alter existing control flow or error handling. No issues from this change.controlplane/src/core/bufservices/monograph/publishMonograph.ts (1)
138-156: Monograph update correctly threads the compose worker poolAdding
opts.composeWorkerPoolto thesubgraphRepo.updatecall matches the updated repository signature and ensures monograph recomposition also benefits from the worker pool. No behavior change beyond using the pool.controlplane/src/core/bufservices/contract/updateContract.ts (1)
126-145: Contract update is correctly wired to use the compose worker poolBoth the
fedGraphRepo.updateandfedGraphRepo.composeAndDeployGraphscalls now passcomposeWorkerPool, which is consistent with the updated repository API and the rest of the PR’s wiring. The argument order is correct and there’s no change to the surrounding error-handling logic.Also applies to: 151-174
controlplane/src/core/build-server.ts (1)
141-144: Worker pool creation and routing wiring look correctThe new
composeWorkersconfig, instantiation ofcomposeWorkerPoolviagetWorkerPool, and passing it intoroutes(...)cleanly expose the shared worker pool throughRouterOptionsto all buf services. This matches the rest of the PR’s composition wiring and keeps the pool scoped to the server lifecycle.Also applies to: 477-513
controlplane/src/workers/compose.ts (1)
39-60: Composition branching logic in the worker looks correctThe contract vs non-contract branching mirrors the existing composition flow: contracts go through
composeFederatedContractwith tag options derived fromfederatedGraph.contract, while non-contract graphs go throughcomposeFederatedGraphWithPotentialContractswithtagOptionsByContractName. The version andcompositionOptionsare threaded consistently.This looks like a faithful extraction of the previous inline logic into the worker context.
| import { Tinypool } from 'tinypool'; | ||
| import routes from './routes.js'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Harden composeWorkerPool initialization and shutdown handling
Two small robustness points around the worker-pool lifecycle:
getWorkerPool(opts.composeWorkers?.maxCount)!assumes it never returnsundefined. Ifbuild()were ever called off the main thread, this would throw at startup. Guarding the result and either failing fast with a clearer error or falling back to inline composition would be safer.- During graceful shutdown,
await composeWorkerPool.destroy()is unguarded. A try/catch with a warning log would avoid aborting the shutdown sequence if the pool destruction fails.
One possible shape (adjust as needed with corresponding type changes in RouterOptions/call sites):
- const composeWorkerPool: Tinypool = getWorkerPool(opts.composeWorkers?.maxCount)!;
+ const composeWorkerPool = getWorkerPool(opts.composeWorkers?.maxCount);
+ if (!composeWorkerPool) {
+ logger.warn('compose worker pool could not be initialized; falling back to in-process composition');
+ }
@@
- composeWorkerPool: composeWorkerPool,
+ composeWorkerPool,
@@
- fastify.gracefulShutdown(async () => {
- fastify.log.debug('Shutting down compose worker pool');
-
- await composeWorkerPool.destroy();
-
- fastify.log.debug('Compose worker pool shut down');
+ fastify.gracefulShutdown(async () => {
+ if (composeWorkerPool) {
+ fastify.log.debug('Shutting down compose worker pool');
+ try {
+ await composeWorkerPool.destroy();
+ fastify.log.debug('Compose worker pool shut down');
+ } catch (err) {
+ fastify.log.warn({ err }, 'Error while shutting down compose worker pool');
+ }
+ }You could also change import { Tinypool } from 'tinypool'; to an import type if it’s only needed for typing.
Also applies to: 57-58, 477-477, 533-538
I'll verify the review comment by examining the codebase to understand the current implementation and whether the concerns raised are valid.
---
🏁 Script executed:
```shell
#!/bin/bash
# Find and examine the build-server.ts file
find . -name "build-server.ts" -type f 2>/dev/null | head -5
Length of output: 43
---
🏁 Script executed:
```shell
#!/bin/bash
# Examine the build-server.ts file, focusing on key lines
cat ./git/controlplane/src/core/build-server.ts
Length of output: 75
---
🏁 Script executed:
```shell
#!/bin/bash
# Examine the build-server.ts file
cat controlplane/src/core/build-server.ts | head -100
Length of output: 4297
---
🏁 Script executed:
```shell
#!/bin/bash
# Get more context around lines 57-58 and the initialization
sed -n '50,150p' controlplane/src/core/build-server.ts
Length of output: 2429
---
🏁 Script executed:
```shell
#!/bin/bash
# Search for build function and worker pool initialization
sed -n '400,550p' controlplane/src/core/build-server.ts
Length of output: 4234
---
🏁 Script executed:
```shell
#!/bin/bash
# Find and examine the getWorkerPool function
cat controlplane/src/workers/compose.js
Length of output: 67
---
🏁 Script executed:
```shell
#!/bin/bash
# Search for the compose file
find controlplane/src/workers -name "compose*" -type f
Length of output: 35
---
🏁 Script executed:
```shell
#!/bin/bash
# Examine the getWorkerPool function
cat controlplane/src/workers/compose.ts
Length of output: 2135
Harden composeWorkerPool initialization and shutdown handling
The function getWorkerPool() at line 27 in controlplane/src/workers/compose.ts has the signature export function getWorkerPool(maxCount?: number): Tinypool | undefined. It returns undefined when not on the main thread (line 32).
In controlplane/src/core/build-server.ts line 477, the code unsafely assumes a non-null result:
const composeWorkerPool: Tinypool = getWorkerPool(opts.composeWorkers?.maxCount)!;
If build() were ever called off the main thread, this assertion would fail at runtime. Guard the result and either fail fast with a clearer error or fall back to inline composition.
Additionally, the graceful shutdown at lines 533-538 calls await composeWorkerPool.destroy() without a try/catch. Wrap this in error handling to prevent shutdown sequence abortion if pool destruction fails.
One possible shape (adjust type annotations in RouterOptions/call sites as needed):
- const composeWorkerPool: Tinypool = getWorkerPool(opts.composeWorkers?.maxCount)!;
+ const composeWorkerPool = getWorkerPool(opts.composeWorkers?.maxCount);
+ if (!composeWorkerPool) {
+ logger.warn('compose worker pool could not be initialized; falling back to in-process composition');
+ }
@@
- fastify.gracefulShutdown(async () => {
- fastify.log.debug('Shutting down compose worker pool');
-
- await composeWorkerPool.destroy();
-
- fastify.log.debug('Compose worker pool shut down');
+ fastify.gracefulShutdown(async () => {
+ if (composeWorkerPool) {
+ fastify.log.debug('Shutting down compose worker pool');
+ try {
+ await composeWorkerPool.destroy();
+ fastify.log.debug('Compose worker pool shut down');
+ } catch (err) {
+ fastify.log.warn({ err }, 'Error while shutting down compose worker pool');
+ }
+ }Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
controlplane/src/core/build-server.ts (1)
477-478: Guard worker-pool initialization and shutdown to avoid brittle failures
getWorkerPool()returnsTinypool | undefined, so the non-null assertion and unguarded destroy are brittle ifbuild()is ever invoked off the main thread or the pool fails to initialize/destroy (e.g., env issues, worker crash). A small robustness tweak:
- Fail fast with a clear error if
getWorkerPoolreturnsundefined, instead of relying on!.- Wrap
destroy()in a try/catch so graceful shutdown still proceeds even if pool teardown fails.For example:
- const composeWorkerPool: Tinypool = getWorkerPool(opts.composeWorkers?.maxCount)!; + const composeWorkerPool = getWorkerPool(opts.composeWorkers?.maxCount); + if (!composeWorkerPool) { + logger.error('compose worker pool could not be initialized; build() must be called on the main thread'); + throw new Error('compose worker pool initialization failed'); + } @@ - fastify.gracefulShutdown(async () => { - fastify.log.debug('Shutting down compose worker pool'); - - await composeWorkerPool.destroy(); - - fastify.log.debug('Compose worker pool shut down'); + fastify.gracefulShutdown(async () => { + fastify.log.debug('Shutting down compose worker pool'); + try { + await composeWorkerPool.destroy(); + fastify.log.debug('Compose worker pool shut down'); + } catch (err) { + fastify.log.warn({ err }, 'Error while shutting down compose worker pool'); + }This keeps the “always use worker pool” invariant, but makes failure modes clearer and shutdown more resilient.
Also applies to: 533-537
🧹 Nitpick comments (3)
controlplane/src/core/build-server.ts (1)
11-12: Worker-pool wiring and config surface look good; minor type/import nitThe addition of
composeWorkerstoBuildConfig, creation ofcomposeWorkerPool, and threading it intoroutes({ …, composeWorkerPool })is coherent and keeps the concern localized to the build server.If
Tinypoolis only used for typing here, consider switching to animport type { Tinypool } from 'tinypool';(and relying on inference forcomposeWorkerPool) to avoid a runtime import where not needed, depending on your TS/module settings. This is purely optional.Also applies to: 141-143, 477-478, 512-512
controlplane/src/core/util.ts (1)
14-14: Async worker-based composition helper looks correct; consider tightening typingThe new
getFederationResultWithPotentialContractsthat delegates tocomposeWorkerPool.runmatches the worker-pool design and keeps composition logic centralized.Two small type-level nits you might consider:
- Use Tinypool’s generic to strongly type the result instead of relying on
anyinference.- Since
runalready returns aPromise, you can return it directly without an extraawait.For example:
-export async function getFederationResultWithPotentialContracts( +export async function getFederationResultWithPotentialContracts( composeWorkerPool: Tinypool, @@ -): Promise<FederationResult | FederationResultWithContracts> { - return await composeWorkerPool.run( +): Promise<FederationResult | FederationResultWithContracts> { + return composeWorkerPool.run<FederationResult | FederationResultWithContracts>( { federatedGraph, subgraphsToCompose, tagOptionsByContractName, compositionOptions, }, { name: 'composeFederatedGraph' }, ); }This makes downstream usage safer while preserving the current behavior.
Also applies to: 535-550
controlplane/src/core/repositories/FederatedGraphRepository.ts (1)
1514-1541: Worker-based composition in composeAndDeployGraphs is sound; consider keeping DB access consistently on the transactionThe updated
composeAndDeployGraphs:
- Constructs
Composerwithtx(instead ofthis.db), which is a nice improvement for transactional consistency.- Uses the shared
composeWorkerPoolviagetFederationResultWithPotentialContractsinside the transaction to offload heavy composition work, which aligns with the PR’s goal.One small consistency tweak you might consider: when you later fetch DTOs inside the same transaction, you still call
this.byId(which usesthis.db) rather than the transaction-backed repo you already instantiated:const federatedGraphDTO = await this.byId(federatedGraph.id); … const contractDTO = await this.byId(contractId);For tighter transactional semantics and to avoid mixing
txandthis.dbin the same flow, you could instead use thefedGraphRepoconstructed withtx:- const federatedGraphDTO = await this.byId(federatedGraph.id); + const federatedGraphDTO = await fedGraphRepo.byId(federatedGraph.id); @@ - const contractDTO = await this.byId(contractId); + const contractDTO = await fedGraphRepo.byId(contractId);Functionally this should be equivalent today, but it keeps all DB access in this path on the same transaction handle and makes the intent clearer.
Also applies to: 1588-1595, 1751-1758, 1793-1801
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
controlplane/src/core/build-server.ts(5 hunks)controlplane/src/core/repositories/FederatedGraphRepository.ts(10 hunks)controlplane/src/core/util.ts(2 hunks)controlplane/src/workers/compose.ts(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- controlplane/src/workers/compose.ts
🧰 Additional context used
🧬 Code graph analysis (3)
controlplane/src/core/build-server.ts (1)
controlplane/src/workers/compose.ts (1)
getWorkerPool(28-35)
controlplane/src/core/repositories/FederatedGraphRepository.ts (2)
composition/src/federation/types.ts (2)
FederationResult(33-33)FederationResultWithContracts(55-55)controlplane/src/core/util.ts (1)
getFederationResultWithPotentialContracts(535-551)
controlplane/src/core/util.ts (3)
controlplane/src/types/index.ts (2)
FederatedGraphDTO(76-99)CompositionOptions(796-798)controlplane/src/core/repositories/FeatureFlagRepository.ts (1)
SubgraphsToCompose(42-48)composition/src/federation/types.ts (3)
ContractTagOptions(57-60)FederationResult(33-33)FederationResultWithContracts(55-55)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
- GitHub Check: build_push_image (nonroot)
- GitHub Check: build_push_image
- GitHub Check: image_scan
- GitHub Check: integration_test (./events)
- GitHub Check: build_test
- GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
- GitHub Check: image_scan (nonroot)
- GitHub Check: build_test
- GitHub Check: build_push_image
- GitHub Check: build_test
- GitHub Check: build_test
- GitHub Check: build_push_image
- GitHub Check: Analyze (javascript-typescript)
- GitHub Check: Analyze (go)
- GitHub Check: build_test
🔇 Additional comments (1)
controlplane/src/core/repositories/FederatedGraphRepository.ts (1)
32-32: Verified: composeWorkerPool threading is correctly wired throughoutThe new
composeWorkerPoolrequirement is consistently implemented across the codebase. Verification confirms:
update(),move(), andcomposeAndDeployGraphs()all correctly require and propagate thecomposeWorkerPool: Tinypoolparameter- All sampled external call sites (deleteFederatedSubgraph, createFederatedGraph, updateContract, createContract, moveSubgraph, moveFederatedGraph) properly pass
opts.composeWorkerPoolto these methods- Internal routing through transaction-scoped repository instances is correct
No issues found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
controlplane/src/workers/compose.ts(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
controlplane/src/workers/compose.ts (5)
controlplane/src/types/index.ts (2)
FederatedGraphDTO(76-99)CompositionOptions(796-798)controlplane/src/core/repositories/FeatureFlagRepository.ts (1)
SubgraphsToCompose(42-48)composition/src/federation/types.ts (1)
ContractTagOptions(57-60)controlplane/src/core/composition/composition.ts (2)
composeFederatedContract(35-47)composeFederatedGraphWithPotentialContracts(18-30)composition/src/v1/federation/utils.ts (1)
newContractTagOptionsFromArrays(242-250)
🪛 GitHub Actions: Controlplane CI
controlplane/src/workers/compose.ts
[error] 27-34: Git diff detected changes in controlplane/src/workers/compose.ts; exit code 1. The diff shows a formatting update to Tinypool config (multi-line object). Run the formatter and commit if intended.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Analyze (go)
- GitHub Check: build_test
🔇 Additional comments (1)
controlplane/src/workers/compose.ts (1)
36-57: LGTM! Composition dispatch logic is correct.The function correctly routes to contract-specific or general composition based on the presence of
federatedGraph.contract. The parameter mapping tocomposeFederatedContractandcomposeFederatedGraphWithPotentialContractsaligns with the signatures defined in the composition module, and the use ofnewContractTagOptionsFromArraysproperly constructs the requiredContractTagOptionsfrom the contract's tag arrays.
controlplane/src/workers/compose.ts
Outdated
| export function getWorkerPool(maxCount?: number): Tinypool | undefined { | ||
| if (isMainThread) { | ||
| return new Tinypool({ filename: join(process.cwd(), 'dist/workers/compose.js'), name: 'composeFederatedGraph', maxThreads: maxCount }); | ||
| } else { | ||
| return undefined; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix formatting to pass CI and reconsider hardcoded path.
Two issues identified:
-
CI Failure: The pipeline indicates a formatting issue with the Tinypool configuration. The object should be formatted across multiple lines.
-
Hardcoded Path: The path
'dist/workers/compose.js'is hardcoded. Past review comments suggest this was discussed and "fixed," but the hardcoded approach remains. While this may work in production whenprocess.cwd()points to the deployment root, it could be fragile in development, testing, or non-standard deployment scenarios where the working directory differs.
Apply this diff to fix the formatting issue:
- return new Tinypool({ filename: join(process.cwd(), 'dist/workers/compose.js'), name: 'composeFederatedGraph', maxThreads: maxCount });
+ return new Tinypool({
+ filename: join(process.cwd(), 'dist/workers/compose.js'),
+ name: 'composeFederatedGraph',
+ maxThreads: maxCount,
+ });Consider whether a more robust path resolution would be appropriate:
// Alternative: derive the path relative to this module's location
const workerPath = import.meta.url.endsWith('.ts')
? join(process.cwd(), 'dist/workers/compose.js')
: import.meta.url.replace(/\/workers\/compose\.(js|ts)$/, '/workers/compose.js');🧰 Tools
🪛 GitHub Actions: Controlplane CI
[error] 27-34: Git diff detected changes in controlplane/src/workers/compose.ts; exit code 1. The diff shows a formatting update to Tinypool config (multi-line object). Run the formatter and commit if intended.
🤖 Prompt for AI Agents
In controlplane/src/workers/compose.ts around lines 28 to 34, fix the formatting
and hardcoded path: reformat the Tinypool constructor options to be spread
across multiple lines (each option on its own line) to satisfy CI formatting
rules, and replace the hardcoded join(process.cwd(), 'dist/workers/compose.js')
with a robust workerPath computed from import.meta.url (use a conditional: if
import.meta.url endsWith '.ts' derive the dist path from process.cwd(),
otherwise derive the .js path by replacing the module URL suffix to point to
workers/compose.js), then pass workerPath to Tinypool. Ensure the function still
returns undefined off the main thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
controlplane/src/workers/compose.ts (2)
5-6: Remove unused imports.Both
isMainThreadandjoinare imported but never used in this file. Additionally,Tinypoolon line 13 is imported but not used here.Apply this diff to remove the unused imports:
-import { isMainThread } from 'node:worker_threads'; -import { join } from 'node:path'; import { ContractTagOptions, FederationResult, FederationResultWithContracts, newContractTagOptionsFromArrays, } from '@wundergraph/composition'; -import { Tinypool } from 'tinypool';
28-49: Consider adding error handling for worker robustness.The worker function has no try-catch blocks. If either
composeFederatedContractorcomposeFederatedGraphWithPotentialContractsthrows an exception, the worker thread will terminate ungracefully. While Tinypool handles worker crashes, explicit error handling would provide better observability and allow for structured error responses.Consider wrapping the logic in a try-catch:
export default function ({ federatedGraph, subgraphsToCompose, tagOptionsByContractName, compositionOptions, }: Inputs): FederationResult | FederationResultWithContracts { try { if (federatedGraph.contract) { return composeFederatedContract( subgraphsToCompose.compositionSubgraphs, newContractTagOptionsFromArrays(federatedGraph.contract.excludeTags, federatedGraph.contract.includeTags), federatedGraph.routerCompatibilityVersion, compositionOptions, ); } return composeFederatedGraphWithPotentialContracts( subgraphsToCompose.compositionSubgraphs, new Map(Object.entries(tagOptionsByContractName)), federatedGraph.routerCompatibilityVersion, compositionOptions, ); } catch (error) { // Re-throw with context or return a structured error throw new Error(`Worker composition failed: ${error instanceof Error ? error.message : String(error)}`); } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
controlplane/src/core/build-server.ts(6 hunks)controlplane/src/workers/compose.ts(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- controlplane/src/core/build-server.ts
🧰 Additional context used
🧬 Code graph analysis (1)
controlplane/src/workers/compose.ts (5)
controlplane/src/types/index.ts (2)
FederatedGraphDTO(76-99)CompositionOptions(796-798)controlplane/src/core/repositories/FeatureFlagRepository.ts (1)
SubgraphsToCompose(42-48)composition/src/federation/types.ts (3)
ContractTagOptions(57-60)FederationResult(33-33)FederationResultWithContracts(55-55)controlplane/src/core/composition/composition.ts (2)
composeFederatedContract(35-47)composeFederatedGraphWithPotentialContracts(18-30)composition/src/v1/federation/utils.ts (1)
newContractTagOptionsFromArrays(242-250)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
- GitHub Check: build_test
- GitHub Check: build_test
- GitHub Check: build_push_image
- GitHub Check: build_push_image (nonroot)
- GitHub Check: build_test
- GitHub Check: image_scan
- GitHub Check: image_scan (nonroot)
- GitHub Check: build_push_image
- GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
- GitHub Check: integration_test (./telemetry)
- GitHub Check: build_test
- GitHub Check: build_push_image
- GitHub Check: build_test
- GitHub Check: Analyze (javascript-typescript)
- GitHub Check: Analyze (go)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
controlplane/src/workers/compose.ts (1)
5-6: Remove unused imports.The imports
isMainThread,join, andTinypoolare not used anywhere in this file. Past review comments suggest the file structure has changed, leaving these imports orphaned.Apply this diff to remove the unused imports:
-import { isMainThread } from 'node:worker_threads'; -import { join } from 'node:path'; import { ContractTagOptions, FederationResult, FederationResultWithContracts, newContractTagOptionsFromArrays, } from '@wundergraph/composition'; -import { Tinypool } from 'tinypool'; import { composeFederatedContract, composeFederatedGraphWithPotentialContracts, } from '../core/composition/composition.js';Also applies to: 13-13
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
controlplane/src/core/util.ts(2 hunks)controlplane/src/workers/compose.ts(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- controlplane/src/core/util.ts
🧰 Additional context used
🧬 Code graph analysis (1)
controlplane/src/workers/compose.ts (5)
controlplane/src/types/index.ts (2)
FederatedGraphDTO(76-99)CompositionOptions(796-798)controlplane/src/core/repositories/FeatureFlagRepository.ts (1)
SubgraphsToCompose(42-48)composition/src/federation/types.ts (3)
ContractTagOptions(57-60)FederationResult(33-33)FederationResultWithContracts(55-55)controlplane/src/core/composition/composition.ts (2)
composeFederatedContract(35-47)composeFederatedGraphWithPotentialContracts(18-30)composition/src/v1/federation/utils.ts (1)
newContractTagOptionsFromArrays(242-250)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
- GitHub Check: build_push_image
- GitHub Check: build_test
- GitHub Check: build_push_image
- GitHub Check: image_scan
- GitHub Check: build_push_image (nonroot)
- GitHub Check: image_scan (nonroot)
- GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
- GitHub Check: integration_test (./telemetry)
- GitHub Check: build_test
- GitHub Check: integration_test (./events)
- GitHub Check: Analyze (go)
- GitHub Check: Analyze (javascript-typescript)
- GitHub Check: build_push_image
- GitHub Check: build_test
- GitHub Check: build_test
- GitHub Check: build_test
🔇 Additional comments (1)
controlplane/src/workers/compose.ts (1)
28-49: Code is correct; parameter usage pattern is appropriate.The worker function properly handles internal branching based on
federatedGraph.contractpresence. WhiletagOptionsByContractNameis unused when a contract exists, this is not a concern because callers unconditionally pass all parameters to the worker, leaving routing logic to the worker itself. This is a sound design pattern—callers pass complete context, and the worker determines execution flow. No changes needed.
controlplane/src/workers/compose.ts
Outdated
| interface Inputs { | ||
| federatedGraph: FederatedGraphDTO; | ||
| subgraphsToCompose: SubgraphsToCompose; | ||
| tagOptionsByContractName: Record<string, ContractTagOptions>; | ||
| compositionOptions?: CompositionOptions; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ContractTagOptions contains non-serializable Sets.
While the Map serialization issue was fixed by using Record<string, ContractTagOptions>, the ContractTagOptions type itself contains Set<string> fields (tagNamesToExclude and tagNamesToInclude), which also cannot be serialized via structured clone. This will cause a DataCloneError when Tinypool attempts to pass the data to the worker thread.
The interface should use serializable array-based tag options. Apply this diff:
interface Inputs {
federatedGraph: FederatedGraphDTO;
subgraphsToCompose: SubgraphsToCompose;
- tagOptionsByContractName: Record<string, ContractTagOptions>;
+ tagOptionsByContractName: Record<string, { tagNamesToExclude: string[]; tagNamesToInclude: string[] }>;
compositionOptions?: CompositionOptions;
}Then update the worker function to convert arrays back to Sets:
return composeFederatedGraphWithPotentialContracts(
subgraphsToCompose.compositionSubgraphs,
- new Map(Object.entries(tagOptionsByContractName)),
+ new Map(
+ Object.entries(tagOptionsByContractName).map(([name, opts]) => [
+ name,
+ {
+ tagNamesToExclude: new Set(opts.tagNamesToExclude),
+ tagNamesToInclude: new Set(opts.tagNamesToInclude),
+ },
+ ])
+ ),
federatedGraph.routerCompatibilityVersion,
compositionOptions,
);Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In controlplane/src/workers/compose.ts around lines 21 to 26, the Inputs
interface still references ContractTagOptions that uses non-serializable Sets
(tagNamesToExclude and tagNamesToInclude), causing DataCloneError when sending
to the worker; change the Inputs type so tag options are array-based (e.g.,
tagNamesToExclude?: string[] and tagNamesToInclude?: string[]) or replace
ContractTagOptions with a serializable variant, then in the worker entry
function convert those arrays back into Sets (new Set(array)) before using them
so runtime logic remains unchanged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
controlplane/test/test-util.ts (1)
405-423: Inconsistency:composeWorkerPoolmissing from return value.The AI-generated summary indicates that the pool should be exposed via
queues.composeWorkerPool, but it's not included in the returnedqueuesobject. This prevents tests from accessing the pool for cleanup or verification purposes.This is addressed by the diff in the previous comment.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
controlplane/test/test-util.ts(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
- GitHub Check: build_test
- GitHub Check: build_push_image
- GitHub Check: build_push_image (nonroot)
- GitHub Check: build_push_image
- GitHub Check: build_test
- GitHub Check: image_scan
- GitHub Check: build_test
- GitHub Check: build_push_image
- GitHub Check: build_test
- GitHub Check: image_scan (nonroot)
- GitHub Check: integration_test (./telemetry)
- GitHub Check: integration_test (./events)
- GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
- GitHub Check: build_test
- GitHub Check: Analyze (javascript-typescript)
- GitHub Check: Analyze (go)
🔇 Additional comments (1)
controlplane/test/test-util.ts (1)
15-15: LGTM!The Tinypool import is correctly added to support the worker pool creation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
controlplane/src/core/repositories/SchemaCheckRepository.ts (2)
233-236: Avoid string-built SQL; use parameters to prevent quoting bugs and injection.incomingChanges is built via string interpolation with changeType/path. Paths may contain quotes or commas, breaking the SQL; this also risks injection. Build the array parametrically with Drizzle instead.
Apply this refactor:
- const incomingChanges = `array[${changes.map((c) => `('${c.changeType}'::schema_change_type, '${c.path}'::text)`)}]`; + const incomingChanges = sql`array[${sql.join( + changes.map((c) => sql`(${c.changeType}::schema_change_type, ${c.path}::text)`), + sql`, ` + )}]`;And keep storedChanges as-is; only the user-supplied pieces (changeType/path) must be parameterized.
831-889: proposalMatchMessage can start with "undefined" due to += on an undefined variable.Initialize or aggregate safely to avoid leaking “undefined” prefixes.
Apply one of these:
- let proposalMatchMessage: string | undefined; + let proposalMatchMessage: string | undefined; + const proposalMessages: string[] = []; ... - proposalMatchMessage += `The subgraph ${subgraphName}'s schema does not match to this subgraph's schema in any approved proposal.\n`; + proposalMessages.push( + `The subgraph ${subgraphName}'s schema does not match to this subgraph's schema in any approved proposal.` + ); ... - proposalMatchMessage, + proposalMatchMessage: proposalMessages.length ? proposalMessages.join('\n') : undefined,Or initialize to empty string and trim when returning.
controlplane/src/core/composition/composer.ts (2)
686-697: Type mismatch: errors field is string[], but catch pushes Error objects.Convert errors to strings to match the updated type and avoid runtime surprises.
Apply:
- composedGraphs.push({ + composedGraphs.push({ id: graph.id, name: graph.name, namespace: graph.namespace, namespaceId: graph.namespaceId, targetID: graph.targetId, fieldConfigurations: [], - errors: [e], + errors: [e instanceof Error ? e.message : String(e)], subgraphs: [], warnings: [], });
856-866: Same issue here: convert caught error to string.Keep the type consistent everywhere.
- composedGraphs.push({ + composedGraphs.push({ id: graph.id, name: graph.name, namespace: graph.namespace, namespaceId: graph.namespaceId, targetID: graph.targetId, fieldConfigurations: [], - errors: [e], + errors: [e instanceof Error ? e.message : String(e)], subgraphs: [], warnings: [], });
🧹 Nitpick comments (2)
controlplane/src/core/repositories/FederatedGraphRepository.ts (2)
1643-1666: Prefer worker-provided routerExecutionConfig for base; skip redundant recompute.Use outputs.routerExecutionConfig if exposed, falling back only if absent.
Apply:
- const routerExecutionConfig = buildRouterExecutionConfig( - outputs.composedGraph, - outputs.federatedSchemaVersionId, - federatedGraph.routerCompatibilityVersion, - ); + const routerExecutionConfig = + (outputs as any).routerExecutionConfig ?? + buildRouterExecutionConfig( + outputs.composedGraph, + outputs.federatedSchemaVersionId, + federatedGraph.routerCompatibilityVersion, + );
1673-1693: Do the same for contracts: consume worker-provided contract router configs if available.Avoid recomputation on the request thread when worker already built them.
- for (const contractGraph of outputs.contractGraphs) { - const contractRouterExecutionConfig = buildRouterExecutionConfig( - contractGraph.composedContract, - contractGraph.contractSchemaVersionId, - federatedGraph.routerCompatibilityVersion, - ); + for (const contractGraph of outputs.contractGraphs) { + const contractRouterExecutionConfig = + (contractGraph as any).contractRouterExecutionConfig ?? + buildRouterExecutionConfig( + contractGraph.composedContract, + contractGraph.contractSchemaVersionId, + federatedGraph.routerCompatibilityVersion, + );
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
controlplane/src/core/bufservices/subgraph/fixSubgraphSchema.ts(1 hunks)controlplane/src/core/composition/composer.ts(2 hunks)controlplane/src/core/repositories/FederatedGraphRepository.ts(18 hunks)controlplane/src/core/repositories/SchemaCheckRepository.ts(2 hunks)controlplane/src/core/repositories/SubgraphRepository.ts(7 hunks)controlplane/src/core/util.ts(3 hunks)controlplane/src/workers/compose.ts(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- controlplane/src/core/util.ts
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-09-08T20:57:07.946Z
Learnt from: JivusAyrus
Repo: wundergraph/cosmo PR: 2156
File: controlplane/src/core/repositories/SubgraphRepository.ts:1746-1763
Timestamp: 2025-09-08T20:57:07.946Z
Learning: The checkSubgraphSchema.ts file already correctly implements linked subgraph functionality, using byName(linkedSubgraph.name, linkedSubgraph.namespace) to fetch target subgraphs and properly handles parse(newSchemaSDL) for schema building. The implementation doesn't need fixes for byId usage or schema parsing as it's already correct.
Applied to files:
controlplane/src/core/bufservices/subgraph/fixSubgraphSchema.ts
📚 Learning: 2025-08-29T10:28:04.846Z
Learnt from: JivusAyrus
Repo: wundergraph/cosmo PR: 2156
File: controlplane/src/core/repositories/SubgraphRepository.ts:1749-1751
Timestamp: 2025-08-29T10:28:04.846Z
Learning: In the controlplane codebase, authentication and authorization checks (including organization scoping) are handled at the service layer in files like unlinkSubgraph.ts before calling repository methods. Repository methods like unlinkSubgraph() in SubgraphRepository.ts can focus purely on data operations without redundant security checks.
Applied to files:
controlplane/src/core/repositories/SubgraphRepository.ts
📚 Learning: 2025-09-10T11:15:52.157Z
Learnt from: JivusAyrus
Repo: wundergraph/cosmo PR: 2156
File: controlplane/src/core/repositories/ProposalRepository.ts:562-572
Timestamp: 2025-09-10T11:15:52.157Z
Learning: The getLatestCheckForProposal function in controlplane/src/core/repositories/ProposalRepository.ts is only called during proposal creation or updates, so proposal match error checking (hasProposalMatchError) is not needed since the proposal is being modified itself rather than being matched against.
Applied to files:
controlplane/src/core/repositories/SchemaCheckRepository.ts
🧬 Code graph analysis (2)
controlplane/src/workers/compose.ts (6)
controlplane/src/types/index.ts (2)
FederatedGraphDTO(76-99)CompositionOptions(796-798)controlplane/src/core/repositories/FeatureFlagRepository.ts (1)
SubgraphsToCompose(42-48)composition/src/federation/types.ts (3)
ContractTagOptions(57-60)FederationResult(33-33)FederationResultWithContracts(55-55)controlplane/src/core/composition/composer.ts (4)
ComposedFederatedGraph(232-245)CompositionDeployResult(247-249)mapResultToComposedGraph(211-230)buildRouterExecutionConfig(94-111)controlplane/src/core/util.ts (1)
getFederationResultWithPotentialContracts(537-559)controlplane/src/core/errors/errors.ts (1)
unsuccessfulBaseCompositionError(41-52)
controlplane/src/core/repositories/FederatedGraphRepository.ts (2)
controlplane/src/core/composition/composer.ts (1)
buildRouterExecutionConfig(94-111)controlplane/src/types/index.ts (1)
FederatedGraphDTO(76-99)
🔇 Additional comments (8)
controlplane/src/core/repositories/SchemaCheckRepository.ts (2)
1018-1024: LGTM: storing stringified composition errors.Using the string messages for CompositionError aligns with the new ComposedFederatedGraph.errors: string[]. Looks good.
1026-1033: LGTM: storing stringified composition warnings.Same as above for warnings; consistent with the updated types.
controlplane/src/core/composition/composer.ts (1)
225-229: LGTM: mapping errors/warnings to strings.mapResultToComposedGraph now returns string[] for errors/warnings; consistent with downstream storage.
controlplane/src/core/bufservices/subgraph/fixSubgraphSchema.ts (1)
180-189: LGTM: store string messages for composition errors.The change to message: error fits the upstream types.
controlplane/src/core/repositories/SubgraphRepository.ts (3)
469-477: LGTM: composeWorkerPool propagated and used in composeAndDeployGraphs.Threading the pool through update() is correct and aligns with the new composition path.
2020-2027: LGTM: string messages for composition errors.Matches the updated types.
2030-2036: LGTM: string messages for composition warnings.Matches the updated types.
controlplane/src/workers/compose.ts (1)
29-32: Remove this review comment — the serialization concern does not apply to this project.Node.js worker_threads uses the HTML structured clone algorithm for workerData and postMessage, which supports Map and Set (including nested Maps/Sets inside plain objects), and Tinypool uses node:worker_threads and inherits the same cloning behavior. The project's Node.js requirement (≥22.11.0) far exceeds any version where this support might be missing.
ContractTagOptionscontains onlySet<string>fields, which are fully supported by structured cloning.The current code is correct. The proposed refactoring to
Record<string, { tagNamesToExclude: string[]; tagNamesToInclude: string[] }>would reduce type safety and clarity without addressing any real problem.Likely an incorrect or invalid review comment.
| portMain.on('message', async (resultJSON: string) => { | ||
| const result: ComposeBaseMessage = JSON.parse(resultJSON); | ||
|
|
||
| const routerExecutionConfig = buildRouterExecutionConfig( | ||
| result.composedGraph, | ||
| result.federatedSchemaVersionId, | ||
| federatedGraph.routerCompatibilityVersion, | ||
| ); | ||
| } | ||
|
|
||
| // Collect all composition warnings | ||
| allCompositionWarnings.push( | ||
| ...result.warnings.map((w) => ({ | ||
| federatedGraphName: federatedGraph.name, | ||
| namespace: federatedGraph.namespace, | ||
| message: w.message, | ||
| featureFlag: subgraphsToCompose.featureFlagName || '', | ||
| })), | ||
| ); | ||
| const baseComposition = await composer.saveComposition({ | ||
| composedGraph: result.composedGraph, | ||
| composedById: actorId, | ||
| isFeatureFlagComposition: subgraphsToCompose.isFeatureFlagComposition, | ||
| federatedSchemaVersionId: result.federatedSchemaVersionId, | ||
| routerExecutionConfig, | ||
| featureFlagId: subgraphsToCompose.featureFlagId, | ||
| }); | ||
|
|
||
| if (!subgraphsToCompose.isFeatureFlagComposition && !result.success && !federatedGraph.contract) { | ||
| allCompositionErrors.push(unsuccessfulBaseCompositionError(federatedGraph.name, federatedGraph.namespace)); | ||
| } | ||
| const contractGraphs: Map<string, FederatedGraphDTO> = new Map(); | ||
| for (const contractName of result.contractNames) { | ||
| const contractGraph = await fedGraphRepo.byName(contractName, federatedGraph.namespace); | ||
| if (!contractGraph) { | ||
| throw new Error(`The contract graph "${contractName}" was not found.`); | ||
| } | ||
| contractGraphs.set(contractName, contractGraph); | ||
| } | ||
|
|
||
| const baseResult: ComposeBaseResult = { | ||
| compositionDeployResult: baseComposition, | ||
| contractGraphs, | ||
| }; | ||
|
|
||
| const composedGraph = mapResultToComposedGraph(federatedGraph, subgraphsToCompose.subgraphs, result); | ||
| portMain.postMessage(baseResult); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stop parsing JSON and use RouterConfig from worker; avoid recomputing.
After switching the worker to post a structured BaseMessage with routerExecutionConfig, consume it directly and skip rebuild. Also protect the handler with try/catch to ensure a reply.
Apply:
- portMain.on('message', async (resultJSON: string) => {
- const result: ComposeBaseMessage = JSON.parse(resultJSON);
-
- const routerExecutionConfig = buildRouterExecutionConfig(
- result.composedGraph,
- result.federatedSchemaVersionId,
- federatedGraph.routerCompatibilityVersion,
- );
+ portMain.on('message', async (result: ComposeBaseMessage) => {
+ const routerExecutionConfig =
+ result.routerExecutionConfig ??
+ buildRouterExecutionConfig(
+ result.composedGraph,
+ result.federatedSchemaVersionId,
+ federatedGraph.routerCompatibilityVersion,
+ );
const baseComposition = await composer.saveComposition({
composedGraph: result.composedGraph,
composedById: actorId,
isFeatureFlagComposition: subgraphsToCompose.isFeatureFlagComposition,
federatedSchemaVersionId: result.federatedSchemaVersionId,
routerExecutionConfig,
featureFlagId: subgraphsToCompose.featureFlagId,
});This aligns main/worker and removes the JSON stringify dependency.
🤖 Prompt for AI Agents
In controlplane/src/core/repositories/FederatedGraphRepository.ts around lines
1604 to 1637, the handler currently parses a JSON string and rebuilds
routerExecutionConfig; change it to accept the structured ComposeBaseMessage
sent by the worker (don’t JSON.parse), use result.routerExecutionConfig
(RouterConfig) directly instead of recomputing via buildRouterExecutionConfig,
and wrap the entire handler body in try/catch so any error is caught and an
error response is posted back through portMain.postMessage (ensuring the worker
always receives a reply).
controlplane/src/workers/compose.ts
Outdated
| const message: BaseMessage = { | ||
| composedGraph, | ||
| federatedSchemaVersionId, | ||
| // routerExecutionConfig, | ||
| contractNames, | ||
| }; | ||
| const messageJSON = JSON.stringify(message); | ||
|
|
||
| port.postMessage(messageJSON); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t JSON.stringify composedGraph; it likely contains non-serializable/cyclic GraphQLSchema. Send RouterConfig/object instead.
mapResultToComposedGraph subgraphs include GraphQLSchema; JSON.stringify will throw. Use structured messaging with a lean payload and include routerExecutionConfig computed in the worker; the main thread can avoid recomputing it.
Minimal change:
export interface BaseMessage {
- composedGraph: ComposedFederatedGraph;
+ composedGraph: ComposedFederatedGraph; // stripped of non-serializable fields
contractNames: string[];
federatedSchemaVersionId: `${string}-${string}-${string}-${string}-${string}`;
- // routerExecutionConfig?: RouterConfig;
+ routerExecutionConfig?: RouterConfig;
}
@@
- const message: BaseMessage = {
- composedGraph,
- federatedSchemaVersionId,
- // routerExecutionConfig,
- contractNames,
- };
- const messageJSON = JSON.stringify(message);
- port.postMessage(messageJSON);
+ // Remove non-serializable fields before posting
+ const leanComposedGraph: ComposedFederatedGraph = {
+ ...composedGraph,
+ subgraphs: composedGraph.subgraphs.map((s: any) => {
+ const { schema, configurationDataByTypeName, protoSchema, mapping, imageReference, ...rest } = s;
+ return rest;
+ }),
+ };
+ const message: BaseMessage = {
+ composedGraph: leanComposedGraph,
+ federatedSchemaVersionId,
+ routerExecutionConfig,
+ contractNames,
+ };
+ port.postMessage(message);Then in the main thread, use routerExecutionConfig from the message (see linked change below).
🤖 Prompt for AI Agents
In controlplane/src/workers/compose.ts around lines 125-133, the code
JSON.stringifys composedGraph which contains a non-serializable/cyclic
GraphQLSchema and will throw; instead remove composedGraph from the message,
compute (or include) a serializable routerExecutionConfig/routerConfig in the
worker, and send a lean, structured payload (e.g., { routerExecutionConfig,
federatedSchemaVersionId, contractNames }) via port.postMessage without
JSON.stringify so the main thread can use the provided routerExecutionConfig and
avoid recomputation; ensure all fields in the message are plain serializable
objects.
| port.on('message', (baseResult: BaseResult) => { | ||
| if (!result.success || !baseResult.compositionDeployResult.schemaVersionId || !routerExecutionConfig) { | ||
| const outputs: Outputs = { | ||
| resultSuccess: result.success, | ||
| composedGraph, | ||
| federatedSchemaVersionId, | ||
| schemaVersionId: baseResult.compositionDeployResult.schemaVersionId, | ||
| // routerExecutionConfig, | ||
| contractGraphs: [], | ||
| allCompositionErrors, | ||
| allCompositionWarnings, | ||
| }; | ||
| resolve(outputs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard message handler with try/catch to avoid deadlock if saveComposition fails.
If composer.saveComposition throws, the worker will await forever. Always reply to the worker.
Apply:
- port.on('message', (baseResult: BaseResult) => {
+ port.on('message', (baseResult: BaseResult) => {
if (!result.success || !baseResult.compositionDeployResult.schemaVersionId || !routerExecutionConfig) {
const outputs: Outputs = {
resultSuccess: result.success,
composedGraph,
federatedSchemaVersionId,
schemaVersionId: baseResult.compositionDeployResult.schemaVersionId,
// routerExecutionConfig,
contractGraphs: [],
allCompositionErrors,
allCompositionWarnings,
};
resolve(outputs);
}
+ // else branch continues below
});Wrap the inner body in try/catch and on error resolve with resultSuccess=false and existing aggregates.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In controlplane/src/workers/compose.ts around lines 135 to 147, the
port.on('message') handler body must be wrapped in a try/catch so a thrown error
(e.g., from composer.saveComposition) does not leave the worker waiting; enclose
the current inner logic in try { ... } catch (err) { resolve({ resultSuccess:
false, composedGraph, federatedSchemaVersionId, schemaVersionId: undefined,
contractGraphs: [], allCompositionErrors, allCompositionWarnings }); } (use the
existing aggregate variables) to ensure the worker is always resolved on error.
Summary by CodeRabbit
New Features
Chores
Tests
Checklist