Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9c56475

Browse files
Merge pull request#15020 from rabbitmq/loic-cq-delivery-count-really
4.3: Implement AMQP-1.0 delivery-count for CQs
2 parents90fddc1 +960cd60 commit9c56475

10 files changed

+375
-161
lines changed

‎deps/rabbit/src/rabbit_amqqueue_process.erl‎

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -905,16 +905,32 @@ ack(AckTags, ChPid, State) ->
905905
State1#q{backing_queue_state=BQS1}
906906
end).
907907

908-
requeue(AckTags,ChPid,State)->
908+
requeue(AckTags,DelFailed,ChPid,State)->
909909
subtract_acks(ChPid,AckTags,State,
910-
fun (State1) ->requeue_and_run(AckTags,false,State1)end).
910+
fun (State1) ->requeue_and_run(AckTags,DelFailed,false,State1)end).
911+
912+
discard(AckTags,DelFailed,ChPid,State)->
913+
with_dlx(
914+
State#q.dlx,
915+
fun (X) ->subtract_acks(ChPid,AckTags,State,
916+
fun (State1) ->
917+
dead_letter_rejected_msgs(
918+
AckTags,DelFailed,X,State1)
919+
end)end,
920+
fun ()->rabbit_global_counters:messages_dead_lettered(rejected,rabbit_classic_queue,
921+
disabled,length(AckTags)),
922+
ack(AckTags,ChPid,State)end).
923+
924+
requeue_and_run(AckTags,ActiveConsumersChanged,State)->
925+
requeue_and_run(AckTags,true,ActiveConsumersChanged,State).
911926

912927
requeue_and_run(AckTags,
928+
DelFailed,
913929
ActiveConsumersChanged,
914930
#q{backing_queue=BQ,
915931
backing_queue_state=BQS0}=State0)->
916932
WasEmpty=BQ:is_empty(BQS0),
917-
{_MsgIds,BQS}=BQ:requeue(AckTags,BQS0),
933+
{_MsgIds,BQS}=BQ:requeue(AckTags,DelFailed,BQS0),
918934
State1=State0#q{backing_queue_state=BQS},
919935
{_Dropped,State2}=maybe_drop_head(State1),
920936
State3=drop_expired_msgs(State2),
@@ -1079,11 +1095,11 @@ dead_letter_expired_msgs(ExpirePred, X, State = #q{backing_queue = BQ}) ->
10791095
BQ:fetchwhile(ExpirePred,DLFun,Acc,BQS1)
10801096
end,expired,X,State).
10811097

1082-
dead_letter_rejected_msgs(AckTags,X,State=#q{backing_queue=BQ})->
1098+
dead_letter_rejected_msgs(AckTags,DelFailed,X,State=#q{backing_queue=BQ})->
10831099
{ok,State1}=
10841100
dead_letter_msgs(
10851101
fun (DLFun,Acc,BQS) ->
1086-
{Acc1,BQS1}=BQ:ackfold(DLFun,Acc,BQS,AckTags),
1102+
{Acc1,BQS1}=BQ:ackfold(DLFun,Acc,BQS,AckTags,DelFailed),
10871103
{ok,Acc1,BQS1}
10881104
end,rejected,X,State),
10891105
State1.
@@ -1258,7 +1274,8 @@ prioritise_cast(Msg, _Len, State) ->
12581274
delete_immediately ->8;
12591275
{delete_exclusive,_Pid} ->8;
12601276
{run_backing_queue,_Mod,_Fun} ->6;
1261-
{ack,_AckTags,_ChPid} ->4;%% [1]
1277+
{ack,_AckTags,_ChPid} ->4;%% [1] %% @todo Remove when 'rabbitmq_4.3.0' FF is required.
1278+
{complete,_AckTags,_ChPid} ->4;%% [1]
12621279
{resume,_ChPid} ->3;
12631280
{notify_sent,_ChPid,_Credit} ->consumer_bias(State,0,2);
12641281
_ ->0
@@ -1527,23 +1544,28 @@ handle_cast({deliver,
15271544
State1=State#q{senders=Senders1},
15281545
noreply(maybe_deliver_or_enqueue(Delivery,Delivered,State1));
15291546

1547+
%% Compat for RabbitMQ 4.2. @todo Remove when 'rabbitmq_4.3.0' FF is required.
15301548
handle_cast({ack,AckTags,ChPid},State)->
1549+
handle_cast({complete,AckTags,ChPid},State);
1550+
handle_cast({reject,true,AckTags,ChPid},State)->
1551+
handle_cast({requeue,AckTags,ChPid},State);
1552+
handle_cast({reject,false,AckTags,ChPid},State)->
1553+
handle_cast({discard,AckTags,ChPid},State);
1554+
1555+
handle_cast({complete,AckTags,ChPid},State)->
15311556
noreply(ack(AckTags,ChPid,State));
15321557

1533-
handle_cast({reject,true,AckTags,ChPid},State)->
1534-
noreply(requeue(AckTags,ChPid,State));
1558+
handle_cast({requeue,AckTags,ChPid},State)->
1559+
noreply(requeue(AckTags,false,ChPid,State));
15351560

1536-
handle_cast({reject,false,AckTags,ChPid},State)->
1537-
noreply(with_dlx(
1538-
State#q.dlx,
1539-
fun (X) ->subtract_acks(ChPid,AckTags,State,
1540-
fun (State1) ->
1541-
dead_letter_rejected_msgs(
1542-
AckTags,X,State1)
1543-
end)end,
1544-
fun ()->rabbit_global_counters:messages_dead_lettered(rejected,rabbit_classic_queue,
1545-
disabled,length(AckTags)),
1546-
ack(AckTags,ChPid,State)end));
1561+
handle_cast({discard,AckTags,ChPid},State)->
1562+
noreply(discard(AckTags,true,ChPid,State));
1563+
1564+
handle_cast({modify,AckTags,DelFailed,false,_Anns,ChPid},State)->
1565+
noreply(requeue(AckTags,DelFailed,ChPid,State));
1566+
1567+
handle_cast({modify,AckTags,DelFailed,true,_Anns,ChPid},State)->
1568+
noreply(discard(AckTags,DelFailed,ChPid,State));
15471569

15481570
handle_cast({delete_exclusive,ConnPid},State)->
15491571
log_delete_exclusive(ConnPid,State),

‎deps/rabbit/src/rabbit_backing_queue.erl‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,11 @@
163163

164164
%% Reinsert messages into the queue which have already been delivered
165165
%% and were pending acknowledgement.
166-
-callbackrequeue([ack()],state())-> {msg_ids(),state()}.
166+
-callbackrequeue([ack()],boolean(),state())-> {msg_ids(),state()}.
167167

168168
%% Fold over messages by ack tag. The supplied function is called with
169169
%% each message, its ack tag, and an accumulator.
170-
-callbackackfold(msg_fun(A),A,state(), [ack()])-> {A,state()}.
170+
-callbackackfold(msg_fun(A),A,state(), [ack()],boolean())-> {A,state()}.
171171

172172
%% How long is my queue?
173173
-callbacklen(state())->non_neg_integer().

‎deps/rabbit/src/rabbit_classic_queue.erl‎

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -333,16 +333,31 @@ cancel(Q, Spec, State) ->
333333
-specsettle(rabbit_amqqueue:name(),rabbit_queue_type:settle_op(),
334334
rabbit_types:ctag(), [non_neg_integer()],state())->
335335
{state(),rabbit_queue_type:actions()}.
336-
settle(QName, {modify,_DelFailed,Undel,_Anns},CTag,MsgIds,State)->
336+
settle(QName,Op,CTag,MsgIds,State)->
337+
caserabbit_feature_flags:is_enabled('rabbitmq_4.3.0')of
338+
true ->settle_43(QName,Op,CTag,MsgIds,State);
339+
false ->settle_compat(QName,Op,CTag,MsgIds,State)
340+
end.
341+
342+
settle_43(_QName, {modify,DelFailed,Undel,Anns},_CTag,MsgIds,State= #?STATE{pid=Pid})->
343+
Arg= {modify,MsgIds,DelFailed,Undel,Anns,self()},
344+
delegate:invoke_no_result(Pid, {gen_server2,cast, [Arg]}),
345+
{State, []};
346+
settle_43(_QName,Op,_CTag,MsgIds,State= #?STATE{pid=Pid})->
347+
Arg= {Op,MsgIds,self()},
348+
delegate:invoke_no_result(Pid, {gen_server2,cast, [Arg]}),
349+
{State, []}.
350+
351+
settle_compat(QName, {modify,_DelFailed,Undel,_Anns},CTag,MsgIds,State)->
337352
%% translate modify into other op
338353
Op=caseUndelof
339354
true ->
340355
discard;
341356
false ->
342357
requeue
343358
end,
344-
settle(QName,Op,CTag,MsgIds,State);
345-
settle(_QName,Op,_CTag,MsgIds,State= #?STATE{pid=Pid})->
359+
settle_compat(QName,Op,CTag,MsgIds,State);
360+
settle_compat(_QName,Op,_CTag,MsgIds,State= #?STATE{pid=Pid})->
346361
Arg=caseOpof
347362
complete ->
348363
{ack,MsgIds,self()};
@@ -430,6 +445,8 @@ supports_stateful_delivery() -> true.
430445
deliver(Qs0,Msg0,Options)->
431446
%% add guid to content here instead of in rabbit_basic:message/3,
432447
%% as classic queues are the only ones that need it
448+
%% @todo Do we need to regenerate it for every time it gets dead lettered?
449+
%% We can likely do better and avoid rewriting to the shared message store.
433450
Msg=mc:prepare(store,mc:set_annotation(id,rabbit_guid:gen(),Msg0)),
434451
Mandatory=maps:get(mandatory,Options,false),
435452
MsgSeqNo=maps:get(correlation,Options,undefined),

‎deps/rabbit/src/rabbit_core_ff.erl‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,3 +218,10 @@
218218
depends_on=> ['rabbitmq_4.1.0'],
219219
callbacks=> #{enable=> {rabbit_khepri,enable_feature_flag}}
220220
}}).
221+
222+
-rabbit_feature_flag(
223+
{'rabbitmq_4.3.0',
224+
#{desc=>"Allows rolling upgrades to 4.3.x",
225+
stability=>stable,
226+
depends_on=> ['rabbitmq_4.2.0']
227+
}}).

‎deps/rabbit/src/rabbit_priority_queue.erl‎

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
-export([init/3,terminate/2,delete_and_terminate/2,delete_crashed/1,
2929
purge/1,purge_acks/1,
3030
publish/5,publish_delivered/4,discard/3,drain_confirmed/1,
31-
dropwhile/2,fetchwhile/4,fetch/2,drop/2,ack/2,requeue/2,
32-
ackfold/4,len/1,is_empty/1,depth/1,
31+
dropwhile/2,fetchwhile/4,fetch/2,drop/2,ack/2,requeue/3,
32+
ackfold/5,len/1,is_empty/1,depth/1,
3333
update_rates/1,needs_timeout/1,timeout/1,
3434
handle_pre_hibernate/1,resume/1,msg_rates/1,
3535
info/2,invoke/3,is_duplicate/2,
@@ -279,27 +279,27 @@ ack(AckTags, State = #state{bq = BQ}) ->
279279
ack(AckTags,State=#passthrough{bq=BQ,bqs=BQS})->
280280
?passthrough2(ack(AckTags,BQS)).
281281

282-
requeue(AckTags,State=#state{bq=BQ})->
282+
requeue(AckTags,DelFailed,State=#state{bq=BQ})->
283283
fold_by_acktags2(fun (AckTagsN,BQSN) ->
284-
BQ:requeue(AckTagsN,BQSN)
284+
BQ:requeue(AckTagsN,DelFailed,BQSN)
285285
end,AckTags,State);
286-
requeue(AckTags,State=#passthrough{bq=BQ,bqs=BQS})->
287-
?passthrough2(requeue(AckTags,BQS)).
286+
requeue(AckTags,DelFailed,State=#passthrough{bq=BQ,bqs=BQS})->
287+
?passthrough2(requeue(AckTags,DelFailed,BQS)).
288288

289289
%% Similar problem to fetchwhile/4
290-
ackfold(MsgFun,Acc,State=#state{bq=BQ},AckTags)->
290+
ackfold(MsgFun,Acc,State=#state{bq=BQ},AckTags,DelFailed)->
291291
AckTagsByPriority=partition_acktags(AckTags),
292292
fold2(
293293
fun (P,BQSN,AccN) ->
294294
casemaps:find(P,AckTagsByPriority)of
295295
{ok,ATagsN} -> {AccN1,BQSN1}=
296-
BQ:ackfold(MsgFun,AccN,BQSN,ATagsN),
296+
BQ:ackfold(MsgFun,AccN,BQSN,ATagsN,DelFailed),
297297
{priority_on_acktags(P,AccN1),BQSN1};
298298
error -> {AccN,BQSN}
299299
end
300300
end,Acc,State);
301-
ackfold(MsgFun,Acc,State=#passthrough{bq=BQ,bqs=BQS},AckTags)->
302-
?passthrough2(ackfold(MsgFun,Acc,BQS,AckTags)).
301+
ackfold(MsgFun,Acc,State=#passthrough{bq=BQ,bqs=BQS},AckTags,DelFailed)->
302+
?passthrough2(ackfold(MsgFun,Acc,BQS,AckTags,DelFailed)).
303303

304304
len(#state{bq=BQ,bqss=BQSs})->
305305
add0(fun (_P,BQSN) ->BQ:len(BQSN)end,BQSs);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp