2626import com .ning .http .client .AsyncHttpClientConfig ;
2727import com .ning .http .client .providers .netty .channel .Channels ;
2828import com .ning .http .client .providers .netty .chmv8 .ConcurrentHashMapV8 ;
29- import com .ning .http .client .providers .netty .future .NettyResponseFuture ;
3029
3130import java .util .ArrayList ;
3231import java .util .Collections ;
@@ -104,6 +103,7 @@ private static final class ChannelCreation {
104103private static final class IdleChannel {
105104final Channel channel ;
106105final long start ;
106+ final AtomicBoolean owned =new AtomicBoolean (false );
107107
108108IdleChannel (Channel channel ,long start ) {
109109if (channel ==null )
@@ -112,6 +112,10 @@ private static final class IdleChannel {
112112this .start =start ;
113113 }
114114
115+ public boolean takeOwnership () {
116+ return owned .compareAndSet (false ,true );
117+ }
118+
115119@ Override
116120// only depends on channel
117121public boolean equals (Object o ) {
@@ -154,26 +158,16 @@ private List<IdleChannel> expiredChannels(ConcurrentLinkedQueue<IdleChannel> par
154158return idleTimeoutChannels !=null ?idleTimeoutChannels :Collections .<IdleChannel >emptyList ();
155159 }
156160
157- private boolean isChannelCloseable (Channel channel ) {
158- Object attribute =Channels .getAttribute (channel );
159- if (attribute instanceof NettyResponseFuture ) {
160- NettyResponseFuture <?>future = (NettyResponseFuture <?>)attribute ;
161- if (!future .isDone ()) {
162- LOGGER .error ("Future not in appropriate state %s, not closing" ,future );
163- return false ;
164- }
165- }
166- return true ;
167- }
168-
169161private final List <IdleChannel >closeChannels (List <IdleChannel >candidates ) {
170162
171163// lazy create, only if we have a non-closeable channel
172164List <IdleChannel >closedChannels =null ;
173165
174166for (int i =0 ;i <candidates .size ();i ++) {
167+ // We call takeOwnership here to avoid closing a channel that has just been taken out
168+ // of the pool, otherwise we risk closing an active connection.
175169IdleChannel idleChannel =candidates .get (i );
176- if (isChannelCloseable ( idleChannel .channel )) {
170+ if (idleChannel .takeOwnership ( )) {
177171LOGGER .debug ("Closing Idle Channel {}" ,idleChannel .channel );
178172close (idleChannel .channel );
179173if (closedChannels !=null ) {
@@ -260,12 +254,15 @@ public Channel poll(Object partitionKey) {
260254while (idleChannel ==null ) {
261255idleChannel =partition .poll ();
262256
263- if (idleChannel ==null )
257+ if (idleChannel ==null ) {
264258// pool is empty
265259break ;
266- else if (!Channels .isChannelValid (idleChannel .channel )) {
260+ } else if (!Channels .isChannelValid (idleChannel .channel )) {
267261idleChannel =null ;
268262LOGGER .trace ("Channel not connected or not opened, probably remotely closed!" );
263+ }else if (!idleChannel .takeOwnership ()) {
264+ idleChannel =null ;
265+ LOGGER .trace ("Couldn't take ownership of channel, probably in the process of being expired!" );
269266 }
270267 }
271268 }