Skip to content

Commit cb4473c

Browse files
committed
enable batching
tests failing, on the bright side, it means that our test coverage is meaningful :) at least some tests are failing because of a bug in current defer/stream implementation, see graphql/graphql-js#2975
1 parent 4a0d10d commit cb4473c

16 files changed

+332
-110
lines changed

packages/batch-execute/package.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@
2727
"is-promise": "4.0.0",
2828
"tslib": "~2.2.0"
2929
},
30+
"devDependencies": {
31+
"@graphql-tools/delegate": "^7.1.2",
32+
"@graphql-tools/mock": "^8.1.1",
33+
"@graphql-tools/schema": "^7.1.3",
34+
"@graphql-tools/utils": "^7.7.3"
35+
},
3036
"publishConfig": {
3137
"access": "public",
3238
"directory": "dist"

packages/batch-execute/src/createBatchingExecutor.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { getOperationAST /*, GraphQLSchema */ } from 'graphql';
1+
import { getOperationAST, GraphQLSchema } from 'graphql';
22

33
import DataLoader from 'dataloader';
44

@@ -9,12 +9,12 @@ import { splitResult } from './splitResult';
99

1010
export function createBatchingExecutor(
1111
executor: Executor,
12-
// targetSchema: GraphQLSchema,
12+
targetSchema: GraphQLSchema,
1313
dataLoaderOptions?: DataLoader.Options<any, any, any>,
1414
extensionsReducer?: (mergedExtensions: Record<string, any>, executionParams: ExecutionParams) => Record<string, any>
1515
): Executor {
1616
const loader = new DataLoader(
17-
createLoadFn(executor, /* targetSchema, */ extensionsReducer ?? defaultExtensionsReducer),
17+
createLoadFn(executor, targetSchema, extensionsReducer ?? defaultExtensionsReducer),
1818
dataLoaderOptions
1919
);
2020
return (executionParams: ExecutionParams) => loader.load(executionParams);
@@ -30,7 +30,7 @@ function createLoadFn(
3030
| ExecutionResult
3131
| AsyncIterableIterator<AsyncExecutionResult>
3232
| Promise<ExecutionResult | AsyncIterableIterator<AsyncExecutionResult>>,
33-
// targetSchema: GraphQLSchema,
33+
targetSchema: GraphQLSchema,
3434
extensionsReducer: (mergedExtensions: Record<string, any>, executionParams: ExecutionParams) => Record<string, any>
3535
) {
3636
return async (
@@ -64,7 +64,7 @@ function createLoadFn(
6464
| Promise<ExecutionResult | AsyncIterableIterator<ExecutionResult>>
6565
> = [];
6666
batchedExecutionParamSets.forEach(batchedExecutionParamSet => {
67-
const mergedExecutionParams = mergeExecutionParams(batchedExecutionParamSet, /* targetSchema, */ extensionsReducer);
67+
const mergedExecutionParams = mergeExecutionParams(batchedExecutionParamSet, targetSchema, extensionsReducer);
6868
const executionResult = executor(mergedExecutionParams);
6969
results = results.concat(splitResult(executionResult, batchedExecutionParamSet.length));
7070
});

packages/batch-execute/src/getBatchingExecutor.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@ import DataLoader from 'dataloader';
22

33
import { ExecutionParams, Executor } from '@graphql-tools/utils';
44
import { createBatchingExecutor } from './createBatchingExecutor';
5-
import { memoize2of4 } from './memoize';
6-
// import { GraphQLSchema } from 'graphql';
5+
import { memoize2of5 } from './memoize';
6+
import { GraphQLSchema } from 'graphql';
77

8-
export const getBatchingExecutor = memoize2of4(function (
8+
export const getBatchingExecutor = memoize2of5(function (
99
_context: Record<string, any> = self ?? window ?? global,
1010
executor: Executor,
11-
// targetSchema: GraphQLSchema,
11+
targetSchema: GraphQLSchema,
1212
dataLoaderOptions?: DataLoader.Options<any, any, any>,
1313
extensionsReducer?: (mergedExtensions: Record<string, any>, executionParams: ExecutionParams) => Record<string, any>
1414
): Executor {
15-
return createBatchingExecutor(executor, /* targetSchema, */ dataLoaderOptions, extensionsReducer);
15+
return createBatchingExecutor(executor, targetSchema, dataLoaderOptions, extensionsReducer);
1616
});

packages/batch-execute/src/memoize.ts

Lines changed: 0 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -39,44 +39,3 @@ export function memoize2of5<
3939

4040
return memoized;
4141
}
42-
43-
export function memoize2of4<
44-
T1 extends Record<string, any>,
45-
T2 extends Record<string, any>,
46-
T3 extends any,
47-
T4 extends any,
48-
R extends any
49-
>(fn: (A1: T1, A2: T2, A3: T3, A4: T4) => R): (A1: T1, A2: T2, A3: T3, A4: T4) => R {
50-
let cache1: WeakMap<T1, WeakMap<T2, R>>;
51-
52-
function memoized(a1: T1, a2: T2, a3: T3, a4: T4) {
53-
if (!cache1) {
54-
cache1 = new WeakMap();
55-
const cache2: WeakMap<T2, R> = new WeakMap();
56-
cache1.set(a1, cache2);
57-
const newValue = fn(a1, a2, a3, a4);
58-
cache2.set(a2, newValue);
59-
return newValue;
60-
}
61-
62-
let cache2 = cache1.get(a1);
63-
if (!cache2) {
64-
cache2 = new WeakMap();
65-
cache1.set(a1, cache2);
66-
const newValue = fn(a1, a2, a3, a4);
67-
cache2.set(a2, newValue);
68-
return newValue;
69-
}
70-
71-
const cachedValue = cache2.get(a2);
72-
if (cachedValue === undefined) {
73-
const newValue = fn(a1, a2, a3, a4);
74-
cache2.set(a2, newValue);
75-
return newValue;
76-
}
77-
78-
return cachedValue;
79-
}
80-
81-
return memoized;
82-
}

packages/batch-execute/src/mergeExecutionParams.ts

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import {
1616
InlineFragmentNode,
1717
FieldNode,
1818
OperationTypeNode,
19-
// GraphQLSchema,
19+
GraphQLSchema,
2020
} from 'graphql';
2121

2222
import { ExecutionParams } from '@graphql-tools/utils';
@@ -64,7 +64,7 @@ import { createPrefix } from './prefix';
6464
*/
6565
export function mergeExecutionParams(
6666
executionParamSets: Array<ExecutionParams>,
67-
// targetSchema: GraphQLSchema,
67+
targetSchema: GraphQLSchema,
6868
extensionsReducer: (mergedExtensions: Record<string, any>, executionParams: ExecutionParams) => Record<string, any>
6969
): ExecutionParams {
7070
const mergedVariables: Record<string, any> = Object.create(null);
@@ -82,16 +82,8 @@ export function mergeExecutionParams(
8282
prefixedExecutionParams.document.definitions.forEach(def => {
8383
if (isOperationDefinition(def)) {
8484
operation = def.operation;
85-
const selections = def.selectionSet.selections;
8685

87-
// TODO:
88-
// once splitting AsyncIterableIterator<AsyncExecutionResult> is implemented within splitResult,
89-
// the below code can be uncommented to also utilize defer when batching.
90-
91-
/*
92-
let selections: Array<SelectionNode>;
93-
94-
selections = targetSchema.getDirective('defer')
86+
const selections = targetSchema.getDirective('defer')
9587
? [
9688
{
9789
kind: Kind.INLINE_FRAGMENT,
@@ -113,12 +105,11 @@ export function mergeExecutionParams(
113105
],
114106
selectionSet: {
115107
kind: Kind.SELECTION_SET,
116-
selections,
108+
selections: def.selectionSet.selections,
117109
},
118110
},
119111
]
120-
: selections;
121-
*/
112+
: def.selectionSet.selections;
122113

123114
mergedSelections.push(...selections);
124115
mergedVariableDefinitions.push(...(def.variableDefinitions ?? []));
@@ -156,7 +147,7 @@ export function mergeExecutionParams(
156147

157148
function prefixExecutionParams(prefix: string, executionParams: ExecutionParams): ExecutionParams {
158149
let document = aliasTopLevelFields(prefix, executionParams.document);
159-
const variableNames = Object.keys(executionParams.variables);
150+
const variableNames = executionParams.variables !== undefined ? Object.keys(executionParams.variables) : [];
160151

161152
if (variableNames.length === 0) {
162153
return { ...executionParams, document };

packages/batch-execute/src/split.ts

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22
// and: https://gist.github.com/jed/cc1e949419d42e2cb26d7f2e1645864d
33
// and also: https://github.com/repeaterjs/repeater/issues/48#issuecomment-569134039
44

5-
import { Push, Repeater } from '@repeaterjs/repeater';
5+
import { Push, Stop, Repeater } from '@repeaterjs/repeater';
66

7-
type Splitter<T> = (item: T) => [number, T];
7+
type Splitter<T> = (item: T) => [number | undefined, T];
88

9-
export function split<T>(asyncIterable: AsyncIterableIterator<T>, n: number, splitter: Splitter<IteratorResult<T>>) {
9+
export function split<T>(asyncIterable: AsyncIterableIterator<T>, n: number, splitter: Splitter<T>) {
1010
const iterator = asyncIterable[Symbol.asyncIterator]();
11-
const returner = iterator.return ?? undefined;
11+
const returner = iterator.return?.bind(iterator) ?? undefined;
1212

13-
const buffers: Array<Array<IteratorResult<T>>> = Array(n).fill([]);
13+
const buffers: Array<Array<IteratorResult<T>>> = Array(n);
14+
for (let i = 0; i < n; i++) {
15+
buffers[i] = [];
16+
};
1417

1518
if (returner) {
1619
const set: Set<number> = new Set();
@@ -25,22 +28,22 @@ export function split<T>(asyncIterable: AsyncIterableIterator<T>, n: number, spl
2528
}
2629
});
2730

28-
await loop(push, earlyReturn, buffer, index, buffers, iterator, splitter);
31+
await loop(push, stop, earlyReturn, buffer, buffers, iterator, splitter);
2932

3033
await earlyReturn;
3134
});
3235
});
3336
}
3437

3538
return buffers.map(
36-
(buffer, index) =>
39+
buffer =>
3740
new Repeater(async (push, stop) => {
3841
let earlyReturn: any;
3942
stop.then(() => {
4043
earlyReturn = returner ? returner() : true;
4144
});
4245

43-
await loop(push, earlyReturn, buffer, index, buffers, iterator, splitter);
46+
await loop(push, stop, earlyReturn, buffer, buffers, iterator, splitter);
4447

4548
await earlyReturn;
4649
})
@@ -49,16 +52,16 @@ export function split<T>(asyncIterable: AsyncIterableIterator<T>, n: number, spl
4952

5053
async function loop<T>(
5154
push: Push<T>,
55+
stop: Stop,
5256
earlyReturn: Promise<any> | any,
5357
buffer: Array<IteratorResult<T>>,
54-
index: number,
5558
buffers: Array<Array<IteratorResult<T>>>,
5659
iterator: AsyncIterator<T>,
57-
splitter: Splitter<IteratorResult<T>>
60+
splitter: Splitter<T>
5861
): Promise<void> {
5962
/* eslint-disable no-unmodified-loop-condition */
6063
while (!earlyReturn) {
61-
const iteration = await next(buffer, index, buffers, iterator, splitter);
64+
const iteration = await next(buffer, buffers, iterator, splitter);
6265

6366
if (iteration === undefined) {
6467
continue;
@@ -76,32 +79,38 @@ async function loop<T>(
7679

7780
async function next<T>(
7881
buffer: Array<IteratorResult<T>>,
79-
index: number,
8082
buffers: Array<Array<IteratorResult<T>>>,
8183
iterator: AsyncIterator<T>,
82-
splitter: Splitter<IteratorResult<T>>
84+
splitter: Splitter<T>
8385
): Promise<IteratorResult<T> | undefined> {
84-
let iteration: IteratorResult<T>;
85-
8686
if (0 in buffer) {
8787
return buffer.shift();
8888
}
8989

9090
const iterationCandidate = await iterator.next();
9191

92+
let tee = true;
9293
const value = iterationCandidate.value;
93-
if (value) {
94+
if (value !== undefined) {
9495
const [iterationIndex, newValue] = splitter(value);
95-
if (index === iterationIndex) {
96-
return newValue;
96+
if (iterationIndex !== undefined) {
97+
buffers[iterationIndex].push({
98+
...iterationCandidate,
99+
value: newValue,
100+
});
101+
tee = false;
97102
}
98-
99-
buffers[iterationIndex].push(iteration);
100-
return undefined;
101103
}
102104

103-
for (const buffer of buffers) {
104-
buffer.push(iteration);
105+
if (tee) {
106+
for (const b of buffers) {
107+
b.push(iterationCandidate);
108+
}
105109
}
106-
return iterationCandidate;
110+
111+
if (0 in buffer) {
112+
return buffer.shift();
113+
};
114+
115+
return undefined;
107116
}

packages/batch-execute/src/splitResult.ts

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { ExecutionResult, GraphQLError } from 'graphql';
44

55
import isPromise from 'is-promise';
66

7-
import { AsyncExecutionResult, isAsyncIterable, relocatedError } from '@graphql-tools/utils';
7+
import { AsyncExecutionResult, ExecutionPatchResult, isAsyncIterable, relocatedError } from '@graphql-tools/utils';
88

99
import { parseKey } from './prefix';
1010
import { split } from './split';
@@ -39,8 +39,72 @@ export function splitExecutionResultOrAsyncIterableIterator(
3939
): Array<ExecutionResult | AsyncIterableIterator<AsyncExecutionResult>> {
4040
if (isAsyncIterable(mergedResult)) {
4141
return split(mergedResult, numResults, originalResult => {
42-
// TODO: implement splitter instead of this placeholder
43-
return [0, originalResult];
42+
const path = (originalResult as ExecutionPatchResult).path;
43+
if (path && path.length) {
44+
const { index, originalKey } = parseKey(path[0] as string);
45+
const newPath = ([originalKey] as Array<string | number>).concat(path.slice(1));
46+
47+
const newResult: ExecutionPatchResult = {
48+
...(originalResult as ExecutionPatchResult),
49+
path: newPath,
50+
};
51+
52+
const errors = originalResult.errors;
53+
if (errors) {
54+
const newErrors: Array<GraphQLError> = [];
55+
errors.forEach(error => {
56+
if (error.path) {
57+
const parsedKey = parseKey(error.path[0] as string);
58+
if (parsedKey) {
59+
const { originalKey } = parsedKey;
60+
const newError = relocatedError(error, [originalKey, ...error.path.slice(1)]);
61+
newErrors.push(newError);
62+
return;
63+
}
64+
}
65+
66+
newErrors.push(error);
67+
});
68+
newResult.errors = newErrors;
69+
}
70+
71+
return [index, newResult];
72+
}
73+
74+
let resultIndex: number;
75+
const newResult: ExecutionResult = { ...originalResult };
76+
const data = originalResult.data;
77+
if (data) {
78+
const newData = {};
79+
Object.keys(data).forEach(prefixedKey => {
80+
const { index, originalKey } = parseKey(prefixedKey);
81+
resultIndex = index;
82+
newData[originalKey] = data[prefixedKey];
83+
});
84+
newResult.data = newData;
85+
}
86+
87+
const errors = originalResult.errors;
88+
if (errors) {
89+
const newErrors: Array<GraphQLError> = [];
90+
errors.forEach(error => {
91+
if (error.path) {
92+
const parsedKey = parseKey(error.path[0] as string);
93+
if (parsedKey) {
94+
const { index, originalKey } = parsedKey;
95+
resultIndex = index;
96+
const newError = relocatedError(error, [originalKey, ...error.path.slice(1)]);
97+
newErrors.push(newError);
98+
return;
99+
}
100+
}
101+
102+
newErrors.push(error);
103+
});
104+
newResult.errors = newErrors;
105+
}
106+
107+
return [resultIndex, newResult]
44108
});
45109
}
46110

0 commit comments

Comments
 (0)