From aa8c478e21bf986dc3decf8fe6ec64344580719f Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 11 Apr 2025 10:22:00 +0200 Subject: [PATCH 1/2] Fix concurrent AMQP queue declarations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prior to this commit, when AMQP clients declared the same queues concurrently, the following crash occurred: ``` │ *Error{Condition: amqp:internal-error, Description: {badmatch,{<<"200">>, │ {map,[{{utf8,<<"leader">>},{utf8,<<"rabbit-2@carrot">>}}, │ {{utf8,<<"message_count">>},{ulong,0}}, │ {{utf8,<<"consumer_count">>},{uint,0}}, │ {{utf8,<<"name">>},{utf8,<<"cq-145">>}}, │ {{utf8,<<"vhost">>},{utf8,<<"/">>}}, │ {{utf8,<<"durable">>},{boolean,true}}, │ {{utf8,<<"auto_delete">>},{boolean,false}}, │ {{utf8,<<"exclusive">>},{boolean,false}}, │ {{utf8,<<"type">>},{utf8,<<"classic">>}}, │ {{utf8,<<"arguments">>}, │ {map,[{{utf8,<<"x-queue-type">>},{utf8,<<"classic">>}}]}}, │ {{utf8,<<"replicas">>}, │ {array,utf8,[{utf8,<<"rabbit-2@carrot">>}]}}]}, │ {[{{resource,<<"/">>,queue,<<"cq-145">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-144">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-143">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-142">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-141">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-140">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-139">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-138">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-137">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-136">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-135">>},configure}, │ {{resource,<<"/">>,queue,<<"cq-134">>},configure}], │ []}}} │ [{rabbit_amqp_management,handle_http_req,8, │ [{file,"rabbit_amqp_management.erl"},{line,130}]}, │ {rabbit_amqp_management,handle_request,5, │ [{file,"rabbit_amqp_management.erl"},{line,43}]}, │ {rabbit_amqp_session,incoming_mgmt_link_transfer,3, │ [{file,"rabbit_amqp_session.erl"},{line,2317}]}, │ {rabbit_amqp_session,handle_frame,2, │ [{file,"rabbit_amqp_session.erl"},{line,963}]}, │ {rabbit_amqp_session,handle_cast,2, │ [{file,"rabbit_amqp_session.erl"},{line,539}]}, │ {gen_server,try_handle_cast,3,[{file,"gen_server.erl"},{line,2371}]}, │ {gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,2433}]}, │ {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,329}]}], Info: map[]} ``` To repro, run the following command in parallel in two separate terminals: ``` ./omq amqp -x 10000 -t /queues/cq-%d -y 0 -C 0 --queues classic classic ``` --- deps/rabbit/src/rabbit_amqp_management.erl | 19 ++++----- .../test/management_SUITE.erl | 42 ++++++++++++++++++- 2 files changed, 49 insertions(+), 12 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index 027821898c73..ead39ac06c4d 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -127,7 +127,6 @@ handle_http_req(HttpMethod = <<"PUT">>, PermCache1 = check_resource_access(QName, configure, User, PermCache0), rabbit_core_metrics:queue_declared(QName), - {Q1, NumMsgs, NumConsumers, StatusCode, PermCache} = case rabbit_amqqueue:with( QName, fun(Q) -> @@ -135,7 +134,8 @@ handle_http_req(HttpMethod = <<"PUT">>, Q, Durable, AutoDelete, QArgs, Owner) of ok -> {ok, Msgs, Consumers} = rabbit_amqqueue:stat(Q), - {ok, {Q, Msgs, Consumers, <<"200">>, PermCache1}} + RespPayload = encode_queue(Q, Msgs, Consumers), + {ok, {<<"200">>, RespPayload, {PermCache1, TopicPermCache}}} catch exit:#amqp_error{name = precondition_failed, explanation = Expl} -> throw(<<"409">>, Expl, []); @@ -146,23 +146,25 @@ handle_http_req(HttpMethod = <<"PUT">>, {ok, Result} -> Result; {error, not_found} -> - PermCache2 = check_dead_letter_exchange(QName, QArgs, User, PermCache1), + PermCache = check_dead_letter_exchange(QName, QArgs, User, PermCache1), try rabbit_amqqueue:declare( QName, Durable, AutoDelete, QArgs, Owner, Username) of {new, Q} -> rabbit_core_metrics:queue_created(QName), - {Q, 0, 0, <<"201">>, PermCache2}; + RespPayload = encode_queue(Q, 0, 0), + {<<"201">>, RespPayload, {PermCache, TopicPermCache}}; {owner_died, Q} -> %% Presumably our own days are numbered since the %% connection has died. Pretend the queue exists though, %% just so nothing fails. - {Q, 0, 0, <<"201">>, PermCache2}; + RespPayload = encode_queue(Q, 0, 0), + {<<"201">>, RespPayload, {PermCache, TopicPermCache}}; {absent, Q, Reason} -> absent(Q, Reason); {existing, _Q} -> %% Must have been created in the meantime. Loop around again. handle_http_req(HttpMethod, PathSegments, Query, ReqPayload, - Vhost, User, ConnPid, {PermCache2, TopicPermCache}); + Vhost, User, ConnPid, {PermCache, TopicPermCache}); {error, queue_limit_exceeded, Reason, ReasonArgs} -> throw(<<"403">>, Reason, @@ -177,10 +179,7 @@ handle_http_req(HttpMethod = <<"PUT">>, end; {error, {absent, Q, Reason}} -> absent(Q, Reason) - end, - - RespPayload = encode_queue(Q1, NumMsgs, NumConsumers), - {StatusCode, RespPayload, {PermCache, TopicPermCache}}; + end; handle_http_req(<<"PUT">>, [<<"exchanges">>, XNameBinQuoted], diff --git a/deps/rabbitmq_amqp_client/test/management_SUITE.erl b/deps/rabbitmq_amqp_client/test/management_SUITE.erl index 952c659e9784..27ecf872ab83 100644 --- a/deps/rabbitmq_amqp_client/test/management_SUITE.erl +++ b/deps/rabbitmq_amqp_client/test/management_SUITE.erl @@ -52,6 +52,7 @@ groups() -> bad_exchange_property, bad_exchange_type, get_queue_not_found, + declare_queues_concurrently, declare_queue_default_queue_type, declare_queue_empty_name, declare_queue_line_feed, @@ -432,6 +433,40 @@ get_queue_not_found(Config) -> amqp10_msg:body(Resp)), ok = cleanup(Init). +declare_queues_concurrently(Config) -> + NumQueues = 5, + {Pid1, Ref1} = spawn_monitor(?MODULE, declare_queues, [Config, NumQueues]), + {Pid2, Ref2} = spawn_monitor(?MODULE, declare_queues, [Config, NumQueues]), + receive {'DOWN', Ref1, process, Pid1, Reason1} -> + ?assertEqual(normal, Reason1) + end, + receive {'DOWN', Ref2, process, Pid2, Reason2} -> + ?assertEqual(normal, Reason2) + end, + + ?assertEqual(NumQueues, count_queues(Config)), + + Init = {_, LinkPair} = init(Config), + lists:foreach(fun(N) -> + Bin = integer_to_binary(N), + QName = <<"queue-", Bin/binary>>, + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName) + end, lists:seq(1, NumQueues)), + ok = cleanup(Init). + +declare_queues(Config, Num) -> + Init = {_, LinkPair} = init(Config), + ok = declare_queues0(LinkPair, Num), + ok = cleanup(Init). + +declare_queues0(_LinkPair, 0) -> + ok; +declare_queues0(LinkPair, Left) -> + Bin = integer_to_binary(Left), + QName = <<"queue-", Bin/binary>>, + ?assertMatch({ok, _}, rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{})), + declare_queues0(LinkPair, Left - 1). + declare_queue_default_queue_type(Config) -> Node = get_node_config(Config, 0, nodename), Vhost = QName = atom_to_binary(?FUNCTION_NAME), @@ -871,11 +906,11 @@ pipeline(Config) -> %% because RabbitMQ grants us 8 link credits initially. Num = 8, pipeline0(Num, LinkPair, <<"PUT">>, {map, []}), - eventually(?_assertEqual(Num, rpc(Config, rabbit_amqqueue, count, [])), 200, 20), + eventually(?_assertEqual(Num, count_queues(Config)), 200, 20), flush(queues_created), pipeline0(Num, LinkPair, <<"DELETE">>, null), - eventually(?_assertEqual(0, rpc(Config, rabbit_amqqueue, count, [])), 200, 20), + eventually(?_assertEqual(0, count_queues(Config)), 200, 20), flush(queues_deleted), ok = cleanup(Init). @@ -1127,3 +1162,6 @@ gen_server_state(Pid) -> L1 = lists:last(L0), {data, L2} = lists:last(L1), proplists:get_value("State", L2). + +count_queues(Config) -> + rpc(Config, rabbit_amqqueue, count, []). From 92311f6fd47365cbad46ce20dec107072ad52894 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 11 Apr 2025 11:19:24 +0200 Subject: [PATCH 2/2] Simplify --- deps/rabbit/src/rabbit_amqp_management.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index ead39ac06c4d..cc02a704939f 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -147,24 +147,25 @@ handle_http_req(HttpMethod = <<"PUT">>, Result; {error, not_found} -> PermCache = check_dead_letter_exchange(QName, QArgs, User, PermCache1), + PermCaches = {PermCache, TopicPermCache}, try rabbit_amqqueue:declare( QName, Durable, AutoDelete, QArgs, Owner, Username) of {new, Q} -> rabbit_core_metrics:queue_created(QName), RespPayload = encode_queue(Q, 0, 0), - {<<"201">>, RespPayload, {PermCache, TopicPermCache}}; + {<<"201">>, RespPayload, PermCaches}; {owner_died, Q} -> %% Presumably our own days are numbered since the %% connection has died. Pretend the queue exists though, %% just so nothing fails. RespPayload = encode_queue(Q, 0, 0), - {<<"201">>, RespPayload, {PermCache, TopicPermCache}}; + {<<"201">>, RespPayload, PermCaches}; {absent, Q, Reason} -> absent(Q, Reason); {existing, _Q} -> %% Must have been created in the meantime. Loop around again. handle_http_req(HttpMethod, PathSegments, Query, ReqPayload, - Vhost, User, ConnPid, {PermCache, TopicPermCache}); + Vhost, User, ConnPid, PermCaches); {error, queue_limit_exceeded, Reason, ReasonArgs} -> throw(<<"403">>, Reason,