Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/grpc-js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ export {
ServerInterceptor,
} from './server-interceptors';

export { ServerMetricRecorder } from './orca';

import * as experimental from './experimental';
export { experimental };

Expand Down
93 changes: 92 additions & 1 deletion packages/grpc-js/src/load-balancer-pick-first.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import {
} from './picker';
import { Endpoint, SubchannelAddress, subchannelAddressToString } from './subchannel-address';
import * as logging from './logging';
import { LogVerbosity } from './constants';
import { LogVerbosity, Status } from './constants';
import {
SubchannelInterface,
ConnectivityStateListener,
Expand All @@ -44,6 +44,12 @@ import { isTcpSubchannelAddress } from './subchannel-address';
import { isIPv6 } from 'net';
import { ChannelOptions } from './channel-options';
import { StatusOr, statusOrFromValue } from './call-interface';
import { OrcaLoadReport__Output } from './generated/xds/data/orca/v3/OrcaLoadReport';
import { OpenRcaServiceClient } from './generated/xds/service/orca/v3/OpenRcaService';
import { ClientReadableStream, ServiceError } from './call';
import { createOrcaClient } from './orca';
import { msToDuration } from './duration';
import { BackoffTimeout } from './backoff-timeout';

const TRACER_NAME = 'pick_first';

Expand All @@ -59,6 +65,8 @@ const TYPE_NAME = 'pick_first';
*/
const CONNECTION_DELAY_INTERVAL_MS = 250;

export type MetricsListener = (loadReport: OrcaLoadReport__Output) => void;

export class PickFirstLoadBalancingConfig implements TypedLoadBalancingConfig {
constructor(private readonly shuffleAddressList: boolean) {}

Expand Down Expand Up @@ -239,6 +247,13 @@ export class PickFirstLoadBalancer implements LoadBalancer {

private latestResolutionNote: string = '';

private metricsListeners: Map<MetricsListener, number> = new Map();
private orcaClient: OpenRcaServiceClient | null = null;
private metricsCall: ClientReadableStream<OrcaLoadReport__Output> | null = null;
private currentMetricsIntervalMs: number = Infinity;
private orcaUnsupported = false;
private metricsBackoffTimer = new BackoffTimeout(() => this.updateMetricsSubscription());

/**
* Load balancer that attempts to connect to each backend in the address list
* in order, and picks the first one that connects, using it for every
Expand Down Expand Up @@ -336,6 +351,12 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.currentPick.removeHealthStateWatcher(
this.pickedSubchannelHealthListener
);
this.orcaClient?.close();
this.orcaClient = null;
this.metricsCall?.cancel();
this.metricsCall = null;
this.metricsBackoffTimer.stop();
this.metricsBackoffTimer.reset();
// Unref last, to avoid triggering listeners
this.currentPick.unref();
this.currentPick = null;
Expand Down Expand Up @@ -439,6 +460,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.currentPick = subchannel;
clearTimeout(this.connectionDelayTimeout);
this.calculateAndReportNewState();
this.updateMetricsSubscription();
}

private updateState(newState: ConnectivityState, picker: Picker, errorMessage: string | null) {
Expand Down Expand Up @@ -573,6 +595,67 @@ export class PickFirstLoadBalancer implements LoadBalancer {
getTypeName(): string {
return TYPE_NAME;
}

private getOrCreateOrcaClient(): OpenRcaServiceClient | null {
if (this.orcaClient) {
return this.orcaClient;
}
if (this.currentPick) {
const channel = this.currentPick.getChannel();
this.orcaClient = createOrcaClient(channel);
return this.orcaClient;
}
return null;
}

private updateMetricsSubscription() {
if (this.orcaUnsupported) {
return;
}
if (this.metricsListeners.size > 0) {
const newInterval = Math.min(...Array.from(this.metricsListeners.values()));
if (!this.metricsCall || newInterval !== this.currentMetricsIntervalMs) {
const orcaClient = this.getOrCreateOrcaClient();
if (!orcaClient) {
return;
}
this.metricsCall?.cancel();
this.currentMetricsIntervalMs = newInterval;
const metricsCall = orcaClient.streamCoreMetrics({report_interval: msToDuration(newInterval)});
this.metricsCall = metricsCall;
metricsCall.on('data', (report: OrcaLoadReport__Output) => {
this.metricsListeners.forEach((interval, listener) => {
listener(report);
});
});
metricsCall.on('error', (error: ServiceError) => {
this.metricsCall = null;
if (error.code === Status.UNIMPLEMENTED) {
this.orcaUnsupported = true;
return;
}
if (error.code === Status.CANCELLED) {
return;
}
this.metricsBackoffTimer.runOnce();
});
}
} else {
this.metricsCall?.cancel();
this.metricsCall = null;
this.currentMetricsIntervalMs = Infinity;
}
}

addMetricsSubscription(listener: MetricsListener, intervalMs: number): void {
this.metricsListeners.set(listener, intervalMs);
this.updateMetricsSubscription();
}

removeMetricsSubscription(listener: MetricsListener): void {
this.metricsListeners.delete(listener);
this.updateMetricsSubscription();
}
}

const LEAF_CONFIG = new PickFirstLoadBalancingConfig(false);
Expand Down Expand Up @@ -650,6 +733,14 @@ export class LeafLoadBalancer {
destroy() {
this.pickFirstBalancer.destroy();
}

addMetricsSubscription(listener: MetricsListener, intervalMs: number): void {
this.pickFirstBalancer.addMetricsSubscription(listener, intervalMs);
}

removeMetricsSubscription(listener: MetricsListener): void {
this.pickFirstBalancer.removeMetricsSubscription(listener);
}
}

export function setup(): void {
Expand Down
9 changes: 8 additions & 1 deletion packages/grpc-js/src/orca.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import { OrcaLoadReport } from "./generated/xds/data/orca/v3/OrcaLoadReport";
import type { loadSync } from '@grpc/proto-loader';
import { ProtoGrpcType as OrcaProtoGrpcType } from "./generated/orca";
import { loadPackageDefinition } from "./make-client";
import { OpenRcaServiceHandlers } from "./generated/xds/service/orca/v3/OpenRcaService";
import { OpenRcaServiceClient, OpenRcaServiceHandlers } from "./generated/xds/service/orca/v3/OpenRcaService";
import { durationMessageToDuration, durationToMs } from "./duration";
import { Server } from "./server";
import { ChannelCredentials } from "./channel-credentials";
import { Channel } from "./channel";

const loadedOrcaProto: OrcaProtoGrpcType | null = null;
function loadOrcaProto(): OrcaProtoGrpcType {
Expand Down Expand Up @@ -206,3 +208,8 @@ export class ServerMetricRecorder {
server.addService(serviceDefinition, this.serviceImplementation);
}
}

export function createOrcaClient(channel: Channel): OpenRcaServiceClient {
const ClientClass = loadOrcaProto().xds.service.orca.v3.OpenRcaService;
return new ClientClass('unused', ChannelCredentials.createInsecure(), {channelOverride: channel});
}
52 changes: 41 additions & 11 deletions packages/grpc-js/src/single-subchannel-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import { ChannelOptions } from "./channel-options";
import { ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, unregisterChannelzRef } from "./channelz";
import { ConnectivityState } from "./connectivity-state";
import { Propagate, Status } from "./constants";
import { restrictControlPlaneStatusCode } from "./control-plane-status";
import { Deadline, getRelativeTimeout } from "./deadline";
import { Metadata } from "./metadata";
import { getDefaultAuthority } from "./resolver";
import { Subchannel } from "./subchannel";
import { SubchannelCall } from "./subchannel-call";
import { GrpcUri, uriToString } from "./uri-parser";
import { GrpcUri, splitHostPort, uriToString } from "./uri-parser";

class SubchannelCallWrapper implements Call {
private childCall: SubchannelCall | null = null;
Expand All @@ -38,7 +39,20 @@ class SubchannelCallWrapper implements Call {
private readPending = false;
private halfClosePending = false;
private pendingStatus: StatusObject | null = null;
private serviceUrl: string;
constructor(private subchannel: Subchannel, private method: string, private options: CallStreamOptions, private callNumber: number) {
const splitPath: string[] = this.method.split('/');
let serviceName = '';
/* The standard path format is "/{serviceName}/{methodName}", so if we split
* by '/', the first item should be empty and the second should be the
* service name */
if (splitPath.length >= 2) {
serviceName = splitPath[1];
}
const hostname = splitHostPort(this.options.host)?.host ?? 'localhost';
/* Currently, call credentials are only allowed on HTTPS connections, so we
* can assume that the scheme is "https" */
this.serviceUrl = `https://${hostname}/${serviceName}`;
const timeout = getRelativeTimeout(options.deadline);
if (timeout !== Infinity) {
if (timeout <= 0) {
Expand Down Expand Up @@ -79,16 +93,32 @@ class SubchannelCallWrapper implements Call {
});
return;
}
this.childCall = this.subchannel.createCall(metadata, this.options.host, this.method, listener);
if (this.readPending) {
this.childCall.startRead();
}
if (this.pendingMessage) {
this.childCall.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
}
if (this.halfClosePending) {
this.childCall.halfClose();
}
this.subchannel.getCallCredentials()
.generateMetadata({method_name: this.method, service_url: this.serviceUrl})
.then(credsMetadata => {
this.childCall = this.subchannel.createCall(credsMetadata, this.options.host, this.method, listener);
if (this.readPending) {
this.childCall.startRead();
}
if (this.pendingMessage) {
this.childCall.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
}
if (this.halfClosePending) {
this.childCall.halfClose();
}
}, (error: Error & { code: number }) => {
const { code, details } = restrictControlPlaneStatusCode(
typeof error.code === 'number' ? error.code : Status.UNKNOWN,
`Getting metadata from plugin failed with error: ${error.message}`
);
listener.onReceiveStatus(
{
code: code,
details: details,
metadata: new Metadata(),
}
);
});
}
sendMessageWithContext(context: MessageContext, message: Buffer): void {
if (this.childCall) {
Expand Down
8 changes: 8 additions & 0 deletions packages/grpc-js/src/subchannel-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

import { CallCredentials } from './call-credentials';
import { Channel } from './channel';
import type { SubchannelRef } from './channelz';
import { ConnectivityState } from './connectivity-state';
import { Subchannel } from './subchannel';
Expand Down Expand Up @@ -67,6 +68,10 @@ export interface SubchannelInterface {
* subchannel.
*/
getCallCredentials(): CallCredentials;
/**
* Get a channel that can be used to make requests with just this
*/
getChannel(): Channel;
}

export abstract class BaseSubchannelWrapper implements SubchannelInterface {
Expand Down Expand Up @@ -143,4 +148,7 @@ export abstract class BaseSubchannelWrapper implements SubchannelInterface {
getCallCredentials(): CallCredentials {
return this.child.getCallCredentials();
}
getChannel(): Channel {
return this.child.getChannel();
}
}
6 changes: 6 additions & 0 deletions packages/grpc-js/src/subchannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import { SubchannelCallInterceptingListener } from './subchannel-call';
import { SubchannelCall } from './subchannel-call';
import { CallEventTracker, SubchannelConnector, Transport } from './transport';
import { CallCredentials } from './call-credentials';
import { SingleSubchannelChannel } from './single-subchannel-channel';
import { Channel } from './channel';

const TRACER_NAME = 'subchannel';

Expand Down Expand Up @@ -519,4 +521,8 @@ export class Subchannel implements SubchannelInterface {
getCallCredentials(): CallCredentials {
return this.secureConnector.getCallCredentials();
}

getChannel(): Channel {
return new SingleSubchannelChannel(this, this.channelTarget, this.options);
}
}
3 changes: 3 additions & 0 deletions packages/grpc-js/test/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ export class MockSubchannel implements SubchannelInterface {
getCallCredentials(): grpc.CallCredentials {
return grpc.CallCredentials.createEmpty();
}
getChannel(): grpc.Channel {
throw new Error('Method not implemented');
}
}

export { assert2 };
Loading