diff --git a/packages/node-http-handler/src/get-transformed-headers.ts b/packages/node-http-handler/src/get-transformed-headers.ts new file mode 100644 index 0000000000000..7542a5505b767 --- /dev/null +++ b/packages/node-http-handler/src/get-transformed-headers.ts @@ -0,0 +1,17 @@ +import { HeaderBag } from "@aws-sdk/types"; +import { IncomingHttpHeaders } from "http2"; + +const getTransformedHeaders = (headers: IncomingHttpHeaders) => { + const transformedHeaders: HeaderBag = {}; + + for (let name of Object.keys(headers)) { + let headerValues = headers[name]; + transformedHeaders[name] = Array.isArray(headerValues) + ? headerValues.join(",") + : headerValues; + } + + return transformedHeaders; +}; + +export { getTransformedHeaders }; diff --git a/packages/node-http-handler/src/index.ts b/packages/node-http-handler/src/index.ts index 4cbad43bcf08a..d3f0430877a94 100644 --- a/packages/node-http-handler/src/index.ts +++ b/packages/node-http-handler/src/index.ts @@ -1 +1,2 @@ export * from "./node-http-handler"; +export * from "./node-http2-handler"; diff --git a/packages/node-http-handler/src/node-http-handler.ts b/packages/node-http-handler/src/node-http-handler.ts index c24256a82b4f0..fdb1fae830752 100644 --- a/packages/node-http-handler/src/node-http-handler.ts +++ b/packages/node-http-handler/src/node-http-handler.ts @@ -3,7 +3,6 @@ import * as http from "http"; import { Readable } from "stream"; import { buildQueryString } from "@aws-sdk/querystring-builder"; import { - HeaderBag, HttpHandler, HttpHandlerOptions, HttpRequest, @@ -13,6 +12,7 @@ import { import { setConnectionTimeout } from "./set-connection-timeout"; import { setSocketTimeout } from "./set-socket-timeout"; import { writeRequestBody } from "./write-request-body"; +import { getTransformedHeaders } from "./get-transformed-headers"; export class NodeHttpHandler implements HttpHandler { private readonly httpAgent: http.Agent; @@ -31,33 +31,9 @@ export class NodeHttpHandler implements HttpHandler { handle( request: HttpRequest, - options: HttpHandlerOptions + { abortSignal }: HttpHandlerOptions ): Promise> { - // determine which http(s) client to use - const isSSL = request.protocol === "https:"; - const httpClient = isSSL ? https : http; - - let path = request.path; - if (request.query) { - const queryString = buildQueryString(request.query); - if (queryString) { - path += `?${queryString}`; - } - } - - const nodeHttpsOptions: https.RequestOptions = { - headers: request.headers, - host: request.hostname, - method: request.method, - path: path, - port: request.port, - agent: isSSL ? this.httpsAgent : this.httpAgent - }; - return new Promise((resolve, reject) => { - const abortSignal = options && options.abortSignal; - const { connectionTimeout, socketTimeout } = this.httpOptions; - // if the request was already aborted, prevent doing extra work if (abortSignal && abortSignal.aborted) { const abortError = new Error("Request aborted"); @@ -66,21 +42,23 @@ export class NodeHttpHandler implements HttpHandler { return; } - // create the http request - const req = (httpClient as typeof http).request(nodeHttpsOptions, res => { - const httpHeaders = res.headers; - const transformedHeaders: HeaderBag = {}; - - for (let name of Object.keys(httpHeaders)) { - let headerValues = httpHeaders[name]; - transformedHeaders[name] = Array.isArray(headerValues) - ? headerValues.join(",") - : headerValues; - } + // determine which http(s) client to use + const isSSL = request.protocol === "https:"; + const queryString = buildQueryString(request.query || {}); + const nodeHttpsOptions: https.RequestOptions = { + headers: request.headers, + host: request.hostname, + method: request.method, + path: queryString ? `${request.path}?${queryString}` : request.path, + port: request.port, + agent: isSSL ? this.httpsAgent : this.httpAgent + }; + // create the http request + const req = (isSSL ? https : http).request(nodeHttpsOptions, res => { const httpResponse: HttpResponse = { statusCode: res.statusCode || -1, - headers: transformedHeaders, + headers: getTransformedHeaders(res.headers), body: res }; resolve(httpResponse); @@ -89,8 +67,8 @@ export class NodeHttpHandler implements HttpHandler { req.on("error", reject); // wire-up any timeout logic - setConnectionTimeout(req, reject, connectionTimeout); - setSocketTimeout(req, reject, socketTimeout); + setConnectionTimeout(req, reject, this.httpOptions.connectionTimeout); + setSocketTimeout(req, reject, this.httpOptions.socketTimeout); // wire-up abort logic if (abortSignal) { diff --git a/packages/node-http-handler/src/node-http2-handler.spec.ts b/packages/node-http-handler/src/node-http2-handler.spec.ts new file mode 100644 index 0000000000000..3e7a61906cfc7 --- /dev/null +++ b/packages/node-http-handler/src/node-http2-handler.spec.ts @@ -0,0 +1,205 @@ +import { NodeHttp2Handler } from "./node-http2-handler"; +import { createMockHttp2Server, createResponseFunction } from "./server.mock"; +import { AbortController } from "@aws-sdk/abort-controller"; + +describe("NodeHttp2Handler", () => { + let nodeH2Handler: NodeHttp2Handler; + + const protocol = "http:"; + const hostname = "localhost"; + const port = 45321; + const mockH2Server = createMockHttp2Server().listen(port); + const getMockRequest = () => ({ + protocol, + hostname, + port, + method: "GET", + path: "/", + headers: {} + }); + + const mockResponse = { + statusCode: 200, + headers: {}, + body: "test" + }; + + beforeEach(() => { + nodeH2Handler = new NodeHttp2Handler(); + mockH2Server.on("request", createResponseFunction(mockResponse)); + }); + + afterEach(() => { + mockH2Server.removeAllListeners("request"); + // @ts-ignore: access private property + const connectionPool = nodeH2Handler.connectionPool; + for (const [, session] of connectionPool) { + session.destroy(); + } + connectionPool.clear(); + }); + + afterAll(() => { + mockH2Server.close(); + }); + + describe("connectionPool", () => { + it("is empty on initialization", () => { + // @ts-ignore: access private property + expect(nodeH2Handler.connectionPool.size).toBe(0); + }); + + it("creates and stores session when request is made", async () => { + await nodeH2Handler.handle(getMockRequest(), {}); + + // @ts-ignore: access private property + expect(nodeH2Handler.connectionPool.size).toBe(1); + expect( + // @ts-ignore: access private property + nodeH2Handler.connectionPool.get(`${protocol}//${hostname}:${port}`) + ).toBeDefined(); + }); + + it("reuses existing session if request is made on same authority again", async () => { + await nodeH2Handler.handle(getMockRequest(), {}); + // @ts-ignore: access private property + expect(nodeH2Handler.connectionPool.size).toBe(1); + + // @ts-ignore: access private property + const session: ClientHttp2Session = nodeH2Handler.connectionPool.get( + `${protocol}//${hostname}:${port}` + ); + const requestSpy = jest.spyOn(session, "request"); + + await nodeH2Handler.handle(getMockRequest(), {}); + // @ts-ignore: access private property + expect(nodeH2Handler.connectionPool.size).toBe(1); + expect(requestSpy.mock.calls.length).toBe(1); + }); + + it("creates new session if request is made on new authority", async () => { + await nodeH2Handler.handle(getMockRequest(), {}); + // @ts-ignore: access private property + expect(nodeH2Handler.connectionPool.size).toBe(1); + + const port2 = port + 1; + const mockH2Server2 = createMockHttp2Server().listen(port2); + mockH2Server2.on("request", createResponseFunction(mockResponse)); + + await nodeH2Handler.handle({ ...getMockRequest(), port: port2 }, {}); + // @ts-ignore: access private property + expect(nodeH2Handler.connectionPool.size).toBe(2); + expect( + // @ts-ignore: access private property + nodeH2Handler.connectionPool.get(`${protocol}//${hostname}:${port2}`) + ).toBeDefined(); + + mockH2Server2.close(); + }); + + it("closes and removes session on sessionTimeout", async done => { + const sessionTimeout = 500; + nodeH2Handler = new NodeHttp2Handler({ sessionTimeout }); + await nodeH2Handler.handle(getMockRequest(), {}); + + const authority = `${protocol}//${hostname}:${port}`; + // @ts-ignore: access private property + const session: ClientHttp2Session = nodeH2Handler.connectionPool.get( + authority + ); + expect(session.closed).toBe(false); + setTimeout(() => { + expect(session.closed).toBe(true); + // @ts-ignore: access private property + expect(nodeH2Handler.connectionPool.get(authority)).not.toBeDefined(); + done(); + }, sessionTimeout + 100); + }); + }); + + describe("destroy", () => { + it("destroys sessions and clears connectionPool", async () => { + await nodeH2Handler.handle(getMockRequest(), {}); + + // @ts-ignore: access private property + const session: ClientHttp2Session = nodeH2Handler.connectionPool.get( + `${protocol}//${hostname}:${port}` + ); + + // @ts-ignore: access private property + expect(nodeH2Handler.connectionPool.size).toBe(1); + expect(session.destroyed).toBe(false); + nodeH2Handler.destroy(); + // @ts-ignore: access private property + expect(nodeH2Handler.connectionPool.size).toBe(0); + expect(session.destroyed).toBe(true); + }); + }); + + describe("abortSignal", () => { + it("will not create session if request already aborted", async () => { + // @ts-ignore: access private property + expect(nodeH2Handler.connectionPool.size).toBe(0); + await expect( + nodeH2Handler.handle(getMockRequest(), { + abortSignal: { + aborted: true + } + }) + ).rejects.toHaveProperty("name", "AbortError"); + // @ts-ignore: access private property + expect(nodeH2Handler.connectionPool.size).toBe(0); + }); + + it("will not create request on session if request already aborted", async () => { + await nodeH2Handler.handle(getMockRequest(), {}); + + // @ts-ignore: access private property + const session: ClientHttp2Session = nodeH2Handler.connectionPool.get( + `${protocol}//${hostname}:${port}` + ); + const requestSpy = jest.spyOn(session, "request"); + + await expect( + nodeH2Handler.handle(getMockRequest(), { + abortSignal: { + aborted: true + } + }) + ).rejects.toHaveProperty("name", "AbortError"); + expect(requestSpy.mock.calls.length).toBe(0); + }); + + it("will close request on session when aborted", async () => { + await nodeH2Handler.handle(getMockRequest(), {}); + + // @ts-ignore: access private property + const session: ClientHttp2Session = nodeH2Handler.connectionPool.get( + `${protocol}//${hostname}:${port}` + ); + const requestSpy = jest.spyOn(session, "request"); + + const abortController = new AbortController(); + // Delay response so that onabort is called earlier + setTimeout(() => { + abortController.abort(); + }, 0); + mockH2Server.on( + "request", + async () => + new Promise(resolve => { + setTimeout(() => { + resolve(createResponseFunction(mockResponse)); + }, 1000); + }) + ); + + await expect( + nodeH2Handler.handle(getMockRequest(), { + abortSignal: abortController.signal + }) + ).rejects.toHaveProperty("name", "AbortError"); + expect(requestSpy.mock.calls.length).toBe(1); + }); + }); +}); diff --git a/packages/node-http-handler/src/node-http2-handler.ts b/packages/node-http-handler/src/node-http2-handler.ts new file mode 100644 index 0000000000000..763a43b8c8ec1 --- /dev/null +++ b/packages/node-http-handler/src/node-http2-handler.ts @@ -0,0 +1,113 @@ +import { connect, constants, ClientHttp2Session } from "http2"; + +import { Readable } from "stream"; +import { buildQueryString } from "@aws-sdk/querystring-builder"; +import { + HttpHandler, + HttpHandlerOptions, + HttpRequest, + HttpResponse, + NodeHttp2Options +} from "@aws-sdk/types"; + +import { writeRequestBody } from "./write-request-body"; +import { getTransformedHeaders } from "./get-transformed-headers"; + +export class NodeHttp2Handler + implements HttpHandler { + private readonly connectionPool: Map; + + constructor(private readonly http2Options: NodeHttp2Options = {}) { + this.connectionPool = new Map(); + } + + destroy(): void { + for (const [_, http2Session] of this.connectionPool) { + http2Session.destroy(); + } + this.connectionPool.clear(); + } + + handle( + request: HttpRequest, + { abortSignal }: HttpHandlerOptions + ): Promise> { + return new Promise((resolve, reject) => { + // if the request was already aborted, prevent doing extra work + if (abortSignal && abortSignal.aborted) { + const abortError = new Error("Request aborted"); + abortError.name = "AbortError"; + reject(abortError); + return; + } + + const { hostname, method, port, protocol, path, query } = request; + const queryString = buildQueryString(query || {}); + + // create the http2 request + const req = this.getSession( + `${protocol}//${hostname}${port ? `:${port}` : ""}` + ).request({ + ...request.headers, + [constants.HTTP2_HEADER_PATH]: queryString + ? `${path}?${queryString}` + : path, + [constants.HTTP2_HEADER_METHOD]: method + }); + + req.on("response", headers => { + const httpResponse: HttpResponse = { + statusCode: headers[":status"] || -1, + headers: getTransformedHeaders(headers), + body: req + }; + resolve(httpResponse); + }); + + req.on("error", reject); + req.on("frameError", reject); + req.on("aborted", reject); + + const { requestTimeout } = this.http2Options; + if (requestTimeout) { + req.setTimeout(requestTimeout, () => { + req.close(); + const timeoutError = new Error( + `Stream timed out because of no activity for ${requestTimeout} ms` + ); + timeoutError.name = "TimeoutError"; + reject(timeoutError); + }); + } + + if (abortSignal) { + abortSignal.onabort = () => { + req.close(); + const abortError = new Error("Request aborted"); + abortError.name = "AbortError"; + reject(abortError); + }; + } + + writeRequestBody(req, request); + }); + } + + private getSession(authority: string): ClientHttp2Session { + const connectionPool = this.connectionPool; + const existingSession = connectionPool.get(authority); + if (existingSession) return existingSession; + + const newSession = connect(authority); + connectionPool.set(authority, newSession); + + const { sessionTimeout } = this.http2Options; + if (sessionTimeout) { + newSession.setTimeout(sessionTimeout, () => { + newSession.close(); + connectionPool.delete(authority); + }); + } + return newSession; + } +} diff --git a/packages/node-http-handler/src/server.mock.ts b/packages/node-http-handler/src/server.mock.ts index 88c76caf75224..637ff913edd92 100644 --- a/packages/node-http-handler/src/server.mock.ts +++ b/packages/node-http-handler/src/server.mock.ts @@ -8,6 +8,7 @@ import { createServer as createHttpsServer, Server as HttpsServer } from "https"; +import { createServer as createHttp2Server, Http2Server } from "http2"; import { readFileSync } from "fs"; import { join } from "path"; import { Readable } from "stream"; @@ -54,3 +55,8 @@ export function createMockHttpServer(): HttpServer { const server = createHttpServer(); return server; } + +export function createMockHttp2Server(): Http2Server { + const server = createHttp2Server(); + return server; +} diff --git a/packages/node-http-handler/src/write-request-body.ts b/packages/node-http-handler/src/write-request-body.ts index 7d73d0f170ef1..428d076a3df0e 100644 --- a/packages/node-http-handler/src/write-request-body.ts +++ b/packages/node-http-handler/src/write-request-body.ts @@ -1,9 +1,10 @@ import { ClientRequest } from "http"; +import { ClientHttp2Stream } from "http2"; import { Readable } from "stream"; import { HttpRequest } from "@aws-sdk/types"; export function writeRequestBody( - httpRequest: ClientRequest, + httpRequest: ClientRequest | ClientHttp2Stream, request: HttpRequest ) { const expect = request.headers["Expect"] || request.headers["expect"]; @@ -17,7 +18,7 @@ export function writeRequestBody( } function writeBody( - httpRequest: ClientRequest, + httpRequest: ClientRequest | ClientHttp2Stream, body?: string | ArrayBuffer | ArrayBufferView | Readable ) { if (body instanceof Readable) { diff --git a/packages/types/src/http.ts b/packages/types/src/http.ts index d2f5f690a1911..b873bef07f453 100644 --- a/packages/types/src/http.ts +++ b/packages/types/src/http.ts @@ -175,3 +175,21 @@ export interface NodeHttpOptions extends HttpOptions { */ socketTimeout?: number; } + +/** + * Represents the http2 options that can be passed to a node http2 client. + */ +export interface NodeHttp2Options extends HttpOptions { + /** + * The maximum time in milliseconds that a stream may remain idle before it + * is closed. + */ + requestTimeout?: number; + + /** + * The maximum time in milliseconds that a session or socket may remain idle + * before it is closed. + * https://nodejs.org/docs/latest-v12.x/api/http2.html#http2_http2session_and_sockets + */ + sessionTimeout?: number; +}