Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -266,24 +266,25 @@ body_bin(#amqp10_msg{body = #'v1_0.amqp_value'{} = Body}) ->
%% following stucture:
%% {amqp10_disposition, {accepted | rejected, DeliveryTag}}
-spec new(delivery_tag(), amqp10_body() | binary(), boolean()) -> amqp10_msg().
new(DeliveryTag, Body, Settled) when is_binary(Body) ->
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
settled = Settled,
message_format = {uint, ?MESSAGE_FORMAT}},
body = [#'v1_0.data'{content = Body}]};
new(DeliveryTag, Bin, Settled) when is_binary(Bin) ->
Body = [#'v1_0.data'{content = Bin}],
new(DeliveryTag, Body, Settled);
new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
settled = Settled,
message_format = {uint, ?MESSAGE_FORMAT}},
body = Body}.
#amqp10_msg{
transfer = #'v1_0.transfer'{
delivery_tag = {binary, DeliveryTag},
settled = Settled,
message_format = {uint, ?MESSAGE_FORMAT}},
%% This lib is safe by default.
header = #'v1_0.header'{durable = true},
body = Body}.

%% @doc Create a new settled amqp10 message using the specified delivery tag
%% and body.
-spec new(delivery_tag(), amqp10_body() | binary()) -> amqp10_msg().
new(DeliveryTag, Body) ->
new(DeliveryTag, Body, false).


% First 3 octets are the format
% the last 1 octet is the version
% See 2.8.11 in the spec
Expand Down
42 changes: 20 additions & 22 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -251,30 +251,29 @@ routing_headers(Msg, Opts) ->
List = application_properties_as_simple_map(Msg, X),
maps:from_list(List).

get_property(durable, Msg) ->
case Msg of
#msg_body_encoded{header = #'v1_0.header'{durable = Durable}}
when is_boolean(Durable) ->
Durable;
get_property(durable, #msg_body_encoded{header = Header} = Msg) ->
case Header of
#'v1_0.header'{durable = D} when is_boolean(D) ->
D;
_ ->
%% fallback in case the source protocol was old AMQP 0.9.1
case message_annotation(<<"x-basic-delivery-mode">>, Msg, undefined) of
{ubyte, 1} ->
false;
{ubyte, 2} ->
true;
_ ->
true
false
end
end;
get_property(timestamp, Msg) ->
case Msg of
#msg_body_encoded{properties = #'v1_0.properties'{creation_time = {timestamp, Ts}}} ->
get_property(timestamp, #msg_body_encoded{properties = Properties}) ->
case Properties of
#'v1_0.properties'{creation_time = {timestamp, Ts}} ->
Ts;
_ ->
undefined
end;
get_property(ttl, Msg) ->
case Msg of
#msg_body_encoded{header = #'v1_0.header'{ttl = {uint, Ttl}}} ->
get_property(ttl, #msg_body_encoded{header = Header} = Msg) ->
case Header of
#'v1_0.header'{ttl = {uint, Ttl}} ->
Ttl;
_ ->
%% fallback in case the source protocol was AMQP 0.9.1
Expand All @@ -286,9 +285,9 @@ get_property(ttl, Msg) ->
undefined
end
end;
get_property(priority, Msg) ->
case Msg of
#msg_body_encoded{header = #'v1_0.header'{priority = {ubyte, Priority}}} ->
get_property(priority, #msg_body_encoded{header = Header} = Msg) ->
case Header of
#'v1_0.header'{priority = {ubyte, Priority}} ->
Priority;
_ ->
%% fallback in case the source protocol was AMQP 0.9.1
Expand Down Expand Up @@ -319,10 +318,7 @@ protocol_state(#msg_body_encoded{header = Header0,
[encode(Sections), BareAndFooter];
protocol_state(#v1{message_annotations = MA0,
bare_and_footer = BareAndFooter}, Anns) ->
Durable = case Anns of
#{?ANN_DURABLE := D} -> D;
_ -> true
end,
Durable = maps:get(?ANN_DURABLE, Anns, true),
Priority = case Anns of
#{?ANN_PRIORITY := P}
when is_integer(P) ->
Expand Down Expand Up @@ -667,7 +663,9 @@ binary_part_bare_and_footer(Payload, Start) ->
binary_part(Payload, Start, byte_size(Payload) - Start).

update_header_from_anns(undefined, Anns) ->
update_header_from_anns(#'v1_0.header'{durable = true}, Anns);
Durable = maps:get(?ANN_DURABLE, Anns, true),
Header = #'v1_0.header'{durable = Durable},
update_header_from_anns(Header, Anns);
update_header_from_anns(Header, Anns) ->
DeliveryCount = case Anns of
#{delivery_count := C} -> C;
Expand Down
104 changes: 99 additions & 5 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ groups() ->
sender_settle_mode_unsettled,
sender_settle_mode_unsettled_fanout,
sender_settle_mode_mixed,
durable_field_classic_queue,
durable_field_quorum_queue,
durable_field_stream,
invalid_transfer_settled_flag,
quorum_queue_rejects,
receiver_settle_mode_first,
Expand Down Expand Up @@ -916,6 +919,77 @@ sender_settle_mode_mixed(Config) ->
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
ok = close(Init).

durable_field_classic_queue(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
durable_field(Config, <<"classic">>, QName).

durable_field_quorum_queue(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
durable_field(Config, <<"quorum">>, QName).

durable_field_stream(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
durable_field(Config, <<"stream">>, QName).

durable_field(Config, QueueType, QName)
when is_binary(QueueType) ->
Address = rabbitmq_amqp_address:queue(QName),
{_Connection, Session, LinkPair} = Init = init(Config),
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QueueType}}},
{ok, #{type := QueueType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps),
{ok, Sender} = amqp10_client:attach_sender_link(
Session, <<"test-sender">>, Address, unsettled),
wait_for_credit(Sender),

ok = amqp10_client:send_msg(Sender,
amqp10_msg:set_headers(
#{durable => true},
amqp10_msg:new(<<"t1">>, <<"durable">>))),
ok = amqp10_client:send_msg(Sender,
amqp10_msg:set_headers(
#{durable => false},
amqp10_msg:new(<<"t2">>, <<"non-durable">>))),
%% Even though the AMQP spec defines durable=false as default
%% (i.e. durable is false if the field is omitted on the wire),
%% we expect our AMQP Erlang library to be safe by default,
%% and therefore send the message as durable=true on behalf of us.
ok = amqp10_client:send_msg(
Sender, amqp10_msg:new(<<"t3">>, <<"lib publishes as durable by default">>)),
%% When we expliclitly publish without a header section, RabbitMQ should interpret
%% durable as false according to the AMQP spec.
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:from_amqp_records(
[#'v1_0.transfer'{delivery_tag = {binary, <<"t4">>},
settled = false,
message_format = {uint, 0}},
#'v1_0.data'{content = <<"publish without header section">>}])),

ok = wait_for_accepts(4),
ok = detach_link_sync(Sender),
flush(sent),

Filter = consume_from_first(QueueType),
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"test-receiver">>, Address, unsettled,
none, Filter),

ok = amqp10_client:flow_link_credit(Receiver, 4, never),
[M1, M2, M3, M4] = Msgs = receive_messages(Receiver, 4),
?assertEqual(<<"durable">>, amqp10_msg:body_bin(M1)),
?assertMatch(#{durable := true}, amqp10_msg:headers(M1)),
?assertEqual(<<"non-durable">>, amqp10_msg:body_bin(M2)),
?assertMatch(#{durable := false}, amqp10_msg:headers(M2)),
?assertEqual(<<"lib publishes as durable by default">>, amqp10_msg:body_bin(M3)),
?assertMatch(#{durable := true}, amqp10_msg:headers(M3)),
?assertEqual(<<"publish without header section">>, amqp10_msg:body_bin(M4)),
?assertMatch(#{durable := false}, amqp10_msg:headers(M4)),
[ok = amqp10_client:accept_msg(Receiver, M) || M <- Msgs],

ok = detach_link_sync(Receiver),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
close(Init).

invalid_transfer_settled_flag(Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
Expand Down Expand Up @@ -1301,7 +1375,7 @@ amqp_amqpl(QType, Config) ->
Body6 = [#'v1_0.data'{content = <<0, 1>>},
#'v1_0.data'{content = <<2, 3>>}],

%% Send only body sections
%% Send only header and body sections
[ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<>>, Body, true)) ||
Body <- [Body1, Body2, Body3, Body4, Body5, Body6]],
%% Send with application-properties
Expand Down Expand Up @@ -1342,6 +1416,11 @@ amqp_amqpl(QType, Config) ->
#{<<"x-array">> => {array, utf8, [{utf8, <<"e1">>},
{utf8, <<"e2">>}]}},
amqp10_msg:new(<<>>, Body1, true))),
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_headers(
#{durable => false},
amqp10_msg:new(<<>>, Body1, true))),

ok = amqp10_client:detach_link(Sender),
flush(detached),
Expand All @@ -1365,8 +1444,10 @@ amqp_amqpl(QType, Config) ->
receive {#'basic.deliver'{consumer_tag = CTag,
redelivered = false},
#amqp_msg{payload = Payload1,
props = #'P_basic'{type = <<"amqp-1.0">>}}} ->
?assertEqual([Body1], amqp10_framing:decode_bin(Payload1))
props = #'P_basic'{delivery_mode = DelMode2,
type = <<"amqp-1.0">>}}} ->
?assertEqual([Body1], amqp10_framing:decode_bin(Payload1)),
?assertEqual(2, DelMode2)
after 30000 -> ct:fail({missing_deliver, ?LINE})
end,
receive {_, #amqp_msg{payload = Payload2,
Expand Down Expand Up @@ -1428,6 +1509,12 @@ amqp_amqpl(QType, Config) ->
rabbit_misc:table_lookup(Headers11, <<"x-array">>))
after 30000 -> ct:fail({missing_deliver, ?LINE})
end,
receive {_, #amqp_msg{payload = Payload12,
props = #'P_basic'{delivery_mode = DelMode1}}} ->
?assertEqual([Body1], amqp10_framing:decode_bin(Payload12)),
?assertNotEqual(2, DelMode1)
after 30000 -> ct:fail({missing_deliver, ?LINE})
end,

ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
Expand Down Expand Up @@ -1514,10 +1601,17 @@ amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address) ->
amqp_channel:cast(
Ch,
#'basic.publish'{routing_key = QName},
#amqp_msg{props = #'P_basic'{headers = Amqp091Headers},
#amqp_msg{props = #'P_basic'{delivery_mode = 2,
priority = 5,
headers = Amqp091Headers},
payload = <<"foobar">>}),

