Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitcb92e37

Browse files
committed
Always close Channel when AsyncHandler returns ABORT,closeAsyncHttpClient#1350,closeAsyncHttpClient#1306
Motivation:We currently handle ABORT in a very gentle manner: ignore next chunks,drain channel until last chunk is received and then offer the channelto the pool if keep-alive is possible.This is way too gentle, and prevents us from closing infinite streams.Modifications:Have ABORT close the Channel, both for HTTP and WebSocketResult:We now close infinite streams.
1 parent06798b9 commitcb92e37

File tree

3 files changed

+49
-68
lines changed

3 files changed

+49
-68
lines changed

‎client/src/main/java/org/asynchttpclient/netty/handler/AsyncHttpClientHandler.java‎

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,25 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
225225
privatebooleanisHandledByReactiveStreams(ChannelHandlerContextctx) {
226226
returnChannels.getAttribute(ctx.channel())instanceofStreamedResponsePublisher;
227227
}
228+
229+
protectedvoidfinishUpdate(NettyResponseFuture<?>future,Channelchannel,booleankeepAlive,booleanexpectOtherChunks)throwsIOException {
230+
future.cancelTimeouts();
231+
232+
if (!keepAlive) {
233+
channelManager.closeChannel(channel);
234+
}elseif (expectOtherChunks) {
235+
channelManager.drainChannelAndOffer(channel,future);
236+
}else {
237+
channelManager.tryToOfferChannelToPool(channel,future.getAsyncHandler(),keepAlive,future.getPartitionKey());
238+
}
239+
240+
try {
241+
future.done();
242+
}catch (Exceptiont) {
243+
// Never propagate exception once we know we are done.
244+
logger.debug(t.getMessage(),t);
245+
}
246+
}
228247

229248
publicabstractvoidhandleRead(Channelchannel,NettyResponseFuture<?>future,Objectmessage)throwsException;
230249

‎client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java‎

Lines changed: 27 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
importio.netty.handler.codec.http.HttpObject;
2222
importio.netty.handler.codec.http.HttpRequest;
2323
importio.netty.handler.codec.http.HttpResponse;
24-
importio.netty.handler.codec.http.HttpUtil;
2524
importio.netty.handler.codec.http.LastHttpContent;
2625

2726
importjava.io.IOException;
@@ -45,75 +44,30 @@ public HttpHandler(AsyncHttpClientConfig config, ChannelManager channelManager,
4544
super(config,channelManager,requestSender);
4645
}
4746

