Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Refactoring MQTTnet.AspNetCore#2103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
xljiulang wants to merge91 commits intodotnet:master
base:master
Choose a base branch
Loading
fromxljiulang:master
Open
Show file tree
Hide file tree
Changes from1 commit
Commits
Show all changes
91 commits
Select commitHold shift + click to select a range
897c551
Use IConnectionFactory to create ConnectionContext to replace SocketC…
xljiulangNov 7, 2024
f38cca3
Refactoring AspNetMQTT
xljiulangNov 12, 2024
d8c94bd
Separate AspNetCoreMqttServerAdapter from MqttConnectionHandler
xljiulangNov 13, 2024
73502e4
TryAdd IConnectionFactory as Singleton
xljiulangNov 13, 2024
3a641ab
Add some remarks
xljiulangNov 13, 2024
91763ae
Add AspNetCoreMqttNetLogger
xljiulangNov 13, 2024
9c14222
Delayed start of AspNetCoreMqttServer
xljiulangNov 13, 2024
2372352
Using fields to cache IHttpContextFeature
xljiulangNov 13, 2024
8d96e19
Update Server_ASP_NET_Samples
xljiulangNov 13, 2024
ad5c798
Use ActivatorUtilities to create TMQttServerWrapper
xljiulangNov 13, 2024
7f95f02
Add IMqttServerBuilder
xljiulangNov 13, 2024
7b44ea2
Add IMqttClientBuilder
xljiulangNov 13, 2024
73c1365
Conditionally load SocketConnectionFactoryAssembly
xljiulangNov 13, 2024
2c10d2c
Add LICENSE
xljiulangNov 13, 2024
6cda003
DynamicallyAccessedMembers
xljiulangNov 13, 2024
faaadbd
Inject IOptions<MqttServerOptionsBuilder>
xljiulangNov 13, 2024
13c9198
Change the namespace to MQTTnet.AspNetCore
xljiulangNov 13, 2024
b8d8abb
await for_aspNetCoreMqttServer.StartAsync
xljiulangNov 14, 2024
64ed638
enable Nullable
xljiulangNov 14, 2024
3526212
Always dispose _connection of AspNetCoreMqttChannelAdapter
xljiulangNov 14, 2024
b4b0b04
UseTls
xljiulangNov 14, 2024
d9e02ce
Restore the IMqttClientAdapterFactory interface
xljiulangNov 15, 2024
b5019bd
Calculate the property values ​​when constructing MqttChannel
xljiulangNov 15, 2024
c206a4a
MqttServerChannelAdapter is modified to inherit MqttChannel
xljiulangNov 15, 2024
0a216a9
Add ClientConnectionContext.WebSocket
xljiulangNov 15, 2024
580257a
Add LICENSE
xljiulangNov 15, 2024
4f6b7ae
Add support for CreateLowLevelMqttClient
xljiulangNov 15, 2024
b25159b
AddMqttClient: Use AspNetCoreMqttClientAdapterFactory as the default …
xljiulangNov 16, 2024
ecb404f
Check that UseMqtt() and MapMqtt() are used.
xljiulangNov 16, 2024
858a914
AspNetCoreMqttServerAdapter: Logging when MqttServerOptions are ignored
xljiulangNov 16, 2024
fedb631
Add IMqttServerBuilder.AddMqttServerAdapter() extensions
xljiulangNov 16, 2024
67239cd
Register MqttServerOptions and MqttServerStopOptions as services
xljiulangNov 16, 2024
e03b9b1
IMqttBuilder: Add IMqttBuilder.UseAspNetCoreMqttNetLogger() extension
xljiulangNov 16, 2024
014a50b
Remove some dead code.
xljiulangNov 16, 2024
ad62877
Apply the properties of MqttClientTcpOptions to the Socket
xljiulangNov 16, 2024
c49beb0
TlsConnectionFeature supports passing in ClientCertificate
xljiulangNov 17, 2024
4c55368
Add support for MqttPacketInspector
xljiulangNov 17, 2024
34a4db2
Add route syntax for pattern parameter.
xljiulangNov 17, 2024
6dc18e1
Add KestrelServerOptions.ListenMqtt() extensions.
xljiulangNov 17, 2024
67dfda4
Optimize the implementation of ListenMqtt.
xljiulangNov 17, 2024
5813a13
Adapt MqttServerTlsTcpEndpointOptions to HttpsConnectionAdapterOptions
xljiulangNov 17, 2024
7628614
Compatible with the default server certificate.
xljiulangNov 17, 2024
7d8562a
Supports both MQTT and MQTT over WebSocket on a ConnectionContext.
xljiulangNov 18, 2024
1bd5949
MqttProtocols adds WebSocket item.
xljiulangNov 18, 2024
c4d9a22
Make sure services.AddMqttServer() has been called before operating M…
xljiulangNov 18, 2024
62313c8
Simplify MqttConnectionMiddleware
xljiulangNov 18, 2024
e087acb
Improve the compatibility of wss connections.
xljiulangNov 18, 2024
db4614c
Update UnixSocket sample.
xljiulangNov 18, 2024
acf8ed7
Update benchmark
xljiulangNov 18, 2024
457dcc1
Enhanced IOptions of MQTT ServiceOptions
xljiulangNov 19, 2024
358c2c0
add IMqttServerBuilder.ConfigureMqttSocketTransport extension.
xljiulangNov 19, 2024
02778aa
Simplifying MqttOptionsFactory
xljiulangNov 19, 2024
b46b357
Optimizing MqttChannel.SendPacketAsync
xljiulangNov 19, 2024
38eb1c3
Check buffer IsEmpty.
xljiulangNov 20, 2024
f953d6d
Merge branch 'dotnet:master' into master
xljiulangNov 30, 2024
ef838f8
Register MqttClientFactory as a service.
xljiulangNov 30, 2024
fcad255
Merge branch 'dotnet:master' into master
xljiulangDec 1, 2024
72b42d5
MqttOptionsFactory.Build() -> MqttOptionsFactory.CreateOptions()
xljiulangDec 2, 2024
b40c8a1
Add MqttBufferWriterPool
xljiulangDec 2, 2024
4305ad7
Merge branch 'dotnet:master' into master
xljiulangDec 2, 2024
cb68a13
Merged from the master branch.
xljiulangDec 2, 2024
a2c014f
Adapt the RemoteEndPoint property.
xljiulangDec 2, 2024
dad4faa
Add MqttBufferWriterPoolOptions
xljiulangDec 3, 2024
6f60eae
Add more conditions to the pool of MqttBufferWriterPoolOptions.
xljiulangDec 3, 2024
cc64f95
Merge branch 'main'
xljiulangDec 4, 2024
54f0b4d
MqttBufferWriterPool: Implementing the IReadOnlyCollection interface.
xljiulangDec 4, 2024
ee4de82
ConfigureAwait(false)
xljiulangDec 4, 2024
b708344
MqttChannel: adapt AllowPacketFragmentation option.
xljiulangDec 4, 2024
13e8190
Fixed the issue that GetRemoteEndPoint did not use the remoteEndPoint…
xljiulangDec 4, 2024
4bfd77d
Add some unit tests.
xljiulangDec 4, 2024
9d9dd44
Add more unit test.
xljiulangDec 4, 2024
8b7c411
MqttBufferWriterPoolOptions: Renaming properties.
xljiulangDec 4, 2024
8a3624d
Rename and update benchmark.
xljiulangDec 5, 2024
06f03cb
Add AspNetCoreTestEnvironment to test environments.
xljiulangDec 6, 2024
146b161
CreateTestEnvironment -> CreateMQTTnetTestEnvironment
xljiulangDec 6, 2024
83148ca
Remove Google's connection test to avoid Google being blocked by SNI …
xljiulangDec 6, 2024
41af5c5
AspNetCoreTestEnvironment: Adapt logger.
xljiulangDec 6, 2024
f94e17a
wait with timeout.
xljiulangDec 6, 2024
efcd6ab
AspNetCoreTestEnvironment: Adjust all configurations to be consistent…
xljiulangDec 6, 2024
b035659
MqttChannel: Exception handling remains consistent with MqttChannelAd…
xljiulangDec 6, 2024
6171c81
CrossPlatformSocket_Tests: create a localhost web server for remote h…
xljiulangDec 6, 2024
9a7a8bd
Server-side adaptation of AllowPacketFragmentation options.
xljiulangDec 7, 2024
e01e5f3
Merge implementation of IsAllowPacketFragmentation.
xljiulangDec 8, 2024
bffe065
Added UseLogger overloaded method and renamed an internal method.
xljiulangDec 8, 2024
ca5d13d
Add some extension methods to IMqttChannelAdapter.
xljiulangDec 8, 2024
8ceba5f
Add IAspNetCoreMqttChannel and remove IAspNetCoreMqttChannelAdapter;
xljiulangDec 9, 2024
b65c176
MapMqtt: Restricted to WebSocket transport protocol.
xljiulangDec 9, 2024
51fb185
Adapt MqttServerTcpEndpointBaseOptions to the Socket accepted by kest…
xljiulangDec 9, 2024
79f4f68
SocketOptionName.ReuseAddress can only be used for listening Socket s…
xljiulangDec 9, 2024
7139431
Accurately detect the DualMode value of listenSocket.
xljiulangDec 9, 2024
5c6e128
Merge branch 'dotnet:master' into master
xljiulangDec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
PrevPrevious commit
NextNext commit
MqttChannel: Exception handling remains consistent with MqttChannelAd…
…apter.
  • Loading branch information
@xljiulang
xljiulang committedDec 6, 2024
commitb0356594ce644c577c0fda483f1ba4d77b94f37d
103 changes: 97 additions & 6 deletionsSource/MQTTnet.AspnetCore/Internal/MqttChannel.cs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -12,8 +12,11 @@
using MQTTnet.Packets;
using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
Expand DownExpand Up@@ -106,8 +109,18 @@ private static bool IsTlsConnection(IHttpContextFeature? _httpContextFeature, IT

public async Task DisconnectAsync()
{
await _input.CompleteAsync().ConfigureAwait(false);
await _output.CompleteAsync().ConfigureAwait(false);
try
{
await _input.CompleteAsync().ConfigureAwait(false);
await _output.CompleteAsync().ConfigureAwait(false);
}
catch (Exception exception)
{
if (!WrapAndThrowException(exception))
{
throw;
}
}
}

public virtual void Dispose()
Expand All@@ -116,6 +129,29 @@ public virtual void Dispose()
}

public async Task<MqttPacket?> ReceivePacketAsync(CancellationToken cancellationToken)
{
try
{
return await ReceivePacketCoreAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
catch (ObjectDisposedException)
{
}
catch (Exception exception)
{
if (!WrapAndThrowException(exception))
{
throw;
}
}

return null;
}

private async Task<MqttPacket?> ReceivePacketCoreAsync(CancellationToken cancellationToken)
{
try
{
Expand DownExpand Up@@ -143,7 +179,7 @@ public virtual void Dispose()
{
if (!buffer.IsEmpty)
{
if (PacketFormatterAdapter.TryDecode(buffer,_packetInspector, out var packet, out consumed, out observed, out var received))
if (PacketFormatterAdapter.TryDecode(buffer,_packetInspector, out var packet, out consumed, out observed, out var received))
{
BytesReceived += received;

Expand All@@ -168,11 +204,11 @@ public virtual void Dispose()
}
}
}
catch (Exception exception)
catch (Exception)
{
// completing the channel makes sure that there is no more data read after a protocol error
_input.Complete(exception);
_output.Complete(exception);
await_input.CompleteAsync().ConfigureAwait(false);
await_output.CompleteAsync().ConfigureAwait(false);

throw;
}
Expand All@@ -188,6 +224,21 @@ public void ResetStatistics()
}

public async Task SendPacketAsync(MqttPacket packet, CancellationToken cancellationToken)
{
try
{
await SendPacketCoreAsync(packet, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
if (!WrapAndThrowException(exception))
{
throw;
}
}
}

private async Task SendPacketCoreAsync(MqttPacket packet, CancellationToken cancellationToken)
{
using (await _writerLock.EnterAsync(cancellationToken).ConfigureAwait(false))
{
Expand DownExpand Up@@ -241,4 +292,44 @@ static void WritePacketBuffer(PipeWriter output, MqttPacketBuffer buffer)
buffer.Payload.CopyTo(destination: span.Slice(offset));
output.Advance(buffer.Length);
}

public static bool WrapAndThrowException(Exception exception)
{
if (exception is OperationCanceledException ||
exception is MqttCommunicationTimedOutException ||
exception is MqttCommunicationException ||
exception is MqttProtocolViolationException)
{
return false;
}

if (exception is IOException && exception.InnerException is SocketException innerException)
{
exception = innerException;
}

if (exception is SocketException socketException)
{
if (socketException.SocketErrorCode == SocketError.OperationAborted)
{
throw new OperationCanceledException();
}

if (socketException.SocketErrorCode == SocketError.ConnectionAborted)
{
throw new MqttCommunicationException(socketException);
}
}

if (exception is COMException comException)
{
const uint ErrorOperationAborted = 0x800703E3;
if ((uint)comException.HResult == ErrorOperationAborted)
{
throw new OperationCanceledException();
}
}

throw new MqttCommunicationException(exception);
}
}
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -52,13 +52,23 @@ public MqttClientChannelAdapter(

public async Task ConnectAsync(CancellationToken cancellationToken)
{
_connection = _channelOptions switch
try
{
MqttClientTcpOptions tcpOptions => await ClientConnectionContext.CreateAsync(tcpOptions, cancellationToken).ConfigureAwait(false),
MqttClientWebSocketOptions webSocketOptions => await ClientConnectionContext.CreateAsync(webSocketOptions, cancellationToken).ConfigureAwait(false),
_ => throw new NotSupportedException(),
};
_channel = new MqttChannel(_packetFormatterAdapter, _connection, _packetInspector, _allowPacketFragmentation);
_connection = _channelOptions switch
{
MqttClientTcpOptions tcpOptions => await ClientConnectionContext.CreateAsync(tcpOptions, cancellationToken).ConfigureAwait(false),
MqttClientWebSocketOptions webSocketOptions => await ClientConnectionContext.CreateAsync(webSocketOptions, cancellationToken).ConfigureAwait(false),
_ => throw new NotSupportedException(),
};
_channel = new MqttChannel(_packetFormatterAdapter, _connection, _packetInspector, _allowPacketFragmentation);
}
catch (Exception ex)
{
if (!MqttChannel.WrapAndThrowException(ex))
{
throw;
}
}
}

public Task DisconnectAsync(CancellationToken cancellationToken)
Expand Down
2 changes: 1 addition & 1 deletionSource/MQTTnet.Tests/ASP/MqttConnectionContextTest.cs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -37,7 +37,7 @@ public async Task TestCorruptedConnectPacket()
await Assert.ThrowsExceptionAsync<MqttProtocolViolationException>(() => ctx.ReceivePacketAsync(CancellationToken.None));

// the first exception should complete the pipes so if someone tries to use the connection after that it should throw immidiatly
await Assert.ThrowsExceptionAsync<InvalidOperationException>(() => ctx.ReceivePacketAsync(CancellationToken.None));
await Assert.ThrowsExceptionAsync<MqttCommunicationException>(() => ctx.ReceivePacketAsync(CancellationToken.None));
}

// TODO: Fix test
Expand Down
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -80,7 +80,7 @@ public async Task Connect_To_Wrong_Host()
[TestMethod]
public async Task Loose_Connection()
{
using var testEnvironments =CreateMQTTnetTestEnvironment();
using var testEnvironments =CreateMixedTestEnvironment();
foreach (var testEnvironment in testEnvironments)
{
testEnvironment.IgnoreServerLogErrors = true;
Expand Down
16 changes: 2 additions & 14 deletionsSource/MQTTnet.Tests/MQTTv5/Server_Tests.cs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -182,21 +182,9 @@ public async Task Reconnect_with_different_SessionExpiryInterval()
}

[TestMethod]
public async Task Disconnect_with_Reason_MQTTnet()
{
using var testEnvironments = CreateMQTTnetTestEnvironment();
await Disconnect_with_Reason(testEnvironments);
}

[TestMethod]
public async Task Disconnect_with_Reason_AspNetCore()
{
using var testEnvironments = CreateAspNetCoreTestEnvironment();
await Disconnect_with_Reason(testEnvironments);
}

private async Task Disconnect_with_Reason(TestEnvironmentCollection testEnvironments)
public async Task Disconnect_with_Reason()
{
using var testEnvironments = CreateMixedTestEnvironment();
foreach (var testEnvironment in testEnvironments)
{
var disconnectReasonTaskSource = new TaskCompletionSource<MqttClientDisconnectReason>();
Expand Down
33 changes: 16 additions & 17 deletionsSource/MQTTnet.Tests/Mockups/AspNetCoreTestEnvironment.cs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -8,6 +8,7 @@
using Microsoft.Extensions.Hosting;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.AspNetCore;
using MQTTnet.Diagnostics.Logger;
using MQTTnet.Formatter;
using MQTTnet.Internal;
using MQTTnet.LowLevelClient;
Expand DownExpand Up@@ -47,8 +48,11 @@ protected override ILowLevelMqttClient CreateLowLevelClientCore()
private IMqttClientFactory CreateClientFactory()
{
var services = new ServiceCollection();
var clientBuilder = services.AddMqttClient();
UseMqttLogger(clientBuilder, "[CLIENT]=>");

var logger = EnableLogger ? (IMqttNetLogger)ClientLogger : MqttNetNullLogger.Instance;
services.AddSingleton(logger);
services.AddMqttClient();

return services.BuildServiceProvider().GetRequiredService<IMqttClientFactory>();
}

Expand All@@ -67,6 +71,7 @@ public override Task<MqttServer> StartServer(Action<MqttServerOptionsBuilder> co
public override Task<MqttServer> StartServer(MqttServerOptionsBuilder optionsBuilder)
{
optionsBuilder.WithDefaultEndpoint();
optionsBuilder.WithDefaultEndpointPort(ServerPort);
optionsBuilder.WithMaxPendingMessagesPerClient(int.MaxValue);
var serverOptions = optionsBuilder.Build();
return StartServer(serverOptions);
Expand All@@ -88,14 +93,14 @@ private async Task<MqttServer> StartServer(MqttServerOptions serverOptions)
var appBuilder = WebApplication.CreateBuilder();
appBuilder.Services.AddSingleton(serverOptions);

var serverBuilder = appBuilder.Services.AddMqttServer();
UseMqttLogger(serverBuilder, "[SERVER]=>");
var logger = EnableLogger ? (IMqttNetLogger)ServerLogger : new MqttNetNullLogger();
appBuilder.Services.AddSingleton(logger);
appBuilder.Services.AddMqttServer();

appBuilder.WebHost.UseKestrel(k => k.ListenMqtt());
appBuilder.Host.ConfigureHostOptions(h => h.ShutdownTimeout = TimeSpan.FromMilliseconds(500d));

_app = appBuilder.Build();
await _app.StartAsync();

Server = _app.Services.GetRequiredService<MqttServer>();
ServerPort = serverOptions.DefaultEndpointOptions.Port;
Expand All@@ -115,6 +120,12 @@ private async Task<MqttServer> StartServer(MqttServerOptions serverOptions)
return CompletedTask.Instance;
};

var appStartedSource = new TaskCompletionSource();
_app.Lifetime.ApplicationStarted.Register(() => appStartedSource.TrySetResult());

await _app.StartAsync();
await appStartedSource.Task;

return Server;
}

Expand All@@ -132,18 +143,6 @@ private static int GetServerPort()
return port;
}

private void UseMqttLogger(IMqttBuilder builder, string categoryNamePrefix)
{
if (EnableLogger)
{
builder.UseAspNetCoreMqttNetLogger(l => l.CategoryNamePrefix = categoryNamePrefix);
}
else
{
builder.UseMqttNetNullLogger();
}
}

public override void Dispose()
{
base.Dispose();
Expand Down
16 changes: 2 additions & 14 deletionsSource/MQTTnet.Tests/Server/Keep_Alive_Tests.cs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -17,21 +17,9 @@ namespace MQTTnet.Tests.Server
public sealed class KeepAlive_Tests : BaseTestClass
{
[TestMethod]
public async Task Disconnect_Client_DueTo_KeepAlive_MQTTnet()
{
using var testEnvironments = CreateMQTTnetTestEnvironment();
await Disconnect_Client_DueTo_KeepAlive(testEnvironments);
}

[TestMethod]
public async Task Disconnect_Client_DueTo_KeepAlive_AspNetCore()
{
using var testEnvironments = CreateAspNetCoreTestEnvironment();
await Disconnect_Client_DueTo_KeepAlive(testEnvironments);
}

private async Task Disconnect_Client_DueTo_KeepAlive(TestEnvironmentCollection testEnvironments)
public async Task Disconnect_Client_DueTo_KeepAlive()
{
using var testEnvironments = CreateMixedTestEnvironment();
foreach (var testEnvironment in testEnvironments)
{
await testEnvironment.StartServer();
Expand Down
17 changes: 2 additions & 15 deletionsSource/MQTTnet.Tests/Server/Session_Tests.cs
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -228,22 +228,9 @@ public async Task Retry_If_Not_PubAck(MqttQualityOfServiceLevel qos)
}

[TestMethod]
public async Task Session_Takeover_MQTTnet()
{
using var testEnvironments = CreateMQTTnetTestEnvironment();
await Session_Takeover(testEnvironments);
}

[TestMethod]
public async Task Session_Takeover_AspNetCore()
{
using var testEnvironments = CreateAspNetCoreTestEnvironment();
await Session_Takeover(testEnvironments);
}


private async Task Session_Takeover(TestEnvironmentCollection testEnvironments)
public async Task Session_Takeover()
{
using var testEnvironments = CreateMixedTestEnvironment();
foreach (var testEnvironment in testEnvironments)
{
await testEnvironment.StartServer();
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp