Skip to content

Commit 71f25e7

Browse files
committed
WIP termination
1 parent 4ffd513 commit 71f25e7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+750
-1002
lines changed

lib/membrane/component_path.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,15 @@ defmodule Membrane.ComponentPath do
4545
Returns formatted string of given path's names joined with separator.
4646
"""
4747
@spec format(path_t(), String.t()) :: String.t()
48-
def format(path, separator \\ "/") do
48+
def format(path, separator \\ "") do
4949
path |> Enum.join(separator)
5050
end
5151

5252
@doc """
5353
Works the same as `format/2` but uses currently stored path
5454
"""
5555
@spec get_formatted(String.t()) :: String.t()
56-
def get_formatted(separator \\ "/") do
56+
def get_formatted(separator \\ "") do
5757
Process.get(@key, []) |> format(separator)
5858
end
5959

lib/membrane/core/bin.ex

Lines changed: 52 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ defmodule Membrane.Core.Bin do
2727
parent: pid,
2828
user_options: Membrane.Bin.options_t(),
2929
parent_clock: Membrane.Clock.t(),
30-
log_metadata: Keyword.t()
30+
log_metadata: Keyword.t(),
31+
sync: :membrane_no_sync
3132
}
3233

3334
@doc """
@@ -62,11 +63,22 @@ defmodule Membrane.Core.Bin do
6263
bin options: #{inspect(user_options)}
6364
""")
6465

66+
start_fun =
67+
&GenServer.start_link(Membrane.Core.Bin, Map.put(options, :children_supervisor, &1))
68+
6569
# rpc if necessary
6670
if node do
67-
:rpc.call(node, GenServer, method, [Membrane.Core.Bin, options])
71+
:rpc.call(node, Membrane.Core.Parent.Supervisor, :go_brrr, [
72+
method,
73+
start_fun,
74+
options.setup_logger
75+
])
6876
else
69-
apply(GenServer, method, [Membrane.Core.Bin, options])
77+
Membrane.Core.Parent.Supervisor.go_brrr(
78+
method,
79+
start_fun,
80+
options.setup_logger
81+
)
7082
end
7183
else
7284
raise """
@@ -76,25 +88,13 @@ defmodule Membrane.Core.Bin do
7688
end
7789
end
7890

79-
@doc """
80-
Changes bin's playback state to `:stopped` and terminates its process
81-
"""
82-
@spec stop_and_terminate(bin :: pid) :: :ok
83-
def stop_and_terminate(bin) do
84-
Message.send(bin, :terminate)
85-
:ok
86-
end
87-
8891
@impl GenServer
8992
def init(options) do
90-
%{parent: parent, name: name, module: module, log_metadata: log_metadata} = options
91-
92-
Process.monitor(parent)
93-
94-
name_str = if String.valid?(name), do: name, else: inspect(name)
95-
:ok = Membrane.Logger.set_prefix(name_str <> " bin")
96-
:ok = Logger.metadata(log_metadata)
97-
:ok = ComponentPath.set_and_append(log_metadata[:parent_path] || [], name_str <> " bin")
93+
%{name: name, module: module} = options
94+
self_pid = self()
95+
setup_logger = fn -> options.setup_logger.(self_pid) end
96+
log_metadata = setup_logger.()
97+
Message.send(options.children_supervisor, :setup_logger, setup_logger)
9898

9999
Telemetry.report_init(:bin)
100100

@@ -116,7 +116,8 @@ defmodule Membrane.Core.Bin do
116116
stream_sync: Sync.no_sync(),
117117
latency: 0
118118
},
119-
children_log_metadata: log_metadata
119+
children_log_metadata: log_metadata,
120+
children_supervisor: options.children_supervisor
120121
}
121122
|> Child.PadSpecHandler.init_pads()
122123

@@ -129,7 +130,13 @@ defmodule Membrane.Core.Bin do
129130
state
130131
)
131132

132-
{:ok, state}
133+
{:ok, state, {:continue, :init}}
134+
end
135+
136+
@impl GenServer
137+
def handle_continue(:init, state) do
138+
state = Parent.LifecycleController.handle_setup(state)
139+
{:noreply, state}
133140
end
134141

135142
@impl GenServer
@@ -153,21 +160,6 @@ defmodule Membrane.Core.Bin do
153160
{:noreply, state}
154161
end
155162

156-
@impl GenServer
157-
def handle_info(
158-
Message.new(:playback_state_changed, [pid, new_playback_state]),
159-
state
160-
) do
161-
state = Parent.ChildLifeController.child_playback_changed(pid, new_playback_state, state)
162-
{:noreply, state}
163-
end
164-
165-
@impl GenServer
166-
def handle_info(Message.new(:change_playback_state, new_state), state) do
167-
state = Parent.LifecycleController.change_playback_state(new_state, state)
168-
{:noreply, state}
169-
end
170-
171163
@impl GenServer
172164
def handle_info(Message.new(:stream_management_event, [element_name, pad_ref, event]), state) do
173165
state =
@@ -206,25 +198,32 @@ defmodule Membrane.Core.Bin do
206198
end
207199

208200
@impl GenServer
209-
def handle_info({:membrane_clock_ratio, clock, ratio}, state) do
210-
state = TimerController.handle_clock_update(clock, ratio, state)
201+
def handle_info(Message.new(:child_death, [name, reason]), state) do
202+
state = Parent.ChildLifeController.handle_child_death(name, reason, state)
211203
{:noreply, state}
212204
end
213205

214206
@impl GenServer
215-
def handle_info({:DOWN, _ref, :process, pid, reason} = message, state) do
216-
cond do
217-
is_child_pid?(pid, state) ->
218-
state = Parent.ChildLifeController.handle_child_death(pid, reason, state)
219-
{:noreply, state}
220-
221-
is_parent_pid?(pid, state) ->
222-
{:stop, {:shutdown, :parent_crash}, state}
223-
224-
true ->
225-
state = Parent.LifecycleController.handle_info(message, state)
226-
{:noreply, state}
227-
end
207+
def handle_info(Message.new(:play), state) do
208+
state = Parent.LifecycleController.handle_play(state)
209+
{:noreply, state}
210+
end
211+
212+
@impl GenServer
213+
def handle_info(Message.new(:initialized, child), state) do
214+
state = Parent.ChildLifeController.handle_child_initialized(child, state)
215+
{:noreply, state}
216+
end
217+
218+
@impl GenServer
219+
def handle_info(Message.new(_type, _args, _opts) = message, _state) do
220+
raise Membrane.BinError, "Received invalid message #{inspect(message)}"
221+
end
222+
223+
@impl GenServer
224+
def handle_info({:membrane_clock_ratio, clock, ratio}, state) do
225+
state = TimerController.handle_clock_update(clock, ratio, state)
226+
{:noreply, state}
228227
end
229228

230229
@impl GenServer
@@ -251,14 +250,6 @@ defmodule Membrane.Core.Bin do
251250
@impl GenServer
252251
def terminate(reason, state) do
253252
Telemetry.report_terminate(:bin)
254-
:ok = state.module.handle_shutdown(reason, state.internal_state)
255-
end
256-
257-
defp is_parent_pid?(pid, state) do
258-
state.parent_pid == pid
259-
end
260-
261-
defp is_child_pid?(pid, state) do
262-
Enum.any?(state.children, fn {_name, entry} -> entry.pid == pid end)
253+
Parent.LifecycleController.handle_terminate(reason, state)
263254
end
264255
end

lib/membrane/core/bin/action_handler.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ defmodule Membrane.Core.Bin.ActionHandler do
2222

2323
@impl CallbackHandler
2424
def handle_action({:remove_child, children}, _cb, _params, state) do
25-
Parent.ChildLifeController.handle_remove_child(children, state)
25+
Parent.ChildLifeController.handle_remove_children(children, state)
2626
end
2727

2828
@impl CallbackHandler

lib/membrane/core/bin/state.ex

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ defmodule Membrane.Core.Bin.State do
4343
pending_specs: LinkHandler.pending_specs_t()
4444
}
4545

46-
@enforce_keys [:module, :synchronization]
46+
@enforce_keys [:module, :synchronization, :children_supervisor]
4747
defstruct @enforce_keys ++
4848
[
4949
internal_state: nil,
@@ -57,6 +57,9 @@ defmodule Membrane.Core.Bin.State do
5757
crash_groups: %{},
5858
children_log_metadata: [],
5959
links: [],
60-
pending_specs: %{}
60+
pending_specs: %{},
61+
status: :initializing,
62+
play_request?: false,
63+
terminating?: false
6164
]
6265
end

lib/membrane/core/element.ex

Lines changed: 13 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ defmodule Membrane.Core.Element do
3030
State
3131
}
3232

33-
alias Membrane.Core.{PlaybackHandler, TimerController}
33+
alias Membrane.Core.TimerController
3434

3535
require Membrane.Core.Message, as: Message
3636
require Membrane.Core.Telemetry, as: Telemetry
@@ -89,35 +89,11 @@ defmodule Membrane.Core.Element do
8989
end
9090
end
9191

92-
@doc """
93-
Stops given element process.
94-
95-
It will wait for reply for amount of time passed as second argument
96-
(in milliseconds).
97-
98-
Will trigger calling `c:Membrane.Element.Base.handle_shutdown/2`
99-
callback.
100-
"""
101-
@spec shutdown(pid, timeout) :: :ok
102-
def shutdown(server, timeout \\ 5000) do
103-
GenServer.stop(server, :normal, timeout)
104-
:ok
105-
end
106-
10792
@impl GenServer
10893
def init(options) do
109-
%{parent: parent, name: name, log_metadata: log_metadata} = options
110-
111-
Process.monitor(parent)
112-
name_str = if String.valid?(name), do: name, else: inspect(name)
113-
:ok = Membrane.Logger.set_prefix(name_str)
114-
:ok = Logger.metadata(log_metadata)
115-
:ok = ComponentPath.set_and_append(log_metadata[:parent_path] || [], name_str)
116-
94+
options.setup_logger.(self())
11795
Telemetry.report_init(:element)
118-
11996
state = Map.take(options, [:module, :name, :parent_clock, :sync, :parent]) |> State.new()
120-
12197
state = LifecycleController.handle_init(options.user_options, state)
12298
{:ok, state, {:continue, :init}}
12399
end
@@ -131,7 +107,7 @@ defmodule Membrane.Core.Element do
131107
@impl GenServer
132108
def terminate(reason, state) do
133109
Telemetry.report_terminate(:element)
134-
LifecycleController.handle_shutdown(reason, state)
110+
LifecycleController.handle_terminate(reason, state)
135111
end
136112