{ok, [Msg]} = drain_queue(Session, Address, 1),

?assertMatch(#{durable := true,
priority := 5},
amqp10_msg:headers(Msg)),

Amqp10MA = amqp10_msg:message_annotations(Msg),
?assertEqual(<<"my-string">>, maps:get(<<"x-string">>, Amqp10MA, undefined)),
?assertEqual(92, maps:get(<<"x-int">>, Amqp10MA, undefined)),
Expand Down Expand Up @@ -3278,7 +3372,7 @@ max_message_size_client_to_server(Config) ->
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, mixed),
ok = wait_for_credit(Sender),

PayloadSmallEnough = binary:copy(<<0>>, MaxMessageSize - 10),
PayloadSmallEnough = binary:copy(<<0>>, MaxMessageSize - 20),
?assertEqual(ok,
amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, PayloadSmallEnough, false))),
ok = wait_for_accepted(<<"t1">>),
Expand Down
15 changes: 15 additions & 0 deletions release-notes/4.2.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,21 @@
RabbitMQ 4.2.0 is a new feature release.


## Breaking Changes and Compatibility Notes

### Default value for AMQP 1.0 `durable` field.

Starting with RabbitMQ 4.2, if a sending client omits the header section, RabbitMQ [assumes](https://github.com/rabbitmq/rabbitmq-server/pull/13918) the `durable` field to be `false` complying with the AMQP 1.0 spec:
```
<field name="durable" type="boolean" default="false"/>
```

AMQP 1.0 apps or client libraries must set the `durable` field of the header section to `true` to mark the message as durable.

Team RabbitMQ recommends client libraries to send messages as durable by default.
All AMQP 1.0 client libraries [maintained by Team RabbitMQ](https://www.rabbitmq.com/client-libraries/amqp-client-libraries) send messages as durable by default.


## Features

### Incoming and Outgoing Message Interceptors for native protocols
Expand Down
Loading