Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 81 additions & 7 deletions lib/flow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,49 @@ defmodule Flow do
consumers child specification. Similar to `start_link/1`, they return
either `{:ok, pid}` or `{:error, reason}`.

## Naming Flows, Partitions and Stages

By default all process stages generated by `Flow` are launched as a list
of children under a flow supervisor process.
Those processes have no name registrations and can only be identified by PID.

By using the :name option to `start_link/2` all generated GenStage processes
automatically get unique name registration.
e.g.

Flow.from_stages([:producer], stages: 1)
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition(stages: 2)
|> Flow.reduce(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Flow.start_link(name: :myflow)

Results in named flow processes :
[:myflow] # flow top process
[:myflow_sup ] # flow supervisor
[:myflow_p0_0 ] # First partition stage (input stage)
[:myflow_p1_0] [:myflow_p1_1] # Second partition stages

When using named flows, partition names can be explicitly specified using
the :name option on statements that effectively create partition processes
like `partition/2`, `from_stages/2` and `from_enumerables/2`
e.g.

Flow.from_stages([:producer], stages: 1, name: :input)
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition(stages: 2, name: :output)
|> Flow.reduce(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Flow.start_link(name: :myflow)

Results in named flow processes :
[:myflow] # flow top process
[:myflow_sup ] # flow supervisor
[:myflow_input_0 ] # First partition stage (input stage)
[:myflow_output_0] [:myflow_output_1] # Second partition stages

## Performance discussions

In this section we will discuss points related to performance
Expand Down Expand Up @@ -558,6 +601,8 @@ defmodule Flow do

* `:window` - a window to run the next stages in, see `Flow.Window`
* `:stages` - the number of stages
* `:name` - the name for the stages in this partition,
requires the flow to be 'named'
* `:buffer_keep` - how the buffer should behave, see `c:GenStage.init/1`
* `:buffer_size` - how many events to buffer, see `c:GenStage.init/1`
* `:shutdown` - the shutdown time for this stage when the flow is shut down.
Expand All @@ -577,7 +622,7 @@ defmodule Flow do
def from_enumerables(enumerables, options \\ [])

def from_enumerables([_ | _] = enumerables, options) do
options = stages(options)
options = stages(options) |> partition_id()
{window, options} = Keyword.pop(options, :window, Flow.Window.global())
%Flow{producers: {:enumerables, enumerables}, options: options, window: window}
end
Expand All @@ -600,6 +645,8 @@ defmodule Flow do

* `:window` - a window to run the next stages in, see `Flow.Window`
* `:stages` - the number of stages
* `:name` - the name for the stages in this partition,
requires the flow to be 'named'
* `:buffer_keep` - how the buffer should behave, see `c:GenStage.init/1`
* `:buffer_size` - how many events to buffer, see `c:GenStage.init/1`
* `:shutdown` - the shutdown time for this stage when the flow is shut down.
Expand Down Expand Up @@ -639,7 +686,7 @@ defmodule Flow do
def from_stages(producers, options \\ [])

def from_stages([_ | _] = producers, options) do
options = stages(options)
options = stages(options) |> partition_id()
{window, options} = Keyword.pop(options, :window, Flow.Window.global())
producers = Enum.map(producers, &{&1, []})

Expand Down Expand Up @@ -682,7 +729,7 @@ defmodule Flow do
def from_specs(producers, options \\ [])

def from_specs([_ | _] = producers, options) do
options = stages(options)
options = stages(options) |> partition_id()
{window, options} = Keyword.pop(options, :window, Flow.Window.global())

fun = fn start_link ->
Expand Down Expand Up @@ -775,7 +822,7 @@ defmodule Flow do
def through_stages(flow, producer_consumers, options \\ [])

def through_stages(%Flow{} = flow, [_ | _] = producer_consumers, options) do
options = stages(options)
options = stages(options) |> partition_id([flow])
{window, options} = Keyword.pop(options, :window, Flow.Window.global())

%Flow{
Expand Down Expand Up @@ -824,7 +871,7 @@ defmodule Flow do
def through_specs(flow, producer_consumers, options \\ [])

def through_specs(%Flow{} = flow, [_ | _] = producer_consumers, options) do
options = stages(options)
options = stages(options) |> partition_id([flow])
{window, options} = Keyword.pop(options, :window, Flow.Window.global())

%Flow{
Expand Down Expand Up @@ -998,6 +1045,7 @@ defmodule Flow do
## Options

* `:name` - the name of the flow
When non-nil, all processes created within the flow get default names

* `:demand` - configures the demand on the flow producers to `:forward`
or `:accumulate`. The default is `:forward`. See `GenStage.demand/2`
Expand Down Expand Up @@ -1250,6 +1298,8 @@ defmodule Flow do
* `:window` - a `Flow.Window` struct which controls how the
reducing function behaves, see `Flow.Window` for more information.
* `:stages` - the number of partitions (reducer stages)
* `:name` - the name for the stages in the partition,
requires the flow to be 'named'
* `:shutdown` - the shutdown time for this stage when the flow is shut down.
The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds.
* `:key` - the key to use when partitioning. It is a function
Expand Down Expand Up @@ -1403,6 +1453,8 @@ defmodule Flow do
* `:window` - a `Flow.Window` struct which controls how the
reducing function behaves, see `Flow.Window` for more information.
* `:stages` - the number of partitions (reducer stages)
* `:name` - the name for the stages in the partition,
requires the flow to be 'named'
* `:shutdown` - the shutdown time for this stage when the flow is shut down.
The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds.

Expand All @@ -1424,6 +1476,8 @@ defmodule Flow do
* `:window` - a `Flow.Window` struct which controls how the
reducing function behaves, see `Flow.Window` for more information.
* `:stages` - the number of partitions (reducer stages)
* `:name` - the name for the stages in the partition,
requires the flow to be 'named'
* `:shutdown` - the shutdown time for this stage when the flow is shut down.
The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds.

Expand All @@ -1435,16 +1489,36 @@ defmodule Flow do
end

def merge([%Flow{} | _] = flows, dispatcher, options) when is_list(options) do
options = options |> stages() |> put_dispatcher(dispatcher)
options = options |> stages() |> partition_id(flows) |> put_dispatcher(dispatcher)
{window, options} = Keyword.pop(options, :window, Flow.Window.global())
%Flow{producers: {:flows, flows}, options: options, window: window}
%Flow{producers: {:flows, index_flows(flows)}, options: options, window: window}
end

def merge(other, _dispatcher, options) when is_list(options) do
raise ArgumentError,
"expected a flow or a non-empty list of flows as first argument, got: #{inspect(other)}"
end

defp index_flows(flows) when length(flows) == 1, do: flows

defp index_flows(flows) do
Enum.with_index(flows)
|> Enum.map(fn {flow, index} ->
%Flow{flow | options: Keyword.put(flow.options, :index, index)}
end)
end

defp partition_id(options) do
options |> Keyword.put(:partition_id, 0)
end

defp partition_id(options, [%Flow{options: opts} | _]) do
case opts[:partition_id] do
nil -> options |> Keyword.put(:partition_id, 0)
id -> options |> Keyword.put(:partition_id, id + 1)
end
end

defp stages(options) do
case Keyword.fetch(options, :stages) do
{:ok, _} ->
Expand Down
15 changes: 11 additions & 4 deletions lib/flow/coordinator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ defmodule Flow.Coordinator do
def init({flow, type, {inner_or_outer, consumers}, options}) do
Process.flag(:trap_exit, true)
type_options = Keyword.take(options, [:dispatcher])

{:ok, supervisor} = start_supervisor()
flow_name = options[:name]
supervisor_name = flow_name && String.to_atom("#{flow_name}_sup")
{:ok, supervisor} = start_supervisor(supervisor_name)
start_link = &start_child(supervisor, &1, restart: :temporary)
{producers, intermediary} = Flow.Materialize.materialize(flow, start_link, type, type_options)

{producers, intermediary} =
Flow.Materialize.materialize(flow, start_link, type, type_options, flow_name)

demand = Keyword.get(options, :demand, :forward)
timeout = Keyword.get(options, :subscribe_timeout, 5_000)
Expand Down Expand Up @@ -60,10 +63,14 @@ defmodule Flow.Coordinator do
# to work all children are started as temporary, except the consumers
# given via into_specs. Once those crash, they terminate the whole
# flow according to their restart type.
defp start_supervisor do
defp start_supervisor(nil) do
Supervisor.start_link([], strategy: :one_for_one, max_restarts: 0)
end

defp start_supervisor(name) do
Supervisor.start_link([], strategy: :one_for_one, max_restarts: 0, name: name)
end

defp start_child(supervisor, spec, opts) do
spec = Supervisor.child_spec(spec, [id: make_ref()] ++ opts)
Supervisor.start_child(supervisor, spec)
Expand Down
Loading