48-
privatevoidfinishUpdate(finalNettyResponseFuture<?>future,Channelchannel,booleanexpectOtherChunks)throwsIOException {
49-
50-
future.cancelTimeouts();
51-
52-
booleankeepAlive =future.isKeepAlive();
53-
if (expectOtherChunks &&keepAlive)
54-
channelManager.drainChannelAndOffer(channel,future);
55-
else
56-
channelManager.tryToOfferChannelToPool(channel,future.getAsyncHandler(),keepAlive,future.getPartitionKey());
57-
58-
try {
59-
future.done();
60-
}catch (Exceptiont) {
61-
// Never propagate exception once we know we are done.
62-
logger.debug(t.getMessage(),t);
63-
}
64-
}
65-
66-
privatebooleanupdateBodyAndInterrupt(NettyResponseFuture<?>future,AsyncHandler<?>handler,HttpResponseBodyPartbodyPart)throwsException {
67-
booleaninterrupt =handler.onBodyPartReceived(bodyPart) !=State.CONTINUE;
68-
if (interrupt)
69-
future.setKeepAlive(false);
70-
returninterrupt;
71-
}
72-
73-
privatevoidnotifyHandler(Channelchannel,NettyResponseFuture<?>future,HttpResponseresponse,AsyncHandler<?>handler,NettyResponseStatusstatus,
74-
HttpRequesthttpRequest,HttpResponseHeadersresponseHeaders)throwsIOException,Exception {
75-
76-
booleanexit =exitAfterHandlingStatus(channel,future,response,handler,status,httpRequest) ||//
77-
exitAfterHandlingHeaders(channel,future,response,handler,responseHeaders,httpRequest) ||//
78-
exitAfterHandlingReactiveStreams(channel,future,response,handler,httpRequest);
79-
80-
if (exit)
81-
finishUpdate(future,channel,HttpUtil.isTransferEncodingChunked(httpRequest) ||HttpUtil.isTransferEncodingChunked(response));
82-
}
83-
84-
privatebooleanexitAfterHandlingStatus(//
85-
Channelchannel,//
86-
NettyResponseFuture<?>future,//
87-
HttpResponseresponse,AsyncHandler<?>handler,//
88-
NettyResponseStatusstatus,//
89-
HttpRequesthttpRequest)throwsIOException,Exception {
90-
returnhandler.onStatusReceived(status) !=State.CONTINUE;
47+
privatebooleanabortAfterHandlingStatus(//
48+
AsyncHandler<?>handler,//
49+
NettyResponseStatusstatus)throwsIOException,Exception {
50+
returnhandler.onStatusReceived(status) ==State.ABORT;
9151
}
9252

93-
privatebooleanexitAfterHandlingHeaders(//
94-
Channelchannel,//
95-
NettyResponseFuture<?>future,//
96-
HttpResponseresponse,//
53+
privatebooleanabortAfterHandlingHeaders(//
9754
AsyncHandler<?>handler,//
98-
HttpResponseHeadersresponseHeaders,//
99-
HttpRequesthttpRequest)throwsIOException,Exception {
100-
return !response.headers().isEmpty() &&handler.onHeadersReceived(responseHeaders) !=State.CONTINUE;
55+
HttpResponseHeadersresponseHeaders)throwsIOException,Exception {
56+
return !responseHeaders.getHeaders().isEmpty() &&handler.onHeadersReceived(responseHeaders) ==State.ABORT;
10157
}
10258

103-
privatebooleanexitAfterHandlingReactiveStreams(//
59+
privatebooleanabortAfterHandlingReactiveStreams(//
10460
Channelchannel,//
10561
NettyResponseFuture<?>future,//
106-
HttpResponseresponse,//
107-
AsyncHandler<?>handler,//
108-
HttpRequesthttpRequest)throwsIOException {
62+
AsyncHandler<?>handler)throwsIOException {
10963
if (handlerinstanceofStreamedAsyncHandler) {
11064
StreamedAsyncHandler<?>streamedAsyncHandler = (StreamedAsyncHandler<?>)handler;
11165
StreamedResponsePublisherpublisher =newStreamedResponsePublisher(channel.eventLoop(),channelManager,future,channel);
11266
// FIXME do we really need to pass the event loop?
11367
// FIXME move this to ChannelManager
11468
channel.pipeline().addLast(channel.eventLoop(),"streamedAsyncHandler",publisher);
11569
Channels.setAttribute(channel,publisher);
116-
returnstreamedAsyncHandler.onStream(publisher)!=State.CONTINUE;
70+
returnstreamedAsyncHandler.onStream(publisher)==State.ABORT;
11771
}
11872
returnfalse;
11973
}
@@ -129,7 +83,13 @@ private void handleHttpResponse(final HttpResponse response, final Channel chann
12983
HttpResponseHeadersresponseHeaders =newHttpResponseHeaders(response.headers());
13084

13185
if (!interceptors.exitAfterIntercept(channel,future,handler,response,status,responseHeaders)) {
132-
notifyHandler(channel,future,response,handler,status,httpRequest,responseHeaders);
86+
booleanabort =abortAfterHandlingStatus(handler,status) ||//
87+
abortAfterHandlingHeaders(handler,responseHeaders) ||//
88+
abortAfterHandlingReactiveStreams(channel,future,handler);
89+
90+
if (abort) {
91+
finishUpdate(future,channel,false,false);
92+
}
13393
}
13494
}
13595

@@ -138,26 +98,28 @@ private void handleChunk(HttpContent chunk,//
13898
finalNettyResponseFuture<?>future,//
13999
AsyncHandler<?>handler)throwsIOException,Exception {
140100

141-
booleaninterrupt =false;
101+
booleanabort =false;
142102
booleanlast =chunkinstanceofLastHttpContent;
143103

144104
// Netty 4: the last chunk is not empty
145105
if (last) {
146106
LastHttpContentlastChunk = (LastHttpContent)chunk;
147107
HttpHeaderstrailingHeaders =lastChunk.trailingHeaders();
148108
if (!trailingHeaders.isEmpty()) {
149-
interrupt =handler.onHeadersReceived(newHttpResponseHeaders(trailingHeaders,true))!=State.CONTINUE;
109+
abort =handler.onHeadersReceived(newHttpResponseHeaders(trailingHeaders,true))==State.ABORT;
150110
}
151111
}
152112

153113
ByteBufbuf =chunk.content();
154-
if (!interrupt && !(handlerinstanceofStreamedAsyncHandler) && (buf.readableBytes() >0 ||last)) {
155-
HttpResponseBodyPartpart =config.getResponseBodyPartFactory().newResponseBodyPart(buf,last);
156-
interrupt =updateBodyAndInterrupt(future,handler,part);
114+
if (!abort && !(handlerinstanceofStreamedAsyncHandler) && (buf.readableBytes() >0 ||last)) {
115+
HttpResponseBodyPartbodyPart =config.getResponseBodyPartFactory().newResponseBodyPart(buf,last);
116+
abort =handler.onBodyPartReceived(bodyPart) ==State.ABORT;
157117
}
158118

159-
if (interrupt ||last)
160-
finishUpdate(future,channel, !last);
119+
if (abort ||last) {
120+
booleankeepAlive = !abort &&future.isKeepAlive();
121+
finishUpdate(future,channel,keepAlive, !last);
122+
}
161123
}
162124

163125
@Override
@@ -207,7 +169,7 @@ private void readFailed(Channel channel, NettyResponseFuture<?> future, Throwabl
207169
}catch (ExceptionabortException) {
208170
logger.debug("Abort failed",abortException);
209171
}finally {
210-
finishUpdate(future,channel,false);
172+
finishUpdate(future,channel,false,false);
211173
}
212174
}
213175

‎client/src/main/java/org/asynchttpclient/netty/handler/WebSocketHandler.java‎

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,11 @@ private void upgrade(Channel channel, NettyResponseFuture<?> future, WebSocketUp
8787
future.done();
8888
}
8989

90-
privatevoidabort(NettyResponseFuture<?>future,WebSocketUpgradeHandlerhandler,HttpResponseStatusstatus)throwsException {
90+
privatevoidabort(Channelchannel,NettyResponseFuture<?>future,WebSocketUpgradeHandlerhandler,HttpResponseStatusstatus)throwsException {
9191
try {
9292
handler.onThrowable(newIOException("Invalid Status code=" +status.getStatusCode() +" text=" +status.getStatusText()));
9393
}finally {
94-
future.done();
94+
finishUpdate(future,channel,false,false);
9595
}
9696
}
9797

@@ -115,7 +115,7 @@ public void handleRead(Channel channel, NettyResponseFuture<?> future, Object e)
115115
upgrade(channel,future,handler,response,responseHeaders);
116116
break;
117117
default:
118-
abort(future,handler,status);
118+
abort(channel,future,handler,status);
119119
}
120120
}
121121

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp