Skip to content

Commit 960918e

Browse files
committed
Fix quorum queue drop-head dead letter order
Prior to this commit, quorum queue at-most-once dead lettering for the overflow behaviour `drop-head` was dead lettering in the wrong order.
1 parent f4d290a commit 960918e

File tree

2 files changed

+61
-14
lines changed

2 files changed

+61
-14
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1639,30 +1639,32 @@ drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects) ->
16391639
#?STATE{cfg = #cfg{dead_letter_handler = DLH},
16401640
dlx = DlxState} = State = State3,
16411641
{_, DlxEffects} = rabbit_fifo_dlx:discard([Msg], maxlen, DLH, DlxState),
1642-
{State, combine_effects(DlxEffects, Effects)};
1642+
{State, add_drop_head_effects(DlxEffects, Effects)};
16431643
empty ->
16441644
{State0, Effects}
16451645
end.
16461646

1647-
%% combine global counter update effects to avoid bulding a huge list of
1648-
%% effects if many messages are dropped at the same time as could happen
1649-
%% when the `max_length' is changed via a configuration update.
1650-
combine_effects([{mod_call,
1651-
rabbit_global_counters,
1652-
messages_dead_lettered,
1653-
[Reason, rabbit_quorum_queue, Type, NewLen]}],
1654-
[{mod_call,
1655-
rabbit_global_counters,
1656-
messages_dead_lettered,
1657-
[Reason, rabbit_quorum_queue, Type, PrevLen]} | Rem]) ->
1647+
add_drop_head_effects([{mod_call,
1648+
rabbit_global_counters,
1649+
messages_dead_lettered,
1650+
[Reason, rabbit_quorum_queue, Type, NewLen]}],
1651+
[{mod_call,
1652+
rabbit_global_counters,
1653+
messages_dead_lettered,
1654+
[Reason, rabbit_quorum_queue, Type, PrevLen]} | Rem]) ->
1655+
%% combine global counter update effects to avoid bulding a huge list of
1656+
%% effects if many messages are dropped at the same time as could happen
1657+
%% when the `max_length' is changed via a configuration update.
16581658
[{mod_call,
16591659
rabbit_global_counters,
16601660
messages_dead_lettered,
16611661
[Reason, rabbit_quorum_queue, Type, PrevLen + NewLen]} | Rem];
1662-
combine_effects(New, Old) ->
1662+
add_drop_head_effects([{log, _, _}] = DlxEffs, Effs) ->
1663+
%% dead letter in the correct order
1664+
Effs ++ DlxEffs;
1665+
add_drop_head_effects(New, Old) ->
16631666
New ++ Old.
16641667

1665-
16661668
maybe_set_msg_ttl(Msg, RaCmdTs, Header,
16671669
#?STATE{cfg = #cfg{msg_ttl = MsgTTL}}) ->
16681670
case mc:is(Msg) of

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ all_tests() ->
167167
subscribe_redelivery_count,
168168
message_bytes_metrics,
169169
queue_length_limit_drop_head,
170+
queue_length_bytes_limit_drop_head,
170171
queue_length_limit_reject_publish,
171172
queue_length_limit_policy_cleared,
172173
subscribe_redelivery_limit,
@@ -3669,6 +3670,50 @@ queue_length_limit_drop_head(Config) ->
36693670
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
36703671
no_ack = true})).
36713672

3673+
queue_length_bytes_limit_drop_head(Config) ->
3674+
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
3675+
3676+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
3677+
QQ = ?config(queue_name, Config),
3678+
DLQ = <<"dead letter queue">>,
3679+
3680+
?assertEqual({'queue.declare_ok', DLQ, 0, 0},
3681+
declare(Ch, DLQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
3682+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
3683+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
3684+
{<<"x-overflow">>, longstr, <<"drop-head">>},
3685+
{<<"x-max-length-bytes">>, long, 1000},
3686+
{<<"x-dead-letter-exchange">>, longstr, <<>>},
3687+
{<<"x-dead-letter-routing-key">>, longstr, DLQ}])),
3688+
3689+
LargePayload = binary:copy(<<"x">>, 1500),
3690+
ok = amqp_channel:cast(Ch,
3691+
#'basic.publish'{routing_key = QQ},
3692+
#amqp_msg{payload = <<"m1">>}),
3693+
ok = amqp_channel:cast(Ch,
3694+
#'basic.publish'{routing_key = QQ},
3695+
#amqp_msg{payload = <<"m2">>}),
3696+
ok = amqp_channel:cast(Ch,
3697+
#'basic.publish'{routing_key = QQ},
3698+
#amqp_msg{payload = LargePayload}),
3699+
wait_for_consensus(QQ, Config),
3700+
wait_for_consensus(DLQ, Config),
3701+
RaName = ra_name(DLQ),
3702+
wait_for_messages_ready(Servers, RaName, 3),
3703+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
3704+
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
3705+
no_ack = true})),
3706+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
3707+
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
3708+
no_ack = true})),
3709+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = LargePayload}},
3710+
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
3711+
no_ack = true})),
3712+
3713+
[?assertEqual(#'queue.delete_ok'{message_count = 0},
3714+
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
3715+
|| Q <- [QQ, DLQ]].
3716+
36723717
queue_length_limit_reject_publish(Config) ->
36733718
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
36743719

0 commit comments

Comments
 (0)