Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/fifty-kids-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@envelop/core': minor
'@envelop/types': minor
---

allow hooking into published subscribe values
2 changes: 1 addition & 1 deletion packages/core/src/graphql-typings.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
declare module 'graphql/jsutils/isAsyncIterable' {
function isAsyncIterable(input: unknown): input is AsyncIterable<any>;
function isAsyncIterable(input: unknown): input is AsyncIterableIterator<any>;
export default isAsyncIterable;
}
63 changes: 35 additions & 28 deletions packages/core/src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ import {
SubscribeResultHook,
TypedSubscriptionArgs,
TypedExecutionArgs,
SubscribeFunction,
OnSubscribeResultResultOnNextHook,
OnSubscribeResultResultOnEndHook,
} from '@envelop/types';
import isAsyncIterable from 'graphql/jsutils/isAsyncIterable';
import {
DocumentNode,
execute,
Expand All @@ -29,12 +33,12 @@ import {
parse,
specifiedRules,
subscribe,
SubscriptionArgs,
validate,
ValidationRule,
} from 'graphql';
import { Maybe } from 'graphql/jsutils/Maybe';
import { prepareTracedSchema, resolversHooksSymbol } from './traced-schema';
import { finalAsyncIterator, makeSubscribe, mapAsyncIterator } from './utils';

export type EnvelopOrchestrator<
InitialContext extends ArbitraryObject = ArbitraryObject,
Expand Down Expand Up @@ -272,32 +276,9 @@ export function createEnvelopOrchestrator<PluginsContext = any>(plugins: Plugin[
}
: initialContext => orchestratorCtx => orchestratorCtx ? { ...initialContext, ...orchestratorCtx } : initialContext;

const customSubscribe = async (
argsOrSchema: SubscriptionArgs | GraphQLSchema,
document?: DocumentNode,
rootValue?: any,
contextValue?: any,
variableValues?: Maybe<{ [key: string]: any }>,
operationName?: Maybe<string>,
fieldResolver?: Maybe<GraphQLFieldResolver<any, any>>,
subscribeFieldResolver?: Maybe<GraphQLFieldResolver<any, any>>
) => {
const args: SubscriptionArgs =
argsOrSchema instanceof GraphQLSchema
? {
schema: argsOrSchema,
document: document!,
rootValue,
contextValue,
variableValues,
operationName,
fieldResolver,
subscribeFieldResolver,
}
: argsOrSchema;

const customSubscribe = makeSubscribe(async args => {
const onResolversHandlers: OnResolverCalledHook[] = [];
let subscribeFn: typeof subscribe = subscribe;
let subscribeFn = subscribe as SubscribeFunction;

const afterCalls: SubscribeResultHook[] = [];
let context = args.contextValue || {};
Expand Down Expand Up @@ -333,17 +314,43 @@ export function createEnvelopOrchestrator<PluginsContext = any>(plugins: Plugin[
contextValue: context,
});

const onNextHandler: OnSubscribeResultResultOnNextHook[] = [];
const onEndHandler: OnSubscribeResultResultOnEndHook[] = [];

for (const afterCb of afterCalls) {
afterCb({
const hookResult = afterCb({
result,
setResult: newResult => {
result = newResult;
},
});
if (hookResult) {
if (hookResult.onNext) {
onNextHandler.push(hookResult.onNext);
}
if (hookResult.onEnd) {
onEndHandler.push(hookResult.onEnd);
}
}
}

if (onNextHandler.length && isAsyncIterable(result)) {
result = mapAsyncIterator(result, async result => {
for (const onNext of onNextHandler) {
await onNext({ result, setResult: newResult => (result = newResult) });
}
return result;
});
}
if (onEndHandler.length && isAsyncIterable(result)) {
result = finalAsyncIterator(result, () => {
for (const onEnd of onEndHandler) {
onEnd();
}
});
}
return result;
};
});

const customExecute = beforeCallbacks.execute.length
? async (
Expand Down
57 changes: 56 additions & 1 deletion packages/core/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
import { ASTNode, DocumentNode, Kind, OperationDefinitionNode, visit, BREAK, Source } from 'graphql';
import {
ASTNode,
DocumentNode,
Kind,
OperationDefinitionNode,
visit,
BREAK,
Source,
ExecutionResult,
SubscriptionArgs,
} from 'graphql';
import { PolymorphicSubscribeArguments } from '@envelop/types';
import { PromiseOrValue } from 'graphql/jsutils/PromiseOrValue';

export const envelopIsIntrospectionSymbol = Symbol('ENVELOP_IS_INTROSPECTION');

Expand Down Expand Up @@ -34,3 +46,46 @@ export function isIntrospectionDocument(document: DocumentNode): boolean {
export function isIntrospectionOperationString(operation: string | Source): boolean {
return (typeof operation === 'string' ? operation : operation.body).indexOf('__schema') !== -1;
}

export function getSubscribeArgs(args: PolymorphicSubscribeArguments): SubscriptionArgs {
return args.length === 1
? args[0]
: {
schema: args[0],
document: args[1],
rootValue: args[2],
contextValue: args[3],
variableValues: args[4],
operationName: args[5],
fieldResolver: args[6],
subscribeFieldResolver: args[7],
};
}

/**
* Utility function for making a subscribe function that handles polymorphic arguments.
*/
export const makeSubscribe =
(subscribeFn: (args: SubscriptionArgs) => PromiseOrValue<AsyncIterableIterator<ExecutionResult> | ExecutionResult>) =>
(...polyArgs: PolymorphicSubscribeArguments): PromiseOrValue<AsyncIterableIterator<ExecutionResult> | ExecutionResult> =>
subscribeFn(getSubscribeArgs(polyArgs));

export async function* mapAsyncIterator<TInput, TOutput = TInput>(
asyncIterable: AsyncIterableIterator<TInput>,
map: (input: TInput) => Promise<TOutput> | TOutput
): AsyncIterableIterator<TOutput> {
for await (const value of asyncIterable) {
yield map(value);
}
}

export async function* finalAsyncIterator<TInput>(
asyncIterable: AsyncIterableIterator<TInput>,
onFinal: () => void
): AsyncIterableIterator<TInput> {
try {
yield* asyncIterable;
} finally {
onFinal();
}
}
4 changes: 4 additions & 0 deletions packages/core/test/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ export const schema = makeExecutableSchema({
id: ID!
name: String!
}

type Subscription {
alphabet: String!
}
`,
resolvers: {
Query: {
Expand Down
90 changes: 90 additions & 0 deletions packages/core/test/subscribe.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { assertStreamExecutionValue, collectAsyncIteratorValues, createTestkit } from '@envelop/testing';
import { ExecutionResult } from 'graphql';
import { schema } from './common';

describe('subscribe', () => {
it('Should be able to manipulate streams', async () => {
const streamExecuteFn = async function* () {
for (const value of ['a', 'b', 'c', 'd']) {
yield { data: { alphabet: value } };
}
};

const teskit = createTestkit(
[
{
onSubscribe({ setSubscribeFn }) {
setSubscribeFn(streamExecuteFn);

return {
onSubscribeResult: () => {
return {
onNext: ({ setResult }) => {
setResult({ data: { alphabet: 'x' } });
},
};
},
};
},
},
],
schema
);

const result = await teskit.execute(/* GraphQL */ `
subscription {
alphabet
}
`);
assertStreamExecutionValue(result);
const values = await collectAsyncIteratorValues(result);
expect(values).toEqual([
{ data: { alphabet: 'x' } },
{ data: { alphabet: 'x' } },
{ data: { alphabet: 'x' } },
{ data: { alphabet: 'x' } },
]);
});

it('Should be able to invoke something after the stream has ended.', async () => {
expect.assertions(1);
const streamExecuteFn = async function* () {
for (const value of ['a', 'b', 'c', 'd']) {
yield { data: { alphabet: value } };
}
};

const teskit = createTestkit(
[
{
onSubscribe({ setSubscribeFn }) {
setSubscribeFn(streamExecuteFn);

return {
onSubscribeResult: () => {
let latestResult: ExecutionResult;
return {
onNext: ({ result }) => {
latestResult = result;
},
onEnd: () => {
expect(latestResult).toEqual({ data: { alphabet: 'd' } });
},
};
},
};
},
},
],
schema
);

const result = await teskit.execute(/* GraphQL */ `
subscription {
alphabet
}
`);
assertStreamExecutionValue(result);
await collectAsyncIteratorValues(result);
});
});
8 changes: 8 additions & 0 deletions packages/testing/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,11 @@ export function assertStreamExecutionValue(input: ExecutionReturn): asserts inpu
throw new Error('Received single result but expected stream.');
}
}

export const collectAsyncIteratorValues = async <TType>(asyncIterable: AsyncIterableIterator<TType>): Promise<Array<TType>> => {
const values: Array<TType> = [];
for await (const value of asyncIterable) {
values.push(value);
}
return values;
};
19 changes: 19 additions & 0 deletions packages/types/src/graphql.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import type { DocumentNode, GraphQLFieldResolver, GraphQLSchema, SubscriptionArgs, ExecutionResult } from 'graphql';
import type { Maybe, PromiseOrValue, AsyncIterableIteratorOrValue } from './utils';

export type PolymorphicSubscribeArguments =
| [SubscriptionArgs]
| [
GraphQLSchema,
DocumentNode,
any?,
any?,
Maybe<{ [key: string]: any }>?,
Maybe<string>?,
Maybe<GraphQLFieldResolver<any, any>>?,
Maybe<GraphQLFieldResolver<any, any>>?
];

export type SubscribeFunction = (
...args: PolymorphicSubscribeArguments
) => PromiseOrValue<AsyncIterableIteratorOrValue<ExecutionResult>>;
18 changes: 14 additions & 4 deletions packages/types/src/hooks.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {
import type {
DocumentNode,
execute,
ExecutionArgs,
Expand All @@ -9,7 +9,6 @@ import {
parse,
ParseOptions,
Source,
subscribe,
SubscriptionArgs,
TypeInfo,
validate,
Expand All @@ -18,6 +17,7 @@ import {
import { Maybe } from 'graphql/jsutils/Maybe';
import { PromiseOrValue } from 'graphql/jsutils/PromiseOrValue';
import { DefaultContext } from 'packages/core/src';
import { SubscribeFunction } from './graphql';
import { Plugin } from './plugin';

export type DefaultArgs = Record<string, unknown>;
Expand Down Expand Up @@ -158,7 +158,7 @@ export type OnExecuteHook<ContextType> = (
/** onSubscribe */
export type TypedSubscriptionArgs<ContextType> = Omit<SubscriptionArgs, 'contextValue'> & { contextValue: ContextType };

export type OriginalSubscribeFn = typeof subscribe;
export type OriginalSubscribeFn = SubscribeFunction;
export type OnSubscribeEventPayload<ContextType> = {
subscribeFn: OriginalSubscribeFn;
args: TypedSubscriptionArgs<ContextType>;
Expand All @@ -169,7 +169,17 @@ export type OnSubscribeResultEventPayload = {
result: AsyncIterableIterator<ExecutionResult> | ExecutionResult;
setResult: (newResult: AsyncIterableIterator<ExecutionResult> | ExecutionResult) => void;
};
export type SubscribeResultHook = (options: OnSubscribeResultEventPayload) => void;
export type OnSubscribeResultResultOnNextHookPayload = {
result: ExecutionResult;
setResult: (newResult: ExecutionResult) => void;
};
export type OnSubscribeResultResultOnNextHook = (payload: OnSubscribeResultResultOnNextHookPayload) => void | Promise<void>;
export type OnSubscribeResultResultOnEndHook = () => void;
export type OnSubscribeResultResult = {
onNext?: OnSubscribeResultResultOnNextHook;
onEnd?: OnSubscribeResultResultOnEndHook;
};
export type SubscribeResultHook = (options: OnSubscribeResultEventPayload) => void | OnSubscribeResultResult;
export type OnSubscribeHookResult<ContextType> = {
onSubscribeResult?: SubscribeResultHook;
onResolverCalled?: OnResolverCalledHook<ContextType>;
Expand Down
1 change: 1 addition & 0 deletions packages/types/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ export * from './context-types';
export * from './hooks';
export * from './plugin';
export * from './get-enveloped';
export * from './graphql';
export * from './utils';
1 change: 1 addition & 0 deletions packages/types/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ export type Unarray<T> = T extends Array<infer U> ? U : T;

export type ArbitraryObject = Record<string | number | symbol, any>;
export type PromiseOrValue<T> = T | Promise<T>;
export type AsyncIterableIteratorOrValue<T> = T | AsyncIterableIterator<T>;
export type Maybe<T> = T | null | undefined;