Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7afe807

Browse files
author
Stephane Landelle
committed
Not perfect fix for race condition on remotely closed pooled connection,closeAsyncHttpClient#415
1 parent804f169 commit7afe807

File tree

1 file changed

+66
-42
lines changed

1 file changed

+66
-42
lines changed

‎src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java‎

Lines changed: 66 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ protected final <T> void writeRequest(final Channel channel, final AsyncHttpClie
459459

460460
try {
461461
/**
462-
* If the channel is dead because it was pooled and the remote server decided to close it, we just let it go and thecloseChannel do it's work.
462+
* If the channel is dead because it was pooled and the remote server decided to close it, we just let it go and thechannelClosed do it's work.
463463
*/
464464
if (!channel.isOpen() || !channel.isConnected()) {
465465
return;
@@ -920,6 +920,48 @@ private <T> void execute(final Request request, final NettyResponseFuture<T> f,
920920
doConnect(request,f.getAsyncHandler(),f,useCache,asyncConnect,reclaimCache);
921921
}
922922

923+
private <T>NettyResponseFuture<T>buildNettyResponseFutureWithCachedChannel(Requestrequest,AsyncHandler<T>asyncHandler,NettyResponseFuture<T>f,ProxyServerproxyServer,
924+
URIuri,ChannelBufferbufferedBytes,intmaxTry)throwsIOException {
925+
926+
for (inti =0;i <maxTry;i++) {
927+
if (maxTry ==0)
928+
returnnull;
929+
930+
Channelchannel =null;
931+
if (f !=null &&f.reuseChannel() &&f.channel() !=null) {
932+
channel =f.channel();
933+
}else {
934+
URIconnectionKeyUri =proxyServer !=null ?proxyServer.getURI() :uri;
935+
channel =lookupInCache(connectionKeyUri,request.getConnectionPoolKeyStrategy());
936+
}
937+
938+
if (channel ==null)
939+
returnnull;
940+
else {
941+
HttpRequestnettyRequest =null;
942+
943+
if (f ==null) {
944+
nettyRequest =buildRequest(config,request,uri,false,bufferedBytes,proxyServer);
945+
f =newFuture(uri,request,asyncHandler,nettyRequest,config,this,proxyServer);
946+
}elseif (i ==0) {
947+
// only build request on first try
948+
nettyRequest =buildRequest(config,request,uri,f.isConnectAllowed(),bufferedBytes,proxyServer);
949+
f.setNettyRequest(nettyRequest);
950+
}
951+
f.setState(NettyResponseFuture.STATE.POOLED);
952+
f.attachChannel(channel,false);
953+
954+
if (channel.isOpen() &&channel.isConnected()) {
955+
f.channel().getPipeline().getContext(NettyAsyncHttpProvider.class).setAttachment(f);
956+
returnf;
957+
}else
958+
// else, channel was closed by the server since we fetched it from the pool, starting over
959+
f.attachChannel(null);
960+
}
961+
}
962+
returnnull;
963+
}
964+
923965
private <T>ListenableFuture<T>doConnect(finalRequestrequest,finalAsyncHandler<T>asyncHandler,NettyResponseFuture<T>f,booleanuseCache,booleanasyncConnect,booleanreclaimCache)throwsIOException {
924966

925967
if (isClose()) {
@@ -939,58 +981,40 @@ private <T> ListenableFuture<T> doConnect(final Request request, final AsyncHand
939981
}else {
940982
uri =request.getURI();
941983
}
942-
Channelchannel =null;
943-
944-
if (useCache) {
945-
if (f !=null &&f.reuseChannel() &&f.channel() !=null) {
946-
channel =f.channel();
947-
}else {
948-
URIconnectionKeyUri =useProxy ?proxyServer.getURI() :uri;
949-
channel =lookupInCache(connectionKeyUri,request.getConnectionPoolKeyStrategy());
950-
}
951-
}
952-
953984
ChannelBufferbufferedBytes =null;
954985
if (f !=null &&f.getRequest().getFile() ==null && !f.getNettyRequest().getMethod().getName().equals(HttpMethod.CONNECT.getName())) {
955986
bufferedBytes =f.getNettyRequest().getContent();
956987
}
957988

958989
booleanuseSSl =isSecure(uri) && !useProxy;
959-
if (channel !=null &&channel.isOpen() &&channel.isConnected()) {
960-
HttpRequestnettyRequest =null;
961990

962-
if (f ==null) {
963-
nettyRequest =buildRequest(config,request,uri,false,bufferedBytes,proxyServer);
964-
f =newFuture(uri,request,asyncHandler,nettyRequest,config,this,proxyServer);
965-
}else {
966-
nettyRequest =buildRequest(config,request,uri,f.isConnectAllowed(),bufferedBytes,proxyServer);
967-
f.setNettyRequest(nettyRequest);
968-
}
969-
f.setState(NettyResponseFuture.STATE.POOLED);
970-
f.attachChannel(channel,false);
971-
972-
log.debug("\nUsing cached Channel {}\n for request\n{}\n",channel,nettyRequest);
973-
channel.getPipeline().getContext(NettyAsyncHttpProvider.class).setAttachment(f);
991+
if (useCache) {
992+
// 3 tentatives
993+
NettyResponseFuture<T>connectedFuture =buildNettyResponseFutureWithCachedChannel(request,asyncHandler,f,proxyServer,uri,bufferedBytes,3);
974994

975-
try {
976-
writeRequest(channel,config,f);
977-
}catch (Exceptionex) {
978-
log.debug("writeRequest failure",ex);
979-
if (useSSl &&ex.getMessage() !=null &&ex.getMessage().contains("SSLEngine")) {
980-
log.debug("SSLEngine failure",ex);
981-
f =null;
982-
}else {
983-
try {
984-
asyncHandler.onThrowable(ex);
985-
}catch (Throwablet) {
986-
log.warn("doConnect.writeRequest()",t);
995+
if (connectedFuture !=null) {
996+
log.debug("\nUsing cached Channel {}\n for request\n{}\n",connectedFuture.channel(),connectedFuture.getNettyRequest());
997+
998+
try {
999+
writeRequest(connectedFuture.channel(),config,connectedFuture);
1000+
}catch (Exceptionex) {
1001+
log.debug("writeRequest failure",ex);
1002+
if (useSSl &&ex.getMessage() !=null &&ex.getMessage().contains("SSLEngine")) {
1003+
log.debug("SSLEngine failure",ex);
1004+
connectedFuture =null;
1005+
}else {
1006+
try {
1007+
asyncHandler.onThrowable(ex);
1008+
}catch (Throwablet) {
1009+
log.warn("doConnect.writeRequest()",t);
1010+
}
1011+
IOExceptionioe =newIOException(ex.getMessage());
1012+
ioe.initCause(ex);
1013+
throwioe;
9871014
}
988-
IOExceptionioe =newIOException(ex.getMessage());
989-
ioe.initCause(ex);
990-
throwioe;
9911015
}
1016+
returnconnectedFuture;
9921017
}
993-
returnf;
9941018
}
9951019

9961020
// Do not throw an exception when we need an extra connection for a redirect.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp