diff --git a/.changeset/fifty-kids-boil.md b/.changeset/fifty-kids-boil.md new file mode 100644 index 0000000000..00392ced0a --- /dev/null +++ b/.changeset/fifty-kids-boil.md @@ -0,0 +1,6 @@ +--- +'@envelop/core': minor +'@envelop/types': minor +--- + +allow hooking into published subscribe values diff --git a/packages/core/src/graphql-typings.d.ts b/packages/core/src/graphql-typings.d.ts index 474528ad13..2dafe88d31 100644 --- a/packages/core/src/graphql-typings.d.ts +++ b/packages/core/src/graphql-typings.d.ts @@ -1,4 +1,4 @@ declare module 'graphql/jsutils/isAsyncIterable' { - function isAsyncIterable(input: unknown): input is AsyncIterable; + function isAsyncIterable(input: unknown): input is AsyncIterableIterator; export default isAsyncIterable; } diff --git a/packages/core/src/orchestrator.ts b/packages/core/src/orchestrator.ts index e02b75453b..23862fe8b2 100644 --- a/packages/core/src/orchestrator.ts +++ b/packages/core/src/orchestrator.ts @@ -16,7 +16,11 @@ import { SubscribeResultHook, TypedSubscriptionArgs, TypedExecutionArgs, + SubscribeFunction, + OnSubscribeResultResultOnNextHook, + OnSubscribeResultResultOnEndHook, } from '@envelop/types'; +import isAsyncIterable from 'graphql/jsutils/isAsyncIterable'; import { DocumentNode, execute, @@ -29,12 +33,12 @@ import { parse, specifiedRules, subscribe, - SubscriptionArgs, validate, ValidationRule, } from 'graphql'; import { Maybe } from 'graphql/jsutils/Maybe'; import { prepareTracedSchema, resolversHooksSymbol } from './traced-schema'; +import { finalAsyncIterator, makeSubscribe, mapAsyncIterator } from './utils'; export type EnvelopOrchestrator< InitialContext extends ArbitraryObject = ArbitraryObject, @@ -272,32 +276,9 @@ export function createEnvelopOrchestrator(plugins: Plugin[ } : initialContext => orchestratorCtx => orchestratorCtx ? { ...initialContext, ...orchestratorCtx } : initialContext; - const customSubscribe = async ( - argsOrSchema: SubscriptionArgs | GraphQLSchema, - document?: DocumentNode, - rootValue?: any, - contextValue?: any, - variableValues?: Maybe<{ [key: string]: any }>, - operationName?: Maybe, - fieldResolver?: Maybe>, - subscribeFieldResolver?: Maybe> - ) => { - const args: SubscriptionArgs = - argsOrSchema instanceof GraphQLSchema - ? { - schema: argsOrSchema, - document: document!, - rootValue, - contextValue, - variableValues, - operationName, - fieldResolver, - subscribeFieldResolver, - } - : argsOrSchema; - + const customSubscribe = makeSubscribe(async args => { const onResolversHandlers: OnResolverCalledHook[] = []; - let subscribeFn: typeof subscribe = subscribe; + let subscribeFn = subscribe as SubscribeFunction; const afterCalls: SubscribeResultHook[] = []; let context = args.contextValue || {}; @@ -333,17 +314,43 @@ export function createEnvelopOrchestrator(plugins: Plugin[ contextValue: context, }); + const onNextHandler: OnSubscribeResultResultOnNextHook[] = []; + const onEndHandler: OnSubscribeResultResultOnEndHook[] = []; + for (const afterCb of afterCalls) { - afterCb({ + const hookResult = afterCb({ result, setResult: newResult => { result = newResult; }, }); + if (hookResult) { + if (hookResult.onNext) { + onNextHandler.push(hookResult.onNext); + } + if (hookResult.onEnd) { + onEndHandler.push(hookResult.onEnd); + } + } } + if (onNextHandler.length && isAsyncIterable(result)) { + result = mapAsyncIterator(result, async result => { + for (const onNext of onNextHandler) { + await onNext({ result, setResult: newResult => (result = newResult) }); + } + return result; + }); + } + if (onEndHandler.length && isAsyncIterable(result)) { + result = finalAsyncIterator(result, () => { + for (const onEnd of onEndHandler) { + onEnd(); + } + }); + } return result; - }; + }); const customExecute = beforeCallbacks.execute.length ? async ( diff --git a/packages/core/src/utils.ts b/packages/core/src/utils.ts index f4f32be7a1..65fe29a07f 100644 --- a/packages/core/src/utils.ts +++ b/packages/core/src/utils.ts @@ -1,4 +1,16 @@ -import { ASTNode, DocumentNode, Kind, OperationDefinitionNode, visit, BREAK, Source } from 'graphql'; +import { + ASTNode, + DocumentNode, + Kind, + OperationDefinitionNode, + visit, + BREAK, + Source, + ExecutionResult, + SubscriptionArgs, +} from 'graphql'; +import { PolymorphicSubscribeArguments } from '@envelop/types'; +import { PromiseOrValue } from 'graphql/jsutils/PromiseOrValue'; export const envelopIsIntrospectionSymbol = Symbol('ENVELOP_IS_INTROSPECTION'); @@ -34,3 +46,46 @@ export function isIntrospectionDocument(document: DocumentNode): boolean { export function isIntrospectionOperationString(operation: string | Source): boolean { return (typeof operation === 'string' ? operation : operation.body).indexOf('__schema') !== -1; } + +export function getSubscribeArgs(args: PolymorphicSubscribeArguments): SubscriptionArgs { + return args.length === 1 + ? args[0] + : { + schema: args[0], + document: args[1], + rootValue: args[2], + contextValue: args[3], + variableValues: args[4], + operationName: args[5], + fieldResolver: args[6], + subscribeFieldResolver: args[7], + }; +} + +/** + * Utility function for making a subscribe function that handles polymorphic arguments. + */ +export const makeSubscribe = + (subscribeFn: (args: SubscriptionArgs) => PromiseOrValue | ExecutionResult>) => + (...polyArgs: PolymorphicSubscribeArguments): PromiseOrValue | ExecutionResult> => + subscribeFn(getSubscribeArgs(polyArgs)); + +export async function* mapAsyncIterator( + asyncIterable: AsyncIterableIterator, + map: (input: TInput) => Promise | TOutput +): AsyncIterableIterator { + for await (const value of asyncIterable) { + yield map(value); + } +} + +export async function* finalAsyncIterator( + asyncIterable: AsyncIterableIterator, + onFinal: () => void +): AsyncIterableIterator { + try { + yield* asyncIterable; + } finally { + onFinal(); + } +} diff --git a/packages/core/test/common.ts b/packages/core/test/common.ts index 261a7cc736..4224b526fa 100644 --- a/packages/core/test/common.ts +++ b/packages/core/test/common.ts @@ -9,6 +9,10 @@ export const schema = makeExecutableSchema({ id: ID! name: String! } + + type Subscription { + alphabet: String! + } `, resolvers: { Query: { diff --git a/packages/core/test/subscribe.spec.ts b/packages/core/test/subscribe.spec.ts new file mode 100644 index 0000000000..53749c187a --- /dev/null +++ b/packages/core/test/subscribe.spec.ts @@ -0,0 +1,90 @@ +import { assertStreamExecutionValue, collectAsyncIteratorValues, createTestkit } from '@envelop/testing'; +import { ExecutionResult } from 'graphql'; +import { schema } from './common'; + +describe('subscribe', () => { + it('Should be able to manipulate streams', async () => { + const streamExecuteFn = async function* () { + for (const value of ['a', 'b', 'c', 'd']) { + yield { data: { alphabet: value } }; + } + }; + + const teskit = createTestkit( + [ + { + onSubscribe({ setSubscribeFn }) { + setSubscribeFn(streamExecuteFn); + + return { + onSubscribeResult: () => { + return { + onNext: ({ setResult }) => { + setResult({ data: { alphabet: 'x' } }); + }, + }; + }, + }; + }, + }, + ], + schema + ); + + const result = await teskit.execute(/* GraphQL */ ` + subscription { + alphabet + } + `); + assertStreamExecutionValue(result); + const values = await collectAsyncIteratorValues(result); + expect(values).toEqual([ + { data: { alphabet: 'x' } }, + { data: { alphabet: 'x' } }, + { data: { alphabet: 'x' } }, + { data: { alphabet: 'x' } }, + ]); + }); + + it('Should be able to invoke something after the stream has ended.', async () => { + expect.assertions(1); + const streamExecuteFn = async function* () { + for (const value of ['a', 'b', 'c', 'd']) { + yield { data: { alphabet: value } }; + } + }; + + const teskit = createTestkit( + [ + { + onSubscribe({ setSubscribeFn }) { + setSubscribeFn(streamExecuteFn); + + return { + onSubscribeResult: () => { + let latestResult: ExecutionResult; + return { + onNext: ({ result }) => { + latestResult = result; + }, + onEnd: () => { + expect(latestResult).toEqual({ data: { alphabet: 'd' } }); + }, + }; + }, + }; + }, + }, + ], + schema + ); + + const result = await teskit.execute(/* GraphQL */ ` + subscription { + alphabet + } + `); + assertStreamExecutionValue(result); + await collectAsyncIteratorValues(result); + }); +}); diff --git a/packages/testing/src/index.ts b/packages/testing/src/index.ts index e49b97c8ac..4e51be3be0 100644 --- a/packages/testing/src/index.ts +++ b/packages/testing/src/index.ts @@ -177,3 +177,11 @@ export function assertStreamExecutionValue(input: ExecutionReturn): asserts inpu throw new Error('Received single result but expected stream.'); } } + +export const collectAsyncIteratorValues = async (asyncIterable: AsyncIterableIterator): Promise> => { + const values: Array = []; + for await (const value of asyncIterable) { + values.push(value); + } + return values; +}; diff --git a/packages/types/src/graphql.ts b/packages/types/src/graphql.ts new file mode 100644 index 0000000000..b99e1a9ed3 --- /dev/null +++ b/packages/types/src/graphql.ts @@ -0,0 +1,19 @@ +import type { DocumentNode, GraphQLFieldResolver, GraphQLSchema, SubscriptionArgs, ExecutionResult } from 'graphql'; +import type { Maybe, PromiseOrValue, AsyncIterableIteratorOrValue } from './utils'; + +export type PolymorphicSubscribeArguments = + | [SubscriptionArgs] + | [ + GraphQLSchema, + DocumentNode, + any?, + any?, + Maybe<{ [key: string]: any }>?, + Maybe?, + Maybe>?, + Maybe>? + ]; + +export type SubscribeFunction = ( + ...args: PolymorphicSubscribeArguments +) => PromiseOrValue>; diff --git a/packages/types/src/hooks.ts b/packages/types/src/hooks.ts index 142a992b41..3abc4f9aa0 100644 --- a/packages/types/src/hooks.ts +++ b/packages/types/src/hooks.ts @@ -1,4 +1,4 @@ -import { +import type { DocumentNode, execute, ExecutionArgs, @@ -9,7 +9,6 @@ import { parse, ParseOptions, Source, - subscribe, SubscriptionArgs, TypeInfo, validate, @@ -18,6 +17,7 @@ import { import { Maybe } from 'graphql/jsutils/Maybe'; import { PromiseOrValue } from 'graphql/jsutils/PromiseOrValue'; import { DefaultContext } from 'packages/core/src'; +import { SubscribeFunction } from './graphql'; import { Plugin } from './plugin'; export type DefaultArgs = Record; @@ -158,7 +158,7 @@ export type OnExecuteHook = ( /** onSubscribe */ export type TypedSubscriptionArgs = Omit & { contextValue: ContextType }; -export type OriginalSubscribeFn = typeof subscribe; +export type OriginalSubscribeFn = SubscribeFunction; export type OnSubscribeEventPayload = { subscribeFn: OriginalSubscribeFn; args: TypedSubscriptionArgs; @@ -169,7 +169,17 @@ export type OnSubscribeResultEventPayload = { result: AsyncIterableIterator | ExecutionResult; setResult: (newResult: AsyncIterableIterator | ExecutionResult) => void; }; -export type SubscribeResultHook = (options: OnSubscribeResultEventPayload) => void; +export type OnSubscribeResultResultOnNextHookPayload = { + result: ExecutionResult; + setResult: (newResult: ExecutionResult) => void; +}; +export type OnSubscribeResultResultOnNextHook = (payload: OnSubscribeResultResultOnNextHookPayload) => void | Promise; +export type OnSubscribeResultResultOnEndHook = () => void; +export type OnSubscribeResultResult = { + onNext?: OnSubscribeResultResultOnNextHook; + onEnd?: OnSubscribeResultResultOnEndHook; +}; +export type SubscribeResultHook = (options: OnSubscribeResultEventPayload) => void | OnSubscribeResultResult; export type OnSubscribeHookResult = { onSubscribeResult?: SubscribeResultHook; onResolverCalled?: OnResolverCalledHook; diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index aa199b7e4d..f84d34be86 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -2,4 +2,5 @@ export * from './context-types'; export * from './hooks'; export * from './plugin'; export * from './get-enveloped'; +export * from './graphql'; export * from './utils'; diff --git a/packages/types/src/utils.ts b/packages/types/src/utils.ts index bd82a7b611..57920b749e 100644 --- a/packages/types/src/utils.ts +++ b/packages/types/src/utils.ts @@ -27,4 +27,5 @@ export type Unarray = T extends Array ? U : T; export type ArbitraryObject = Record; export type PromiseOrValue = T | Promise; +export type AsyncIterableIteratorOrValue = T | AsyncIterableIterator; export type Maybe = T | null | undefined;