Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1e6f8ab

Browse files
lukebakkenbordingGsantomaggio
authored
Add dedicated exception forbasic.return messages. (#1832)
* Add dedicated exception for `basic.return` messages.Fixes#1831This PR adds the `PublishReturnException` class that includes theoriginating exchange and routing key for a `basic.return` message. Itshould be backwards-compatible in the API.* Add reply code and reply text to the new `PublishBasicException`exception type.* Fix bug in the code that did not set certain `PublishReturnException` data* Add exception messages* my 2cents.Add more conditions to the testSigned-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>* formattingSigned-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>---------Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>Co-authored-by: Brandon Ording <bording@gmail.com>Co-authored-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent9ecad93 commit1e6f8ab

File tree

5 files changed

+117
-7
lines changed

5 files changed

+117
-7
lines changed

‎projects/RabbitMQ.Client/Exceptions/PublishException.cs‎

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ public class PublishException : RabbitMQClientException
4242
privatebool_isReturn=false;
4343
privateulong_publishSequenceNumber=ulong.MinValue;
4444

45-
publicPublishException(ulongpublishSequenceNumber,boolisReturn):base()
45+
publicPublishException(ulongpublishSequenceNumber,boolisReturn):this(publishSequenceNumber,isReturn,"Message rejected by broker.")
46+
{
47+
}
48+
49+
publicPublishException(ulongpublishSequenceNumber,boolisReturn,stringmessage):base(message)
4650
{
4751
if(publishSequenceNumber==ulong.MinValue)
4852
{
@@ -63,4 +67,66 @@ public PublishException(ulong publishSequenceNumber, bool isReturn) : base()
6367
/// </summary>
6468
publiculongPublishSequenceNumber=>_publishSequenceNumber;
6569
}
70+
71+
/// <summary>
72+
/// Class for exceptions related to publisher confirmations
73+
/// or the <c>mandatory</c> flag, when <c>basic.return</c> is
74+
/// sent from the broker.
75+
/// </summary>
76+
publicclassPublishReturnException:PublishException
77+
{
78+
privatereadonlystring_exchange;
79+
privatereadonlystring_routingKey;
80+
privatereadonlyushort_replyCode;
81+
privatereadonlystring_replyText;
82+
83+
publicPublishReturnException(ulongpublishSequenceNumber,stringmessage,
84+
string?exchange=null,string?routingKey=null,
85+
ushort?replyCode=null,string?replyText=null)
86+
:base(publishSequenceNumber,true,message)
87+
{
88+
_exchange=exchange??string.Empty;
89+
_routingKey=routingKey??string.Empty;
90+
_replyCode=replyCode??0;
91+
_replyText=replyText??string.Empty;
92+
}
93+
94+
/// <summary>
95+
/// Get the exchange associated with this <c>basic.return</c>
96+
/// </summary>
97+
publicstringExchange=>_exchange;
98+
99+
/// <summary>
100+
/// Get the routing key associated with this <c>basic.return</c>
101+
/// </summary>
102+
publicstringRoutingKey=>_routingKey;
103+
104+
/// <summary>
105+
/// Get the reply code associated with this <c>basic.return</c>
106+
/// </summary>
107+
publicushortReplyCode=>_replyCode;
108+
109+
/// <summary>
110+
/// Get the reply text associated with this <c>basic.return</c>
111+
/// </summary>
112+
publicstringReplyText=>_replyText;
113+
}
114+
115+
internalstaticclassPublishExceptionFactory
116+
{
117+
internalstaticPublishExceptionCreate(boolisReturn,
118+
ulongdeliveryTag,string?exchange=null,string?routingKey=null,
119+
ushort?replyCode=null,string?replyText=null)
120+
{
121+
if(isReturn)
122+
{
123+
stringmessage=$"{replyCode}{replyText} Exchange:{exchange} Routing Key:{routingKey}";
124+
returnnewPublishReturnException(deliveryTag,message,exchange,routingKey,replyCode,replyText);
125+
}
126+
else
127+
{
128+
returnnewPublishException(deliveryTag,isReturn);
129+
}
130+
}
131+
}
66132
}

‎projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs‎

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,9 @@ private void HandleAck(ulong deliveryTag, bool multiple)
200200
}
201201

202202
[MethodImpl(MethodImplOptions.AggressiveInlining)]
203-
privatevoidHandleNack(ulongdeliveryTag,boolmultiple,boolisReturn)
203+
privatevoidHandleNack(ulongdeliveryTag,boolmultiple,boolisReturn,
204+
string?exchange=null,string?routingKey=null,
205+
ushort?replyCode=null,string?replyText=null)
204206
{
205207
if(ShouldHandleAckOrNack(deliveryTag))
206208
{
@@ -210,7 +212,9 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn)
210212
{
211213
if(pair.Key<=deliveryTag)
212214
{
213-
pair.Value.SetException(newPublishException(pair.Key,isReturn));
215+
PublishExceptionex=PublishExceptionFactory.Create(isReturn,pair.Key,
216+
exchange,routingKey,replyCode,replyText);
217+
pair.Value.SetException(ex);
214218
_confirmsTaskCompletionSources.Remove(pair.Key,out_);
215219
}
216220
}
@@ -219,7 +223,9 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn)
219223
{
220224
if(_confirmsTaskCompletionSources.Remove(deliveryTag,outTaskCompletionSource<bool>?tcs))
221225
{
222-
tcs.SetException(newPublishException(deliveryTag,isReturn));
226+
PublishExceptionex=PublishExceptionFactory.Create(isReturn,deliveryTag,
227+
exchange,routingKey,replyCode,replyText);
228+
tcs.SetException(ex);
223229
}
224230
}
225231
}
@@ -249,7 +255,9 @@ private void HandleReturn(BasicReturnEventArgs basicReturnEvent)
249255
}
250256
}
251257

252-
HandleNack(publishSequenceNumber,multiple:false,isReturn:true);
258+
HandleNack(publishSequenceNumber,multiple:false,isReturn:true,
259+
exchange:basicReturnEvent.Exchange,routingKey:basicReturnEvent.RoutingKey,
260+
replyCode:basicReturnEvent.ReplyCode,replyText:basicReturnEvent.ReplyText);
253261
}
254262
}
255263

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
RabbitMQ.Client.Exceptions.PublishException.PublishException(ulong publishSequenceNumber, bool isReturn, string! message) -> void
2+
RabbitMQ.Client.Exceptions.PublishReturnException
3+
RabbitMQ.Client.Exceptions.PublishReturnException.Exchange.get -> string!
4+
RabbitMQ.Client.Exceptions.PublishReturnException.PublishReturnException(ulong publishSequenceNumber, string! message, string? exchange = null, string? routingKey = null, ushort? replyCode = null, string? replyText = null) -> void
5+
RabbitMQ.Client.Exceptions.PublishReturnException.ReplyCode.get -> ushort
6+
RabbitMQ.Client.Exceptions.PublishReturnException.ReplyText.get -> string!
7+
RabbitMQ.Client.Exceptions.PublishReturnException.RoutingKey.get -> string!

‎projects/Test/Integration/TestBasicPublishAsync.cs‎

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved.
3030
//---------------------------------------------------------------------------
3131

32+
usingSystem;
3233
usingSystem.Threading.Tasks;
3334
usingRabbitMQ.Client;
35+
usingRabbitMQ.Client.Exceptions;
3436
usingXunit;
3537
usingXunit.Abstractions;
3638

@@ -49,7 +51,6 @@ public async Task TestQueuePurgeAsync()
4951

5052
varpublishSyncSource=newTaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
5153

52-
5354
QueueDeclareOkq=await_channel.QueueDeclareAsync(string.Empty,false,false,true);
5455

5556
varpublishTask=Task.Run(async()=>
@@ -65,5 +66,28 @@ public async Task TestQueuePurgeAsync()
6566
Assert.True(awaitpublishSyncSource.Task);
6667
Assert.Equal((uint)messageCount,await_channel.QueuePurgeAsync(q));
6768
}
69+
70+
[Fact]
71+
publicasyncTaskTestBasicReturnAsync()
72+
{
73+
stringroutingKey=Guid.NewGuid().ToString();
74+
try
75+
{
76+
await_channel.BasicPublishAsync(exchange:string.Empty,routingKey:routingKey,
77+
mandatory:true,body:GetRandomBody());
78+
}
79+
catch(PublishReturnExceptionprex)
80+
{
81+
Assert.True(prex.IsReturn);
82+
Assert.NotNull(prex.Exchange);
83+
Assert.Equal(string.Empty,prex.Exchange);
84+
Assert.NotNull(prex.RoutingKey);
85+
Assert.Equal(routingKey,prex.RoutingKey);
86+
Assert.NotEqual(0,prex.ReplyCode);
87+
Assert.NotNull(prex.ReplyText);
88+
Assert.Equal("NO_ROUTE",prex.ReplyText);
89+
90+
}
91+
}
6892
}
6993
}

‎projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs‎

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,13 @@ await TestConcurrentOperationsAsync(async () =>
145145
}
146146
catch(PublishExceptionex)
147147
{
148-
if(ex.IsReturn)
148+
if(exisPublishReturnExceptionprex)
149149
{
150+
Assert.True(prex.IsReturn);
151+
Assert.NotNull(prex.Exchange);
152+
Assert.NotNull(prex.RoutingKey);
153+
Assert.NotEqual(0,prex.ReplyCode);
154+
Assert.NotNull(prex.ReplyText);
150155
Interlocked.Increment(reftotalReturnCount);
151156
}
152157
else

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp