Skip to content

Commit c77751e

Browse files
Merge pull request #14403 from rabbitmq/stream-close-connection-if-unauthorized-vhost-after-secret-update
Close stream connection if unauthorized vhost after secret update
2 parents 4d60fc4 + 173876c commit c77751e

File tree

3 files changed

+88
-27
lines changed

3 files changed

+88
-27
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,12 @@ open(info, {OK, S, Data},
697697
{next_state, close_sent,
698698
StatemData#statem_data{connection = Connection1,
699699
connection_state = State1}};
700+
failure ->
701+
_ = demonitor_all_streams(Connection),
702+
?LOG_INFO("Force closing stream connection ~tp because of "
703+
"transition to invalid state",
704+
[self()]),
705+
{stop, {shutdown, <<"Invalid state">>}};
700706
_ ->
701707
State2 =
702708
case Blocked of
@@ -1502,7 +1508,6 @@ handle_frame_pre_auth(Transport,
15021508
send(Transport, S, F),
15031509
Connection#stream_connection{connection_step = failure}
15041510
end,
1505-
15061511
{Connection1, State};
15071512
handle_frame_pre_auth(_Transport, Connection, State, heartbeat) ->
15081513
?LOG_DEBUG("Received heartbeat frame pre auth"),
@@ -1586,6 +1591,7 @@ handle_frame_post_auth(Transport,
15861591
stream),
15871592
auth_fail(NewUsername, Msg, Args, C1, S1),
15881593
?LOG_WARNING(Msg, Args),
1594+
silent_close_delay(),
15891595
{C1#stream_connection{connection_step = failure},
15901596
{sasl_authenticate,
15911597
?RESPONSE_AUTHENTICATION_FAILURE, <<>>}};
@@ -1610,27 +1616,15 @@ handle_frame_post_auth(Transport,
16101616
Challenge}};
16111617
{ok, NewUser = #user{username = NewUsername}} ->
16121618
case NewUsername of
1613-
Username ->
1614-
rabbit_core_metrics:auth_attempt_succeeded(Host,
1615-
Username,
1616-
stream),
1617-
notify_auth_result(Username,
1618-
user_authentication_success,
1619-
[],
1620-
C1,
1621-
S1),
1622-
?LOG_DEBUG("Successfully updated secret for username '~ts'", [Username]),
1623-
{C1#stream_connection{user = NewUser,
1624-
authentication_state = done,
1625-
connection_step = authenticated},
1626-
{sasl_authenticate, ?RESPONSE_CODE_OK,
1627-
<<>>}};
1619+
Username ->
1620+
complete_secret_update(NewUser, C1, S1);
16281621
_ ->
16291622
rabbit_core_metrics:auth_attempt_failed(Host,
16301623
Username,
16311624
stream),
16321625
?LOG_WARNING("Not allowed to change username '~ts'. Only password",
16331626
[Username]),
1627+
silent_close_delay(),
16341628
{C1#stream_connection{connection_step =
16351629
failure},
16361630
{sasl_authenticate,
@@ -1652,6 +1646,7 @@ handle_frame_post_auth(Transport,
16521646
{OtherMechanism, _} ->
16531647
?LOG_WARNING("User '~ts' cannot change initial auth mechanism '~ts' for '~ts'",
16541648
[Username, NewMechanism, OtherMechanism]),
1649+
silent_close_delay(),
16551650
CmdBody =
16561651
{sasl_authenticate, ?RESPONSE_SASL_CANNOT_CHANGE_MECHANISM, <<>>},
16571652
Frame = rabbit_stream_core:frame({response, CorrelationId, CmdBody}),
@@ -2774,6 +2769,32 @@ handle_frame_post_auth(Transport,
27742769
increase_protocol_counter(?UNKNOWN_FRAME),
27752770
{Connection#stream_connection{connection_step = close_sent}, State}.
27762771

2772+
complete_secret_update(NewUser = #user{username = Username},
2773+
#stream_connection{host = Host,
2774+
socket = S,
2775+
virtual_host = VH} = C1, S1) ->
2776+
notify_auth_result(Username, user_authentication_success, [], C1, S1),
2777+
rabbit_core_metrics:auth_attempt_succeeded(Host, Username, stream),
2778+
?LOG_DEBUG("Stream connection has successfully checked updated secret (token) for username '~ts'",
2779+
[Username]),
2780+
try
2781+
?LOG_DEBUG("Stream connection: will verify virtual host access after secret (token) update"),
2782+
rabbit_access_control:check_vhost_access(NewUser, VH, {socket, S}, #{}),
2783+
?LOG_DEBUG("Stream connection: successfully re-verified virtual host access"),
2784+
2785+
{C1#stream_connection{user = NewUser,
2786+
authentication_state = done,
2787+
connection_step = authenticated},
2788+
{sasl_authenticate, ?RESPONSE_CODE_OK,
2789+
<<>>}}
2790+
catch exit:#amqp_error{explanation = Explanation} ->
2791+
?LOG_WARNING("Stream connection no longer has the permissions to access its target virtual host ('~ts') after a secret (token) update: ~ts",
2792+
[VH, Explanation]),
2793+
silent_close_delay(),
2794+
{C1#stream_connection{connection_step = failure},
2795+
{sasl_authenticate, ?RESPONSE_VHOST_ACCESS_FAILURE, <<>>}}
2796+
end.
2797+
27772798
process_client_command_versions(C, []) ->
27782799
C;
27792800
process_client_command_versions(C, [H | T]) ->

deps/rabbitmq_stream/src/rabbit_stream_reader.hrl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
-type publisher_reference() :: binary().
2121
-type subscription_id() :: byte().
2222
-type internal_id() :: integer().
23+
-type connection_step() :: tcp_connected | peer_properties_exchanged |
24+
authenticating | authenticated | tuning |
25+
tuned | opened | failure |
26+
closing | close_sent | closing_done.
2327

2428
-record(publisher,
2529
{publisher_id :: publisher_id(),
@@ -75,8 +79,7 @@
7579
credits :: atomics:atomics_ref(),
7680
user :: undefined | #user{},
7781
virtual_host :: undefined | binary(),
78-
connection_step ::
79-
atom(), % tcp_connected, peer_properties_exchanged, authenticating, authenticated, tuning, tuned, opened, failure, closing, closing_done
82+
connection_step :: connection_step(),
8083
frame_max :: integer(),
8184
heartbeat :: undefined | integer(),
8285
heartbeater :: any(),

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ groups() ->
4848
test_update_secret,
4949
cannot_update_username_after_authenticated,
5050
cannot_use_another_authmechanism_when_updating_secret,
51+
update_secret_should_close_connection_if_wrong_secret,
52+
update_secret_should_close_connection_if_unauthorized_vhost,
5153
unauthenticated_client_rejected_tcp_connected,
5254
timeout_tcp_connected,
5355
unauthenticated_client_rejected_peer_properties_exchanged,
@@ -165,6 +167,12 @@ init_per_testcase(cannot_update_username_after_authenticated = TestCase, Config)
165167
ok = rabbit_ct_broker_helpers:add_user(Config, <<"other">>),
166168
rabbit_ct_helpers:testcase_started(Config, TestCase);
167169

170+
init_per_testcase(update_secret_should_close_connection_if_unauthorized_vhost = TestCase,
171+
Config) ->
172+
ok = rabbit_ct_broker_helpers:add_user(Config, <<"other">>),
173+
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, <<"other">>, <<"/">>),
174+
rabbit_ct_helpers:testcase_started(Config, TestCase);
175+
168176
init_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) ->
169177
ok = rabbit_ct_broker_helpers:rpc(Config,
170178
0,
@@ -200,6 +208,11 @@ end_per_testcase(cannot_update_username_after_authenticated = TestCase, Config)
200208
ok = rabbit_ct_broker_helpers:delete_user(Config, <<"other">>),
201209
rabbit_ct_helpers:testcase_finished(Config, TestCase);
202210

211+
end_per_testcase(update_secret_should_close_connection_if_unauthorized_vhost = TestCase,
212+
Config) ->
213+
ok = rabbit_ct_broker_helpers:delete_user(Config, <<"other">>),
214+
rabbit_ct_helpers:testcase_finished(Config, TestCase);
215+
203216
end_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) ->
204217
ok = rabbit_ct_broker_helpers:rpc(Config,
205218
0,
@@ -285,29 +298,53 @@ test_update_secret(Config) ->
285298
{S, C0} = connect_and_authenticate(Transport, Config),
286299
rabbit_ct_broker_helpers:change_password(Config, <<"guest">>, <<"password">>),
287300
C1 = expect_successful_authentication(
288-
try_authenticate(Transport, S, C0, <<"PLAIN">>, <<"guest">>, <<"password">>)),
301+
try_authenticate(Transport, S, C0, <<"PLAIN">>, <<"guest">>, <<"password">>)),
289302
_C2 = test_close(Transport, S, C1),
290303
closed = wait_for_socket_close(Transport, S, 10),
291304
ok.
292305

293306
cannot_update_username_after_authenticated(Config) ->
294307
{S, C0} = connect_and_authenticate(gen_tcp, Config),
295-
C1 = expect_unsuccessful_authentication(
296-
try_authenticate(gen_tcp, S, C0, <<"PLAIN">>, <<"other">>, <<"other">>),
297-
?RESPONSE_SASL_CANNOT_CHANGE_USERNAME),
298-
_C2 = test_close(gen_tcp, S, C1),
308+
_C1 = expect_unsuccessful_authentication(
309+
try_authenticate(gen_tcp, S, C0, <<"PLAIN">>, <<"other">>, <<"other">>),
310+
?RESPONSE_SASL_CANNOT_CHANGE_USERNAME),
299311
closed = wait_for_socket_close(gen_tcp, S, 10),
300312
ok.
301313

302314
cannot_use_another_authmechanism_when_updating_secret(Config) ->
303315
{S, C0} = connect_and_authenticate(gen_tcp, Config),
304-
C1 = expect_unsuccessful_authentication(
305-
try_authenticate(gen_tcp, S, C0, <<"EXTERNAL">>, <<"guest">>, <<"new_password">>),
306-
?RESPONSE_SASL_CANNOT_CHANGE_MECHANISM),
307-
_C2 = test_close(gen_tcp, S, C1),
316+
_C1 = expect_unsuccessful_authentication(
317+
try_authenticate(gen_tcp, S, C0, <<"EXTERNAL">>, <<"guest">>, <<"new_password">>),
318+
?RESPONSE_SASL_CANNOT_CHANGE_MECHANISM),
308319
closed = wait_for_socket_close(gen_tcp, S, 10),
309320
ok.
310321

322+
update_secret_should_close_connection_if_wrong_secret(Config) ->
323+
Transport = gen_tcp,
324+
{S, C0} = connect_and_authenticate(Transport, Config),
325+
Pwd = rand:bytes(20),
326+
_C1 = expect_unsuccessful_authentication(
327+
try_authenticate(Transport, S, C0, <<"PLAIN">>, <<"guest">>, Pwd),
328+
?RESPONSE_AUTHENTICATION_FAILURE),
329+
closed = wait_for_socket_close(Transport, S, 10),
330+
ok.
331+
332+
update_secret_should_close_connection_if_unauthorized_vhost(Config) ->
333+
T = gen_tcp,
334+
Port = get_port(T, Config),
335+
Opts = get_opts(T),
336+
{ok, S} = T:connect("localhost", Port, Opts),
337+
C0 = rabbit_stream_core:init(0),
338+
C1 = test_peer_properties(T, S, C0),
339+
Username = <<"other">>,
340+
C2 = test_authenticate(T, S, C1, Username),
341+
ok = rabbit_ct_broker_helpers:clear_permissions(Config, Username, <<"/">>),
342+
_C3 = expect_unsuccessful_authentication(
343+
try_authenticate(gen_tcp, S, C2, <<"PLAIN">>, Username, Username),
344+
?RESPONSE_VHOST_ACCESS_FAILURE),
345+
closed = wait_for_socket_close(T, S, 10),
346+
ok.
347+
311348
test_stream_tls(Config) ->
312349
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
313350
test_server(ssl, Stream, Config),

0 commit comments

Comments
 (0)