Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb5b6cdf

Browse files
committed
Temp commit: Add support for failover
1 parent487fdd6 commitb5b6cdf

File tree

11 files changed

+467
-140
lines changed

11 files changed

+467
-140
lines changed

‎r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionStrategy.java‎

Lines changed: 85 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
packageio.asyncer.r2dbc.mysql;
1818

1919
importio.asyncer.r2dbc.mysql.client.Client;
20+
importio.asyncer.r2dbc.mysql.client.ReactorNettyClient;
2021
importio.asyncer.r2dbc.mysql.constant.CompressionAlgorithm;
2122
importio.asyncer.r2dbc.mysql.constant.SslMode;
2223
importio.netty.channel.ChannelOption;
@@ -27,13 +28,17 @@
2728
importio.netty.util.concurrent.EventExecutor;
2829
importio.netty.util.internal.logging.InternalLogger;
2930
importio.netty.util.internal.logging.InternalLoggerFactory;
31+
importio.r2dbc.spi.R2dbcNonTransientResourceException;
32+
importorg.jetbrains.annotations.Nullable;
3033
importreactor.core.publisher.Mono;
3134
importreactor.netty.resources.LoopResources;
3235
importreactor.netty.tcp.TcpClient;
3336

3437
importjava.net.InetSocketAddress;
3538
importjava.time.Duration;
3639
importjava.util.Set;
40+
importjava.util.function.Function;
41+
importjava.util.function.Supplier;
3742

3843
/**
3944
* An interface of a connection strategy that considers how to obtain a MySQL {@link Client} object.
@@ -50,7 +55,7 @@ interface ConnectionStrategy {
5055
*
5156
* @return a logged-in {@link Client} object.
5257
*/
53-
Mono<Client>connect();
58+
Mono<?extendsClient>connect();
5459

5560
/**
5661
* Creates a general-purpose {@link TcpClient} with the given {@link SocketClientConfiguration}.
@@ -88,7 +93,7 @@ static TcpClient createTcpClient(SocketClientConfiguration configuration, boolea
8893
* @param configuration a configuration that affects login behavior.
8994
* @return a logged-in {@link Client} object.
9095
*/
91-
staticMono<Client>login(
96+
staticMono<ReactorNettyClient>login(
9297
TcpClienttcpClient,
9398
Credentialcredential,
9499
MySqlConnectionConfigurationconfiguration
@@ -108,33 +113,91 @@ static Mono<Client> login(
108113
configuration.retrieveConnectionZoneId()
109114
);
110115

111-
returnClient.connect(tcpClient,ssl,context).flatMap(client ->
116+
returnReactorNettyClient.connect(tcpClient,ssl,context).flatMap(client ->
112117
QueryFlow.login(client,sslMode,loginDb,credential,compressionAlgorithms,zstdLevel));
113118
}
114-
}
115-
116-
/**
117-
* Resolves the {@link InetSocketAddress} to IP address, randomly pick one if it resolves to multiple IP addresses.
118-
*
119-
* @since 1.2.0
120-
*/
121-
finalclassBalancedResolverGroupextendsAddressResolverGroup<InetSocketAddress> {
122119

123-
BalancedResolverGroup() {
120+
/**
121+
* Creates an exception that indicates a retry failure.
122+
*
123+
* @param message the message of the exception.
124+
* @param cause the last exception that caused the retry.
125+
* @return a retry failure exception.
126+
*/
127+
staticR2dbcNonTransientResourceExceptionretryFail(Stringmessage,@NullableThrowablecause) {
128+
returnnewR2dbcNonTransientResourceException(
129+
message,
130+
"H1000",
131+
9000,
132+
cause
133+
);
124134
}
125135

126-
publicstaticfinalBalancedResolverGroupINSTANCE;
136+
/**
137+
* Connect and login to a MySQL server with a specific TCP socket address.
138+
*
139+
* @since 1.2.0
140+
*/
141+
finalclassInetConnectFunctionimplementsFunction<Supplier<InetSocketAddress>,Mono<ReactorNettyClient>> {
142+
143+
privatefinalbooleanbalancedDns;
144+
145+
privatefinalbooleantcpKeepAlive;
146+
147+
privatefinalbooleantcpNoDelay;
127148

128-
static {
129-
INSTANCE =newBalancedResolverGroup();
130-
Runtime.getRuntime().addShutdownHook(newThread(
131-
INSTANCE::close,
132-
"R2DBC-MySQL-BalancedResolverGroup-ShutdownHook"
133-
));
149+
privatefinalCredentialcredential;
150+
151+
privatefinalMySqlConnectionConfigurationconfiguration;
152+
153+
InetConnectFunction(
154+
booleanbalancedDns,
155+
booleantcpKeepAlive,
156+
booleantcpNoDelay,
157+
Credentialcredential,
158+
MySqlConnectionConfigurationconfiguration
159+
) {
160+
this.balancedDns =balancedDns;
161+
this.tcpKeepAlive =tcpKeepAlive;
162+
this.tcpNoDelay =tcpNoDelay;
163+
this.credential =credential;
164+
this.configuration =configuration;
165+
}
166+
167+
@Override
168+
publicMono<ReactorNettyClient>apply(Supplier<InetSocketAddress>address) {
169+
TcpClientcc =ConnectionStrategy.createTcpClient(configuration.getClient(),balancedDns)
170+
.option(ChannelOption.SO_KEEPALIVE,tcpKeepAlive)
171+
.option(ChannelOption.TCP_NODELAY,tcpNoDelay)
172+
.remoteAddress(address);
173+
174+
returnConnectionStrategy.login(cc,credential,configuration);
175+
}
134176
}
135177

136-
@Override
137-
protectedAddressResolver<InetSocketAddress>newResolver(EventExecutorexecutor) {
138-
returnnewRoundRobinInetAddressResolver(executor,newDefaultNameResolver(executor)).asAddressResolver();
178+
/**
179+
* Resolves the {@link InetSocketAddress} to IP address, randomly pick one if it resolves to multiple IP addresses.
180+
*
181+
* @since 1.2.0
182+
*/
183+
finalclassBalancedResolverGroupextendsAddressResolverGroup<InetSocketAddress> {
184+
185+
BalancedResolverGroup() {
186+
}
187+
188+
publicstaticfinalBalancedResolverGroupINSTANCE;
189+
190+
static {
191+
INSTANCE =newBalancedResolverGroup();
192+
Runtime.getRuntime().addShutdownHook(newThread(
193+
INSTANCE::close,
194+
"R2DBC-MySQL-BalancedResolverGroup-ShutdownHook"
195+
));
196+
}
197+
198+
@Override
199+
protectedAddressResolver<InetSocketAddress>newResolver(EventExecutorexecutor) {
200+
returnnewRoundRobinInetAddressResolver(executor,newDefaultNameResolver(executor)).asAddressResolver();
201+
}
139202
}
140203
}

‎r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MultiHostsConnectionStrategy.java‎

Lines changed: 103 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,17 @@
1717
packageio.asyncer.r2dbc.mysql;
1818

1919
importio.asyncer.r2dbc.mysql.client.Client;
20+
importio.asyncer.r2dbc.mysql.client.FailoverClient;
21+
importio.asyncer.r2dbc.mysql.client.ReactorNettyClient;
2022
importio.asyncer.r2dbc.mysql.constant.ProtocolDriver;
2123
importio.asyncer.r2dbc.mysql.internal.NodeAddress;
2224
importio.asyncer.r2dbc.mysql.internal.util.InternalArrays;
23-
importio.netty.channel.ChannelOption;
2425
importio.netty.resolver.DefaultNameResolver;
2526
importio.netty.resolver.NameResolver;
2627
importio.netty.util.concurrent.Future;
27-
importio.r2dbc.spi.R2dbcNonTransientResourceException;
28-
importorg.jetbrains.annotations.Nullable;
2928
importreactor.core.publisher.Flux;
3029
importreactor.core.publisher.Mono;
3130
importreactor.netty.resources.LoopResources;
32-
importreactor.netty.tcp.TcpClient;
3331
importreactor.netty.tcp.TcpResources;
3432

3533
importjava.net.InetAddress;
@@ -46,101 +44,152 @@
4644
*/
4745
finalclassMultiHostsConnectionStrategyimplementsConnectionStrategy {
4846

49-
privatefinalMono<Client>client;
47+
privatefinalMono<?extendsClient>client;
5048

5149
MultiHostsConnectionStrategy(
52-
TcpSocketConfigurationtcp,
5350
MySqlConnectionConfigurationconfiguration,
54-
booleanshuffle
51+
List<NodeAddress>addresses,
52+
ProtocolDriverdriver,
53+
intretriesAllDown,
54+
booleanshuffle,
55+
booleantcpKeepAlive,
56+
booleantcpNoDelay
5557
) {
56-
this.client =Mono.defer(() -> {
57-
if (ProtocolDriver.DNS_SRV.equals(tcp.getDriver())) {
58+
Mono<ReactorNettyClient>client =configuration.getCredential().flatMap(credential -> {
59+
if (ProtocolDriver.DNS_SRV.equals(driver)) {
60+
logger.debug("Resolve hosts via DNS SRV: {}",addresses);
61+
5862
LoopResourcesresources =configuration.getClient().getLoopResources();
5963
LoopResourcesloopResources =resources ==null ?TcpResources.get() :resources;
60-
61-
returnresolveAllHosts(loopResources,tcp.getAddresses(),shuffle)
62-
.flatMap(addresses ->connectHost(addresses,tcp,configuration,false,shuffle,0));
64+
InetConnectFunctionlogin =newInetConnectFunction(
65+
false,
66+
tcpKeepAlive,
67+
tcpNoDelay,
68+
credential,
69+
configuration
70+
);
71+
72+
returnresolveAllHosts(loopResources,addresses,shuffle).flatMap(addrs -> {
73+
logger.debug("Connect to multiple addresses: {}",addrs);
74+
75+
returnconnectHost(
76+
addrs,
77+
login,
78+
shuffle,
79+
0,
80+
retriesAllDown
81+
);
82+
});
6383
}else {
64-
List<NodeAddress>availableHosts =copyAvailableAddresses(tcp.getAddresses(),shuffle);
84+
List<NodeAddress>availableHosts =copyAvailableAddresses(addresses,shuffle);
85+
logger.debug("Connect to multiple hosts: {}",availableHosts);
86+
6587
intsize =availableHosts.size();
66-
InetSocketAddress[]addresses =newInetSocketAddress[availableHosts.size()];
88+
InetSocketAddress[]array =newInetSocketAddress[availableHosts.size()];
6789

6890
for (inti =0;i <size;i++) {
69-
NodeAddressaddress =availableHosts.get(i);
70-
addresses[i] =InetSocketAddress.createUnresolved(address.getHost(),address.getPort());
91+
array[i] =availableHosts.get(i).toUnresolved();
7192
}
7293

73-
returnconnectHost(InternalArrays.asImmutableList(addresses),tcp,configuration,true,shuffle,0);
94+
List<InetSocketAddress>addrs =InternalArrays.asImmutableList(array);
95+
InetConnectFunctionlogin =newInetConnectFunction(
96+
true,
97+
tcpKeepAlive,
98+
tcpNoDelay,
99+
credential,
100+
configuration
101+
);
102+
103+
returnconnectHost(
104+
addrs,
105+
login,
106+
shuffle,
107+
0,
108+
retriesAllDown
109+
);
74110
}
75111
});
112+
113+
this.client =client.map(c ->newFailoverClient(c,client));
76114
}
77115

78116
@Override
79-
publicMono<Client>connect() {
117+
publicMono<?extendsClient>connect() {
80118
returnclient;
81119
}
82120

83-
privateMono<Client>connectHost(
121+
privatestaticMono<ReactorNettyClient>connectHost(
84122
List<InetSocketAddress>addresses,
85-
TcpSocketConfigurationtcp,
86-
MySqlConnectionConfigurationconfiguration,
87-
booleanbalancedDns,
123+
InetConnectFunctionlogin,
88124
booleanshuffle,
89-
intattempts
125+
intattempts,
126+
intmaxAttempts
90127
) {
91128
Iterator<InetSocketAddress>iter =addresses.iterator();
92129

93130
if (!iter.hasNext()) {
94-
returnMono.error(fail("Fail to establish connection: no available host",null));
131+
returnMono.error(ConnectionStrategy.retryFail("Fail to establish connection: no available host",null));
95132
}
96133

97-
returnattemptConnect(iter.next(),tcp,configuration,balancedDns).onErrorResume(t ->
98-
resumeConnect(t,addresses,iter,tcp,configuration,balancedDns,shuffle,attempts));
134+
InetSocketAddressaddress =iter.next();
135+
136+
returnlogin.apply(() ->address).onErrorResume(error ->resumeConnect(
137+
error,
138+
address,
139+
addresses,
140+
iter,
141+
login,
142+
shuffle,
143+
attempts,
144+
maxAttempts
145+
));
99146
}
100147

101-
privateMono<Client>resumeConnect(
148+
privatestaticMono<ReactorNettyClient>resumeConnect(
102149
Throwablet,
150+
InetSocketAddressfailed,
103151
List<InetSocketAddress>addresses,
104152
Iterator<InetSocketAddress>iter,
105-
TcpSocketConfigurationtcp,
106-
MySqlConnectionConfigurationconfiguration,
107-
booleanbalancedDns,
153+
InetConnectFunctionlogin,
108154
booleanshuffle,
109-
intattempts
155+
intattempts,
156+
intmaxAttempts
110157
) {
158+
logger.warn("Fail to connect to {}",failed,t);
159+
111160
if (!iter.hasNext()) {
112161
// The last host failed to connect
113-
if (attempts >=tcp.getRetriesAllDown()) {
114-
returnMono.error(fail(
115-
"Fail to establishconnection, retried " +attempts +" times: " +t.getMessage(),t));
162+
if (attempts >=maxAttempts) {
163+
returnMono.error(ConnectionStrategy.retryFail(
164+
"Fail to establishconnections, retried " +attempts +" times",t));
116165
}
117166

118-
logger.warn("All hosts failed to establish connections, auto-try again after 250ms.");
167+
logger.warn("All hosts failed to establish connections, auto-try again after 250ms.",t);
119168

120169
// Ignore waiting error, e.g. interrupted, scheduler rejected
121170
returnMono.delay(Duration.ofMillis(250))
122171
.onErrorComplete()
123-
.then(Mono.defer(() ->connectHost(addresses,tcp,configuration,balancedDns,shuffle,attempts +1)));
172+
.then(Mono.defer(() ->connectHost(
173+
addresses,
174+
login,
175+
shuffle,
176+
attempts +1,
177+
maxAttempts
178+
)));
124179
}
125180

126-
returnattemptConnect(iter.next(),tcp,configuration,balancedDns).onErrorResume(tt ->
127-
resumeConnect(tt,addresses,iter,tcp,configuration,balancedDns,shuffle,attempts));
128-
}
129-
130-
privateMono<Client>attemptConnect(
131-
InetSocketAddressaddress,
132-
TcpSocketConfigurationtcp,
133-
MySqlConnectionConfigurationconfiguration,
134-
booleanbalancedDns
135-
) {
136-
returnconfiguration.getCredential().flatMap(credential -> {
137-
TcpClienttcpClient =ConnectionStrategy.createTcpClient(configuration.getClient(),balancedDns)
138-
.option(ChannelOption.SO_KEEPALIVE,tcp.isTcpKeepAlive())
139-
.option(ChannelOption.TCP_NODELAY,tcp.isTcpNoDelay())
140-
.remoteAddress(() ->address);
141-
142-
returnConnectionStrategy.login(tcpClient,credential,configuration);
143-
}).doOnError(e ->logger.warn("Fail to connect: ",e));
181+
InetSocketAddressaddress =iter.next();
182+
183+
returnlogin.apply(() ->address).onErrorResume(error ->resumeConnect(
184+
error,
185+
address,
186+
addresses,
187+
iter,
188+
login,
189+
shuffle,
190+
attempts,
191+
maxAttempts
192+
));
144193
}
145194

146195
privatestaticMono<List<InetSocketAddress>>resolveAllHosts(
@@ -199,13 +248,4 @@ private static List<NodeAddress> copyAvailableAddresses(List<NodeAddress> addres
199248

200249
returnInternalArrays.asImmutableList(addresses.toArray(newNodeAddress[0]));
201250
}
202-
203-
privatestaticR2dbcNonTransientResourceExceptionfail(Stringmessage,@NullableThrowablecause) {
204-
returnnewR2dbcNonTransientResourceException(
205-
message,
206-
"H1000",
207-
9000,
208-
cause
209-
);
210-
}
211251
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp