From 02449bd5d36df6b7663091b0eca6518aa43204cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 19 Aug 2025 14:58:04 +0200 Subject: [PATCH 1/7] Close stream connection if secret update fails --- .../src/rabbit_stream_reader.erl | 9 +++++++ .../src/rabbit_stream_reader.hrl | 7 ++++-- .../test/rabbit_stream_SUITE.erl | 25 +++++++++++++------ 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 3217409b3bfc..2bc618c1fbf6 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -697,6 +697,12 @@ open(info, {OK, S, Data}, {next_state, close_sent, StatemData#statem_data{connection = Connection1, connection_state = State1}}; + failure -> + _ = demonitor_all_streams(Connection), + ?LOG_INFO("Forcing stream connection ~tp closing because of " + "transition to invalid state", + [self()]), + {stop, {shutdown, <<"Invalid state">>}}; _ -> State2 = case Blocked of @@ -1586,6 +1592,7 @@ handle_frame_post_auth(Transport, stream), auth_fail(NewUsername, Msg, Args, C1, S1), ?LOG_WARNING(Msg, Args), + silent_close_delay(), {C1#stream_connection{connection_step = failure}, {sasl_authenticate, ?RESPONSE_AUTHENTICATION_FAILURE, <<>>}}; @@ -1631,6 +1638,7 @@ handle_frame_post_auth(Transport, stream), ?LOG_WARNING("Not allowed to change username '~ts'. Only password", [Username]), + silent_close_delay(), {C1#stream_connection{connection_step = failure}, {sasl_authenticate, @@ -1652,6 +1660,7 @@ handle_frame_post_auth(Transport, {OtherMechanism, _} -> ?LOG_WARNING("User '~ts' cannot change initial auth mechanism '~ts' for '~ts'", [Username, NewMechanism, OtherMechanism]), + silent_close_delay(), CmdBody = {sasl_authenticate, ?RESPONSE_SASL_CANNOT_CHANGE_MECHANISM, <<>>}, Frame = rabbit_stream_core:frame({response, CorrelationId, CmdBody}), diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.hrl b/deps/rabbitmq_stream/src/rabbit_stream_reader.hrl index 0c1bc2dcc683..0f5723221898 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.hrl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.hrl @@ -20,6 +20,10 @@ -type publisher_reference() :: binary(). -type subscription_id() :: byte(). -type internal_id() :: integer(). +-type connection_step() :: tcp_connected | peer_properties_exchanged | + authenticating | authenticated | tuning | + tuned | opened | failure | + closing | close_sent | closing_done. -record(publisher, {publisher_id :: publisher_id(), @@ -75,8 +79,7 @@ credits :: atomics:atomics_ref(), user :: undefined | #user{}, virtual_host :: undefined | binary(), - connection_step :: - atom(), % tcp_connected, peer_properties_exchanged, authenticating, authenticated, tuning, tuned, opened, failure, closing, closing_done + connection_step :: connection_step(), frame_max :: integer(), heartbeat :: undefined | integer(), heartbeater :: any(), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 651fd7ec89dd..20c5f762212f 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -48,6 +48,7 @@ groups() -> test_update_secret, cannot_update_username_after_authenticated, cannot_use_another_authmechanism_when_updating_secret, + update_secret_should_close_connection_if_wrong_secret, unauthenticated_client_rejected_tcp_connected, timeout_tcp_connected, unauthenticated_client_rejected_peer_properties_exchanged, @@ -292,22 +293,30 @@ test_update_secret(Config) -> cannot_update_username_after_authenticated(Config) -> {S, C0} = connect_and_authenticate(gen_tcp, Config), - C1 = expect_unsuccessful_authentication( - try_authenticate(gen_tcp, S, C0, <<"PLAIN">>, <<"other">>, <<"other">>), - ?RESPONSE_SASL_CANNOT_CHANGE_USERNAME), - _C2 = test_close(gen_tcp, S, C1), + _C1 = expect_unsuccessful_authentication( + try_authenticate(gen_tcp, S, C0, <<"PLAIN">>, <<"other">>, <<"other">>), + ?RESPONSE_SASL_CANNOT_CHANGE_USERNAME), closed = wait_for_socket_close(gen_tcp, S, 10), ok. cannot_use_another_authmechanism_when_updating_secret(Config) -> {S, C0} = connect_and_authenticate(gen_tcp, Config), - C1 = expect_unsuccessful_authentication( - try_authenticate(gen_tcp, S, C0, <<"EXTERNAL">>, <<"guest">>, <<"new_password">>), - ?RESPONSE_SASL_CANNOT_CHANGE_MECHANISM), - _C2 = test_close(gen_tcp, S, C1), + _C1 = expect_unsuccessful_authentication( + try_authenticate(gen_tcp, S, C0, <<"EXTERNAL">>, <<"guest">>, <<"new_password">>), + ?RESPONSE_SASL_CANNOT_CHANGE_MECHANISM), closed = wait_for_socket_close(gen_tcp, S, 10), ok. +update_secret_should_close_connection_if_wrong_secret(Config) -> + Transport = gen_tcp, + {S, C0} = connect_and_authenticate(Transport, Config), + Pwd = rand:bytes(20), + _C1 = expect_unsuccessful_authentication( + try_authenticate(Transport, S, C0, <<"PLAIN">>, <<"guest">>, Pwd), + ?RESPONSE_AUTHENTICATION_FAILURE), + closed = wait_for_socket_close(Transport, S, 10), + ok. + test_stream_tls(Config) -> Stream = atom_to_binary(?FUNCTION_NAME, utf8), test_server(ssl, Stream, Config), From ba8745ab4b2555c761b003989f1f4a9aec2e98ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 19 Aug 2025 16:47:47 +0200 Subject: [PATCH 2/7] Close stream connection if vhost not authorized after secret update --- .../src/rabbit_stream_reader.erl | 44 ++++++++++++------- .../test/rabbit_stream_SUITE.erl | 30 ++++++++++++- 2 files changed, 57 insertions(+), 17 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 2bc618c1fbf6..5bc95e76bf48 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -1508,7 +1508,6 @@ handle_frame_pre_auth(Transport, send(Transport, S, F), Connection#stream_connection{connection_step = failure} end, - {Connection1, State}; handle_frame_pre_auth(_Transport, Connection, State, heartbeat) -> ?LOG_DEBUG("Received heartbeat frame pre auth"), @@ -1617,21 +1616,8 @@ handle_frame_post_auth(Transport, Challenge}}; {ok, NewUser = #user{username = NewUsername}} -> case NewUsername of - Username -> - rabbit_core_metrics:auth_attempt_succeeded(Host, - Username, - stream), - notify_auth_result(Username, - user_authentication_success, - [], - C1, - S1), - ?LOG_DEBUG("Successfully updated secret for username '~ts'", [Username]), - {C1#stream_connection{user = NewUser, - authentication_state = done, - connection_step = authenticated}, - {sasl_authenticate, ?RESPONSE_CODE_OK, - <<>>}}; + Username -> + complete_secret_update(NewUser, C1, S1); _ -> rabbit_core_metrics:auth_attempt_failed(Host, Username, @@ -2783,6 +2769,32 @@ handle_frame_post_auth(Transport, increase_protocol_counter(?UNKNOWN_FRAME), {Connection#stream_connection{connection_step = close_sent}, State}. +complete_secret_update(NewUser = #user{username = Username}, + #stream_connection{host = Host, + socket = S, + virtual_host = VH} = C1, S1) -> + notify_auth_result(Username, user_authentication_success, [], C1, S1), + rabbit_core_metrics:auth_attempt_succeeded(Host, Username, stream), + ?LOG_DEBUG("Successfully checked updated secret for username '~ts'", + [Username]), + try + ?LOG_DEBUG("Checking vhost access after secret update"), + rabbit_access_control:check_vhost_access(NewUser, VH, {socket, S}, #{}), + ?LOG_DEBUG("Checked vhost access"), + + {C1#stream_connection{user = NewUser, + authentication_state = done, + connection_step = authenticated}, + {sasl_authenticate, ?RESPONSE_CODE_OK, + <<>>}} + catch exit:#amqp_error{explanation = Explanation} -> + ?LOG_WARNING("Access to vhost failed after secret update: ~ts", + [Explanation]), + silent_close_delay(), + {C1#stream_connection{connection_step = failure}, + {sasl_authenticate, ?RESPONSE_VHOST_ACCESS_FAILURE, <<>>}} + end. + process_client_command_versions(C, []) -> C; process_client_command_versions(C, [H | T]) -> diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 20c5f762212f..a75b531965ef 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -49,6 +49,7 @@ groups() -> cannot_update_username_after_authenticated, cannot_use_another_authmechanism_when_updating_secret, update_secret_should_close_connection_if_wrong_secret, + update_secret_should_close_connection_if_unauthorized_vhost, unauthenticated_client_rejected_tcp_connected, timeout_tcp_connected, unauthenticated_client_rejected_peer_properties_exchanged, @@ -166,6 +167,12 @@ init_per_testcase(cannot_update_username_after_authenticated = TestCase, Config) ok = rabbit_ct_broker_helpers:add_user(Config, <<"other">>), rabbit_ct_helpers:testcase_started(Config, TestCase); +init_per_testcase(update_secret_should_close_connection_if_unauthorized_vhost = TestCase, + Config) -> + ok = rabbit_ct_broker_helpers:add_user(Config, <<"other">>), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, <<"other">>, <<"/">>), + rabbit_ct_helpers:testcase_started(Config, TestCase); + init_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) -> ok = rabbit_ct_broker_helpers:rpc(Config, 0, @@ -201,6 +208,11 @@ end_per_testcase(cannot_update_username_after_authenticated = TestCase, Config) ok = rabbit_ct_broker_helpers:delete_user(Config, <<"other">>), rabbit_ct_helpers:testcase_finished(Config, TestCase); +end_per_testcase(update_secret_should_close_connection_if_unauthorized_vhost = TestCase, + Config) -> + ok = rabbit_ct_broker_helpers:delete_user(Config, <<"other">>), + rabbit_ct_helpers:testcase_finished(Config, TestCase); + end_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) -> ok = rabbit_ct_broker_helpers:rpc(Config, 0, @@ -286,7 +298,7 @@ test_update_secret(Config) -> {S, C0} = connect_and_authenticate(Transport, Config), rabbit_ct_broker_helpers:change_password(Config, <<"guest">>, <<"password">>), C1 = expect_successful_authentication( - try_authenticate(Transport, S, C0, <<"PLAIN">>, <<"guest">>, <<"password">>)), + try_authenticate(Transport, S, C0, <<"PLAIN">>, <<"guest">>, <<"password">>)), _C2 = test_close(Transport, S, C1), closed = wait_for_socket_close(Transport, S, 10), ok. @@ -317,6 +329,22 @@ update_secret_should_close_connection_if_wrong_secret(Config) -> closed = wait_for_socket_close(Transport, S, 10), ok. +update_secret_should_close_connection_if_unauthorized_vhost(Config) -> + T = gen_tcp, + Port = get_port(T, Config), + Opts = get_opts(T), + {ok, S} = T:connect("localhost", Port, Opts), + C0 = rabbit_stream_core:init(0), + C1 = test_peer_properties(T, S, C0), + Username = <<"other">>, + C2 = test_authenticate(T, S, C1, Username), + ok = rabbit_ct_broker_helpers:clear_permissions(Config, Username, <<"/">>), + _C3 = expect_unsuccessful_authentication( + try_authenticate(gen_tcp, S, C2, <<"PLAIN">>, Username, Username), + ?RESPONSE_VHOST_ACCESS_FAILURE), + closed = wait_for_socket_close(T, S, 10), + ok. + test_stream_tls(Config) -> Stream = atom_to_binary(?FUNCTION_NAME, utf8), test_server(ssl, Stream, Config), From 147dfb0e7057694cb3709fcd104a8c2aca6c6192 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 19 Aug 2025 13:09:57 -0400 Subject: [PATCH 3/7] Log message wording #14402 (cherry picked from commit 987f5519e645844ebf3cc257822931f7f29a54fe) --- deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 5bc95e76bf48..17166ac8d129 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -699,7 +699,7 @@ open(info, {OK, S, Data}, connection_state = State1}}; failure -> _ = demonitor_all_streams(Connection), - ?LOG_INFO("Forcing stream connection ~tp closing because of " + ?LOG_INFO("Force closing stream connection ~tp because of " "transition to invalid state", [self()]), {stop, {shutdown, <<"Invalid state">>}}; From 55d6419bcdf085635aa457303c698a5c69a9d2c8 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 19 Aug 2025 13:44:50 -0400 Subject: [PATCH 4/7] More log message edits #14403 --- deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 17166ac8d129..1184f7e861c2 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2775,10 +2775,10 @@ complete_secret_update(NewUser = #user{username = Username}, virtual_host = VH} = C1, S1) -> notify_auth_result(Username, user_authentication_success, [], C1, S1), rabbit_core_metrics:auth_attempt_succeeded(Host, Username, stream), - ?LOG_DEBUG("Successfully checked updated secret for username '~ts'", + ?LOG_DEBUG("Stream connection has successfully checked updated secret for username '~ts'", [Username]), try - ?LOG_DEBUG("Checking vhost access after secret update"), + ?LOG_DEBUG("Checking virtual host access after secret update"), rabbit_access_control:check_vhost_access(NewUser, VH, {socket, S}, #{}), ?LOG_DEBUG("Checked vhost access"), @@ -2788,7 +2788,7 @@ complete_secret_update(NewUser = #user{username = Username}, {sasl_authenticate, ?RESPONSE_CODE_OK, <<>>}} catch exit:#amqp_error{explanation = Explanation} -> - ?LOG_WARNING("Access to vhost failed after secret update: ~ts", + ?LOG_WARNING("Stream client can no longer access virtual host after a secret update: ~ts", [Explanation]), silent_close_delay(), {C1#stream_connection{connection_step = failure}, From e7634679d1265bbb75cd026974df3f8ddb810b96 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 19 Aug 2025 13:58:47 -0400 Subject: [PATCH 5/7] More log message edits #14403 --- deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 1184f7e861c2..61029f5c22c0 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2775,10 +2775,10 @@ complete_secret_update(NewUser = #user{username = Username}, virtual_host = VH} = C1, S1) -> notify_auth_result(Username, user_authentication_success, [], C1, S1), rabbit_core_metrics:auth_attempt_succeeded(Host, Username, stream), - ?LOG_DEBUG("Stream connection has successfully checked updated secret for username '~ts'", + ?LOG_DEBUG("Stream connection has successfully checked updated secret (token) for username '~ts'", [Username]), try - ?LOG_DEBUG("Checking virtual host access after secret update"), + ?LOG_DEBUG("Stream connection: will verify virtual host access after secret (token) update"), rabbit_access_control:check_vhost_access(NewUser, VH, {socket, S}, #{}), ?LOG_DEBUG("Checked vhost access"), @@ -2788,7 +2788,7 @@ complete_secret_update(NewUser = #user{username = Username}, {sasl_authenticate, ?RESPONSE_CODE_OK, <<>>}} catch exit:#amqp_error{explanation = Explanation} -> - ?LOG_WARNING("Stream client can no longer access virtual host after a secret update: ~ts", + ?LOG_WARNING("Stream connection no longer has the permissions to access its target virtual host after a secret (token) update: ~ts", [Explanation]), silent_close_delay(), {C1#stream_connection{connection_step = failure}, From a9f7bf1fbc58ac4202653a268679ead6cbed6c47 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 19 Aug 2025 14:05:31 -0400 Subject: [PATCH 6/7] Log virtual host name if updated token lacks the permissions for it #14403 --- deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 61029f5c22c0..9e1c4aef002b 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2788,8 +2788,8 @@ complete_secret_update(NewUser = #user{username = Username}, {sasl_authenticate, ?RESPONSE_CODE_OK, <<>>}} catch exit:#amqp_error{explanation = Explanation} -> - ?LOG_WARNING("Stream connection no longer has the permissions to access its target virtual host after a secret (token) update: ~ts", - [Explanation]), + ?LOG_WARNING("Stream connection no longer has the permissions to access its target virtual host ('~ts') after a secret (token) update: ~ts", + [VH, Explanation]), silent_close_delay(), {C1#stream_connection{connection_step = failure}, {sasl_authenticate, ?RESPONSE_VHOST_ACCESS_FAILURE, <<>>}} From 173876c9821e1f08b418e2834f72949850b1df85 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 19 Aug 2025 14:06:15 -0400 Subject: [PATCH 7/7] More log message edits #14403 --- deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 9e1c4aef002b..2e0143a7e94e 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2780,7 +2780,7 @@ complete_secret_update(NewUser = #user{username = Username}, try ?LOG_DEBUG("Stream connection: will verify virtual host access after secret (token) update"), rabbit_access_control:check_vhost_access(NewUser, VH, {socket, S}, #{}), - ?LOG_DEBUG("Checked vhost access"), + ?LOG_DEBUG("Stream connection: successfully re-verified virtual host access"), {C1#stream_connection{user = NewUser, authentication_state = done,