Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
-export([pid_of/1, pid_of/2]).
-export([mark_local_durable_queues_stopped/1]).

-export([rebalance/3]).

%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2,
Expand Down Expand Up @@ -484,6 +486,120 @@ not_found_or_absent_dirty(Name) ->
{ok, Q} -> {absent, Q, nodedown}
end.

-spec rebalance('all' | 'quorum' | 'classic', binary(), binary()) ->
{ok, [{node(), pos_integer()}]}.
rebalance(Type, VhostSpec, QueueSpec) ->
Running = rabbit_mnesia:cluster_nodes(running),
NumRunning = length(Running),
ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
filter_per_type(Type, Q),
is_replicated(Q),
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)],
NumToRebalance = length(ToRebalance),
ByNode = group_by_node(ToRebalance),
Rem = case (NumToRebalance rem NumRunning) of
0 -> 0;
_ -> 1
end,
MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem,
iterative_rebalance(ByNode, MaxQueuesDesired).

filter_per_type(all, _) ->
true;
filter_per_type(quorum, Q) ->
?amqqueue_is_quorum(Q);
filter_per_type(classic, Q) ->
?amqqueue_is_classic(Q).

rebalance_module(Q) when ?amqqueue_is_quorum(Q) ->
rabbit_quorum_queue;
rebalance_module(Q) when ?amqqueue_is_classic(Q) ->
rabbit_mirror_queue_misc.

get_resource_name(#resource{name = Name}) ->
Name.

is_match(Subj, E) ->
nomatch /= re:run(Subj, E).

iterative_rebalance(ByNode, MaxQueuesDesired) ->
case maybe_migrate(ByNode, MaxQueuesDesired) of
{ok, Summary} ->
rabbit_log:warning("Nothing to do, all balanced"),
{ok, Summary};
{migrated, Other} ->
iterative_rebalance(Other, MaxQueuesDesired);
{not_migrated, Other} ->
iterative_rebalance(Other, MaxQueuesDesired)
end.

maybe_migrate(ByNode, MaxQueuesDesired) ->
maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)).

maybe_migrate(ByNode, _, []) ->
{ok, maps:fold(fun(K, V, Acc) ->
[{K, length(V)} | Acc]
end, [], ByNode)};
maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
case maps:get(N, ByNode, []) of
[{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired ->
Name = amqqueue:get_name(Q),
Module = rebalance_module(Q),
OtherNodes = Module:get_replicas(Q) -- [N],
case OtherNodes of
[] ->
{not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)};
_ ->
[{Length, Destination} | _] = sort_by_number_of_queues(OtherNodes, ByNode),
rabbit_log:warning("Migrating queue ~p from node ~p with ~p queues to node ~p with ~p queues",
[Name, N, length(All), Destination, Length]),
case Module:transfer_leadership(Q, Destination) of
{migrated, NewNode} ->
rabbit_log:warning("Queue ~p migrated to ~p", [Name, NewNode]),
{migrated, update_migrated_queue(Destination, N, Queue, Queues, ByNode)};
{not_migrated, Reason} ->
rabbit_log:warning("Error migrating queue ~p: ~p", [Name, Reason]),
{not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}
end
end;
[{_, _, true} | _] = All when length(All) > MaxQueuesDesired ->
rabbit_log:warning("Node ~p contains ~p queues, but all have already migrated. "
"Do nothing", [N, length(All)]),
maybe_migrate(ByNode, MaxQueuesDesired, Nodes);
All ->
rabbit_log:warning("Node ~p only contains ~p queues, do nothing",
[N, length(All)]),
maybe_migrate(ByNode, MaxQueuesDesired, Nodes)
end.

update_not_migrated_queue(N, {Entries, Q, _}, Queues, ByNode) ->
maps:update(N, Queues ++ [{Entries, Q, true}], ByNode).

update_migrated_queue(NewNode, OldNode, {Entries, Q, _}, Queues, ByNode) ->
maps:update_with(NewNode,
fun(L) -> L ++ [{Entries, Q, true}] end,
[{Entries, Q, true}], maps:update(OldNode, Queues, ByNode)).

sort_by_number_of_queues(Nodes, ByNode) ->
lists:keysort(1,
lists:map(fun(Node) ->
{num_queues(Node, ByNode), Node}
end, Nodes)).

num_queues(Node, ByNode) ->
length(maps:get(Node, ByNode, [])).

group_by_node(Queues) ->
ByNode = lists:foldl(fun(Q, Acc) ->
Module = rebalance_module(Q),
Length = Module:queue_length(Q),
maps:update_with(amqqueue:qnode(Q),
fun(L) -> [{Length, Q, false} | L] end,
[{Length, Q, false}], Acc)
end, #{}, Queues),
maps:map(fun(_K, V) -> lists:keysort(1, V) end, ByNode).

-spec with(name(),
qfun(A),
fun((not_found_or_absent()) -> rabbit_types:channel_exit())) ->
Expand Down
42 changes: 42 additions & 0 deletions src/rabbit_mirror_queue_misc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

-export([sync_queue/1, cancel_sync_queue/1]).

-export([transfer_leadership/2, queue_length/1, get_replicas/1]).

%% for testing only
-export([module/1]).

Expand Down Expand Up @@ -535,6 +537,46 @@ update_mirrors(Q) when ?is_amqqueue(Q) ->
maybe_auto_sync(Q),
ok.

queue_length(Q) ->
[{messages, M}] = rabbit_amqqueue:info(Q, [messages]),
M.

get_replicas(Q) ->
{MNode, SNodes} = suggested_queue_nodes(Q),
[MNode] ++ SNodes.

transfer_leadership(Q, Destination) ->
QName = amqqueue:get_name(Q),
{OldMNode, OldSNodes, _} = actual_queue_nodes(Q),
OldNodes = [OldMNode | OldSNodes],
add_mirrors(QName, [Destination] -- OldNodes, async),
drop_mirrors(QName, OldNodes -- [Destination]),
{Result, NewQ} = wait_for_new_master(QName, Destination),
update_mirrors(NewQ),
Result.

wait_for_new_master(QName, Destination) ->
wait_for_new_master(QName, Destination, 100).

wait_for_new_master(QName, _, 0) ->
{ok, Q} = rabbit_amqqueue:lookup(QName),
{{not_migrated, ""}, Q};
wait_for_new_master(QName, Destination, N) ->
{ok, Q} = rabbit_amqqueue:lookup(QName),
case amqqueue:get_pid(Q) of
none ->
timer:sleep(100),
wait_for_new_master(QName, Destination, N - 1);
Pid ->
case node(Pid) of
Destination ->
{{migrated, Destination}, Q};
_ ->
timer:sleep(100),
wait_for_new_master(QName, Destination, N - 1)
end
end.

%% The arrival of a newly synced slave may cause the master to die if
%% the policy does not want the master but it has been kept alive
%% because there were no synced slaves.
Expand Down
26 changes: 26 additions & 0 deletions src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
-export([cleanup_data_dir/0]).
-export([shrink_all/1,
grow/4]).
-export([transfer_leadership/2, get_replicas/1, queue_length/1]).

%%-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit.hrl").
Expand Down Expand Up @@ -847,6 +848,31 @@ grow(Node, VhostSpec, QueueSpec, Strategy) ->
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].

transfer_leadership(Q, Destination) ->
{RaName, _} = Pid = amqqueue:get_pid(Q),
case ra:transfer_leadership(Pid, {RaName, Destination}) of
ok ->
{_, _, {_, NewNode}} = ra:members(Pid),
{migrated, NewNode};
already_leader ->
{not_migrated, already_leader};
{error, Reason} ->
{not_migrated, Reason};
{timeout, _} ->
%% TODO should we retry once?
{not_migrated, timeout}
end.

queue_length(Q) ->
Name = amqqueue:get_name(Q),
case ets:lookup(ra_metrics, Name) of
[] -> 0;
[{_, _, SnapIdx, _, _, LastIdx, _}] -> LastIdx - SnapIdx
end.

get_replicas(Q) ->
get_nodes(Q).

get_resource_name(#resource{name = Name}) ->
Name.

Expand Down
117 changes: 116 additions & 1 deletion test/dynamic_ha_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ groups() ->
rapid_change,
nodes_policy_should_pick_master_from_its_params,
promote_slave_after_standalone_restart,
queue_survive_adding_dead_vhost_mirror
queue_survive_adding_dead_vhost_mirror,
rebalance_all,
rebalance_exactly,
rebalance_nodes
% FIXME: Re-enable those tests when the know issues are
% fixed.
% failing_random_policies,
Expand Down Expand Up @@ -539,6 +542,118 @@ promote_slave_after_standalone_restart(Config) ->

ok.

rebalance_all(Config) ->
[A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),

Q1 = <<"q1">>,
Q2 = <<"q2">>,
Q3 = <<"q3">>,
Q4 = <<"q4">>,
Q5 = <<"q5">>,

amqp_channel:call(ACh, #'queue.declare'{queue = Q1}),
amqp_channel:call(ACh, #'queue.declare'{queue = Q2}),
amqp_channel:call(ACh, #'queue.declare'{queue = Q3}),
amqp_channel:call(ACh, #'queue.declare'{queue = Q4}),
amqp_channel:call(ACh, #'queue.declare'{queue = Q5}),
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"q.*">>, <<"all">>),
timer:sleep(1000),

rabbit_ct_client_helpers:publish(ACh, Q1, 5),
rabbit_ct_client_helpers:publish(ACh, Q2, 3),
assert_slaves(A, Q1, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]),
assert_slaves(A, Q2, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]),
assert_slaves(A, Q3, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]),
assert_slaves(A, Q4, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]),
assert_slaves(A, Q5, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]),

{ok, Summary} = rpc:call(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]),

