Skip to content
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Changelog

## 1.2.3
* Telemetry event's metadata `component_type` now correctly refers to the root type and not the implementing module [#958](https://github.com/membraneframework/membrane_core/pull/958)
* Makes sure that all handle_child_terminated callbacks for children in crash group are called before handle_crash_group_down of that group in [#962](https://github.com/membraneframework/membrane_core/pull/962)
* Telemetry event's metadata `component_type` now correctly refers to the root type and not the implementing module [#958](https://github.com/membraneframework/membrane_core/pull/958)

## 1.2.2
* Improve conditions for generating compilation warnings. [#956](https://github.com/membraneframework/membrane_core/pull/956)
Expand Down
5 changes: 4 additions & 1 deletion lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -729,11 +729,14 @@ defmodule Membrane.Core.Parent.ChildLifeController do

with {:ok, crash_group} <- CrashGroupUtils.get_child_crash_group(child_name, state) do
state =
CrashGroupUtils.handle_crash_group_member_death(child_name, crash_group, reason, state)
CrashGroupUtils.maybe_detonate_crash_group(child_name, crash_group, reason, state)
|> ChildrenModel.delete_child(child_name)

state = exec_handle_child_terminated(child_name, state)

state =
CrashGroupUtils.handle_crash_group_member_death(child_name, crash_group, reason, state)

{:ok, state}
else
:error when reason == :normal ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do
end
end

def handle_crash_group_member_death(child_name, %CrashGroup{} = group, crash_reason, state) do
state =
if group.detonating? do
state
else
detonate_crash_group(child_name, group, crash_reason, state)
end

def handle_crash_group_member_death(child_name, %CrashGroup{} = group, _crash_reason, state) do
all_members_dead? =
List.delete(group.members, child_name)
|> Enum.all?(&(not Map.has_key?(state.children, &1)))
Expand All @@ -71,7 +64,15 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do
end
end

defp detonate_crash_group(crash_initiator, %CrashGroup{} = group, crash_reason, state) do
@spec maybe_detonate_crash_group(Child.name(), CrashGroup.t(), any(), Parent.state()) ::
Parent.state()
def maybe_detonate_crash_group(
crash_initiator,
%CrashGroup{detonating?: false} = group,
reason,
state
)
when reason != :normal do
state = ChildLifeController.remove_children_from_specs(group.members, state)
state = LinkUtils.unlink_crash_group(group, state)

Expand All @@ -87,11 +88,13 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do
&1
| detonating?: true,
crash_initiator: crash_initiator,
crash_reason: crash_reason
crash_reason: reason
}
)
end

def maybe_detonate_crash_group(_child_name, %CrashGroup{}, _reason, state), do: state

defp cleanup_crash_group(group_name, state) do
state = exec_handle_crash_group_down(group_name, state)
{_group, state} = pop_in(state.crash_groups[group_name])
Expand Down
4 changes: 3 additions & 1 deletion lib/membrane/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,10 @@ defmodule Membrane.Pipeline do
) :: {[Action.common_actions()], state()}

@doc """
Callback invoked when a crash group crashes.
Callback invoked when a crash group crashes, when all children from
the crash group are already dead.

You can use this callback to respawn the children from the failed crashed crash group.
Context passed to this callback contains 2 additional fields: `:members` and `:crash_initiator`.
By default, it does nothing.
"""
Expand Down
1 change: 1 addition & 0 deletions lib/membrane/testing/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ defmodule Membrane.Testing.Pipeline do
## Example usage

Firstly, we can start the pipeline providing its options as a keyword list:

import Membrane.ChildrenSpec
children = [
child(source, %Membrane.Testing.Source{} ) |>
Expand Down
73 changes: 73 additions & 0 deletions test/membrane/integration/callbacks_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,77 @@ defmodule Membrane.Integration.CallbacksTest do

Testing.Pipeline.terminate(pipeline)
end

defmodule CrashingFilter do
use Membrane.Filter

def_input_pad :input, accepted_format: _any
def_output_pad :output, accepted_format: _any

@impl true
def handle_playing(_ctx, state) do
Process.send_after(self(), :raise, 500)
{[], state}
end

@impl true
def handle_info(:raise, _ctx, _state) do
raise "Raising"
end

@impl true
def handle_buffer(:input, buffer, _ctx, state) do
{[buffer: {:output, buffer}], state}
end
end

defmodule CallbacksOrderAssertingPipeline do
use Membrane.Pipeline

@impl true
def handle_init(_ctx, _opts) do
static_spec =
[
child(:source, %Membrane.Testing.Source{output: [1, 2, 3]})
|> child(:connector1, Membrane.Connector),
child(:connector2, Membrane.Connector)
|> child(:sink, Membrane.Debug.Sink)
]

crash_group_spec =
get_child(:connector1)
|> child(:filter1, Membrane.Debug.Filter)
|> child(:filter2, CrashingFilter)
|> child(:filter3, Membrane.Debug.Filter)
|> get_child(:connector2)

state = %{crash_group_children: MapSet.new([:filter1, :filter2, :filter3])}

{[
spec: static_spec,
spec: {crash_group_spec, group: :crash_group, crash_group_mode: :temporary}
], state}
end

@impl true
def handle_child_terminated(child, _ctx, state) do
state = %{crash_group_children: MapSet.delete(state.crash_group_children, child)}
{[], state}
end

@impl true
def handle_crash_group_down(_group_id, _ctx, state) do
assert MapSet.size(state.crash_group_children) == 0
{[terminate: :shutdown], state}
end
end

test "handle_child_terminated and handle_crash_group_down in proper order" do
pipeline = Testing.Pipeline.start_supervised!(module: CallbacksOrderAssertingPipeline)
Process.monitor(pipeline)

receive do
{:DOWN, _ref, _process, ^pipeline, _reason} -> :ok
end
end
end
Loading