Skip to content

Commit 05d1520

Browse files
authored
Use gen_statem in Postgrex.ReplicationConnection (#644)
1 parent e77976a commit 05d1520

File tree

2 files changed

+75
-35
lines changed

2 files changed

+75
-35
lines changed

lib/postgrex/replication_connection.ex

Lines changed: 73 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,13 @@ defmodule Postgrex.ReplicationConnection do
141141
`GenServer`. Read more about them in the `GenServer` docs.
142142
"""
143143

144-
use Connection
145144
require Logger
146145
import Bitwise
147146

148147
alias Postgrex.Protocol
149148

149+
@behaviour :gen_statem
150+
150151
@doc false
151152
defstruct protocol: nil,
152153
state: nil,
@@ -156,7 +157,7 @@ defmodule Postgrex.ReplicationConnection do
156157

157158
## PUBLIC API ##
158159

159-
@type server :: GenServer.server()
160+
@type server :: :gen_statem.server()
160161
@type state :: term
161162
@type ack :: iodata
162163
@type query :: iodata
@@ -240,7 +241,7 @@ defmodule Postgrex.ReplicationConnection do
240241
been replied to should eventually do so. One simple approach is to
241242
reply to any pending commands on `c:handle_disconnect/1`.
242243
"""
243-
@callback handle_call(term, GenServer.from(), state) ::
244+
@callback handle_call(term, :gen_statem.from(), state) ::
244245
{:noreply, state}
245246
| {:noreply, ack, state}
246247
| {:query, query, state}
@@ -250,7 +251,7 @@ defmodule Postgrex.ReplicationConnection do
250251
Callback for `:query` outputs.
251252
252253
If any callback returns `{:query, iodata, state}`,
253-
then this callback will be immediatelly called with
254+
then this callback will be immediately called with
254255
the result of the query. Please note that even though
255256
replication connections use the simple query protocol,
256257
Postgres currently limits them to single command queries.
@@ -274,13 +275,13 @@ defmodule Postgrex.ReplicationConnection do
274275
@doc """
275276
Replies to the given `call/3`.
276277
"""
277-
defdelegate reply(client, reply), to: GenServer
278+
defdelegate reply(client, reply), to: :gen_statem
278279

279280
@doc """
280281
Calls the given replication server.
281282
"""
282283
def call(server, message, timeout \\ 5000) do
283-
with {__MODULE__, reason} <- GenServer.call(server, message, timeout) do
284+
with {__MODULE__, reason} <- :gen_statem.call(server, message, timeout) do
284285
exit({reason, {__MODULE__, :call, [server, message, timeout]}})
285286
end
286287
end
@@ -339,10 +340,34 @@ defmodule Postgrex.ReplicationConnection do
339340
@spec start_link(module(), term(), Keyword.t()) ::
340341
{:ok, pid} | {:error, Postgrex.Error.t() | term}
341342
def start_link(module, arg, opts) do
342-
{server_opts, opts} = Keyword.split(opts, [:name])
343+
{name, opts} = Keyword.pop(opts, :name)
343344
opts = Keyword.put_new(opts, :sync_connect, true)
344345
connection_opts = Postgrex.Utils.default_opts(opts)
345-
Connection.start_link(__MODULE__, {module, arg, connection_opts}, server_opts)
346+
start_args = {module, arg, connection_opts}
347+
348+
case name do
349+
nil ->
350+
:gen_statem.start_link(__MODULE__, start_args, [])
351+
352+
atom when is_atom(atom) ->
353+
:gen_statem.start_link({:local, atom}, __MODULE__, start_args, [])
354+
355+
{:global, _term} = tuple ->
356+
:gen_statem.start_link(tuple, __MODULE__, start_args, [])
357+
358+
{:via, via_module, _term} = tuple when is_atom(via_module) ->
359+
:gen_statem.start_link(tuple, __MODULE__, start_args, [])
360+
361+
other ->
362+
raise ArgumentError, """
363+
expected :name option to be one of the following:
364+
* nil
365+
* atom
366+
* {:global, term}
367+
* {:via, module, term}
368+
Got: #{inspect(other)}
369+
"""
370+
end
346371
end
347372

348373
@doc """
@@ -409,7 +434,14 @@ defmodule Postgrex.ReplicationConnection do
409434

410435
## CALLBACKS ##
411436

437+
@state :no_state
438+
439+
@doc false
440+
@impl :gen_statem
441+
def callback_mode, do: :handle_event_function
442+
412443
@doc false
444+
@impl :gen_statem
413445
def init({mod, arg, opts}) do
414446
case mod.init(arg) do
415447
{:ok, mod_state} ->
@@ -433,42 +465,44 @@ defmodule Postgrex.ReplicationConnection do
433465
put_opts(opts)
434466

435467
if opts[:sync_connect] do
436-
case connect(:init, state) do
437-
{:ok, _} = ok -> ok
438-
{:backoff, _, _} = backoff -> backoff
439-
{:stop, reason, _} -> {:stop, reason}
468+
case handle_event(:internal, {:connect, :init}, @state, state) do
469+
{:keep_state, state} -> {:ok, @state, state}
470+
{:keep_state, state, actions} -> {:ok, @state, state, actions}
471+
{:stop, _reason, _state} = stop -> stop
440472
end
441473
else
442-
{:connect, :init, state}
474+
{:ok, @state, state, {:next_event, :internal, {:connect, :init}}}
443475
end
444476
end
445477
end
446478

447479
@doc false
448-
def connect(_, %{state: {mod, mod_state}} = s) do
480+
@impl :gen_statem
481+
def handle_event(type, content, state, s)
482+
483+
def handle_event({:timeout, :backoff}, nil, @state, s) do
484+
{:keep_state, s, {:next_event, :internal, {:connect, :backoff}}}
485+
end
486+
487+
def handle_event(:internal, {:connect, _info}, @state, %{state: {mod, mod_state}} = s) do
449488
case Protocol.connect(opts()) do
450489
{:ok, protocol} ->
451-
s = %{s | protocol: protocol}
452-
453-
with {:noreply, s} <- maybe_handle(mod, :handle_connect, [mod_state], s) do
454-
{:ok, s}
455-
end
490+
maybe_handle(mod, :handle_connect, [mod_state], %{s | protocol: protocol})
456491

457492
{:error, reason} ->
458493
if s.auto_reconnect do
459-
{:backoff, s.reconnect_backoff, s}
494+
{:keep_state, s, {{:timeout, :backoff}, s.reconnect_backoff, nil}}
460495
else
461496
{:stop, reason, s}
462497
end
463498
end
464499
end
465500

466-
def handle_call(msg, from, %{state: {mod, mod_state}} = s) do
501+
def handle_event({:call, from}, msg, @state, %{state: {mod, mod_state}} = s) do
467502
handle(mod, :handle_call, [msg, from, mod_state], from, s)
468503
end
469504

470-
@doc false
471-
def handle_info(msg, %{protocol: protocol, streaming: streaming} = s) do
505+
def handle_event(:info, msg, @state, %{protocol: protocol, streaming: streaming} = s) do
472506
case Protocol.handle_copy_recv(msg, streaming, protocol) do
473507
{:ok, copies, protocol} ->
474508
handle_data(copies, %{s | protocol: protocol})
@@ -482,16 +516,18 @@ defmodule Postgrex.ReplicationConnection do
482516
end
483517
end
484518

485-
defp handle_data([], s), do: {:noreply, s}
519+
## Helpers
520+
521+
defp handle_data([], s), do: {:keep_state, s}
486522

487523
defp handle_data([:copy_done | copies], %{state: {mod, mod_state}} = s) do
488-
with {:noreply, s} <- handle(mod, :handle_data, [:done, mod_state], nil, s) do
524+
with {:keep_state, s} <- handle(mod, :handle_data, [:done, mod_state], nil, s) do
489525
handle_data(copies, %{s | streaming: nil})
490526
end
491527
end
492528

493529
defp handle_data([copy | copies], %{state: {mod, mod_state}} = s) do
494-
with {:noreply, s} <- handle(mod, :handle_data, [copy, mod_state], nil, s) do
530+
with {:keep_state, s} <- handle(mod, :handle_data, [copy, mod_state], nil, s) do
495531
handle_data(copies, s)
496532
end
497533
end
@@ -500,20 +536,20 @@ defmodule Postgrex.ReplicationConnection do
500536
if function_exported?(mod, fun, length(args)) do
501537
handle(mod, fun, args, nil, s)
502538
else
503-
{:noreply, s}
539+
{:keep_state, s}
504540
end
505541
end
506542

507543
defp handle(mod, fun, args, from, %{streaming: streaming} = s) do
508544
case apply(mod, fun, args) do
509545
{:noreply, mod_state} ->
510-
{:noreply, %{s | state: {mod, mod_state}}}
546+
{:keep_state, %{s | state: {mod, mod_state}}}
511547

512548
{:noreply, replies, mod_state} ->
513549
s = %{s | state: {mod, mod_state}}
514550

515551
case Protocol.handle_copy_send(replies, s.protocol) do
516-
:ok -> {:noreply, s}
552+
:ok -> {:keep_state, s}
517553
{error, reason, protocol} -> reconnect_or_stop(error, reason, protocol, s)
518554
end
519555

@@ -523,7 +559,7 @@ defmodule Postgrex.ReplicationConnection do
523559

524560
with {:ok, protocol} <- Protocol.handle_streaming(query, s.protocol),
525561
{:ok, protocol} <- Protocol.checkin(protocol) do
526-
{:noreply, %{s | protocol: protocol, streaming: max_messages}}
562+
{:keep_state, %{s | protocol: protocol, streaming: max_messages}}
527563
else
528564
{error_or_disconnect, reason, protocol} ->
529565
reconnect_or_stop(error_or_disconnect, reason, protocol, s)
@@ -552,21 +588,24 @@ defmodule Postgrex.ReplicationConnection do
552588
defp stream_in_progress(command, mod, mod_state, from, s) do
553589
Logger.warning("received #{command} while stream is already in progress")
554590
from && reply(from, {__MODULE__, :stream_in_progress})
555-
{:noreply, %{s | state: {mod, mod_state}}}
591+
{:keep_state, %{s | state: {mod, mod_state}}}
556592
end
557593

558594
defp reconnect_or_stop(error, reason, protocol, %{auto_reconnect: false} = s)
559595
when error in [:error, :disconnect] do
560596
%{state: {mod, mod_state}} = s
561-
{:noreply, s} = maybe_handle(mod, :handle_disconnect, [mod_state], %{s | protocol: protocol})
597+
598+
{:keep_state, s} =
599+
maybe_handle(mod, :handle_disconnect, [mod_state], %{s | protocol: protocol})
600+
562601
{:stop, reason, s}
563602
end
564603

565604
defp reconnect_or_stop(error, _reason, _protocol, %{auto_reconnect: true} = s)
566605
when error in [:error, :disconnect] do
567606
%{state: {mod, mod_state}} = s
568-
{:noreply, s} = maybe_handle(mod, :handle_disconnect, [mod_state], s)
569-
{:connect, :reconnect, %{s | streaming: nil}}
607+
{:keep_state, s} = maybe_handle(mod, :handle_disconnect, [mod_state], s)
608+
{:keep_state, %{s | streaming: nil}, {:next_event, :internal, {:connect, :reconnect}}}
570609
end
571610

572611
defp opts(), do: Process.get(__MODULE__)

test/replication_connection_test.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,8 @@ defmodule ReplicationTest do
288288
end
289289

290290
defp disconnect(repl) do
291-
{:gen_tcp, sock} = :sys.get_state(repl).mod_state.protocol.sock
291+
{_, state} = :sys.get_state(repl)
292+
{:gen_tcp, sock} = state.protocol.sock
292293
:gen_tcp.shutdown(sock, :read_write)
293294
end
294295
end

0 commit comments

Comments
 (0)