Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit517cc4c

Browse files
committed
Add support for multiple hosts configuration
- Allow to use Mono for user and password- Add multiple hosts connection strategy- Add HA protocol support for multiple hosts- Allow to use DNS SRV records for HA protocol
1 parentdcbd67a commit517cc4c

File tree

47 files changed

+2560
-858
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2560
-858
lines changed
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright 2024 asyncer.io projects
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
packageio.asyncer.r2dbc.mysql;
18+
19+
importio.asyncer.r2dbc.mysql.client.Client;
20+
importio.asyncer.r2dbc.mysql.constant.CompressionAlgorithm;
21+
importio.asyncer.r2dbc.mysql.constant.SslMode;
22+
importio.netty.channel.ChannelOption;
23+
importio.netty.resolver.AddressResolver;
24+
importio.netty.resolver.AddressResolverGroup;
25+
importio.netty.resolver.DefaultNameResolver;
26+
importio.netty.resolver.RoundRobinInetAddressResolver;
27+
importio.netty.util.concurrent.EventExecutor;
28+
importio.netty.util.internal.logging.InternalLogger;
29+
importio.netty.util.internal.logging.InternalLoggerFactory;
30+
importreactor.core.publisher.Mono;
31+
importreactor.netty.resources.LoopResources;
32+
importreactor.netty.tcp.TcpClient;
33+
34+
importjava.net.InetSocketAddress;
35+
importjava.time.Duration;
36+
importjava.util.Set;
37+
38+
/**
39+
* An interface of a connection strategy that considers how to obtain a MySQL {@link Client} object.
40+
*
41+
* @since 1.2.0
42+
*/
43+
@FunctionalInterface
44+
interfaceConnectionStrategy {
45+
46+
InternalLoggerlogger =InternalLoggerFactory.getInstance(ConnectionStrategy.class);
47+
48+
/**
49+
* Establish a connection to a target server that is determined by this connection strategy.
50+
*
51+
* @return a logged-in {@link Client} object.
52+
*/
53+
Mono<Client>connect();
54+
55+
/**
56+
* Creates a general-purpose {@link TcpClient} with the given {@link SocketClientConfiguration}.
57+
* <p>
58+
* Note: Unix Domain Socket also uses this method to create a general-purpose {@link TcpClient client}.
59+
*
60+
* @param configuration socket client configuration.
61+
* @return a general-purpose {@link TcpClient client}.
62+
*/
63+
staticTcpClientcreateTcpClient(SocketClientConfigurationconfiguration,booleanbalancedDns) {
64+
LoopResourcesloopResources =configuration.getLoopResources();
65+
DurationconnectTimeout =configuration.getConnectTimeout();
66+
TcpClientclient =TcpClient.newConnection();
67+
68+
if (loopResources !=null) {
69+
client =client.runOn(loopResources);
70+
}
71+
72+
if (connectTimeout !=null) {
73+
client =client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,Math.toIntExact(connectTimeout.toMillis()));
74+
}
75+
76+
if (balancedDns) {
77+
client =client.resolver(BalancedResolverGroup.INSTANCE);
78+
}
79+
80+
returnclient;
81+
}
82+
83+
/**
84+
* Logins to a MySQL server with the given {@link TcpClient}, {@link Credential} and configurations.
85+
*
86+
* @param tcpClient a TCP client to connect to a MySQL server.
87+
* @param credential user and password to log in to a MySQL server.
88+
* @param configuration a configuration that affects login behavior.
89+
* @return a logged-in {@link Client} object.
90+
*/
91+
staticMono<Client>login(
92+
TcpClienttcpClient,
93+
Credentialcredential,
94+
MySqlConnectionConfigurationconfiguration
95+
) {
96+
MySqlSslConfigurationssl =configuration.getSsl();
97+
SslModesslMode =ssl.getSslMode();
98+
booleancreateDbIfNotExist =configuration.isCreateDatabaseIfNotExist();
99+
Stringdatabase =configuration.getDatabase();
100+
StringloginDb =createDbIfNotExist ?"" :database;
101+
Set<CompressionAlgorithm>compressionAlgorithms =configuration.getCompressionAlgorithms();
102+
intzstdLevel =configuration.getZstdCompressionLevel();
103+
ConnectionContextcontext =newConnectionContext(
104+
configuration.getZeroDateOption(),
105+
configuration.getLoadLocalInfilePath(),
106+
configuration.getLocalInfileBufferSize(),
107+
configuration.isPreserveInstants(),
108+
configuration.retrieveConnectionZoneId()
109+
);
110+
111+
returnClient.connect(tcpClient,ssl,context).flatMap(client ->
112+
QueryFlow.login(client,sslMode,loginDb,credential,compressionAlgorithms,zstdLevel,context));
113+
}
114+
}
115+
116+
/**
117+
* Resolves the {@link InetSocketAddress} to IP address, randomly select one if it resolves to multiple IP addresses.
118+
* <p>
119+
* Note: DNS resolution should have no relation to the connection strategy of HA protocol.
120+
*
121+
* @since 1.2.0
122+
*/
123+
finalclassBalancedResolverGroupextendsAddressResolverGroup<InetSocketAddress> {
124+
125+
BalancedResolverGroup() {
126+
}
127+
128+
publicstaticfinalBalancedResolverGroupINSTANCE;
129+
130+
static {
131+
INSTANCE =newBalancedResolverGroup();
132+
Runtime.getRuntime().addShutdownHook(newThread(
133+
INSTANCE::close,
134+
"R2DBC-MySQL-BalancedResolverGroup-ShutdownHook"
135+
));
136+
}
137+
138+
@Override
139+
protectedAddressResolver<InetSocketAddress>newResolver(EventExecutorexecutor) {
140+
returnnewRoundRobinInetAddressResolver(executor,newDefaultNameResolver(executor)).asAddressResolver();
141+
}
142+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2024 asyncer.io projects
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
packageio.asyncer.r2dbc.mysql;
18+
19+
importorg.jetbrains.annotations.Nullable;
20+
21+
importjava.util.Objects;
22+
importjava.util.Optional;
23+
24+
/**
25+
* A value object representing a user with an optional password.
26+
*/
27+
finalclassCredential {
28+
29+
privatefinalStringuser;
30+
31+
@Nullable
32+
privatefinalCharSequencepassword;
33+
34+
Credential(Stringuser,@NullableCharSequencepassword) {
35+
this.user =user;
36+
this.password =password;
37+
}
38+
39+
StringgetUser() {
40+
returnuser;
41+
}
42+
43+
@Nullable
44+
CharSequencegetPassword() {
45+
returnpassword;
46+
}
47+
48+
@Override
49+
publicbooleanequals(Objecto) {
50+
if (this ==o) {
51+
returntrue;
52+
}
53+
if (!(oinstanceofCredential)) {
54+
returnfalse;
55+
}
56+
57+
Credentialthat = (Credential)o;
58+
59+
returnuser.equals(that.user) &&Objects.equals(password,that.password);
60+
}
61+
62+
@Override
63+
publicinthashCode() {
64+
return31 *user.hashCode() +Objects.hashCode(password);
65+
}
66+
67+
@Override
68+
publicStringtoString() {
69+
return"Credential{user=" +user +", password=REDACTED}";
70+
}
71+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp