Skip to content

Commit 14d8018

Browse files
authored
Modify 5s linking timeout constraint (#810)
* Modify 5s linking timeout constraint * Make dialyzer happy * Maybe fix bug occuring in FishJam * Fix linking timeout mechanism * Remove leftover * Implement suggestions from CR & delete unnecessary state field * Remove leftover * Satisfy lint * with -> case
1 parent e2c282f commit 14d8018

File tree

9 files changed

+82
-55
lines changed

9 files changed

+82
-55
lines changed

lib/membrane/bin/pad_data.ex

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ defmodule Membrane.Bin.PadData do
2727
link_id: private_field,
2828
endpoint: private_field,
2929
linked?: private_field,
30-
response_received?: private_field
30+
response_received?: private_field,
31+
linking_timeout_id: private_field,
32+
linked_in_spec?: private_field
3133
}
3234

3335
@enforce_keys [
@@ -40,7 +42,9 @@ defmodule Membrane.Bin.PadData do
4042
:endpoint,
4143
:linked?,
4244
:response_received?,
43-
:spec_ref
45+
:spec_ref,
46+
:linking_timeout_id,
47+
:linked_in_spec?
4448
]
4549

4650
defstruct @enforce_keys

lib/membrane/core/bin.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,8 @@ defmodule Membrane.Core.Bin do
209209
{:noreply, state}
210210
end
211211

212-
defp do_handle_info(Message.new(:linking_timeout, pad_ref), state) do
213-
PadController.handle_linking_timeout(pad_ref, state)
212+
defp do_handle_info(Message.new(:linking_timeout, [pad_ref, linking_timeout_id]), state) do
213+
:ok = PadController.handle_linking_timeout(pad_ref, linking_timeout_id, state)
214214
{:noreply, state}
215215
end
216216

lib/membrane/core/bin/pad_controller.ex

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ defmodule Membrane.Core.Bin.PadController do
88
alias Membrane.{Core, LinkError, Pad}
99
alias Membrane.Core.Bin.{ActionHandler, CallbackContext, State}
1010
alias Membrane.Core.{CallbackHandler, Child, Message}
11-
alias Membrane.Core.Child.PadModel
1211
alias Membrane.Core.Element.StreamFormatController
1312
alias Membrane.Core.Parent.{ChildLifeController, Link, SpecificationParser}
1413

15-
require Membrane.Core.Child.PadModel
14+
require Membrane.Core.Child.PadModel, as: PadModel
1615
require Membrane.Core.Message
1716
require Membrane.Logger
1817
require Membrane.Pad
@@ -50,8 +49,7 @@ defmodule Membrane.Core.Bin.PadController do
5049
state =
5150
case PadModel.get_data(state, pad_ref) do
5251
{:error, :unknown_pad} ->
53-
init_pad_data(pad_ref, pad_info, state)
54-
|> Map.update!(:pad_refs, &[pad_ref | &1])
52+
init_pad_data(pad_ref, state)
5553

5654
# This case is for pads that were instantiated before the external link request,
5755
# that is in the internal link request (see `handle_internal_link_request/4`).
@@ -69,17 +67,19 @@ defmodule Membrane.Core.Bin.PadController do
6967
state
7068
end
7169

72-
state = PadModel.update_data!(state, pad_ref, &%{&1 | link_id: link_id, options: pad_options})
73-
state = maybe_handle_pad_added(pad_ref, state)
70+
linking_timeout_id = make_ref()
7471

75-
unless PadModel.get_data!(state, pad_ref, :endpoint) do
76-
# If there's no endpoint associated to the pad, no internal link to the pad
77-
# has been requested in the bin yet
78-
_ref = Process.send_after(self(), Message.new(:linking_timeout, pad_ref), 5000)
79-
:ok
80-
end
72+
state =
73+
PadModel.update_data!(
74+
state,
75+
pad_ref,
76+
&%{&1 | link_id: link_id, linking_timeout_id: linking_timeout_id, options: pad_options}
77+
)
8178

82-
state
79+
message = Message.new(:linking_timeout, [pad_ref, linking_timeout_id])
80+
_ref = Process.send_after(self(), message, 5000)
81+
82+
maybe_handle_pad_added(pad_ref, state)
8383
end
8484

8585
@spec remove_pad(Pad.ref(), State.t()) :: State.t()
@@ -102,16 +102,16 @@ defmodule Membrane.Core.Bin.PadController do
102102
end
103103
end
104104

105-
@spec handle_linking_timeout(Pad.ref(), State.t()) :: :ok | no_return()
106-
def handle_linking_timeout(pad_ref, state) do
107-
case PadModel.get_data(state, pad_ref) do
108-
{:ok, %{endpoint: nil} = pad_data} ->
109-
raise Membrane.LinkError,
110-
"Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{inspect(pad_data, pretty: true)}"
111-
112-
_other ->
113-
:ok
105+
@spec handle_linking_timeout(Pad.ref(), reference(), State.t()) :: :ok | no_return()
106+
def handle_linking_timeout(pad_ref, linking_timeout_id, state) do
107+
with {:ok, pad_data} <- PadModel.get_data(state, pad_ref),
108+
%{linking_timeout_id: ^linking_timeout_id, linked_in_spec?: false} <- pad_data do
109+
raise Membrane.LinkError, """
110+
Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)}
111+
"""
114112
end
113+
114+
:ok
115115
end
116116

117117
@doc """
@@ -139,7 +139,7 @@ defmodule Membrane.Core.Bin.PadController do
139139

140140
# Static pads can be linked internally before the external link request
141141
pad_info.availability == :always ->
142-
init_pad_data(pad_ref, pad_info, state)
142+
init_pad_data(pad_ref, state)
143143

144144
true ->
145145
raise LinkError, "Dynamic pads must be firstly linked externally, then internally"
@@ -284,7 +284,6 @@ defmodule Membrane.Core.Bin.PadController do
284284
with {:ok, %{availability: :on_request}} <- PadModel.get_data(state, pad_ref) do
285285
{pad_data, state} =
286286
maybe_handle_pad_removed(pad_ref, state)
287-
|> Map.update!(:pad_refs, &List.delete(&1, pad_ref))
288287
|> PadModel.pop_data!(pad_ref)
289288

290289
if pad_data.endpoint do
@@ -316,8 +315,8 @@ defmodule Membrane.Core.Bin.PadController do
316315
end
317316

318317
@spec maybe_handle_pad_added(Pad.ref(), Core.Bin.State.t()) :: Core.Bin.State.t()
319-
defp maybe_handle_pad_added(ref, state) do
320-
%{options: pad_opts, availability: availability} = PadModel.get_data!(state, ref)
318+
defp maybe_handle_pad_added(pad_ref, state) do
319+
%{options: pad_opts, availability: availability} = PadModel.get_data!(state, pad_ref)
321320

322321
if Pad.availability_mode(availability) == :dynamic do
323322
context = &CallbackContext.from_state(&1, pad_options: pad_opts)
@@ -326,7 +325,7 @@ defmodule Membrane.Core.Bin.PadController do
326325
:handle_pad_added,
327326
ActionHandler,
328327
%{context: context},
329-
[ref],
328+
[pad_ref],
330329
state
331330
)
332331
else
@@ -351,9 +350,15 @@ defmodule Membrane.Core.Bin.PadController do
351350
end
352351
end
353352

354-
defp init_pad_data(pad_ref, pad_info, state) do
353+
@spec init_pad_data(Pad.ref(), State.t()) :: State.t()
354+
def init_pad_data(pad_ref, state) do
355+
if PadModel.assert_instance(state, pad_ref) == :ok do
356+
raise "Cannot init pad data for pad #{inspect(pad_ref)}, because it already exists"
357+
end
358+
355359
pad_data =
356-
pad_info
360+
state.pads_info
361+
|> Map.get(Pad.name_by_ref(pad_ref))
357362
|> Map.delete(:accepted_formats_str)
358363
|> Map.merge(%{
359364
ref: pad_ref,
@@ -362,10 +367,12 @@ defmodule Membrane.Core.Bin.PadController do
362367
linked?: false,
363368
response_received?: false,
364369
spec_ref: nil,
365-
options: nil
370+
options: nil,
371+
linking_timeout_id: nil,
372+
linked_in_spec?: false
366373
})
367374
|> then(&struct!(Membrane.Bin.PadData, &1))
368375

369-
put_in(state, [:pads_data, pad_ref], pad_data)
376+
put_in(state.pads_data[pad_ref], pad_data)
370377
end
371378
end

lib/membrane/core/bin/state.ex

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ defmodule Membrane.Core.Bin.State do
88
use Bunch
99
use Bunch.Access
1010

11-
alias Membrane.{Child, Clock, Pad, Sync}
11+
alias Membrane.{Child, Clock, Sync}
1212
alias Membrane.Core.Child.PadModel
1313
alias Membrane.Core.Parent.ChildLifeController
1414
alias Membrane.Core.Parent.{ChildrenModel, CrashGroup, Link}
@@ -20,7 +20,6 @@ defmodule Membrane.Core.Bin.State do
2020
children: ChildrenModel.children(),
2121
subprocess_supervisor: pid(),
2222
name: Membrane.Bin.name() | nil,
23-
pad_refs: [Pad.ref()],
2423
pads_info: PadModel.pads_info() | nil,
2524
pads_data: PadModel.pads_data() | nil,
2625
parent_pid: pid,
@@ -62,7 +61,6 @@ defmodule Membrane.Core.Bin.State do
6261
parent_pid: nil,
6362
playback: :stopped,
6463
internal_state: nil,
65-
pad_refs: [],
6664
pads_info: nil,
6765
children: %{},
6866
links: %{},

lib/membrane/core/child/pad_spec_handler.ex

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ defmodule Membrane.Core.Child.PadSpecHandler do
2121
| pads_info:
2222
get_pads(state)
2323
|> Map.new(),
24-
pads_data: %{},
25-
pad_refs: []
24+
pads_data: %{}
2625
}
2726
end
2827

lib/membrane/core/element/pad_controller.ex

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,7 @@ defmodule Membrane.Core.Element.PadController do
229229
state = generate_eos_if_needed(pad_ref, state)
230230
state = maybe_handle_pad_removed(pad_ref, state)
231231

232-
{pad_data, state} =
233-
Map.update!(state, :pad_refs, &List.delete(&1, pad_ref))
234-
|> PadModel.pop_data!(pad_ref)
232+
{pad_data, state} = PadModel.pop_data!(state, pad_ref)
235233

236234
with %{direction: :input, flow_control: :auto, other_effective_flow_control: :pull} <-
237235
pad_data do
@@ -321,9 +319,7 @@ defmodule Membrane.Core.Element.PadController do
321319
|> then(&struct!(Membrane.Element.PadData, &1))
322320

323321
state =
324-
state
325-
|> put_in([:pads_data, endpoint.pad_ref], pad_data)
326-
|> Map.update!(:pad_refs, &[endpoint.pad_ref | &1])
322+
put_in(state, [:pads_data, endpoint.pad_ref], pad_data)
327323

328324
:ok =
329325
AtomicDemand.set_sender_status(

lib/membrane/core/element/state.ex

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ defmodule Membrane.Core.Element.State do
1919
type: Element.type(),
2020
name: Element.name(),
2121
internal_state: Element.state() | nil,
22-
pad_refs: [Pad.ref()] | nil,
2322
pads_info: PadModel.pads_info() | nil,
2423
pads_data: PadModel.pads_data() | nil,
2524
parent_pid: pid,
@@ -65,7 +64,6 @@ defmodule Membrane.Core.Element.State do
6564
playback: :stopped,
6665
type: nil,
6766
internal_state: nil,
68-
pad_refs: [],
6967
pads_info: %{},
7068
synchronization: nil,
7169
delayed_demands: MapSet.new(),

lib/membrane/core/parent/child_life_controller.ex

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
55
alias __MODULE__.{CrashGroupUtils, LinkUtils, StartupUtils}
66
alias Membrane.{Child, ChildrenSpec}
77
alias Membrane.Core.{Bin, CallbackHandler, Component, Parent, Pipeline}
8+
alias Membrane.Core.Bin.PadController
89

910
alias Membrane.Core.Parent.{
1011
ChildEntryParser,
@@ -17,6 +18,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
1718
alias Membrane.Pad
1819
alias Membrane.ParentError
1920

21+
require Membrane.Core.Child.PadModel, as: PadModel
2022
require Membrane.Core.Component
2123
require Membrane.Core.Message, as: Message
2224
require Membrane.Logger
@@ -154,7 +156,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
154156

155157
state =
156158
put_in(state, [:pending_specs, spec_ref], %{
157-
status: :initializing,
159+
status: :created,
158160
children_names: MapSet.new(all_children_names),
159161
links_ids: Enum.map(links, & &1.id),
160162
dependent_specs: dependent_specs,
@@ -309,14 +311,37 @@ defmodule Membrane.Core.Parent.ChildLifeController do
309311
end
310312
end
311313

314+
defp do_proceed_spec_startup(spec_ref, %{status: :created} = spec_data, state) do
315+
state =
316+
with %Bin.State{} <- state do
317+
bin_pads_linked_in_spec =
318+
spec_data.links_ids
319+
|> Enum.map(&Map.fetch!(state.links, &1))
320+
|> Enum.flat_map(&[&1.from, &1.to])
321+
|> Enum.flat_map(fn
322+
%{child: {Membrane.Bin, :itself}, pad_ref: pad_ref} -> [pad_ref]
323+
_other -> []
324+
end)
325+
326+
bin_pads_linked_in_spec
327+
|> Enum.reduce(state, fn pad_ref, state ->
328+
case PadModel.assert_instance(state, pad_ref) do
329+
:ok -> state
330+
{:error, :unknown_pad} -> PadController.init_pad_data(pad_ref, state)
331+
end
332+
|> PadModel.set_data!(pad_ref, :linked_in_spec?, true)
333+
end)
334+
end
335+
336+
do_proceed_spec_startup(spec_ref, %{spec_data | status: :initializing}, state)
337+
end
338+
312339
defp do_proceed_spec_startup(spec_ref, %{status: :initializing} = spec_data, state) do
313340
Membrane.Logger.debug(
314341
"Proceeding spec #{inspect(spec_ref)} startup: initializing, dependent specs: #{inspect(MapSet.to_list(spec_data.dependent_specs))}"
315342
)
316343

317-
%{children: children} = state
318-
319-
if Enum.all?(spec_data.children_names, &Map.fetch!(children, &1).initialized?) and
344+
if Enum.all?(spec_data.children_names, &Map.fetch!(state.children, &1).initialized?) and
320345
Enum.empty?(spec_data.dependent_specs) do
321346
Membrane.Logger.debug("Spec #{inspect(spec_ref)} status changed to initialized")
322347

@@ -353,7 +378,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
353378
end
354379

355380
defp do_proceed_spec_startup(spec_ref, %{status: :linking_internally} = spec_data, state) do
356-
if Enum.empty?(spec_data.awaiting_responses) do
381+
if MapSet.size(spec_data.awaiting_responses) == 0 do
357382
state =
358383
spec_data.links_ids
359384
|> Enum.map(&Map.fetch!(state.links, &1))

test/membrane/core/element/pad_controller_test.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ defmodule Membrane.Core.Element.PadControllerTest do
6565
state
6666
)
6767

68-
assert Map.drop(new_state, [:pads_data, :pad_refs]) ==
69-
Map.drop(state, [:pads_data, :pad_refs])
68+
assert Map.delete(new_state, :pads_data) ==
69+
Map.delete(state, :pads_data)
7070

7171
assert PadModel.assert_instance(new_state, :input) == :ok
7272
end

0 commit comments

Comments
 (0)