%% Check that we have at most 2 queues per node
?assert(lists:all(fun({_, V}) -> V =< 2 end, Summary)),
%% Check that Q1 and Q2 haven't moved
assert_slaves(A, Q1, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]),
assert_slaves(A, Q2, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]),

ok.

rebalance_exactly(Config) ->
[A, _, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),

Q1 = <<"q1">>,
Q2 = <<"q2">>,
Q3 = <<"q3">>,
Q4 = <<"q4">>,
Q5 = <<"q5">>,

amqp_channel:call(ACh, #'queue.declare'{queue = Q1}),
amqp_channel:call(ACh, #'queue.declare'{queue = Q2}),
amqp_channel:call(ACh, #'queue.declare'{queue = Q3}),
amqp_channel:call(ACh, #'queue.declare'{queue = Q4}),
amqp_channel:call(ACh, #'queue.declare'{queue = Q5}),
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"q.*">>, {<<"exactly">>, 2}),
timer:sleep(1000),

rabbit_ct_client_helpers:publish(ACh, Q1, 5),
rabbit_ct_client_helpers:publish(ACh, Q2, 3),

?assertEqual(A, node(proplists:get_value(pid, find_queue(Q1, A)))),
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q2, A)))),
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q3, A)))),
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q4, A)))),
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q5, A)))),

{ok, Summary} = rpc:call(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]),

%% Check that we have at most 2 queues per node
?assert(lists:all(fun({_, V}) -> V =< 2 end, Summary)),
%% Check that Q1 and Q2 haven't moved
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q1, A)))),
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q2, A)))),

ok.

rebalance_nodes(Config) ->
[A, B, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),

Q1 = <<"q1">>,
Q2 = <<"q2">>,
Q3 = <<"q3">>,
Q4 = <<"q4">>,
Q5 = <<"q5">>,

amqp_channel:call(ACh, #'queue.declare'{queue = Q1}),
amqp_channel:call(ACh, #'queue.declare'{queue = Q2}),
amqp_channel:call(ACh, #'queue.declare'{queue = Q3}),
amqp_channel:call(ACh, #'queue.declare'{queue = Q4}),
amqp_channel:call(ACh, #'queue.declare'{queue = Q5}),
rabbit_ct_broker_helpers:set_ha_policy(
Config, A, <<"q.*">>,
{<<"nodes">>, [rabbit_misc:atom_to_binary(A), rabbit_misc:atom_to_binary(B)]}),
timer:sleep(1000),

rabbit_ct_client_helpers:publish(ACh, Q1, 5),
rabbit_ct_client_helpers:publish(ACh, Q2, 3),

?assertEqual(A, node(proplists:get_value(pid, find_queue(Q1, A)))),
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q2, A)))),
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q3, A)))),
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q4, A)))),
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q5, A)))),

{ok, Summary} = rpc:call(A, rabbit_amqqueue, rebalance, [classic, ".*", ".*"]),

%% Check that we have at most 3 queues per node
?assert(lists:all(fun({_, V}) -> V =< 3 end, Summary)),
%% Check that Q1 and Q2 haven't moved
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q1, A)))),
?assertEqual(A, node(proplists:get_value(pid, find_queue(Q2, A)))),

ok.

%%----------------------------------------------------------------------------

assert_slaves(RPCNode, QName, Exp) ->
Expand Down
Loading