From 8ae53804d94fa31683457e4f1be405c8b42e56cf Mon Sep 17 00:00:00 2001 From: Laurin Quast Date: Fri, 16 Jul 2021 09:18:35 +0200 Subject: [PATCH 1/5] feat: hook into published subscribe values --- .changeset/fifty-kids-boil.md | 6 ++ packages/core/src/orchestrator.ts | 64 +++++++++++--------- packages/core/src/utils.ts | 57 +++++++++++++++++- packages/core/test/common.ts | 4 ++ packages/core/test/subscribe.spec.ts | 90 ++++++++++++++++++++++++++++ packages/testing/src/index.ts | 8 +++ packages/types/src/graphql.ts | 19 ++++++ packages/types/src/hooks.ts | 13 ++-- packages/types/src/index.ts | 1 + packages/types/src/utils.ts | 2 + 10 files changed, 231 insertions(+), 33 deletions(-) create mode 100644 .changeset/fifty-kids-boil.md create mode 100644 packages/core/test/subscribe.spec.ts create mode 100644 packages/types/src/graphql.ts diff --git a/.changeset/fifty-kids-boil.md b/.changeset/fifty-kids-boil.md new file mode 100644 index 0000000000..00392ced0a --- /dev/null +++ b/.changeset/fifty-kids-boil.md @@ -0,0 +1,6 @@ +--- +'@envelop/core': minor +'@envelop/types': minor +--- + +allow hooking into published subscribe values diff --git a/packages/core/src/orchestrator.ts b/packages/core/src/orchestrator.ts index e02b75453b..11ae570974 100644 --- a/packages/core/src/orchestrator.ts +++ b/packages/core/src/orchestrator.ts @@ -11,12 +11,15 @@ import { OnParseHook, OnResolverCalledHook, OnSubscribeHook, + OnSubscribeResultResult, OnValidateHook, Plugin, SubscribeResultHook, TypedSubscriptionArgs, TypedExecutionArgs, + SubscribeFunction, } from '@envelop/types'; +import { isAsyncIterable } from '@graphql-tools/utils'; import { DocumentNode, execute, @@ -29,12 +32,12 @@ import { parse, specifiedRules, subscribe, - SubscriptionArgs, validate, ValidationRule, } from 'graphql'; import { Maybe } from 'graphql/jsutils/Maybe'; import { prepareTracedSchema, resolversHooksSymbol } from './traced-schema'; +import { finalAsyncIterator, makeSubscribe, mapAsyncIterator } from './utils'; export type EnvelopOrchestrator< InitialContext extends ArbitraryObject = ArbitraryObject, @@ -272,32 +275,9 @@ export function createEnvelopOrchestrator(plugins: Plugin[ } : initialContext => orchestratorCtx => orchestratorCtx ? { ...initialContext, ...orchestratorCtx } : initialContext; - const customSubscribe = async ( - argsOrSchema: SubscriptionArgs | GraphQLSchema, - document?: DocumentNode, - rootValue?: any, - contextValue?: any, - variableValues?: Maybe<{ [key: string]: any }>, - operationName?: Maybe, - fieldResolver?: Maybe>, - subscribeFieldResolver?: Maybe> - ) => { - const args: SubscriptionArgs = - argsOrSchema instanceof GraphQLSchema - ? { - schema: argsOrSchema, - document: document!, - rootValue, - contextValue, - variableValues, - operationName, - fieldResolver, - subscribeFieldResolver, - } - : argsOrSchema; - + const customSubscribe = makeSubscribe(async args => { const onResolversHandlers: OnResolverCalledHook[] = []; - let subscribeFn: typeof subscribe = subscribe; + let subscribeFn = subscribe as SubscribeFunction; const afterCalls: SubscribeResultHook[] = []; let context = args.contextValue || {}; @@ -333,17 +313,45 @@ export function createEnvelopOrchestrator(plugins: Plugin[ contextValue: context, }); + const onNextHandler: Exclude[] = []; + const onEndHandler: Exclude[] = []; + for (const afterCb of afterCalls) { - afterCb({ + const hookResult = afterCb({ result, setResult: newResult => { result = newResult; }, }); + if (hookResult) { + if (hookResult.onNext) { + onNextHandler.push(hookResult.onNext); + } + if (hookResult.onEnd) { + onEndHandler.push(hookResult.onEnd); + } + } } + if (isAsyncIterable(result)) { + if (onNextHandler.length) { + result = mapAsyncIterator(result, async result => { + for (const onNext of onNextHandler) { + await onNext({ result, setResult: newResult => (result = newResult) }); + } + return result; + }); + } + if (onEndHandler.length) { + result = finalAsyncIterator(result, () => { + for (const onEnd of onEndHandler) { + onEnd(); + } + }); + } + } return result; - }; + }); const customExecute = beforeCallbacks.execute.length ? async ( diff --git a/packages/core/src/utils.ts b/packages/core/src/utils.ts index f4f32be7a1..65fe29a07f 100644 --- a/packages/core/src/utils.ts +++ b/packages/core/src/utils.ts @@ -1,4 +1,16 @@ -import { ASTNode, DocumentNode, Kind, OperationDefinitionNode, visit, BREAK, Source } from 'graphql'; +import { + ASTNode, + DocumentNode, + Kind, + OperationDefinitionNode, + visit, + BREAK, + Source, + ExecutionResult, + SubscriptionArgs, +} from 'graphql'; +import { PolymorphicSubscribeArguments } from '@envelop/types'; +import { PromiseOrValue } from 'graphql/jsutils/PromiseOrValue'; export const envelopIsIntrospectionSymbol = Symbol('ENVELOP_IS_INTROSPECTION'); @@ -34,3 +46,46 @@ export function isIntrospectionDocument(document: DocumentNode): boolean { export function isIntrospectionOperationString(operation: string | Source): boolean { return (typeof operation === 'string' ? operation : operation.body).indexOf('__schema') !== -1; } + +export function getSubscribeArgs(args: PolymorphicSubscribeArguments): SubscriptionArgs { + return args.length === 1 + ? args[0] + : { + schema: args[0], + document: args[1], + rootValue: args[2], + contextValue: args[3], + variableValues: args[4], + operationName: args[5], + fieldResolver: args[6], + subscribeFieldResolver: args[7], + }; +} + +/** + * Utility function for making a subscribe function that handles polymorphic arguments. + */ +export const makeSubscribe = + (subscribeFn: (args: SubscriptionArgs) => PromiseOrValue | ExecutionResult>) => + (...polyArgs: PolymorphicSubscribeArguments): PromiseOrValue | ExecutionResult> => + subscribeFn(getSubscribeArgs(polyArgs)); + +export async function* mapAsyncIterator( + asyncIterable: AsyncIterableIterator, + map: (input: TInput) => Promise | TOutput +): AsyncIterableIterator { + for await (const value of asyncIterable) { + yield map(value); + } +} + +export async function* finalAsyncIterator( + asyncIterable: AsyncIterableIterator, + onFinal: () => void +): AsyncIterableIterator { + try { + yield* asyncIterable; + } finally { + onFinal(); + } +} diff --git a/packages/core/test/common.ts b/packages/core/test/common.ts index 261a7cc736..4224b526fa 100644 --- a/packages/core/test/common.ts +++ b/packages/core/test/common.ts @@ -9,6 +9,10 @@ export const schema = makeExecutableSchema({ id: ID! name: String! } + + type Subscription { + alphabet: String! + } `, resolvers: { Query: { diff --git a/packages/core/test/subscribe.spec.ts b/packages/core/test/subscribe.spec.ts new file mode 100644 index 0000000000..53749c187a --- /dev/null +++ b/packages/core/test/subscribe.spec.ts @@ -0,0 +1,90 @@ +import { assertStreamExecutionValue, collectAsyncIteratorValues, createTestkit } from '@envelop/testing'; +import { ExecutionResult } from 'graphql'; +import { schema } from './common'; + +describe('subscribe', () => { + it('Should be able to manipulate streams', async () => { + const streamExecuteFn = async function* () { + for (const value of ['a', 'b', 'c', 'd']) { + yield { data: { alphabet: value } }; + } + }; + + const teskit = createTestkit( + [ + { + onSubscribe({ setSubscribeFn }) { + setSubscribeFn(streamExecuteFn); + + return { + onSubscribeResult: () => { + return { + onNext: ({ setResult }) => { + setResult({ data: { alphabet: 'x' } }); + }, + }; + }, + }; + }, + }, + ], + schema + ); + + const result = await teskit.execute(/* GraphQL */ ` + subscription { + alphabet + } + `); + assertStreamExecutionValue(result); + const values = await collectAsyncIteratorValues(result); + expect(values).toEqual([ + { data: { alphabet: 'x' } }, + { data: { alphabet: 'x' } }, + { data: { alphabet: 'x' } }, + { data: { alphabet: 'x' } }, + ]); + }); + + it('Should be able to invoke something after the stream has ended.', async () => { + expect.assertions(1); + const streamExecuteFn = async function* () { + for (const value of ['a', 'b', 'c', 'd']) { + yield { data: { alphabet: value } }; + } + }; + + const teskit = createTestkit( + [ + { + onSubscribe({ setSubscribeFn }) { + setSubscribeFn(streamExecuteFn); + + return { + onSubscribeResult: () => { + let latestResult: ExecutionResult; + return { + onNext: ({ result }) => { + latestResult = result; + }, + onEnd: () => { + expect(latestResult).toEqual({ data: { alphabet: 'd' } }); + }, + }; + }, + }; + }, + }, + ], + schema + ); + + const result = await teskit.execute(/* GraphQL */ ` + subscription { + alphabet + } + `); + assertStreamExecutionValue(result); + await collectAsyncIteratorValues(result); + }); +}); diff --git a/packages/testing/src/index.ts b/packages/testing/src/index.ts index e49b97c8ac..4e51be3be0 100644 --- a/packages/testing/src/index.ts +++ b/packages/testing/src/index.ts @@ -177,3 +177,11 @@ export function assertStreamExecutionValue(input: ExecutionReturn): asserts inpu throw new Error('Received single result but expected stream.'); } } + +export const collectAsyncIteratorValues = async (asyncIterable: AsyncIterableIterator): Promise> => { + const values: Array = []; + for await (const value of asyncIterable) { + values.push(value); + } + return values; +}; diff --git a/packages/types/src/graphql.ts b/packages/types/src/graphql.ts new file mode 100644 index 0000000000..b99e1a9ed3 --- /dev/null +++ b/packages/types/src/graphql.ts @@ -0,0 +1,19 @@ +import type { DocumentNode, GraphQLFieldResolver, GraphQLSchema, SubscriptionArgs, ExecutionResult } from 'graphql'; +import type { Maybe, PromiseOrValue, AsyncIterableIteratorOrValue } from './utils'; + +export type PolymorphicSubscribeArguments = + | [SubscriptionArgs] + | [ + GraphQLSchema, + DocumentNode, + any?, + any?, + Maybe<{ [key: string]: any }>?, + Maybe?, + Maybe>?, + Maybe>? + ]; + +export type SubscribeFunction = ( + ...args: PolymorphicSubscribeArguments +) => PromiseOrValue>; diff --git a/packages/types/src/hooks.ts b/packages/types/src/hooks.ts index 142a992b41..576ff540cb 100644 --- a/packages/types/src/hooks.ts +++ b/packages/types/src/hooks.ts @@ -1,4 +1,4 @@ -import { +import type { DocumentNode, execute, ExecutionArgs, @@ -9,7 +9,6 @@ import { parse, ParseOptions, Source, - subscribe, SubscriptionArgs, TypeInfo, validate, @@ -18,6 +17,7 @@ import { import { Maybe } from 'graphql/jsutils/Maybe'; import { PromiseOrValue } from 'graphql/jsutils/PromiseOrValue'; import { DefaultContext } from 'packages/core/src'; +import { SubscribeFunction } from './graphql'; import { Plugin } from './plugin'; export type DefaultArgs = Record; @@ -158,7 +158,7 @@ export type OnExecuteHook = ( /** onSubscribe */ export type TypedSubscriptionArgs = Omit & { contextValue: ContextType }; -export type OriginalSubscribeFn = typeof subscribe; +export type OriginalSubscribeFn = SubscribeFunction; export type OnSubscribeEventPayload = { subscribeFn: OriginalSubscribeFn; args: TypedSubscriptionArgs; @@ -169,7 +169,12 @@ export type OnSubscribeResultEventPayload = { result: AsyncIterableIterator | ExecutionResult; setResult: (newResult: AsyncIterableIterator | ExecutionResult) => void; }; -export type SubscribeResultHook = (options: OnSubscribeResultEventPayload) => void; +export type OnSubscribeResultResultOnNextPayload = { result: ExecutionResult; setResult: (newResult: ExecutionResult) => void }; +export type OnSubscribeResultResult = { + onNext?: (options: OnSubscribeResultResultOnNextPayload) => void | Promise; + onEnd?: () => void; +}; +export type SubscribeResultHook = (options: OnSubscribeResultEventPayload) => void | OnSubscribeResultResult; export type OnSubscribeHookResult = { onSubscribeResult?: SubscribeResultHook; onResolverCalled?: OnResolverCalledHook; diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index aa199b7e4d..f84d34be86 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -2,4 +2,5 @@ export * from './context-types'; export * from './hooks'; export * from './plugin'; export * from './get-enveloped'; +export * from './graphql'; export * from './utils'; diff --git a/packages/types/src/utils.ts b/packages/types/src/utils.ts index bd82a7b611..9128d46f87 100644 --- a/packages/types/src/utils.ts +++ b/packages/types/src/utils.ts @@ -27,4 +27,6 @@ export type Unarray = T extends Array ? U : T; export type ArbitraryObject = Record; export type PromiseOrValue = T | Promise; +export type AsyncIterableIteratorOrValue = T | AsyncIterableIterator; +export type UnwrapPromiseValue = TValue extends Promise ? TWrappedValue : TValue; export type Maybe = T | null | undefined; From 692ed33d523f1a126d725de516305e34d9cc7640 Mon Sep 17 00:00:00 2001 From: Laurin Quast Date: Fri, 16 Jul 2021 09:20:31 +0200 Subject: [PATCH 2/5] fix import --- packages/core/src/orchestrator.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/src/orchestrator.ts b/packages/core/src/orchestrator.ts index 11ae570974..6ecd9baa44 100644 --- a/packages/core/src/orchestrator.ts +++ b/packages/core/src/orchestrator.ts @@ -19,7 +19,7 @@ import { TypedExecutionArgs, SubscribeFunction, } from '@envelop/types'; -import { isAsyncIterable } from '@graphql-tools/utils'; +import { isAsyncIterable } from 'graphql/jsutils/isAsyncIterable'; import { DocumentNode, execute, From f826ccd4653cfcc2f91476e2260ac4a9b00e290f Mon Sep 17 00:00:00 2001 From: Laurin Quast Date: Fri, 16 Jul 2021 09:21:40 +0200 Subject: [PATCH 3/5] fix type definition --- packages/core/src/graphql-typings.d.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/src/graphql-typings.d.ts b/packages/core/src/graphql-typings.d.ts index 474528ad13..2dafe88d31 100644 --- a/packages/core/src/graphql-typings.d.ts +++ b/packages/core/src/graphql-typings.d.ts @@ -1,4 +1,4 @@ declare module 'graphql/jsutils/isAsyncIterable' { - function isAsyncIterable(input: unknown): input is AsyncIterable; + function isAsyncIterable(input: unknown): input is AsyncIterableIterator; export default isAsyncIterable; } From 51e6643ee6525785c7f5a3fcddf94f63aa2fbc35 Mon Sep 17 00:00:00 2001 From: Laurin Quast Date: Fri, 16 Jul 2021 09:22:26 +0200 Subject: [PATCH 4/5] fix import... --- packages/core/src/orchestrator.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/src/orchestrator.ts b/packages/core/src/orchestrator.ts index 6ecd9baa44..8e9b79a5c4 100644 --- a/packages/core/src/orchestrator.ts +++ b/packages/core/src/orchestrator.ts @@ -19,7 +19,7 @@ import { TypedExecutionArgs, SubscribeFunction, } from '@envelop/types'; -import { isAsyncIterable } from 'graphql/jsutils/isAsyncIterable'; +import isAsyncIterable from 'graphql/jsutils/isAsyncIterable'; import { DocumentNode, execute, From 7ab1bd755c9c4375bad19fefe75a396d6b644fff Mon Sep 17 00:00:00 2001 From: Laurin Quast Date: Fri, 16 Jul 2021 09:30:08 +0200 Subject: [PATCH 5/5] refactor: generate types for everything --- packages/core/src/orchestrator.ts | 37 +++++++++++++++---------------- packages/types/src/hooks.ts | 11 ++++++--- packages/types/src/utils.ts | 1 - 3 files changed, 26 insertions(+), 23 deletions(-) diff --git a/packages/core/src/orchestrator.ts b/packages/core/src/orchestrator.ts index 8e9b79a5c4..23862fe8b2 100644 --- a/packages/core/src/orchestrator.ts +++ b/packages/core/src/orchestrator.ts @@ -11,13 +11,14 @@ import { OnParseHook, OnResolverCalledHook, OnSubscribeHook, - OnSubscribeResultResult, OnValidateHook, Plugin, SubscribeResultHook, TypedSubscriptionArgs, TypedExecutionArgs, SubscribeFunction, + OnSubscribeResultResultOnNextHook, + OnSubscribeResultResultOnEndHook, } from '@envelop/types'; import isAsyncIterable from 'graphql/jsutils/isAsyncIterable'; import { @@ -313,8 +314,8 @@ export function createEnvelopOrchestrator(plugins: Plugin[ contextValue: context, }); - const onNextHandler: Exclude[] = []; - const onEndHandler: Exclude[] = []; + const onNextHandler: OnSubscribeResultResultOnNextHook[] = []; + const onEndHandler: OnSubscribeResultResultOnEndHook[] = []; for (const afterCb of afterCalls) { const hookResult = afterCb({ @@ -333,22 +334,20 @@ export function createEnvelopOrchestrator(plugins: Plugin[ } } - if (isAsyncIterable(result)) { - if (onNextHandler.length) { - result = mapAsyncIterator(result, async result => { - for (const onNext of onNextHandler) { - await onNext({ result, setResult: newResult => (result = newResult) }); - } - return result; - }); - } - if (onEndHandler.length) { - result = finalAsyncIterator(result, () => { - for (const onEnd of onEndHandler) { - onEnd(); - } - }); - } + if (onNextHandler.length && isAsyncIterable(result)) { + result = mapAsyncIterator(result, async result => { + for (const onNext of onNextHandler) { + await onNext({ result, setResult: newResult => (result = newResult) }); + } + return result; + }); + } + if (onEndHandler.length && isAsyncIterable(result)) { + result = finalAsyncIterator(result, () => { + for (const onEnd of onEndHandler) { + onEnd(); + } + }); } return result; }); diff --git a/packages/types/src/hooks.ts b/packages/types/src/hooks.ts index 576ff540cb..3abc4f9aa0 100644 --- a/packages/types/src/hooks.ts +++ b/packages/types/src/hooks.ts @@ -169,10 +169,15 @@ export type OnSubscribeResultEventPayload = { result: AsyncIterableIterator | ExecutionResult; setResult: (newResult: AsyncIterableIterator | ExecutionResult) => void; }; -export type OnSubscribeResultResultOnNextPayload = { result: ExecutionResult; setResult: (newResult: ExecutionResult) => void }; +export type OnSubscribeResultResultOnNextHookPayload = { + result: ExecutionResult; + setResult: (newResult: ExecutionResult) => void; +}; +export type OnSubscribeResultResultOnNextHook = (payload: OnSubscribeResultResultOnNextHookPayload) => void | Promise; +export type OnSubscribeResultResultOnEndHook = () => void; export type OnSubscribeResultResult = { - onNext?: (options: OnSubscribeResultResultOnNextPayload) => void | Promise; - onEnd?: () => void; + onNext?: OnSubscribeResultResultOnNextHook; + onEnd?: OnSubscribeResultResultOnEndHook; }; export type SubscribeResultHook = (options: OnSubscribeResultEventPayload) => void | OnSubscribeResultResult; export type OnSubscribeHookResult = { diff --git a/packages/types/src/utils.ts b/packages/types/src/utils.ts index 9128d46f87..57920b749e 100644 --- a/packages/types/src/utils.ts +++ b/packages/types/src/utils.ts @@ -28,5 +28,4 @@ export type Unarray = T extends Array ? U : T; export type ArbitraryObject = Record; export type PromiseOrValue = T | Promise; export type AsyncIterableIteratorOrValue = T | AsyncIterableIterator; -export type UnwrapPromiseValue = TValue extends Promise ? TWrappedValue : TValue; export type Maybe = T | null | undefined;