Skip to content

Commit 63c5418

Browse files
committed
refactor: update code for easier testing, simplify in a couple of places
1 parent 136b5cf commit 63c5418

16 files changed

+568
-393
lines changed

DOCUMENTATION.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ enum SubscriptionType {
147147
DEPTH_PRICE = 3,
148148
DEPTH_ORDER = 4,
149149
TRADES = 5,
150-
CUMLATIVE_VOLUME = 6,
150+
CUMULATIVE_VOLUME = 6,
151151
OHLC = 7,
152152
OHLC_NON_REGULAR = 8,
153153
}
@@ -158,12 +158,12 @@ enum SubscriptionType {
158158
Now that we know how to use the listener and client objects, let's put together a little demo that subscribes to all MSFT messages and prints what's happening to the console:
159159

160160
```ts
161-
import { OpenFeedClient } from "./connection/connection";
161+
import { OpenFeedClient } from "./connection/client";
162162

163163
import { Service } from "../generated/openfeed";
164164
import { SubscriptionType } from "../generated/openfeed_api";
165165
import { OpenFeedListeners } from "./connection/listeners";
166-
import { IOpenFeedLogger } from "./connection/connection_interfaces";
166+
import { IOpenFeedLogger } from "./connection/interfaces";
167167

168168
const connect = async () => {
169169
const logger: IOpenFeedLogger = console;

package.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@
1111
"license": "MIT",
1212
"scripts": {
1313
"prepare": "husky install",
14+
"upgrade:proto": "yarn upgrade proto",
1415
"generate:proto": "cd node_modules\\proto && ..\\..\\protoc --plugin=..\\..\\node_modules\\.bin\\protoc-gen-ts_proto --ts_proto_opt=outputJsonMethods=false --ts_proto_opt=outputPartialMethods=false --ts_proto_opt=exportCommonSymbols=false --ts_proto_opt=esModuleInterop=true --ts_proto_opt=forceLong=long --ts_proto_opt=useExactTypes=false --ts_proto_out=../../generated *.proto",
1516
"generate:version": "genversion --es6 -s -d ./generated/version.ts",
1617
"generate:process": "tsx ./scripts/process.ts",
17-
"generate": "yarn generate:proto && yarn generate:process",
18+
"generate": "yarn upgrade:proto && yarn generate:proto && yarn generate:process",
1819
"build:clear": "rimraf dist",
1920
"build:node": "esbuild --bundle --outfile=dist/node.js --platform=node --target=node16 --format=esm --banner:js=\"import { createRequire } from 'module';const require = createRequire(import.meta.url);\" src/index.ts",
2021
"build:ts": "yarn generate:version && vite build",
@@ -49,7 +50,8 @@
4950
"ts-proto": "^1.156.7",
5051
"tsx": "^3.12.7",
5152
"typescript": "^5.2.2",
52-
"vite": "^4.4.9"
53+
"vite": "^4.4.9",
54+
"vite-plugin-checker": "^0.8.0"
5355
},
5456
"packageManager": "[email protected]",
5557
"dependencies": {

src/connection/client.ts

Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
import WebSocket from "isomorphic-ws";
2+
import Long from "long";
3+
4+
import { OptionalUndefined } from "@src/utilities/messages";
5+
import type { SubscriptionType, OpenfeedGatewayRequest } from "@gen/openfeed_api";
6+
import { Result } from "@gen/openfeed_api";
7+
import type { Service } from "@gen/openfeed";
8+
import { ResolutionSource } from "@src/utilities/async";
9+
import { CorrelationId } from "@src/utilities/correlation_id";
10+
import { OpenFeedConnection } from "@src/connection/connection";
11+
import { receive, send } from "@src/utilities/communication";
12+
import { getClientVersion } from "@src/utilities/client_version";
13+
import { TIME } from "@src/utilities/constants";
14+
import { IOpenFeedClient, IOpenFeedConnection, IOpenFeedLogger } from "./interfaces";
15+
import { OpenFeedListeners } from "./listeners";
16+
import { ConnectionDisposedError, DuplicateLoginError, InvalidCredentialsError } from "./errors";
17+
18+
export class OpenFeedClient implements IOpenFeedClient {
19+
private socket: WebSocket | null = null;
20+
private _connection: OpenFeedConnection | null = null;
21+
22+
// We are going to throw errors internally, outside needs to know only when disposed
23+
private whenConnectedInternalSource = new ResolutionSource<OpenFeedConnection>();
24+
private whenConnectedSource = new ResolutionSource<IOpenFeedConnection>();
25+
private loopResetSource = new ResolutionSource<void>();
26+
private subscribeResetSource = new ResolutionSource<void>();
27+
28+
private readonly subscriptions: Map<string, ResolutionSource<void>> = new Map<string, ResolutionSource<void>>();
29+
30+
constructor(
31+
private readonly url: string,
32+
private readonly username: string,
33+
private readonly password: string,
34+
private readonly listeners: OpenFeedListeners,
35+
private readonly logger?: IOpenFeedLogger,
36+
private readonly clientId?: string
37+
) {
38+
this.runConnectLoop();
39+
}
40+
41+
private onOpen = async () => {
42+
if (!this.socket) return;
43+
44+
const clientVersion = await getClientVersion(this.clientId);
45+
const loginRequest: OptionalUndefined<OpenfeedGatewayRequest> = {
46+
loginRequest: {
47+
correlationId: CorrelationId.create(),
48+
username: this.username,
49+
password: this.password,
50+
clientVersion,
51+
protocolVersion: 1,
52+
jwt: "",
53+
},
54+
};
55+
send(this.socket, loginRequest);
56+
};
57+
58+
private onMessage = async (event: WebSocket.MessageEvent) => {
59+
const [message] = receive(event);
60+
if (message.loginResponse?.token && this.socket) {
61+
this._connection = new OpenFeedConnection(message.loginResponse?.token, this.socket, this.listeners, this.logger);
62+
this.whenConnectedInternalSource.resolve(this._connection);
63+
this.whenConnectedSource.resolve(this._connection);
64+
65+
await this.listeners.onConnected(this._connection);
66+
// this can't be caught in subscriptions,
67+
// we need to be able to catch disconnects even if there are no subscribers
68+
try {
69+
await this._connection.whenDisconnected();
70+
} catch (e) {
71+
if (e instanceof DuplicateLoginError) {
72+
this.logger?.warn("Received duplicate login message, disconnecting...");
73+
this.loopResetSource.reject(e);
74+
}
75+
if (e instanceof ConnectionDisposedError) {
76+
this.logger?.warn("Disposing...");
77+
this.loopResetSource.reject(e);
78+
}
79+
} finally {
80+
await this.listeners.onDisconnected();
81+
this.loopResetSource.resolve();
82+
}
83+
} else if (
84+
[Result.INSUFFICIENT_PRIVILEGES, Result.INVALID_CREDENTIALS, Result.AUTHENTICATION_REQUIRED].includes(
85+
message.loginResponse?.status?.result ?? Result.SUCCESS
86+
)
87+
) {
88+
this.logger?.warn("Received authentication error, disconnecting...");
89+
this.whenConnectedInternalSource.reject(
90+
new InvalidCredentialsError("Invalid credentials provided. Please update credentials and try again.")
91+
);
92+
}
93+
};
94+
95+
private onError = (error: WebSocket.ErrorEvent) => {
96+
this.logger?.log(`Socket error: ${error.message}`);
97+
if (!this.whenConnectedInternalSource.completed) {
98+
this.whenConnectedInternalSource.reject(new Error(`Error when connecting to socket: ${error.message}`));
99+
}
100+
};
101+
102+
private onClose = (event: WebSocket.CloseEvent) => {
103+
this.logger?.log(`Socket closed: ${event.reason}`);
104+
if (!this.whenConnectedInternalSource.completed) {
105+
this.whenConnectedInternalSource.reject(new Error(`Socket closed: ${event.reason}`));
106+
}
107+
};
108+
109+
private runConnectLoop = async () => {
110+
for (;;) {
111+
if (this.socket) {
112+
// It's expected to have a closing state when bad connection happens
113+
if (this.socket.readyState !== WebSocket.CLOSED && this.socket.readyState !== WebSocket.CLOSING) {
114+
this.socket.close(1000, "Closed from socket loop");
115+
}
116+
this.socket = null;
117+
}
118+
119+
try {
120+
this.socket = new WebSocket(this.url);
121+
this.socket.binaryType = "arraybuffer";
122+
123+
this.socket.onopen = this.onOpen;
124+
// We will override the 3 below in the connection and it will be the new listener
125+
this.socket.onmessage = this.onMessage;
126+
this.socket.onerror = this.onError;
127+
this.socket.onclose = this.onClose;
128+
129+
// eslint-disable-next-line no-await-in-loop
130+
await this.whenConnectedInternalSource.whenCompleted;
131+
132+
// eslint-disable-next-line no-await-in-loop
133+
await this.loopResetSource.whenCompleted;
134+
} catch (e) {
135+
const socket = this.socket!;
136+
// these will fire, even though the connection error means we should be already disconnected
137+
socket.onerror = () => {};
138+
socket.onclose = () => {};
139+
socket.onopen = () => {};
140+
141+
if (socket.readyState !== WebSocket.CLOSED && socket.readyState !== WebSocket.CLOSING) {
142+
socket.close(1000, "Socket closed");
143+
}
144+
145+
if (e instanceof DuplicateLoginError || e instanceof InvalidCredentialsError) {
146+
this.logger?.warn("Stopping the client because of unrecoverable error");
147+
// eslint-disable-next-line no-await-in-loop
148+
await this.listeners.onCredentialsRejected();
149+
this.cleanUp();
150+
break;
151+
}
152+
153+
if (e instanceof ConnectionDisposedError) {
154+
this.logger?.warn("Stopping the client because of disposal");
155+
// eslint-disable-next-line no-await-in-loop
156+
await this.listeners.onDisconnected();
157+
this.cleanUp();
158+
break;
159+
}
160+
}
161+
162+
this._connection = null;
163+
if (this.whenConnectedInternalSource.completed) {
164+
this.whenConnectedInternalSource = new ResolutionSource<OpenFeedConnection>();
165+
}
166+
if (this.whenConnectedSource.completed) {
167+
this.whenConnectedSource = new ResolutionSource<IOpenFeedConnection>();
168+
}
169+
this.loopResetSource = new ResolutionSource<void>();
170+
this.subscribeResetSource.resolve();
171+
this.subscribeResetSource = new ResolutionSource<void>();
172+
173+
// eslint-disable-next-line no-await-in-loop
174+
await new Promise((resolve) => {
175+
setTimeout(resolve, TIME.RECONNECT);
176+
});
177+
}
178+
};
179+
180+
private cleanUp = () => {
181+
for (const [, sub] of this.subscriptions) {
182+
sub.resolve();
183+
}
184+
this.subscriptions.clear();
185+
this.whenConnectedSource.reject(new Error("Connection disposed"));
186+
};
187+
188+
private runSubscribeLoop = async (
189+
service: Service,
190+
subscriptionType: SubscriptionType,
191+
snapshotIntervalSeconds: number,
192+
symbols: string[] | null,
193+
marketIds: Long[] | null,
194+
exchanges: string[] | null,
195+
channels: number[] | null,
196+
cancelSource: ResolutionSource<void>
197+
) => {
198+
for (;;) {
199+
try {
200+
// race will trigger even if rejected
201+
// eslint-disable-next-line no-await-in-loop
202+
const connection = await Promise.race([this.connection, cancelSource.whenCompleted]);
203+
if (cancelSource.completed || /* can't actually happen */ !connection) {
204+
return;
205+
}
206+
207+
const subscriptionId = connection.subscribe(
208+
service,
209+
subscriptionType,
210+
snapshotIntervalSeconds,
211+
symbols,
212+
marketIds,
213+
exchanges,
214+
channels
215+
);
216+
217+
// eslint-disable-next-line no-await-in-loop
218+
await Promise.race([this.subscribeResetSource.whenCompleted, cancelSource.whenCompleted]);
219+
220+
if (cancelSource.completed) {
221+
try {
222+
connection.unsubscribe(subscriptionId);
223+
} catch (e) {
224+
// unsubscribe throws if the connection is not available, this is expected
225+
}
226+
return;
227+
}
228+
} catch (error) {
229+
this.logger?.warn("Subscription error:", error);
230+
// eslint-disable-next-line no-await-in-loop
231+
await new Promise((resolve) => {
232+
setTimeout(resolve, TIME.SUBSCRIPTION_RETRY);
233+
});
234+
}
235+
}
236+
};
237+
238+
public subscribe = (
239+
service: Service,
240+
subscriptionType: SubscriptionType,
241+
snapshotIntervalSeconds: number,
242+
symbols: string[] | null = null,
243+
marketIds: Long[] | null = null,
244+
exchanges: string[] | null = null,
245+
channels: number[] | null = null
246+
) => {
247+
const id = CorrelationId.create();
248+
249+
const cancelSource = new ResolutionSource<void>();
250+
this.subscriptions.set(id.toString(), cancelSource);
251+
252+
this.runSubscribeLoop(service, subscriptionType, snapshotIntervalSeconds, symbols, marketIds, exchanges, channels, cancelSource);
253+
254+
return id;
255+
};
256+
257+
public unsubscribe = (subscriptionId: Long) => {
258+
const cancelSource = this.subscriptions.get(subscriptionId.toString());
259+
if (!cancelSource) {
260+
throw new Error(`Subscription ID ${subscriptionId} does not exist.`);
261+
}
262+
263+
this.subscriptions.delete(subscriptionId.toString());
264+
cancelSource.resolve();
265+
};
266+
267+
public get connection() {
268+
if (this._connection) return Promise.resolve(this._connection);
269+
if (this.whenConnectedSource.completed) {
270+
throw new ConnectionDisposedError("Connection disposed");
271+
}
272+
return this.whenConnectedSource.whenCompleted;
273+
}
274+
275+
public dispose = () => {
276+
if (this._connection) {
277+
this._connection.dispose();
278+
} else {
279+
this.whenConnectedInternalSource.reject(new ConnectionDisposedError("Connection disposed"));
280+
}
281+
};
282+
}

0 commit comments

Comments
 (0)