Skip to content

Commit cb5efcb

Browse files
committed
Direct data delivery for HTTP/2 Websocket
The current implementation moves the Websocket data through stream handlers before the Websocket process receives it and parses it. The new mechanism relays the data directly as an Erlang message without any intermediary. It also manages HTTP/2 flow control in a simpler way (it maintains it at a configured value). A new switch_protocol option (coming from the options returned from init/2) called `data_delivery` decides which mechanism to use: `stream_handlers` is the old behavior; `relay` the new. WIP: Need to test the relay mechanism in other test suites WIP: the benchmark test suite is greatly modified because I wanted to make sure Gun's poor performance was not negatively impacting results (this bit me earlier). Some tests use it, some don't, we should keep both for good measure. WIP: Other minor todos.
1 parent a8c7177 commit cb5efcb

File tree

6 files changed

+287
-48
lines changed

6 files changed

+287
-48
lines changed

src/cowboy_http2.erl

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,14 @@
8282
-export_type([opts/0]).
8383

8484
-record(stream, {
85-
%% Whether the stream is currently stopping.
86-
status = running :: running | stopping,
85+
%% Whether the stream is currently in a special state.
86+
%%
87+
%% - The running state is the normal state of a stream.
88+
%% - The relaying state is used by extended CONNECT protocols to
89+
%% use a 'relay' data_delivery method.
90+
%% - The stopping state indicates the stream used the 'stop' command.
91+
%% @todo Do we need to buffer data before the response is sent? Probably.
92+
status = running :: running | {relaying, pid()} | stopping,
8793

8894
%% Flow requested for this stream.
8995
flow = 0 :: non_neg_integer(),
@@ -327,6 +333,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
327333
%% Messages pertaining to a stream.
328334
{{Pid, StreamID}, Msg} when Pid =:= self() ->
329335
before_loop(info(State, StreamID, Msg), Buffer);
336+
{'$cowboy_stream_data', {Pid, StreamID}, IsFin, Data} when Pid =:= self() ->
337+
before_loop(commands(State, StreamID, [{data, IsFin, Data}]), Buffer);
330338
%% Exit signal from children.
331339
Msg = {'EXIT', Pid, _} ->
332340
before_loop(down(State, Pid, Msg), Buffer);
@@ -520,6 +528,14 @@ data_frame(State0=#state{opts=Opts, flow=Flow0, streams=Streams}, StreamID, IsFi
520528
reset_stream(State0, StreamID, {internal_error, {Class, Exception},
521529
'Unhandled exception in cowboy_stream:data/4.'})
522530
end;
531+
%% Stream handlers are not used for the data when relaying.
532+
#{StreamID := #stream{status={relaying, RelayPid}}} ->
533+
RelayPid ! {'$cowboy_stream_data', {self(), StreamID}, IsFin, Data},
534+
%% We keep a steady flow using the configured flow value.
535+
%% Because we do not change the 'flow' value the update_window/2
536+
%% function will always maintain this value (of course with
537+
%% thresholds applying).
538+
update_window(State0, StreamID);
523539
%% We ignore DATA frames for streams that are stopping.
524540
#{} ->
525541
State0
@@ -866,6 +882,19 @@ commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade},
866882
commands(State, StreamID, Tail);
867883
%% Use a different protocol within the stream (CONNECT :protocol).
868884
%% @todo Make sure we error out when the feature is disabled.
885+
%% There are two data_delivery: stream_handlers and relay.
886+
%% The former just has the data go through stream handlers
887+
%% like normal requests. The latter relays data directly.
888+
commands(State0=#state{flow=Flow, streams=Streams}, StreamID,
889+
[{switch_protocol, Headers, _Mod, ModState=#{data_delivery := relay}}|Tail]) ->
890+
State1 = info(State0, StreamID, {headers, 200, Headers}),
891+
#{StreamID := Stream} = Streams,
892+
#{data_delivery_pid := RelayPid} = ModState,
893+
RelayFlow = maps:get(data_delivery_flow, ModState, 131072),
894+
State = State1#state{flow=Flow + RelayFlow, streams=Streams#{StreamID => Stream#stream{
895+
status={relaying, RelayPid},
896+
flow=RelayFlow}}},
897+
commands(State, StreamID, Tail);
869898
commands(State0, StreamID, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
870899
State = info(State0, StreamID, {headers, 200, Headers}),
871900
commands(State, StreamID, Tail);

src/cowboy_websocket.erl

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@
9191
parent :: undefined | pid(),
9292
ref :: ranch:ref(),
9393
socket = undefined :: inet:socket() | {pid(), cowboy_stream:streamid()} | undefined,
94-
transport = undefined :: module() | undefined,
94+
transport :: module() | {data_delivery, stream_handlers | relay},
9595
opts = #{} :: opts(),
9696
active = true :: boolean(),
9797
handler :: module(),
@@ -149,7 +149,7 @@ upgrade(Req, Env, Handler, HandlerState) ->
149149
%% @todo Immediately crash if a response has already been sent.
150150
upgrade(Req0=#{version := Version}, Env, Handler, HandlerState, Opts) ->
151151
FilteredReq = case maps:get(req_filter, Opts, undefined) of
152-
undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req0);
152+
undefined -> maps:with([method, version, scheme, host, port, path, qs, peer, streamid], Req0);
153153
FilterFun -> FilterFun(Req0)
154154
end,
155155
Utf8State = case maps:get(validate_utf8, Opts, true) of
@@ -273,12 +273,28 @@ websocket_handshake(State=#state{key=Key},
273273
%% For HTTP/2 we do not let the process die, we instead keep it
274274
%% for the Websocket stream. This is because in HTTP/2 we only
275275
%% have a stream, it doesn't take over the whole connection.
276-
websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
276+
%%
277+
%% There are two methods of delivering data to the Websocket session:
278+
%% - 'stream_handlers' is the default and makes the data go
279+
%% through stream handlers just like when reading a request body;
280+
%% - 'relay' is a new method where data is sent as a message as
281+
%% soon as it is received from the socket in a DATA frame.
282+
websocket_handshake(State=#state{opts=Opts},
283+
Req=#{ref := Ref, pid := Pid, streamid := StreamID},
277284
HandlerState, _Env) ->
278285
%% @todo We don't want date and server headers.
279286
Headers = cowboy_req:response_headers(#{}, Req),
280-
Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}},
281-
takeover(Pid, Ref, {Pid, StreamID}, undefined, #{}, <<>>,
287+
DataDelivery = maps:get(data_delivery, Opts, stream_handlers),
288+
ModState = #{
289+
data_delivery => DataDelivery,
290+
%% For relay data_delivery. The flow is a hint and may
291+
%% not be used by the underlying protocol.
292+
data_delivery_pid => self(),
293+
data_delivery_flow => maps:get(data_delivery_flow, Opts, 131072)
294+
},
295+
Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, ModState}},
296+
%% @todo We can't call the normal takeover because it tries to parse.
297+
takeover(Pid, Ref, {Pid, StreamID}, {data_delivery, DataDelivery}, #{}, <<>>,
282298
{State, HandlerState}).
283299

284300
%% Connection process.
@@ -311,7 +327,7 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
311327
_ -> ranch:remove_connection(Ref)
312328
end,
313329
Messages = case Transport of
314-
undefined -> undefined;
330+
{data_delivery, _} -> undefined;
315331
_ -> Transport:messages()
316332
end,
317333
State = set_idle_timeout(State0#state{parent=Parent,
@@ -355,13 +371,14 @@ after_init(State, HandlerState, ParseState) ->
355371
%% immediately but there might still be data to be processed in
356372
%% the message queue.
357373

358-
setopts_active(#state{transport=undefined}) ->
374+
setopts_active(#state{transport={data_delivery, _}}) ->
359375
ok;
360376
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
361377
N = maps:get(active_n, Opts, 1),
362378
Transport:setopts(Socket, [{active, N}]).
363379

364-
maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) ->
380+
maybe_read_body(#state{transport={data_delivery, stream_handlers},
381+
socket=Stream={Pid, _}, active=true}) ->
365382
%% @todo Keep Ref around.
366383
ReadBodyRef = make_ref(),
367384
Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}},
@@ -374,10 +391,11 @@ active(State) ->
374391
maybe_read_body(State),
375392
State#state{active=true}.
376393

377-
passive(State=#state{transport=undefined}) ->
394+
passive(State=#state{transport={data_delivery, _}}) ->
378395
%% Unfortunately we cannot currently cancel read_body.
379396
%% But that's OK, we will just stop reading the body
380397
%% after the next message.
398+
%% @todo We can't stop relay data_delivery currently.
381399
State#state{active=false};
382400
passive(State=#state{socket=Socket, transport=Transport, messages=Messages}) ->
383401
Transport:setopts(Socket, [{active, false}]),
@@ -454,6 +472,10 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
454472
{request_body, _Ref, fin, _, Data} ->
455473
maybe_read_body(State),
456474
parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
475+
%% @todo It would be better to check StreamID.
476+
%% @todo We must ensure that IsFin=fin is handled like a socket close?
477+
{'$cowboy_stream_data', {Pid, _StreamID}, _IsFin, Data} when Pid =:= Parent ->
478+
parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
457479
%% Timeouts.
458480
{timeout, TRef, ?MODULE} ->
459481
tick_idle_timeout(State, HandlerState, ParseState);
@@ -662,9 +684,14 @@ commands([Frame|Tail], State, Data0) ->
662684
commands(Tail, State, Data)
663685
end.
664686

665-
transport_send(#state{socket=Stream={Pid, _}, transport=undefined}, IsFin, Data) ->
687+
transport_send(#state{transport={data_delivery, stream_handlers},
688+
socket=Stream={Pid, _}}, IsFin, Data) ->
666689
Pid ! {Stream, {data, IsFin, Data}},
667690
ok;
691+
transport_send(#state{transport={data_delivery, relay},
692+
socket=Stream={Pid, _}}, IsFin, Data) ->
693+
Pid ! {'$cowboy_stream_data', Stream, IsFin, Data},
694+
ok;
668695
transport_send(#state{socket=Socket, transport=Transport}, _, Data) ->
669696
Transport:send(Socket, Data).
670697

src/cowboy_webtransport.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) ->
277277
%% callback is not implemented.
278278
%%
279279
%% @todo Better type than map() for the cowboy_stream state.
280+
%% @todo Is this really useful?
280281

281282
-spec info(cowboy_stream:streamid(), any(), State)
282283
-> {cowboy_stream:commands(), State} when State::map().

test/handlers/ws_ignore.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
init(Req, _) ->
1010
{cowboy_websocket, Req, undefined, #{
11+
data_delivery => relay,
1112
compress => true
1213
}}.
1314

test/ws_SUITE_data/ws_echo.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
init(Req, _) ->
1010
{cowboy_websocket, Req, undefined, #{
11+
data_delivery => relay,
1112
compress => true
1213
}}.
1314

0 commit comments

Comments
 (0)