Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit41d5b70

Browse files
authored
Fix support for unix sockets (#1995)
* Add support for changing the protocol type on socket level* Do not set NoDelay when not supported by protocol* Update release notes* Fix release notes
1 parentb476977 commit41d5b70

File tree

8 files changed

+64
-32
lines changed

8 files changed

+64
-32
lines changed

‎.github/workflows/ReleaseNotes.md‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
*[Core] Optimized packet serialization of PUBACK and PUBREC packets for protocol version 5.0.0 (#1939, thanks to@Y-Sindo).
22
*[Core] The package inspector is now fully async (#1941).
33
*[Core] Fixed decoding of DISCONNECT packet with empty body (#1994, thanks to@Y-Sindo).
4+
*[Client] Exposed the_EndPoint_ type to support other endpoint types (like Unix Domain Sockets) in client options (#1919).
5+
*[Client] Fixed support for unix sockets by exposing more options (#1995).
46
*[Client] Added a dedicated exception when the client is not connected (#1954, thanks to@marcpiulachs).
57
*[Client] The client will now throw a_MqttClientUnexpectedDisconnectReceivedException_ when publishing a QoS 0 message which leads to a server disconnect (BREAKING CHANGE!,#1974, thanks to@fazho).
68
*[Client] Exposed the certificate selection event handler in client options (#1984).

‎Source/MQTTnet.Tests/MqttTcpChannel_Tests.cs‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class MqttTcpChannel_Tests
1919
publicasyncTaskDispose_Channel_While_Used()
2020
{
2121
varct=newCancellationTokenSource();
22-
varserverSocket=newCrossPlatformSocket(AddressFamily.InterNetwork);
22+
varserverSocket=newCrossPlatformSocket(AddressFamily.InterNetwork,ProtocolType.Tcp);
2323

2424
try
2525
{
@@ -38,7 +38,7 @@ public async Task Dispose_Channel_While_Used()
3838
}
3939
},ct.Token);
4040

41-
varclientSocket=newCrossPlatformSocket(AddressFamily.InterNetwork);
41+
varclientSocket=newCrossPlatformSocket(AddressFamily.InterNetwork,ProtocolType.Tcp);
4242
awaitclientSocket.ConnectAsync("localhost",50001,CancellationToken.None);
4343

4444
vartcpChannel=newMqttTcpChannel(clientSocket.GetStream(),"test",null);

‎Source/MQTTnet.Tests/Server/Connection_Tests.cs‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public async Task Close_Idle_Connection_On_Connect()
2323
{
2424
awaittestEnvironment.StartServer(newMqttServerOptionsBuilder().WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(1)));
2525

26-
varclient=newCrossPlatformSocket(AddressFamily.InterNetwork);
26+
varclient=newCrossPlatformSocket(AddressFamily.InterNetwork,ProtocolType.Tcp);
2727
awaitclient.ConnectAsync("localhost",testEnvironment.ServerPort,CancellationToken.None);
2828

2929
// Don't send anything. The server should close the connection.
@@ -54,7 +54,7 @@ public async Task Send_Garbage()
5454

5555
// Send an invalid packet and ensure that the server will close the connection and stay in a waiting state
5656
// forever. This is security related.
57-
varclient=newCrossPlatformSocket(AddressFamily.InterNetwork);
57+
varclient=newCrossPlatformSocket(AddressFamily.InterNetwork,ProtocolType.Tcp);
5858
awaitclient.ConnectAsync("localhost",testEnvironment.ServerPort,CancellationToken.None);
5959

6060
varbuffer=Encoding.UTF8.GetBytes("Garbage");

‎Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs‎

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
usingSystem.Collections.Generic;
99
usingSystem.Linq;
1010
usingSystem.Net;
11+
usingSystem.Net.Sockets;
1112
usingSystem.Text;
1213
usingMQTTnet.Formatter;
1314
usingMQTTnet.Packets;
@@ -18,14 +19,14 @@ namespace MQTTnet.Client
1819
publicsealedclassMqttClientOptionsBuilder
1920
{
2021
readonlyMqttClientOptions_options=newMqttClientOptions();
22+
int?_port;
2123

2224
[Obsolete]MqttClientWebSocketProxyOptions_proxyOptions;
25+
EndPoint_remoteEndPoint;
2326

2427
MqttClientTcpOptions_tcpOptions;
2528
MqttClientTlsOptions_tlsOptions;
26-
EndPoint_remoteEndPoint;
27-
int?_port;
28-
29+
2930
[Obsolete]MqttClientOptionsBuilderTlsParameters_tlsParameters;
3031

3132
MqttClientWebSocketOptions_webSocketOptions;
@@ -89,7 +90,7 @@ public MqttClientOptions Build()
8990

9091
if(_tcpOptions.RemoteEndpoint==null)
9192
{
92-
_tcpOptions.RemoteEndpoint=_remoteEndPoint;
93+
_tcpOptions.RemoteEndpoint=_remoteEndPoint;
9394
}
9495
}
9596
elseif(_webSocketOptions!=null)
@@ -114,6 +115,12 @@ public MqttClientOptions Build()
114115
return_options;
115116
}
116117

118+
publicMqttClientOptionsBuilderWithAddressFamily(AddressFamilyaddressFamily)
119+
{
120+
_tcpOptions.AddressFamily=addressFamily;
121+
returnthis;
122+
}
123+
117124
publicMqttClientOptionsBuilderWithAuthentication(stringmethod,byte[]data)
118125
{
119126
_options.AuthenticationMethod=method;
@@ -218,6 +225,14 @@ public MqttClientOptionsBuilder WithCredentials(IMqttClientCredentialsProvider c
218225
returnthis;
219226
}
220227

228+
publicMqttClientOptionsBuilderWithEndPoint(EndPointendPoint)
229+
{
230+
_remoteEndPoint=endPoint??thrownewArgumentNullException(nameof(endPoint));
231+
_tcpOptions=newMqttClientTcpOptions();
232+
233+
returnthis;
234+
}
235+
221236
publicMqttClientOptionsBuilderWithExtendedAuthenticationExchangeHandler(IMqttExtendedAuthenticationExchangeHandlerhandler)
222237
{
223238
_options.ExtendedAuthenticationExchangeHandler=handler;
@@ -263,6 +278,12 @@ public MqttClientOptionsBuilder WithoutThrowOnNonSuccessfulConnectResponse()
263278
returnthis;
264279
}
265280

281+
publicMqttClientOptionsBuilderWithProtocolType(ProtocolTypeprotocolType)
282+
{
283+
_tcpOptions.ProtocolType=protocolType;
284+
returnthis;
285+
}
286+
266287
publicMqttClientOptionsBuilderWithProtocolVersion(MqttProtocolVersionvalue)
267288
{
268289
if(value==MqttProtocolVersion.Unknown)
@@ -344,14 +365,6 @@ public MqttClientOptionsBuilder WithTcpServer(string server, int? port = null)
344365

345366
returnthis;
346367
}
347-
348-
publicMqttClientOptionsBuilderWithEndPoint(EndPointendPoint)
349-
{
350-
_remoteEndPoint=endPoint??thrownewArgumentNullException(nameof(endPoint));
351-
_tcpOptions=newMqttClientTcpOptions();
352-
353-
returnthis;
354-
}
355368

356369
publicMqttClientOptionsBuilderWithTcpServer(Action<MqttClientTcpOptions>optionsBuilder)
357370
{

‎Source/MQTTnet/Client/Options/MqttClientTcpOptions.cs‎

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,20 @@ public sealed class MqttClientTcpOptions : IMqttClientChannelOptions
2828
/// </summary>
2929
publicEndPointLocalEndpoint{get;set;}
3030

31+
/// <summary>
32+
/// Enables or disables the Nagle algorithm for the socket.
33+
/// This is only supported for TCP.
34+
/// For other protocol types the value is ignored.
35+
/// Default: true
36+
/// </summary>
3137
publicboolNoDelay{get;set;}=true;
3238

39+
/// <summary>
40+
/// The MQTT transport is usually TCP but when using other endpoint types like
41+
/// unix sockets it must be changed (IP for unix sockets).
42+
/// </summary>
43+
publicProtocolTypeProtocolType{get;set;}=ProtocolType.Tcp;
44+
3345
publicEndPointRemoteEndpoint{get;set;}
3446

3547
publicMqttClientTlsOptionsTlsOptions{get;set;}=newMqttClientTlsOptions();

‎Source/MQTTnet/Implementations/CrossPlatformSocket.cs‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ public sealed class CrossPlatformSocket : IDisposable
2222

2323
NetworkStream_networkStream;
2424

25-
publicCrossPlatformSocket(AddressFamilyaddressFamily)
25+
publicCrossPlatformSocket(AddressFamilyaddressFamily,ProtocolTypeprotocolType)
2626
{
27-
_socket=newSocket(addressFamily,SocketType.Stream,ProtocolType.Tcp);
27+
_socket=newSocket(addressFamily,SocketType.Stream,protocolType);
2828

2929
#if!NET5_0_OR_GREATER
3030
_socketDisposeAction=_socket.Dispose;

‎Source/MQTTnet/Implementations/MqttTcpChannel.cs‎

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public async Task ConnectAsync(CancellationToken cancellationToken)
6767
}
6868
else
6969
{
70-
socket=newCrossPlatformSocket(_tcpOptions.AddressFamily);
70+
socket=newCrossPlatformSocket(_tcpOptions.AddressFamily,_tcpOptions.ProtocolType);
7171
}
7272

7373
if(_tcpOptions.LocalEndpoint!=null)
@@ -78,7 +78,12 @@ public async Task ConnectAsync(CancellationToken cancellationToken)
7878
socket.ReceiveBufferSize=_tcpOptions.BufferSize;
7979
socket.SendBufferSize=_tcpOptions.BufferSize;
8080
socket.SendTimeout=(int)_clientOptions.Timeout.TotalMilliseconds;
81-
socket.NoDelay=_tcpOptions.NoDelay;
81+
82+
if(_tcpOptions.ProtocolType==ProtocolType.Tcp)
83+
{
84+
// Other protocol types do not support the Nagle algorithm.
85+
socket.NoDelay=_tcpOptions.NoDelay;
86+
}
8287

8388
if(socket.LingerState!=null)
8489
{

‎Source/MQTTnet/Implementations/MqttTcpServerListener.cs‎

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public sealed class MqttTcpServerListener : IDisposable
2727
readonlyMqttServerOptions_serverOptions;
2828
readonlyMqttServerTcpEndpointBaseOptions_options;
2929
readonlyMqttServerTlsTcpEndpointOptions_tlsOptions;
30-
30+
3131
CrossPlatformSocket_socket;
3232
IPEndPoint_localEndPoint;
3333

@@ -65,7 +65,7 @@ public bool Start(bool treatErrorsAsWarning, CancellationToken cancellationToken
6565

6666
_logger.Info("Starting TCP listener (Endpoint={0}, TLS={1})",_localEndPoint,_tlsOptions?.CertificateProvider!=null);
6767

68-
_socket=newCrossPlatformSocket(_addressFamily);
68+
_socket=newCrossPlatformSocket(_addressFamily,ProtocolType.Tcp);
6969

7070
// Usage of socket options is described here: https://docs.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.setsocketoption?view=netcore-2.2
7171
if(_options.ReuseAddress)
@@ -87,33 +87,33 @@ public bool Start(bool treatErrorsAsWarning, CancellationToken cancellationToken
8787
{
8888
_socket.KeepAlive=_options.KeepAlive.Value;
8989
}
90-
90+
9191
if(_options.TcpKeepAliveInterval.HasValue)
9292
{
9393
_socket.TcpKeepAliveInterval=_options.TcpKeepAliveInterval.Value;
9494
}
95-
95+
9696
if(_options.TcpKeepAliveRetryCount.HasValue)
9797
{
9898
_socket.TcpKeepAliveInterval=_options.TcpKeepAliveRetryCount.Value;
9999
}
100-
100+
101101
if(_options.TcpKeepAliveTime.HasValue)
102102
{
103103
_socket.TcpKeepAliveTime=_options.TcpKeepAliveTime.Value;
104104
}
105-
105+
106106
_socket.Bind(_localEndPoint);
107-
107+
108108
// Get the local endpoint back from the socket. The port may have changed.
109109
// This can happen when port 0 is used. Then the OS will choose the next free port.
110110
_localEndPoint=(IPEndPoint)_socket.LocalEndPoint;
111111
_options.Port=_localEndPoint.Port;
112-
112+
113113
_socket.Listen(_options.ConnectionBacklog);
114-
114+
115115
_logger.Verbose("TCP listener started (Endpoint={0})",_localEndPoint);
116-
116+
117117
Task.Run(()=>AcceptClientConnectionsAsync(cancellationToken),cancellationToken).RunInBackground(_logger);
118118

119119
returntrue;
@@ -183,7 +183,7 @@ async Task TryHandleClientConnectionAsync(CrossPlatformSocket clientSocket)
183183
clientSocket.NoDelay=_options.NoDelay;
184184
stream=clientSocket.GetStream();
185185
varclientCertificate=_tlsOptions?.CertificateProvider?.GetCertificate();
186-
186+
187187
if(clientCertificate!=null)
188188
{
189189
if(!clientCertificate.HasPrivateKey)
@@ -228,7 +228,7 @@ await sslStream.AuthenticateAsServerAsync(
228228
vartcpChannel=newMqttTcpChannel(stream,remoteEndPoint,clientCertificate);
229229
varbufferWriter=newMqttBufferWriter(_serverOptions.WriterBufferSize,_serverOptions.WriterBufferSizeMax);
230230
varpacketFormatterAdapter=newMqttPacketFormatterAdapter(bufferWriter);
231-
231+
232232
using(varclientAdapter=newMqttChannelAdapter(tcpChannel,packetFormatterAdapter,_rootLogger))
233233
{
234234
clientAdapter.AllowPacketFragmentation=_options.AllowPacketFragmentation;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp