Skip to content
This repository was archived by the owner on Mar 13, 2021. It is now read-only.

Commit cb6e6e4

Browse files
Florent Bivillefbiville
authored andcommitted
Remove argument transformers, partially reinstate argument type
Fixes #122
1 parent 698d93c commit cb6e6e4

25 files changed

+435
-707
lines changed

README.md

Lines changed: 53 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ and invokes functions accordingly.
1010

1111
## Supported functions
1212

13-
### Non-streaming functions
13+
### Non-streaming functions (a.k.a. request-reply functions)
1414

1515
Non-streaming functions, more specifically "request-reply" functions, such as:
1616
```js
@@ -68,6 +68,58 @@ The function **must** end the output streams when it is done emitting data or wh
6868
(if the output streams are [`pipe`](https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options)'d from
6969
input streams, then this is automatically managed by this invoker).
7070

71+
## Message support
72+
73+
A message is an object that contains both headers and a payload.
74+
Message headers are a map with case-insensitive keys and multiple string values.
75+
76+
Since JavaScript and Node have no built-in type for messages or headers, riff uses the [@projectriff/message](https://github.com/projectriff/node-message/) npm module. To use messages, functions should install the `@projectriff/message` package:
77+
```bash
78+
npm install --save @projectriff/message
79+
```
80+
81+
By default, request-reply functions accept and produce payloads.
82+
They can be configured instead to **receive** either the entire message or the headers only.
83+
84+
> Streaming functions can only receive messages. Configuring them with `$argumentType` will trigger an error.
85+
> However, they can produce either messages or payloads, just like request-reply functions.
86+
87+
##### Receiving messages
88+
89+
```js
90+
const { Message } = require('@projectriff/message');
91+
92+
// a request-reply function that accepts a message, which is an instance of Message
93+
module.exports = message => {
94+
const authorization = message.headers.getValue('Authorization');
95+
// [...]
96+
};
97+
98+
// tell the invoker the function wants to receive messages
99+
module.exports.$argumentType = 'message';
100+
```
101+
102+
##### Producing messages
103+
104+
```js
105+
const { Message } = require('@projectriff/message');
106+
107+
const instanceId = Math.round(Math.random() * 10000);
108+
let invocationCount = 0;
109+
110+
// a request-reply function that produces a Message
111+
module.exports = name => {
112+
return Message.builder()
113+
.addHeader('X-Riff-Instance', instanceId)
114+
.addHeader('X-Riff-Count', invocationCount++)
115+
.payload(`Hello ${name}!`)
116+
.build();
117+
};
118+
119+
// even if the function receives payloads, it can still produce a message
120+
module.exports.$argumentType = 'payload';
121+
```
122+
71123
## Lifecycle
72124

73125
Functions that communicate with external services, like a database, can use the `$init` and `$destroy` lifecycle hooks
@@ -104,37 +156,6 @@ Note that the lifecycle hooks must be fields on the exported function.
104156
The hooks may be either synchronous or async functions.
105157
Lifecycle functions have up to **10 seconds** to complete their work, or the function invoker will abort.
106158

107-
## Argument transformers
108-
109-
Sometimes, the content-type information is not enough to extract the payload the user function is supposed to interact
110-
with.
111-
112-
Argument transformers are custom functions that take a `Message` (as defined by [`@projectriff/message`](https://github.com/projectriff/node-message))
113-
and return whatever the function needs.
114-
115-
The `Message` payload is the result of the first content-type-based conversion pass. For instance, if the input
116-
content-type is `application/json` and its payload is `'{"key": "value"}'` the payload of the `Message` exposed to the
117-
transformer will be the corresponding object representation (i.e. `{"key": "value"}`).
118-
119-
Argument transformers are declared this way:
120-
121-
```js
122-
module.exports.$argumentTransformers = [
123-
// transformer for first input
124-
(message) => {
125-
return message.payload;
126-
},
127-
// transformer for second input
128-
(message) => {
129-
return message.headers.getValue('x-some-header');
130-
},
131-
// ...
132-
];
133-
```
134-
135-
If `$argumentTransformers` is not declared, the default transformer assigned to each input extracts the `Message`
136-
payload.
137-
138159
## Supported protocols
139160

140161
This invoker supports only streaming, and complies to [riff streaming protocol](https://github.com/projectriff/streaming-processor).

lib/errors.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ module.exports = {
1111
// request-reply functions
1212
REQUEST_REPLY_FUNCTION_RUNTIME: "request-reply-function-runtime-error",
1313
FUNCTION_PROMOTION: "error-promoting-function",
14-
// argument transformers
15-
ARGUMENT_TRANSFORMER: "error-argument-transformer",
1614
// hooks
1715
HOOK_INVALID: "error-invalid-hook",
1816
HOOK_TIMEOUT: "error-hook-timeout",
@@ -21,6 +19,7 @@ module.exports = {
2119
UNSUPPORTED_INPUT_CONTENT_TYPE: "error-input-content-type-unsupported",
2220
INVALID_INPUT_CONTENT_TYPE: "error-input-content-type-invalid",
2321
INVALID_INPUT: "error-input-invalid",
22+
UNEXPECTED_INDEX_INPUT: "error-input-unexpected-index",
2423
UNSUPPORTED_OUTPUT_CONTENT_TYPE:
2524
"error-output-content-type-unsupported",
2625
INVALID_OUTPUT: "error-output-invalid",

lib/input-unmarshaller.js

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,37 +8,32 @@ const {
88
unmarshaller,
99
} = require("./content-negotiation");
1010

11-
const DEFAULT_ARGUMENT_TRANSFORMER = (msg) => msg.payload;
12-
1311
module.exports = class InputUnmarshaller extends Transform {
14-
constructor(argumentTransformer) {
12+
constructor() {
1513
super({ objectMode: true });
16-
this.argumentTransformer =
17-
argumentTransformer || DEFAULT_ARGUMENT_TRANSFORMER;
1814
}
1915

2016
_transform(inputSignal, _, callback) {
2117
const dataSignal = inputSignal.data;
22-
23-
let finalPayloadResult;
18+
let unmarshalledPayload;
2419
console.debug(`Forwarding data for input #${dataSignal.argIndex || 0}`);
2520
try {
26-
const contentTypeResult = this._parseContentType(dataSignal);
27-
const unmarshalledPayloadResult = this._unmarshal(
28-
contentTypeResult,
21+
const acceptedContentType = this._parseContentType(dataSignal);
22+
unmarshalledPayload = this._unmarshal(
23+
acceptedContentType,
2924
dataSignal
3025
);
31-
finalPayloadResult = this._convertToMessage(
32-
unmarshalledPayloadResult,
33-
dataSignal.headers
34-
);
3526
} catch (err) {
3627
callback(err);
3728
return;
3829
}
3930

4031
try {
41-
callback(null, finalPayloadResult);
32+
const message = this._convertToMessage(
33+
unmarshalledPayload,
34+
dataSignal.headers
35+
);
36+
callback(null, message);
4237
} catch (err) {
4338
// propagate downstream error
4439
this.emit("error", err);
@@ -80,14 +75,6 @@ module.exports = class InputUnmarshaller extends Transform {
8075
}
8176

8277
_convertToMessage(payload, headers) {
83-
try {
84-
return this.argumentTransformer(this._toMessage(payload, headers));
85-
} catch (err) {
86-
throw new RiffError(errorTypes.ARGUMENT_TRANSFORMER, err);
87-
}
88-
}
89-
90-
_toMessage(payload, headers) {
9178
const messageBuilder = Message.builder();
9279
for (const headerName in headers) {
9380
messageBuilder.addHeader(headerName, headers[headerName]);

lib/mapping-transform.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ module.exports = class MappingTransform extends Transform {
66
constructor(fn) {
77
super({ objectMode: true });
88
this._function = fn;
9+
this._argumentType = fn.$argumentType || "payload";
910
}
1011

1112
_transform(chunk, _, callback) {
1213
Promise.resolve(chunk)
14+
.then((message) => this._transformMessage(message))
1315
.then(this._function)
1416
.then(
1517
(result) => callback(null, result),
@@ -27,4 +29,17 @@ module.exports = class MappingTransform extends Transform {
2729
);
2830
});
2931
}
32+
33+
_transformMessage(message) {
34+
switch (this._argumentType) {
35+
case "payload":
36+
return message.payload;
37+
case "message":
38+
return message;
39+
case "headers":
40+
return message.headers;
41+
default:
42+
throw new Error(`unknown $argumentType: ${this._argumentType}`);
43+
}
44+
}
3045
};

lib/request-reply-promoter.js

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,4 @@
1-
const validateArgumentTransformers = require("./argument-transformer-validator");
2-
const RiffError = require("./riff-error");
31
const MappingTransform = require("./mapping-transform");
4-
const { types: errorTypes } = require("./errors");
5-
6-
const withTransformers = (promotedFunction, userFunction) => {
7-
const transformers = userFunction["$argumentTransformers"];
8-
if (typeof transformers === "undefined") {
9-
return promotedFunction;
10-
}
11-
12-
validateArgumentTransformers(transformers);
13-
const transformerCount = transformers.length;
14-
if (transformerCount !== 1) {
15-
throw new RiffError(
16-
errorTypes.ARGUMENT_TRANSFORMER,
17-
`Request-reply function must declare exactly 1 argument transformer. Found ${transformerCount}`
18-
);
19-
}
20-
promotedFunction["$argumentTransformers"] = transformers;
21-
return promotedFunction;
22-
};
232

243
module.exports = (userFunction) => {
254
const interactionModel =
@@ -30,12 +9,12 @@ module.exports = (userFunction) => {
309

3110
console.debug("Promoting request-reply function to streaming function");
3211
const mapper = new MappingTransform(userFunction);
33-
const promotedFunction = (inputs, outputs) => {
12+
const result = (inputs, outputs) => {
3413
mapper.on("error", (err) => {
3514
inputs.$order[0].emit("error", err);
3615
});
3716
inputs.$order[0].pipe(mapper).pipe(outputs.$order[0]);
3817
};
39-
promotedFunction.$interactionModel = "node-streams";
40-
return withTransformers(promotedFunction, userFunction);
18+
result.$interactionModel = "node-streams";
19+
return result;
4120
};

lib/streaming-pipeline.js

Lines changed: 18 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
const { Writable } = require("stream");
2-
const validateArgumentTransformers = require("./argument-transformer-validator");
32
const RiffError = require("./riff-error");
43
const OutputMarshaller = require("./output-marshaller");
54
const InputUnmarshaller = require("./input-unmarshaller");
@@ -17,23 +16,20 @@ const toObject = (parameters, streamNames) => {
1716
);
1817
};
1918

20-
const validateTransformers = (transformers) => {
21-
if (typeof transformers === "undefined") {
22-
return;
23-
}
24-
validateArgumentTransformers(transformers);
25-
};
26-
2719
module.exports = class StreamingPipeline extends Writable {
2820
constructor(userFunction, grpcStream) {
2921
super({ objectMode: true });
22+
if (typeof userFunction.$argumentType !== "undefined") {
23+
throw new Error(
24+
`Streaming functions cannot be configured with $argumentType`
25+
);
26+
}
3027
if (userFunction.$interactionModel !== "node-streams") {
3128
throw new Error(
3229
`SteamingPipeline expects a function with "node-streams" interaction model, but was "${userFunction.$interactionModel}" instead`
3330
);
3431
}
3532

36-
validateTransformers(userFunction["$argumentTransformers"]);
3733
this._userFunction = userFunction;
3834
this._destinationStream = grpcStream;
3935
this._startReceived = false;
@@ -117,25 +113,6 @@ module.exports = class StreamingPipeline extends Writable {
117113
`input names: [${this._streamMetadata.inputs}], ` +
118114
`output names: [${this._streamMetadata.outputs}]`
119115
);
120-
121-
const inputCount = this._streamMetadata.inputs.length;
122-
let transformers =
123-
this._userFunction["$argumentTransformers"] ||
124-
Array(inputCount);
125-
if (transformers.length !== inputCount) {
126-
const transformerCount = transformers.length;
127-
console.error(
128-
`Wrong number of argument transformers: ${transformerCount} instead of ${inputCount}`
129-
);
130-
callback(
131-
new RiffError(
132-
errors.types.ARGUMENT_TRANSFORMER,
133-
`Function must declare exactly ${inputCount} argument transformer(s). Found ${transformerCount}`
134-
)
135-
);
136-
return;
137-
}
138-
139116
const normalizedOutputContentTypes = [];
140117
for (let i = 0; i < outputContentTypes.length; i++) {
141118
const contentType = outputContentTypes[i];
@@ -157,7 +134,7 @@ module.exports = class StreamingPipeline extends Writable {
157134
normalizedOutputContentTypes.push(normalizedOutputContentType);
158135
}
159136

160-
this._wireInputs(transformers);
137+
this._wireInputs(this._streamMetadata.inputs.length);
161138
this._wireOutputs(normalizedOutputContentTypes);
162139
this._invoke(callback);
163140
this._startReceived = true;
@@ -218,11 +195,10 @@ module.exports = class StreamingPipeline extends Writable {
218195
this._functionInputs.forEach((fi) => fi.emit("end"));
219196
}
220197

221-
_wireInputs(transformers) {
222-
const inputCount = transformers.length;
198+
_wireInputs(inputCount) {
223199
console.debug(`Wiring ${inputCount} input stream(s)`);
224200
for (let i = 0; i < inputCount; i++) {
225-
const unmarshaller = new InputUnmarshaller(transformers[i]);
201+
const unmarshaller = new InputUnmarshaller();
226202
unmarshaller.on("error", (err) => {
227203
console.error(
228204
`error from ${this._errorOrigin(err)}: ${err.toString()}`
@@ -233,16 +209,6 @@ module.exports = class StreamingPipeline extends Writable {
233209
}
234210
}
235211

236-
_errorOrigin(err) {
237-
if (
238-
err.type &&
239-
err.type === errors.types.REQUEST_REPLY_FUNCTION_RUNTIME
240-
) {
241-
return "request-reply function";
242-
}
243-
return "unmarshaller";
244-
}
245-
246212
_wireOutputs(outputContentTypes) {
247213
const outputCount = outputContentTypes.length;
248214
console.debug(`Wiring ${outputCount} output stream(s)`);
@@ -284,4 +250,14 @@ module.exports = class StreamingPipeline extends Writable {
284250
);
285251
}
286252
}
253+
254+
_errorOrigin(err) {
255+
if (
256+
err.type &&
257+
err.type === errors.types.REQUEST_REPLY_FUNCTION_RUNTIME
258+
) {
259+
return "request-reply function";
260+
}
261+
return "unmarshaller";
262+
}
287263
};

samples/streaming-mean/mean.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ module.exports = (inputs, outputs) => {
55
let n = 0;
66
let mean = 0;
77
const meanStream = miss.through.obj((newValue, _, callback) => {
8-
mean = stats.addToMean(mean, n++, newValue);
8+
mean = stats.addToMean(mean, n++, newValue.payload);
99
callback(null, mean);
1010
});
1111
inputs.$order[0].pipe(meanStream).pipe(outputs.$order[0]);

samples/streaming-repeater/repeater.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ module.exports = (inputs, outputs) => {
1111
.zip(wordStream)
1212
.flatMap((numberWordPair) => {
1313
const result = [];
14-
for (let i = 0; i < numberWordPair[0]; i++) {
15-
result.push(numberWordPair[1]);
14+
for (let i = 0; i < numberWordPair[0].payload; i++) {
15+
result.push(numberWordPair[1].payload);
1616
}
1717
return _(result);
1818
})

0 commit comments

Comments
 (0)