Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1d5b880

Browse files
committed
Prevent Netty connection pool from killing active connections, backportAsyncHttpClient#1114
1 parent01c0329 commit1d5b880

File tree

2 files changed

+20
-19
lines changed

2 files changed

+20
-19
lines changed

‎src/main/java/com/ning/http/client/providers/netty/channel/ChannelManager.java‎

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -296,9 +296,13 @@ public final void tryToOfferChannelToPool(Channel channel, boolean keepAlive, Ob
296296
if (channel.isConnected() &&keepAlive &&channel.isReadable()) {
297297
LOGGER.debug("Adding key: {} for channel {}",partitionKey,channel);
298298
Channels.setDiscard(channel);
299-
channelPool.offer(channel,partitionKey);
300-
if (maxConnectionsPerHostEnabled)
301-
channelId2PartitionKey.putIfAbsent(channel.getId(),partitionKey);
299+
if (channelPool.offer(channel,partitionKey)) {
300+
if (maxConnectionsPerHostEnabled)
301+
channelId2PartitionKey.putIfAbsent(channel.getId(),partitionKey);
302+
}else {
303+
// rejected by pool
304+
closeChannel(channel);
305+
}
302306
}else {
303307
// not offered
304308
closeChannel(channel);

‎src/main/java/com/ning/http/client/providers/netty/channel/pool/DefaultChannelPool.java‎

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
importcom.ning.http.client.AsyncHttpClientConfig;
2727
importcom.ning.http.client.providers.netty.channel.Channels;
2828
importcom.ning.http.client.providers.netty.chmv8.ConcurrentHashMapV8;
29-
importcom.ning.http.client.providers.netty.future.NettyResponseFuture;
3029

3130
importjava.util.ArrayList;
3231
importjava.util.Collections;
@@ -104,6 +103,7 @@ private static final class ChannelCreation {
104103
privatestaticfinalclassIdleChannel {
105104
finalChannelchannel;
106105
finallongstart;
106+
finalAtomicBooleanowned =newAtomicBoolean(false);
107107

108108
IdleChannel(Channelchannel,longstart) {
109109
if (channel ==null)
@@ -112,6 +112,10 @@ private static final class IdleChannel {
112112
this.start =start;
113113
}
114114

115+
publicbooleantakeOwnership() {
116+
returnowned.compareAndSet(false,true);
117+
}
118+
115119
@Override
116120
// only depends on channel
117121
publicbooleanequals(Objecto) {
@@ -154,26 +158,16 @@ private List<IdleChannel> expiredChannels(ConcurrentLinkedQueue<IdleChannel> par
154158
returnidleTimeoutChannels !=null ?idleTimeoutChannels :Collections.<IdleChannel>emptyList();
155159
}
156160

157-
privatebooleanisChannelCloseable(Channelchannel) {
158-
Objectattribute =Channels.getAttribute(channel);
159-
if (attributeinstanceofNettyResponseFuture) {
160-
NettyResponseFuture<?>future = (NettyResponseFuture<?>)attribute;
161-
if (!future.isDone()) {
162-
LOGGER.error("Future not in appropriate state %s, not closing",future);
163-
returnfalse;
164-
}
165-
}
166-
returntrue;
167-
}
168-
169161
privatefinalList<IdleChannel>closeChannels(List<IdleChannel>candidates) {
170162

171163
// lazy create, only if we have a non-closeable channel
172164
List<IdleChannel>closedChannels =null;
173165

174166
for (inti =0;i <candidates.size();i++) {
167+
// We call takeOwnership here to avoid closing a channel that has just been taken out
168+
// of the pool, otherwise we risk closing an active connection.
175169
IdleChannelidleChannel =candidates.get(i);
176-
if (isChannelCloseable(idleChannel.channel)) {
170+
if (idleChannel.takeOwnership()) {
177171
LOGGER.debug("Closing Idle Channel {}",idleChannel.channel);
178172
close(idleChannel.channel);
179173
if (closedChannels !=null) {
@@ -260,12 +254,15 @@ public Channel poll(Object partitionKey) {
260254
while (idleChannel ==null) {
261255
idleChannel =partition.poll();
262256

263-
if (idleChannel ==null)
257+
if (idleChannel ==null) {
264258
// pool is empty
265259
break;
266-
elseif (!Channels.isChannelValid(idleChannel.channel)) {
260+
}elseif (!Channels.isChannelValid(idleChannel.channel)) {
267261
idleChannel =null;
268262
LOGGER.trace("Channel not connected or not opened, probably remotely closed!");
263+
}elseif (!idleChannel.takeOwnership()) {
264+
idleChannel =null;
265+
LOGGER.trace("Couldn't take ownership of channel, probably in the process of being expired!");
269266
}
270267
}
271268
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp