Skip to content

Commit 67ffab4

Browse files
mat-hekvarsillbblaszkow06
authored
Playback refactor (#432)
* remove parent message dispatcher and genserver helper * POC pipeline & elements startup * termination mvp * children removal bug fixes * add linking order test, fix getting clock and bin termination * remove playback state checks, fix tests * remove status and add playback * don't remove children until they're linked * fix long running sync test * Add option to name processes to ease debugging with observer * make dialyzer happy * fix linking test * fixed most credo issues * improve pipeline initialization * using dynamic filter from test/support * docs update * add terminate action * improve logs in supervisors * ignore playback action when terminating * Update lib/membrane/core/bin/action_handler.ex Co-authored-by: Łukasz Kita <[email protected]> * Update lib/membrane/core/pipeline/action_handler.ex Co-authored-by: Łukasz Kita <[email protected]> * Update lib/membrane/pipeline.ex Co-authored-by: Łukasz Kita <[email protected]> * Update lib/membrane/pipeline.ex Co-authored-by: Łukasz Kita <[email protected]> * start remote controlled pipeline with start_supervised in tests * fixes for CR * add test for element play * fixes for CR Co-authored-by: Bartosz Błaszków <[email protected]> * handle_play -> handle_playing * play_request? -> playing_requested? * Play -> Playing * Apply suggestions from code review Co-authored-by: Bartosz Błaszków <[email protected]> * refactor component path * Improved ChildrenSupervisor, ResourceGuard and Observability (#447) * supervisors refactor * resource guard * add utility supervisor to context * refactor element tests not to use State.new * Update lib/membrane/resource_guard.ex Co-authored-by: Bartosz Błaszków <[email protected]> * run resource cleanup in a separate process * refactor observability setup * improve children supervisor API * add docs and test for utility supervisor and resource guard * add timeout to resource * use a boring function name instead of go_brrr * unlink cleanup task in resource guard before killing it * Add list of available callbacks in each component (#449) * Add an aggregated list of all callbacks that can be implemented by `Filter`, `Endpoint`, `Source` and `Sink` element behaviours. Co-authored-by: Mateusz Front <[email protected]> * add docs for Observability.setup Co-authored-by: Bartosz Błaszków <[email protected]> Co-authored-by: Łukasz Kita <[email protected]> * Playback termination (#451) * handle_terminate_request in parent * add handle_terminate_request to element * fix unlinking pads * add terminate action type to element and bin * Rewrite espec tests (#452) * Remove unecessary ESpec tests * Rewrite some crucial ESpec tests * Unify the support modules from ExUnit and ESpec * fix linking test * fix resource guard test * Apply suggestions from code review Co-authored-by: Bartosz Błaszków <[email protected]> Co-authored-by: Łukasz Kita <[email protected]> * Update lib/membrane/bin.ex Co-authored-by: Bartosz Błaszków <[email protected]> * CallbackContext.Other -> Info * fix typos <3 * Callback contexts for handle_terminate_request * add docs for zombies * use ResourceGuard to report termination via telemetry * Add a test to check the links between processes in the pipeline_test.exs. Fix the bug in the test case * add deferred termination tests * add description for children supervisor * fix RC in child removal test * ChildrenSupervisor -> SubprocessSupervisor Co-authored-by: Łukasz Kita <[email protected]> Co-authored-by: Bartosz Błaszków <[email protected]> * Fix spawning children on remote nodes (#456) * fix spawning children on remote nodes * remove redundant checks in element and bin * Fix filter aggregator (#455) * termporairly fix distribution * re-enable filter aggregator tests * make filter aggregator work again * add context to handle_init * unify handle_init default impl across components * filter aggregator test improvements * remove redundant match * Fixes for CR Co-authored-by: Łukasz Kita <[email protected]> * RemoteControlled.Pipeline await_play -> await_playing * put back relevant tests Co-authored-by: Łukasz Kita <[email protected]> Co-authored-by: Bartosz Błaszków <[email protected]>
1 parent 4ad63c5 commit 67ffab4

File tree

157 files changed

+3783
-3614
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

157 files changed

+3783
-3614
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
## 0.10.0
1212
* Remove all deprecated stuff [#399](https://github.com/membraneframework/membrane_core/pull/399)
1313
* Make `Membrane.Pipeline.{prepare, play, stop}` deprecated and add `:playback` action instead
14-
* Make `Membrane.Pipeline.stop_and_terminate/2` deprecated and add `Membrane.Pipeline.terminate/2` instead
14+
* Make `Membrane.Pipeline.stop_and_terminate` deprecated and add `Membrane.Pipeline.terminate/2` instead
1515
* Add `Membrane.RemoteControlled.Pipeline` - a basic implementation of a `Membrane.Pipeline` that </br>
1616
can be spawned and controlled by an external process [#366](https://github.com/membraneframework/membrane_core/pull/366)
1717
* Disallow sending buffers without sending caps first [#341](https://github.com/membraneframework/membrane_core/issues/341)

assets/images/observer_graph.png

290 KB
Loading

lib/membrane/bin.ex

Lines changed: 49 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ defmodule Membrane.Bin do
2626

2727
@typedoc """
2828
Defines options that can be passed to `start_link/3` and received
29-
in `c:handle_init/1` callback.
29+
in `c:handle_init/2` callback.
3030
"""
3131
@type options_t :: struct | nil
3232

@@ -36,23 +36,19 @@ defmodule Membrane.Bin do
3636
@type name_t :: any()
3737

3838
@doc """
39-
Callback invoked on initialization of bin process. It should parse options
40-
and initialize bin's internal state. Internally it is invoked inside
41-
`c:GenServer.init/1` callback.
42-
"""
43-
@callback handle_init(options :: options_t) :: callback_return_t()
44-
45-
@doc """
46-
Callback invoked when bin is shutting down.
47-
Internally called in `c:GenServer.terminate/2` callback.
39+
Callback invoked on initialization of bin.
4840
49-
Useful for any cleanup required.
41+
This callback is synchronous: the parent waits until it finishes. Also, any failures
42+
that happen in this callback crash the parent as well, regardless of crash groups.
43+
For these reasons, it's important to do any long-lasting or complex work in `c:handle_setup/2`,
44+
while `handle_init` should be used for things like parsing options, initializing state or
45+
spawning children.
5046
"""
51-
@callback handle_shutdown(reason, state :: state_t) :: :ok
52-
when reason: :normal | :shutdown | {:shutdown, any} | term()
47+
@callback handle_init(context :: CallbackContext.Init.t(), options :: options_t) ::
48+
callback_return_t()
5349

5450
@doc """
55-
Callback that is called when new pad has beed added to bin. Executed
51+
Callback that is called when new pad has been added to bin. Executed
5652
ONLY for dynamic pads.
5753
"""
5854
@callback handle_pad_added(
@@ -62,7 +58,7 @@ defmodule Membrane.Bin do
6258
) :: callback_return_t
6359

6460
@doc """
65-
Callback that is called when some pad of the bin has beed removed. Executed
61+
Callback that is called when some pad of the bin has been removed. Executed
6662
ONLY for dynamic pads.
6763
"""
6864
@callback handle_pad_removed(
@@ -72,54 +68,24 @@ defmodule Membrane.Bin do
7268
) :: callback_return_t
7369

7470
@doc """
75-
Callback invoked when bin transition from `:stopped` to `:prepared` state has finished,
76-
that is all of its children are prepared to enter `:playing` state.
77-
"""
78-
@callback handle_stopped_to_prepared(
79-
context :: CallbackContext.PlaybackChange.t(),
80-
state :: state_t
81-
) ::
82-
callback_return_t
71+
Callback invoked on bin startup, right after `c:handle_init/2`.
8372
84-
@doc """
85-
Callback invoked when bin transition from `:playing` to `:prepared` state has finished,
86-
that is all of its children are prepared to be stopped.
73+
Any long-lasting or complex initialization should happen here.
8774
"""
88-
@callback handle_playing_to_prepared(
89-
context :: CallbackContext.PlaybackChange.t(),
75+
@callback handle_setup(
76+
context :: CallbackContext.Setup.t(),
9077
state :: state_t
91-
) ::
92-
callback_return_t
93-
94-
@doc """
95-
Callback invoked when bin is in `:playing` state, i.e. all its children
96-
are in this state.
97-
"""
98-
@callback handle_prepared_to_playing(
99-
context :: CallbackContext.PlaybackChange.t(),
100-
state :: state_t
101-
) ::
102-
callback_return_t
78+
) :: callback_return_t
10379

10480
@doc """
105-
Callback invoked when bin is in `:playing` state, i.e. all its children
106-
are in this state.
81+
Callback invoked when bin switches the playback to `:playing`.
10782
"""
108-
@callback handle_prepared_to_stopped(
109-
context :: CallbackContext.PlaybackChange.t(),
83+
@callback handle_playing(
84+
context :: CallbackContext.Playing.t(),
11085
state :: state_t
11186
) ::
11287
callback_return_t
11388

114-
@doc """
115-
Callback invoked when bin is in `:terminating` state, i.e. all its children
116-
are in this state.
117-
"""
118-
@callback handle_stopped_to_terminating(
119-
context :: CallbackContext.PlaybackChange.t(),
120-
state :: state_t
121-
) :: callback_return_t
122-
12389
@doc """
12490
Callback invoked when a notification comes in from an element.
12591
"""
@@ -143,14 +109,13 @@ defmodule Membrane.Bin do
143109
Callback invoked when bin receives a message that is not recognized
144110
as an internal membrane message.
145111
146-
Useful for receiving data sent from NIFs or other stuff.
112+
Can be used for receiving data from non-membrane processes.
147113
"""
148114
@callback handle_info(
149115
message :: any,
150-
context :: CallbackContext.Other.t(),
116+
context :: CallbackContext.Info.t(),
151117
state :: state_t
152-
) ::
153-
callback_return_t
118+
) :: callback_return_t
154119

155120
@doc """
156121
Callback invoked when a child element starts processing stream via given pad.
@@ -173,11 +138,7 @@ defmodule Membrane.Bin do
173138
) :: callback_return_t
174139

175140
@doc """
176-
Callback invoked when `Membrane.ParentSpec` is linked and in the same playback
177-
state as bin.
178-
179-
This callback can be started from `c:handle_init/1` callback or as
180-
`t:Membrane.Bin.Action.spec_t/0` action.
141+
Callback invoked when children of `Membrane.ParentSpec` are started.
181142
"""
182143
@callback handle_spec_started(
183144
children :: [Child.name_t()],
@@ -195,22 +156,27 @@ defmodule Membrane.Bin do
195156
state :: state_t
196157
) :: callback_return_t
197158

198-
@optional_callbacks handle_init: 1,
199-
handle_shutdown: 2,
159+
@doc """
160+
A callback invoked when the bin is being removed by its parent.
161+
162+
By default it returns `t:Membrane.Bin.Action.terminate_t/0` with reason `:normal`.
163+
"""
164+
@callback handle_terminate_request(context :: CallbackContext.TerminateRequest.t(), state_t) ::
165+
callback_return_t()
166+
167+
@optional_callbacks handle_init: 2,
200168
handle_pad_added: 3,
201169
handle_pad_removed: 3,
202-
handle_stopped_to_prepared: 2,
203-
handle_playing_to_prepared: 2,
204-
handle_prepared_to_playing: 2,
205-
handle_prepared_to_stopped: 2,
206-
handle_stopped_to_terminating: 2,
170+
handle_setup: 2,
171+
handle_playing: 2,
207172
handle_info: 3,
208173
handle_spec_started: 3,
209174
handle_element_start_of_stream: 4,
210175
handle_element_end_of_stream: 4,
211176
handle_child_notification: 4,
212177
handle_parent_notification: 3,
213-
handle_tick: 3
178+
handle_tick: 3,
179+
handle_terminate_request: 2
214180

215181
@doc PadsSpecs.def_pad_docs(:input, :bin)
216182
defmacro def_input_pad(name, spec) do
@@ -314,10 +280,11 @@ defmodule Membrane.Bin do
314280
def membrane_bin?, do: true
315281

316282
@impl true
317-
def handle_init(_options), do: {:ok, %{}}
283+
def handle_init(_ctx, %_opt_struct{} = options),
284+
do: {:ok, options |> Map.from_struct()}
318285

319286
@impl true
320-
def handle_shutdown(_reason, _state), do: :ok
287+
def handle_init(_ctx, options), do: {:ok, options}
321288

322289
@impl true
323290
def handle_pad_added(_pad, _ctx, state), do: {:ok, state}
@@ -326,19 +293,10 @@ defmodule Membrane.Bin do
326293
def handle_pad_removed(_pad, _ctx, state), do: {:ok, state}
327294

328295
@impl true
329-
def handle_stopped_to_prepared(_ctx, state), do: {:ok, state}
330-
331-
@impl true
332-
def handle_prepared_to_playing(_ctx, state), do: {:ok, state}
333-
334-
@impl true
335-
def handle_playing_to_prepared(_ctx, state), do: {:ok, state}
296+
def handle_setup(_ctx, state), do: {:ok, state}
336297

337298
@impl true
338-
def handle_prepared_to_stopped(_ctx, state), do: {:ok, state}
339-
340-
@impl true
341-
def handle_stopped_to_terminating(_ctx, state), do: {:ok, state}
299+
def handle_playing(_ctx, state), do: {:ok, state}
342300

343301
@impl true
344302
def handle_info(message, _ctx, state), do: {:ok, state}
@@ -358,21 +316,21 @@ defmodule Membrane.Bin do
358316
@impl true
359317
def handle_parent_notification(_notification, _ctx, state), do: {:ok, state}
360318

361-
defoverridable handle_init: 1,
362-
handle_shutdown: 2,
319+
@impl true
320+
def handle_terminate_request(_ctx, state), do: {{:ok, terminate: :normal}, state}
321+
322+
defoverridable handle_init: 2,
363323
handle_pad_added: 3,
364324
handle_pad_removed: 3,
365-
handle_stopped_to_prepared: 2,
366-
handle_playing_to_prepared: 2,
367-
handle_prepared_to_playing: 2,
368-
handle_prepared_to_stopped: 2,
369-
handle_stopped_to_terminating: 2,
325+
handle_setup: 2,
326+
handle_playing: 2,
370327
handle_info: 3,
371328
handle_spec_started: 3,
372329
handle_element_start_of_stream: 4,
373330
handle_element_end_of_stream: 4,
374331
handle_child_notification: 4,
375-
handle_parent_notification: 3
332+
handle_parent_notification: 3,
333+
handle_terminate_request: 2
376334
end
377335
end
378336
end

lib/membrane/bin/action.ex

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ defmodule Membrane.Bin.Action do
55
66
Returning actions is a way of bin interaction with
77
other components and parts of framework. Each action may be returned by any
8-
callback (except for `c:Membrane.Bin.handle_shutdown/2`, as it
9-
does not support returning any actions) unless explicitly stated otherwise.
8+
callback unless explicitly stated otherwise.
109
"""
1110

1211
alias Membrane.{Child, ParentSpec}
@@ -25,8 +24,8 @@ defmodule Membrane.Bin.Action do
2524
@typedoc """
2625
Action that instantiates children and links them according to `Membrane.ParentSpec`.
2726
28-
Children's playback state is changed to the current bin state.
29-
`c:Membrane.Parent.handle_spec_started/3` callback is executed once it happens.
27+
Children's playback is changed to the current bin playback.
28+
`c:Membrane.Parent.handle_spec_started/3` callback is executed once the children are spawned.
3029
"""
3130
@type spec_t :: {:spec, ParentSpec.t()}
3231

@@ -82,6 +81,22 @@ defmodule Membrane.Bin.Action do
8281
"""
8382
@type stop_timer_t :: {:stop_timer, timer_id :: any}
8483

84+
@typedoc """
85+
Terminates bin with given reason.
86+
87+
Termination reason follows the OTP semantics:
88+
- Use `:normal` for graceful termination. Allowed only when the parent already requested termination,
89+
i.e. after `c:Membrane.Bin.handle_terminate_request/2` is called. If the bin has no children, it
90+
terminates immediately. Otherwise, it switches to the zombie mode, requests all the children to terminate,
91+
waits for them to terminate and then terminates itself. In the zombie mode, no bin callbacks
92+
are called and all messages and calls to the bin are ignored (apart from Membrane internal
93+
messages)
94+
- If the reason is other than `:normal`, the bin terminates immediately. The bin supervisor
95+
terminates all the children with the reason `:shutdown`
96+
- If the reason is neither `:normal`, `:shutdown` nor `{:shutdown, term}`, an error is logged
97+
"""
98+
@type terminate_t :: {:terminate, reason :: :normal | :shutdown | {:shutdown, term} | term}
99+
85100
@typedoc """
86101
Type describing actions that can be returned from bin callbacks.
87102
@@ -96,4 +111,5 @@ defmodule Membrane.Bin.Action do
96111
| start_timer_t
97112
| timer_interval_t
98113
| stop_timer_t
114+
| terminate_t
99115
end

lib/membrane/bin/callback_context/other.ex renamed to lib/membrane/bin/callback_context/info.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
defmodule Membrane.Bin.CallbackContext.Other do
1+
defmodule Membrane.Bin.CallbackContext.Info do
22
@moduledoc """
33
Structure representing a context that is passed to the callback when
44
bin receives unrecognized message.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
defmodule Membrane.Bin.CallbackContext.Init do
2+
@moduledoc """
3+
Callback context for `c:Membrane.Bin.handle_init/2`.
4+
"""
5+
use Membrane.Core.Bin.CallbackContext
6+
end

lib/membrane/bin/callback_context/playback_change.ex

Lines changed: 0 additions & 7 deletions
This file was deleted.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
defmodule Membrane.Bin.CallbackContext.Playing do
2+
@moduledoc """
3+
Structure representing a context that is passed to the `c:Membrane.Bin.handle_playing/2` callback.
4+
"""
5+
use Membrane.Core.Bin.CallbackContext
6+
end
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
defmodule Membrane.Bin.CallbackContext.Setup do
2+
@moduledoc """
3+
Structure representing a context that is passed to the `c:Membrane.Bin.handle_setup/2` callback.
4+
"""
5+
use Membrane.Core.Bin.CallbackContext
6+
end

0 commit comments

Comments
 (0)