From 78999ac16d93f20b16d60cc989fd3c48ecd07bfb Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 24 Jul 2025 15:08:53 -0700 Subject: [PATCH] grpc-js: Implement ORCA server-side OOB metrics --- packages/grpc-js/src/duration.ts | 12 ++ packages/grpc-js/src/orca.ts | 77 ++++++++- packages/grpc-js/src/server-call.ts | 12 +- packages/grpc-js/src/server-interceptors.ts | 10 +- packages/grpc-js/test/test-orca.ts | 166 +++++++++++++++++++- 5 files changed, 263 insertions(+), 14 deletions(-) diff --git a/packages/grpc-js/src/duration.ts b/packages/grpc-js/src/duration.ts index b0aefde8e..390f29f55 100644 --- a/packages/grpc-js/src/duration.ts +++ b/packages/grpc-js/src/duration.ts @@ -20,6 +20,18 @@ export interface Duration { nanos: number; } +export interface DurationMessage { + seconds: string; + nanos: number; +} + +export function durationMessageToDuration(message: DurationMessage): Duration { + return { + seconds: Number.parseInt(message.seconds), + nanos: message.nanos + }; +} + export function msToDuration(millis: number): Duration { return { seconds: (millis / 1000) | 0, diff --git a/packages/grpc-js/src/orca.ts b/packages/grpc-js/src/orca.ts index 0478546e4..1ffbcd3c8 100644 --- a/packages/grpc-js/src/orca.ts +++ b/packages/grpc-js/src/orca.ts @@ -20,6 +20,9 @@ import { OrcaLoadReport } from "./generated/xds/data/orca/v3/OrcaLoadReport"; import type { loadSync } from '@grpc/proto-loader'; import { ProtoGrpcType as OrcaProtoGrpcType } from "./generated/orca"; import { loadPackageDefinition } from "./make-client"; +import { OpenRcaServiceHandlers } from "./generated/xds/service/orca/v3/OpenRcaService"; +import { durationMessageToDuration, durationToMs } from "./duration"; +import { Server } from "./server"; const loadedOrcaProto: OrcaProtoGrpcType | null = null; function loadOrcaProto(): OrcaProtoGrpcType { @@ -47,7 +50,7 @@ function loadOrcaProto(): OrcaProtoGrpcType { /** * ORCA metrics recorder for a single request */ -export class PerRequestMetricsRecorder { +export class PerRequestMetricRecorder { private message: OrcaLoadReport = {}; /** @@ -131,3 +134,75 @@ export class PerRequestMetricsRecorder { return orcaProto.xds.data.orca.v3.OrcaLoadReport.serialize(this.message); } } + +const DEFAULT_REPORT_INTERVAL_MS = 30_000; + +export class ServerMetricRecorder { + private message: OrcaLoadReport = {}; + + private serviceImplementation: OpenRcaServiceHandlers = { + StreamCoreMetrics: call => { + const reportInterval = call.request.report_interval ? + durationToMs(durationMessageToDuration(call.request.report_interval)) : + DEFAULT_REPORT_INTERVAL_MS; + const reportTimer = setInterval(() => { + call.write(this.message); + }, reportInterval); + call.on('cancelled', () => { + clearInterval(reportTimer); + }) + } + } + + putUtilizationMetric(name: string, value: number) { + if (!this.message.utilization) { + this.message.utilization = {}; + } + this.message.utilization[name] = value; + } + + setAllUtilizationMetrics(metrics: {[name: string]: number}) { + this.message.utilization = {...metrics}; + } + + deleteUtilizationMetric(name: string) { + delete this.message.utilization?.[name]; + } + + setCpuUtilizationMetric(value: number) { + this.message.cpu_utilization = value; + } + + deleteCpuUtilizationMetric() { + delete this.message.cpu_utilization; + } + + setApplicationUtilizationMetric(value: number) { + this.message.application_utilization = value; + } + + deleteApplicationUtilizationMetric() { + delete this.message.application_utilization; + } + + setQpsMetric(value: number) { + this.message.rps_fractional = value; + } + + deleteQpsMetric() { + delete this.message.rps_fractional; + } + + setEpsMetric(value: number) { + this.message.eps = value; + } + + deleteEpsMetric() { + delete this.message.eps; + } + + addToServer(server: Server) { + const serviceDefinition = loadOrcaProto().xds.service.orca.v3.OpenRcaService.service; + server.addService(serviceDefinition, this.serviceImplementation); + } +} diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 2120fed39..670cf6294 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -26,7 +26,7 @@ import type { StatusObject, PartialStatusObject } from './call-interface'; import type { Deadline } from './deadline'; import type { ServerInterceptingCallInterface } from './server-interceptors'; import { AuthContext } from './auth-context'; -import { PerRequestMetricsRecorder } from './orca'; +import { PerRequestMetricRecorder } from './orca'; export type ServerStatusResponse = Partial; @@ -41,7 +41,7 @@ export type ServerSurfaceCall = { getPath(): string; getHost(): string; getAuthContext(): AuthContext; - getMetricsRecorder(): PerRequestMetricsRecorder; + getMetricsRecorder(): PerRequestMetricRecorder; } & EventEmitter; export type ServerUnaryCall = ServerSurfaceCall & { @@ -123,7 +123,7 @@ export class ServerUnaryCallImpl return this.call.getAuthContext(); } - getMetricsRecorder(): PerRequestMetricsRecorder { + getMetricsRecorder(): PerRequestMetricRecorder { return this.call.getMetricsRecorder(); } } @@ -171,7 +171,7 @@ export class ServerReadableStreamImpl return this.call.getAuthContext(); } - getMetricsRecorder(): PerRequestMetricsRecorder { + getMetricsRecorder(): PerRequestMetricRecorder { return this.call.getMetricsRecorder(); } } @@ -227,7 +227,7 @@ export class ServerWritableStreamImpl return this.call.getAuthContext(); } - getMetricsRecorder(): PerRequestMetricsRecorder { + getMetricsRecorder(): PerRequestMetricRecorder { return this.call.getMetricsRecorder(); } @@ -308,7 +308,7 @@ export class ServerDuplexStreamImpl return this.call.getAuthContext(); } - getMetricsRecorder(): PerRequestMetricsRecorder { + getMetricsRecorder(): PerRequestMetricRecorder { return this.call.getMetricsRecorder(); } diff --git a/packages/grpc-js/src/server-interceptors.ts b/packages/grpc-js/src/server-interceptors.ts index 498342c7b..a38957f14 100644 --- a/packages/grpc-js/src/server-interceptors.ts +++ b/packages/grpc-js/src/server-interceptors.ts @@ -35,7 +35,7 @@ import { CallEventTracker } from './transport'; import * as logging from './logging'; import { AuthContext } from './auth-context'; import { TLSSocket } from 'tls'; -import { PerRequestMetricsRecorder } from './orca'; +import { PerRequestMetricRecorder } from './orca'; const TRACER_NAME = 'server_call'; @@ -355,7 +355,7 @@ export interface ServerInterceptingCallInterface { * the server was constructed with the `grpc.server_call_metric_recording` * option. */ - getMetricsRecorder(): PerRequestMetricsRecorder; + getMetricsRecorder(): PerRequestMetricRecorder; } export class ServerInterceptingCall implements ServerInterceptingCallInterface { @@ -470,7 +470,7 @@ export class ServerInterceptingCall implements ServerInterceptingCallInterface { getConnectionInfo(): ConnectionInfo { return this.nextCall.getConnectionInfo(); } - getMetricsRecorder(): PerRequestMetricsRecorder { + getMetricsRecorder(): PerRequestMetricRecorder { return this.nextCall.getMetricsRecorder(); } } @@ -545,7 +545,7 @@ export class BaseServerInterceptingCall private streamEnded = false; private host: string; private connectionInfo: ConnectionInfo; - private metricsRecorder = new PerRequestMetricsRecorder(); + private metricsRecorder = new PerRequestMetricRecorder(); private shouldSendMetrics: boolean; constructor( @@ -1036,7 +1036,7 @@ export class BaseServerInterceptingCall getConnectionInfo(): ConnectionInfo { return this.connectionInfo; } - getMetricsRecorder(): PerRequestMetricsRecorder { + getMetricsRecorder(): PerRequestMetricRecorder { return this.metricsRecorder; } } diff --git a/packages/grpc-js/test/test-orca.ts b/packages/grpc-js/test/test-orca.ts index ce996af24..f06c2cd56 100644 --- a/packages/grpc-js/test/test-orca.ts +++ b/packages/grpc-js/test/test-orca.ts @@ -22,8 +22,11 @@ import * as grpc from '../src'; import { ServiceClient } from '../src/make-client'; import { assert2, loadProtoFile } from './common'; import { ProtoGrpcType as OrcaProtoGrpcType } from "../src/generated/orca"; -import { PerRequestMetricsRecorder } from '../src/orca'; +import { PerRequestMetricRecorder, ServerMetricRecorder } from '../src/orca'; import { loadSync } from '@grpc/proto-loader'; +import { OpenRcaServiceClient } from '../src/generated/xds/service/orca/v3/OpenRcaService'; +import { OrcaLoadReport__Output } from '../src/generated/xds/data/orca/v3/OrcaLoadReport'; +import { msToDuration } from '../src/duration'; const GRPC_METRICS_HEADER = 'endpoint-load-metrics-bin'; const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); @@ -44,7 +47,7 @@ const loadedProto = loadSync('xds/service/orca/v3/orca.proto', { }); const orcaProto = grpc.loadPackageDefinition(loadedProto) as unknown as OrcaProtoGrpcType; -let setMetrics: (metricsRecorder: PerRequestMetricsRecorder) => void = () => {}; +let setMetrics: (metricsRecorder: PerRequestMetricRecorder) => void = () => {}; const serviceImpl = { echo: ( @@ -298,4 +301,163 @@ describe('ORCA', () => { }); }); }); + describe('server-side out of band metrics', () => { + let metricRecorder: ServerMetricRecorder; + let server: grpc.Server; + let client: OpenRcaServiceClient; + let call: grpc.ClientReadableStream | null = null; + beforeEach(done => { + metricRecorder = new ServerMetricRecorder(); + server = new grpc.Server(); + metricRecorder.addToServer(server); + server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => { + if (error) { + done(error); + return; + } + client = new orcaProto.xds.service.orca.v3.OpenRcaService(`localhost:${port}`, grpc.credentials.createInsecure()); + done(); + }); + }); + afterEach(done => { + call?.cancel(); + call = null; + client.close(); + server.tryShutdown(done); + }); + it('Should send utilization metrics', done => { + metricRecorder.putUtilizationMetric('test', 123); + call = client.streamCoreMetrics({report_interval: msToDuration(10)}); + call.on('error', () => {}); + call.on('data', (report: OrcaLoadReport__Output) => { + assert.strictEqual(report.utilization.test, 123); + done(); + }); + }); + it('Should set all utilization metrics', done => { + metricRecorder.setAllUtilizationMetrics({test1: 123, test2: 456}); + call = client.streamCoreMetrics({report_interval: msToDuration(10)}); + call.on('error', () => {}); + call.on('data', (report: OrcaLoadReport__Output) => { + assert.deepStrictEqual(report.utilization, {test1: 123, test2: 456}); + done(); + }); + }); + it('Should delete utilization metrics', done => { + metricRecorder.putUtilizationMetric('test', 123); + call = client.streamCoreMetrics({report_interval: msToDuration(10)}); + call.on('error', () => {}); + let seenMetric = false; + call.on('data', (report: OrcaLoadReport__Output) => { + if (!seenMetric) { + assert.strictEqual(report.utilization.test, 123); + metricRecorder.deleteUtilizationMetric('test'); + seenMetric = true; + } else { + assert.deepStrictEqual(report.utilization, {}); + done(); + } + }); + }); + it('Should set CPU utilization', done => { + metricRecorder.setCpuUtilizationMetric(123); + call = client.streamCoreMetrics({report_interval: msToDuration(10)}); + call.on('error', () => {}); + call.on('data', (report: OrcaLoadReport__Output) => { + assert.strictEqual(report.cpu_utilization, 123); + done(); + }); + }); + it('Should delete CPU utilization', done => { + metricRecorder.setCpuUtilizationMetric(123); + call = client.streamCoreMetrics({report_interval: msToDuration(10)}); + call.on('error', () => {}); + let seenMetric = false; + call.on('data', (report: OrcaLoadReport__Output) => { + if (!seenMetric) { + assert.strictEqual(report.cpu_utilization, 123); + metricRecorder.deleteCpuUtilizationMetric(); + seenMetric = true; + } else { + assert.strictEqual(report.cpu_utilization, 0); + done(); + } + }); + }); + it('Should set application utilization', done => { + metricRecorder.setApplicationUtilizationMetric(123); + call = client.streamCoreMetrics({report_interval: msToDuration(10)}); + call.on('error', () => {}); + call.on('data', (report: OrcaLoadReport__Output) => { + assert.strictEqual(report.application_utilization, 123); + done(); + }); + }); + it('Should delete application utilization', done => { + metricRecorder.setApplicationUtilizationMetric(123); + call = client.streamCoreMetrics({report_interval: msToDuration(10)}); + call.on('error', () => {}); + let seenMetric = false; + call.on('data', (report: OrcaLoadReport__Output) => { + if (!seenMetric) { + assert.strictEqual(report.application_utilization, 123); + metricRecorder.deleteApplicationUtilizationMetric(); + seenMetric = true; + } else { + assert.strictEqual(report.application_utilization, 0); + done(); + } + }); + }); + it('Should set QPS metric', done => { + metricRecorder.setQpsMetric(123); + call = client.streamCoreMetrics({report_interval: msToDuration(10)}); + call.on('error', () => {}); + call.on('data', (report: OrcaLoadReport__Output) => { + assert.strictEqual(report.rps_fractional, 123); + done(); + }); + }); + it('Should delete QPS metric', done => { + metricRecorder.setQpsMetric(123); + call = client.streamCoreMetrics({report_interval: msToDuration(10)}); + call.on('error', () => {}); + let seenMetric = false; + call.on('data', (report: OrcaLoadReport__Output) => { + if (!seenMetric) { + assert.strictEqual(report.rps_fractional, 123); + metricRecorder.deleteQpsMetric(); + seenMetric = true; + } else { + assert.strictEqual(report.rps_fractional, 0); + done(); + } + }); + }); + it('Should set EPS metric', done => { + metricRecorder.setEpsMetric(123); + call = client.streamCoreMetrics({report_interval: msToDuration(10)}); + call.on('error', () => {}); + call.on('data', (report: OrcaLoadReport__Output) => { + assert.strictEqual(report.eps, 123); + done(); + }); + }); + it('Should delete QPS metric', done => { + metricRecorder.setEpsMetric(123); + call = client.streamCoreMetrics({report_interval: msToDuration(10)}); + call.on('error', () => {}); + let seenMetric = false; + call.on('data', (report: OrcaLoadReport__Output) => { + if (!seenMetric) { + assert.strictEqual(report.eps, 123); + metricRecorder.deleteEpsMetric(); + seenMetric = true; + } else { + assert.strictEqual(report.eps, 0); + done(); + } + }); + }); + }); });