137113
@impl GenServer
@@ -161,12 +137,6 @@ defmodule Membrane.Core.Element do
161137
"Received invalid message #{inspect(message)} from #{inspect(pid)}"
162138
end
163139

164-
@impl GenServer
165-
def handle_info({:DOWN, _ref, :process, parent_pid, reason}, %{parent_pid: parent_pid} = state) do
166-
:ok = LifecycleController.handle_pipeline_down(reason, state)
167-
{:stop, {:shutdown, :parent_crash}, state}
168-
end
169-
170140
@impl GenServer
171141
def handle_info(message, state) do
172142
Telemetry.report_metric(
@@ -179,17 +149,6 @@ defmodule Membrane.Core.Element do
179149

180150
@compile {:inline, do_handle_info: 2}
181151

182-
defp do_handle_info(Message.new(:change_playback_state, new_playback_state), state) do
183-
{:ok, state} =
184-
PlaybackHandler.change_playback_state(
185-
new_playback_state,
186-
Element.LifecycleController,
187-
state
188-
)
189-
190-
{:noreply, state}
191-
end
192-
193152
defp do_handle_info(Message.new(:demand, size, _opts) = msg, state) do
194153
pad_ref = Message.for_pad(msg)
195154
state = DemandController.handle_demand(pad_ref, size, state)
@@ -229,20 +188,25 @@ defmodule Membrane.Core.Element do
229188
{:noreply, state}
230189
end
231190

232-
defp do_handle_info({:membrane_clock_ratio, clock, ratio}, state) do
233-
state = TimerController.handle_clock_update(clock, ratio, state)
234-
{:noreply, state}
235-
end
236-
237191
defp do_handle_info(Message.new(:parent_notification, notification), state) do
238192
state = Core.Child.LifecycleController.handle_parent_notification(notification, state)
239193
{:noreply, state}
240194
end
241195

196+
defp do_handle_info(Message.new(:terminate), state) do
197+
state = LifecycleController.handle_terminate_request(state)
198+
{:stop, :normal, state}
199+
end
200+
242201
defp do_handle_info(Message.new(_type, _args, _opts) = message, _state) do
243202
raise Membrane.ElementError, "Received invalid message #{inspect(message)}"
244203
end
245204

205+
defp do_handle_info({:membrane_clock_ratio, clock, ratio}, state) do
206+
state = TimerController.handle_clock_update(clock, ratio, state)
207+
{:noreply, state}
208+
end
209+
246210
defp do_handle_info(message, state) do
247211
state = LifecycleController.handle_info(message, state)
248212
{:noreply, state}

lib/membrane/core/element/action_handler.ex

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ defmodule Membrane.Core.Element.ActionHandler do
1010

1111
alias Membrane.{ActionError, Buffer, Caps, ElementError, Event, Pad, PadDirectionError}
1212
alias Membrane.Core.Child.PadModel
13-
alias Membrane.Core.Element.{DemandHandler, LifecycleController, PadController, State}
14-
alias Membrane.Core.{Events, Message, PlaybackHandler, TimerController}
13+
alias Membrane.Core.Element.{DemandHandler, PadController, State}
14+
alias Membrane.Core.{Events, Message, TimerController}
1515
alias Membrane.Core.Telemetry
1616
alias Membrane.Element.Action
1717

@@ -72,24 +72,6 @@ defmodule Membrane.Core.Element.ActionHandler do
7272
)
7373
end
7474

75-
@impl CallbackHandler
76-
def handle_action({:playback_change, :suspend}, cb, _params, state)
77-
when cb in [
78-
:handle_stopped_to_prepared,
79-
:handle_playing_to_prepared,
80-
:handle_prepared_to_playing,
81-
:handle_prepared_to_stopped
82-
] do
83-
{:ok, state} = PlaybackHandler.suspend_playback_change(state)
84-
state
85-
end
86-
87-
@impl CallbackHandler
88-
def handle_action({:playback_change, :resume}, _cb, _params, state) do
89-
{:ok, state} = PlaybackHandler.continue_playback_change(LifecycleController, state)
90-
state
91-
end
92-
9375
@impl CallbackHandler
9476
def handle_action({:buffer, _args} = action, cb, _params, %State{
9577
playback: %{state: playback_state}

0 commit comments

Comments
 (0)