Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3a75b6c

Browse files
committed
rabbit_db: Eliminate thedelete_queue Khepri transaction
... by using `keep_while` conditions on bindings and auto-deleteexchanges.[Why]The `delete_queue` transaction's anonymous function has to be beextracted by Horus, like any Khepri transaction. This is an expensiveoperation, but Horus uses caching to avoid most work after the firstextraction.The problem is when there are many concurrent executions of the sametransaction, before it has been executed once: the cache is not hot andHorus has to extract the same transaction many times in parallelcurrently.An example of this situation is when there are massive disconnectionsfrom RabbitMQ clients that trigger massive queue deletions. This can puta lot of load on RabbitMQ.[How]This patch removes the entire transaction. Instead, it uses `keep_while`conditions on bindings and auto-delete exchanges to let Khepri handlethe deletion of semantically related tree nodes. RabbitMQ just has tomake a simle "delete this queue" command.
1 parentac86eb6 commit3a75b6c

File tree

3 files changed

+109
-26
lines changed

3 files changed

+109
-26
lines changed

‎deps/rabbit/src/rabbit_db_binding.erl‎

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
has_for_source_in_mnesia/1,
3636
has_for_source_in_khepri/1,
3737
match_source_and_destination_in_khepri_tx/2,
38-
clear_in_khepri/0
38+
clear_in_khepri/0,
39+
khepri_ret_to_deletions/2
3940
]).
4041

4142
-export([
@@ -201,6 +202,14 @@ create_in_khepri(#binding{source = SrcName,
201202
caseChecksFun(Src,Dst)of
202203
ok ->
203204
RoutePath=khepri_route_path(Binding),
205+
DstPath=caseDstNameof
206+
#resource{kind=queue} ->
207+
rabbit_db_queue:khepri_queue_path(DstName);
208+
#resource{kind=exchange} ->
209+
rabbit_db_exchange:khepri_exchange_path(DstName)
210+
end,
211+
KeepWhile= #{DstPath=>#if_node_exists{}},
212+
PutOptions= #{keep_while=>KeepWhile},
204213
MaybeSerial=rabbit_exchange:serialise_events(Src),
205214
Serial=rabbit_khepri:transaction(
206215
fun()->
@@ -210,11 +219,17 @@ create_in_khepri(#binding{source = SrcName,
210219
true ->
211220
already_exists;
212221
false ->
213-
ok=khepri_tx:put(RoutePath,sets:add_element(Binding,Set)),
222+
ok=khepri_tx:put(
223+
RoutePath,
224+
sets:add_element(Binding,Set),
225+
PutOptions),
214226
serial_in_khepri(MaybeSerial,Src)
215227
end;
216228
_ ->
217-
ok=khepri_tx:put(RoutePath,sets:add_element(Binding,sets:new([{version,2}]))),
229+
ok=khepri_tx:put(
230+
RoutePath,
231+
sets:add_element(Binding,sets:new([{version,2}])),
232+
PutOptions),
218233
serial_in_khepri(MaybeSerial,Src)
219234
end
220235
end,rw),
@@ -906,6 +921,7 @@ delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, na
906921
Name,
907922
?KHEPRI_WILDCARD_STAR),%% RoutingKey
908923
{ok,BindingsMap}=khepri_tx_adv:delete_many(Pattern),
924+
% logger:alert("BindingsMap = ~p", [BindingsMap]),
909925
Bindings=maps:fold(
910926
fun(Path,Props,Acc) ->
911927
case {Path,Props}of
@@ -920,6 +936,38 @@ delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, na
920936
rabbit_binding:group_bindings_fold(funmaybe_auto_delete_exchange_in_khepri/4,
921937
lists:keysort(#binding.source,Bindings),OnlyDurable).
922938

939+
khepri_ret_to_deletions(Deleted,OnlyDurable)->
940+
Bindings0=maps:fold(
941+
fun(Path,Props,Acc) ->
942+
case {Path,Props}of
943+
{?RABBITMQ_KHEPRI_ROUTE_PATH(
944+
_VHost,_SrcName,_Kind,_Name,_RoutingKey),
945+
#{data :=Set}} ->
946+
sets:to_list(Set)++Acc;
947+
{_,_} ->
948+
Acc
949+
end
950+
end, [],Deleted),
951+
Bindings1=lists:keysort(#binding.source,Bindings0),
952+
rabbit_binding:group_bindings_fold(
953+
fun(XName,Bindings,Deletions,_OnlyDurable) ->
954+
ExchangePath=rabbit_db_exchange:khepri_exchange_path(XName),
955+
caseDeletedof
956+
#{ExchangePath := #{data :=X}} ->
957+
rabbit_binding:add_deletion(
958+
XName,X,deleted,Bindings,Deletions);
959+
_ ->
960+
caserabbit_db_exchange:get(XName)of
961+
{ok,X} ->
962+
rabbit_binding:add_deletion(
963+
XName,X,not_deleted,Bindings,Deletions);
964+
_ ->
965+
Deletions
966+
end
967+
end
968+
end,
969+
Bindings1,OnlyDurable).
970+
923971
%% -------------------------------------------------------------------
924972
%% delete_transient_for_destination_in_mnesia().
925973
%% -------------------------------------------------------------------

‎deps/rabbit/src/rabbit_db_exchange.erl‎

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,24 @@ create_or_get_in_khepri(#exchange{name = XName} = X) ->
414414
Path0, [#if_any{conditions=
415415
[#if_node_exists{exists=false},
416416
#if_has_payload{has_payload=false}]}]),
417-
caserabbit_khepri:put(Path1,X)of
417+
Options=caseXof
418+
#exchange{name=#resource{virtual_host=VHost,
419+
name=Name},
420+
auto_delete=true} ->
421+
Path=rabbit_db_binding:khepri_route_path(
422+
VHost,
423+
Name,
424+
_Kind=?KHEPRI_WILDCARD_STAR,
425+
_DstName=?KHEPRI_WILDCARD_STAR,
426+
_RoutingKey=?KHEPRI_WILDCARD_STAR),
427+
KeepWhile= #{Path=>#if_all{conditions=
428+
[#if_node_exists{},
429+
#if_has_data{}]}},
430+
#{keep_while=>KeepWhile};
431+
_ ->
432+
#{}
433+
end,
434+
caserabbit_khepri:put(Path1,X,Options)of
418435
ok ->
419436
{new,X};
420437
{error, {khepri,mismatching_node, #{node_props := #{data :=ExistingX}}}} ->

‎deps/rabbit/src/rabbit_db_queue.erl‎

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -411,28 +411,46 @@ delete_in_khepri(QueueName) ->
411411
delete_in_khepri(QueueName,false).
412412

413413
delete_in_khepri(QueueName,OnlyDurable)->
414-
rabbit_khepri:transaction(
415-
fun ()->
416-
Path=khepri_queue_path(QueueName),
417-
UsesUniformWriteRet=try
418-
khepri_tx:does_api_comply_with(uniform_write_ret)
419-
catch
420-
error:undef ->
421-
false
422-
end,
423-
casekhepri_tx_adv:delete(Path)of
424-
{ok, #{Path := #{data :=_}}}whenUsesUniformWriteRet ->
425-
%% we want to execute some things, as decided by rabbit_exchange,
426-
%% after the transaction.
427-
rabbit_db_binding:delete_for_destination_in_khepri(QueueName,OnlyDurable);
428-
{ok, #{data :=_}}whennotUsesUniformWriteRet ->
429-
%% we want to execute some things, as decided by rabbit_exchange,
430-
%% after the transaction.
431-
rabbit_db_binding:delete_for_destination_in_khepri(QueueName,OnlyDurable);
432-
{ok,_} ->
433-
ok
434-
end
435-
end,rw).
414+
Path=khepri_queue_path(QueueName),
415+
FeatureFlag=true,
416+
caseFeatureFlagof
417+
true ->
418+
casekhepri_adv:delete(Path)of
419+
{ok, #{Path := #{data :=_}}=Deleted} ->
420+
%% we want to execute some things, as decided by
421+
%% rabbit_exchange, after the transaction.
422+
rabbit_db_binding:khepri_ret_to_deletions(
423+
Deleted,OnlyDurable);
424+
{ok,_} ->
425+
ok;
426+
{error,_}=Error ->
427+
Error
428+
end;
429+
false ->
430+
UsesUniformWriteRet=try
431+
khepri_tx:does_api_comply_with(uniform_write_ret)
432+
catch
433+
error:undef ->
434+
false
435+
end,
436+
rabbit_khepri:transaction(
437+
fun ()->
438+
Ret1=khepri_tx_adv:delete(Path),
439+
% logger:alert("Deleted queue ret = ~p", [Ret1]),
440+
caseRet1of
441+
{ok, #{Path := #{data :=_}}}whenUsesUniformWriteRet ->
442+
%% we want to execute some things, as decided by rabbit_exchange,
443+
%% after the transaction.
444+
rabbit_db_binding:delete_for_destination_in_khepri(QueueName,OnlyDurable);
445+
{ok, #{data :=_}}whennotUsesUniformWriteRet ->
446+
%% we want to execute some things, as decided by rabbit_exchange,
447+
%% after the transaction.
448+
rabbit_db_binding:delete_for_destination_in_khepri(QueueName,OnlyDurable);
449+
{ok,_} ->
450+
ok
451+
end
452+
end,rw)
453+
end.
436454

437455
%% -------------------------------------------------------------------
438456
%% internal_delete().

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp