Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,20 @@ end}.
{mapping, "max_message_size", "rabbit.max_message_size",
[{datatype, integer}, {validators, ["max_message_size"]}]}.

{mapping, "cluster_exchange_limit", "rabbit.cluster_exchange_limit",
[{datatype, [{atom, infinity}, integer]}, {validators, ["non_negative_integer"]}]}.

{translation, "rabbit.cluster_exchange_limit",
fun(Conf) ->
case cuttlefish:conf_get("cluster_exchange_limit", Conf, undefined) of
undefined -> cuttlefish:unset();
infinity -> infinity;
Val when is_integer(Val) -> Val;
_ -> cuttlefish:invalid("should be a non-negative integer")
end
end
}.

%% Customising Socket Options.
%%
%% See (https://www.erlang.org/doc/man/inet.html#setopts-2) for
Expand Down
16 changes: 11 additions & 5 deletions deps/rabbit/src/rabbit_db_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,11 @@ count_in_mnesia() ->
mnesia:table_info(?MNESIA_TABLE, size).

count_in_khepri() ->
Path = khepri_exchange_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
case rabbit_khepri:count(Path) of
{ok, Count} -> Count;
_ -> 0
try
ets:info(?KHEPRI_PROJECTION, size)
catch
error:badarg ->
0
end.

%% -------------------------------------------------------------------
Expand Down Expand Up @@ -863,7 +864,12 @@ exists_in_mnesia(Name) ->
ets:member(?MNESIA_TABLE, Name).

exists_in_khepri(Name) ->
rabbit_khepri:exists(khepri_exchange_path(Name)).
try
ets:member(?KHEPRI_PROJECTION, Name)
catch
error:badarg ->
false
end.

%% -------------------------------------------------------------------
%% clear().
Expand Down
25 changes: 24 additions & 1 deletion deps/rabbit/src/rabbit_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,13 @@ serial(X) ->
Internal :: boolean(),
Args :: rabbit_framing:amqp_table(),
Username :: rabbit_types:username(),
Ret :: {ok, rabbit_types:exchange()} | {error, timeout}.
Ret :: {ok, rabbit_types:exchange()} |
{error, timeout} |
%% May exit with `#amqp_error{}` if validations fail:
rabbit_types:channel_exit().

declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
ok = check_exchange_limits(XName),
X = rabbit_exchange_decorator:set(
rabbit_policy:set(#exchange{name = XName,
type = Type,
Expand Down Expand Up @@ -140,6 +144,25 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
{ok, X}
end.

check_exchange_limits(XName) ->
Limit = rabbit_misc:get_env(rabbit, cluster_exchange_limit, infinity),
case rabbit_db_exchange:count() >= Limit of
false ->
ok;
true ->
case rabbit_db_exchange:exists(XName) of
true ->
%% Allow re-declares of existing exchanges when at the
%% exchange limit.
ok;
false ->
rabbit_misc:protocol_error(
precondition_failed,
"cannot declare ~ts: exchange limit of ~tp is reached",
[rabbit_misc:rs(XName), Limit])
end
end.

%% Used with binaries sent over the wire; the type may not exist.

-spec check_type
Expand Down
78 changes: 66 additions & 12 deletions deps/rabbit/test/cluster_limit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
-include_lib("amqp_client/include/amqp_client.hrl").
-compile([nowarn_export_all, export_all]).

-define(EXCHANGE_LIMIT, 10).

all() ->
[
Expand All @@ -22,7 +23,8 @@ groups() ->
[
{clustered, [],
[
{size_2, [], [queue_limit]}
{size_2, [], [queue_limit,
exchange_limit]}
]}
].

Expand All @@ -34,7 +36,8 @@ init_per_suite(Config0) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:merge_app_env(
Config0, {rabbit, [{quorum_tick_interval, 1000},
{cluster_queue_limit, 3}]}),
{cluster_queue_limit, 3},
{cluster_exchange_limit, ?EXCHANGE_LIMIT}]}),
rabbit_ct_helpers:run_setup_steps(Config1, []).

end_per_suite(Config) ->
Expand Down Expand Up @@ -101,48 +104,99 @@ queue_limit(Config) ->
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q1 = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q1, 0, 0},
declare(Ch, Q1)),
declare_queue(Ch, Q1)),

Q2 = ?config(alt_queue_name, Config),
?assertEqual({'queue.declare_ok', Q2, 0, 0},
declare(Ch, Q2)),
declare_queue(Ch, Q2)),

Q3 = ?config(alt_2_queue_name, Config),
?assertEqual({'queue.declare_ok', Q3, 0, 0},
declare(Ch, Q3)),
declare_queue(Ch, Q3)),
Q4 = ?config(over_limit_queue_name, Config),
ExpectedError = list_to_binary(io_lib:format("PRECONDITION_FAILED - cannot declare queue '~s': queue limit in cluster (3) is reached", [Q4])),
?assertExit(
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
declare(Ch, Q4)),
declare_queue(Ch, Q4)),

%% Trying the second server, in the cluster, but no queues on it,
%% but should still fail as the limit is cluster wide.
?assertExit(
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
declare(Ch2, Q4)),
declare_queue(Ch2, Q4)),

%Trying other types of queues
ChQQ = rabbit_ct_client_helpers:open_channel(Config, Server0),
ChStream = rabbit_ct_client_helpers:open_channel(Config, Server1),
?assertExit(
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
declare(ChQQ, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
declare_queue(ChQQ, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertExit(
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
declare(ChStream, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
declare_queue(ChStream, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
ok.

declare(Ch, Q) ->
declare(Ch, Q, []).
exchange_limit(Config) ->
DefaultXs = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_exchange, count, []),
?assert(?EXCHANGE_LIMIT > DefaultXs),

declare(Ch, Q, Args) ->
[Server0, Server1] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),

%% Reach the limit.
[begin
XName = list_to_binary(rabbit_misc:format("x-~b", [N])),
#'exchange.declare_ok'{} = declare_exchange(Ch1, XName, <<"fanout">>)
end || N <- lists:seq(DefaultXs, ?EXCHANGE_LIMIT - 1)],

%% Trying to declare the next exchange fails.
OverLimitXName = <<"over-limit-x">>,
?assertExit(
{{shutdown, {server_initiated_close, 406,
<<"PRECONDITION_FAILED", _/binary>>}}, _},
declare_exchange(Ch1, OverLimitXName, <<"fanout">>)),

%% Existing exchanges can be re-declared.
ExistingX = list_to_binary(rabbit_misc:format("x-~b", [DefaultXs])),
#'exchange.declare_ok'{} = declare_exchange(Ch2, ExistingX, <<"fanout">>),

%% The limit is cluster wide: the other node cannot declare the exchange
%% either.
?assertExit(
{{shutdown, {server_initiated_close, 406,
<<"PRECONDITION_FAILED", _/binary>>}}, _},
declare_exchange(Ch2, OverLimitXName, <<"fanout">>)),

%% Clean up extra exchanges
Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server0),
[begin
XName = list_to_binary(rabbit_misc:format("x-~b", [N])),
#'exchange.delete_ok'{} = amqp_channel:call(
Ch3,
#'exchange.delete'{exchange = XName})
end || N <- lists:seq(DefaultXs, ?EXCHANGE_LIMIT - 1)],

ok.

%% -------------------------------------------------------------------

declare_queue(Ch, Q) ->
declare_queue(Ch, Q, []).

declare_queue(Ch, Q, Args) ->
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
durable = true,
auto_delete = false,
arguments = Args}).

declare_exchange(Ch, Name, Type) ->
amqp_channel:call(Ch, #'exchange.declare'{exchange = Name,
type = Type,
durable = true}).

delete_queues() ->
[rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
|| Q <- rabbit_amqqueue:list()].
Loading