2121import io .netty .handler .codec .http .HttpObject ;
2222import io .netty .handler .codec .http .HttpRequest ;
2323import io .netty .handler .codec .http .HttpResponse ;
24- import io .netty .handler .codec .http .HttpUtil ;
2524import io .netty .handler .codec .http .LastHttpContent ;
2625
2726import java .io .IOException ;
@@ -45,75 +44,30 @@ public HttpHandler(AsyncHttpClientConfig config, ChannelManager channelManager,
4544super (config ,channelManager ,requestSender );
4645 }
4746
48- private void finishUpdate (final NettyResponseFuture <?>future ,Channel channel ,boolean expectOtherChunks )throws IOException {
49-
50- future .cancelTimeouts ();
51-
52- boolean keepAlive =future .isKeepAlive ();
53- if (expectOtherChunks &&keepAlive )
54- channelManager .drainChannelAndOffer (channel ,future );
55- else
56- channelManager .tryToOfferChannelToPool (channel ,future .getAsyncHandler (),keepAlive ,future .getPartitionKey ());
57-
58- try {
59- future .done ();
60- }catch (Exception t ) {
61- // Never propagate exception once we know we are done.
62- logger .debug (t .getMessage (),t );
63- }
64- }
65-
66- private boolean updateBodyAndInterrupt (NettyResponseFuture <?>future ,AsyncHandler <?>handler ,HttpResponseBodyPart bodyPart )throws Exception {
67- boolean interrupt =handler .onBodyPartReceived (bodyPart ) !=State .CONTINUE ;
68- if (interrupt )
69- future .setKeepAlive (false );
70- return interrupt ;
71- }
72-
73- private void notifyHandler (Channel channel ,NettyResponseFuture <?>future ,HttpResponse response ,AsyncHandler <?>handler ,NettyResponseStatus status ,
74- HttpRequest httpRequest ,HttpResponseHeaders responseHeaders )throws IOException ,Exception {
75-
76- boolean exit =exitAfterHandlingStatus (channel ,future ,response ,handler ,status ,httpRequest ) ||//
77- exitAfterHandlingHeaders (channel ,future ,response ,handler ,responseHeaders ,httpRequest ) ||//
78- exitAfterHandlingReactiveStreams (channel ,future ,response ,handler ,httpRequest );
79-
80- if (exit )
81- finishUpdate (future ,channel ,HttpUtil .isTransferEncodingChunked (httpRequest ) ||HttpUtil .isTransferEncodingChunked (response ));
82- }
83-
84- private boolean exitAfterHandlingStatus (//
85- Channel channel ,//
86- NettyResponseFuture <?>future ,//
87- HttpResponse response ,AsyncHandler <?>handler ,//
88- NettyResponseStatus status ,//
89- HttpRequest httpRequest )throws IOException ,Exception {
90- return handler .onStatusReceived (status ) !=State .CONTINUE ;
47+ private boolean abortAfterHandlingStatus (//
48+ AsyncHandler <?>handler ,//
49+ NettyResponseStatus status )throws IOException ,Exception {
50+ return handler .onStatusReceived (status ) ==State .ABORT ;
9151 }
9252
93- private boolean exitAfterHandlingHeaders (//
94- Channel channel ,//
95- NettyResponseFuture <?>future ,//
96- HttpResponse response ,//
53+ private boolean abortAfterHandlingHeaders (//
9754AsyncHandler <?>handler ,//
98- HttpResponseHeaders responseHeaders ,//
99- HttpRequest httpRequest )throws IOException ,Exception {
100- return !response .headers ().isEmpty () &&handler .onHeadersReceived (responseHeaders ) !=State .CONTINUE ;
55+ HttpResponseHeaders responseHeaders )throws IOException ,Exception {
56+ return !responseHeaders .getHeaders ().isEmpty () &&handler .onHeadersReceived (responseHeaders ) ==State .ABORT ;
10157 }
10258
103- private boolean exitAfterHandlingReactiveStreams (//
59+ private boolean abortAfterHandlingReactiveStreams (//
10460Channel channel ,//
10561NettyResponseFuture <?>future ,//
106- HttpResponse response ,//
107- AsyncHandler <?>handler ,//
108- HttpRequest httpRequest )throws IOException {
62+ AsyncHandler <?>handler )throws IOException {
10963if (handler instanceof StreamedAsyncHandler ) {
11064StreamedAsyncHandler <?>streamedAsyncHandler = (StreamedAsyncHandler <?>)handler ;
11165StreamedResponsePublisher publisher =new StreamedResponsePublisher (channel .eventLoop (),channelManager ,future ,channel );
11266// FIXME do we really need to pass the event loop?
11367// FIXME move this to ChannelManager
11468channel .pipeline ().addLast (channel .eventLoop (),"streamedAsyncHandler" ,publisher );
11569Channels .setAttribute (channel ,publisher );
116- return streamedAsyncHandler .onStream (publisher )!= State .CONTINUE ;
70+ return streamedAsyncHandler .onStream (publisher )== State .ABORT ;
11771 }
11872return false ;
11973 }
@@ -129,7 +83,13 @@ private void handleHttpResponse(final HttpResponse response, final Channel chann
12983HttpResponseHeaders responseHeaders =new HttpResponseHeaders (response .headers ());
13084
13185if (!interceptors .exitAfterIntercept (channel ,future ,handler ,response ,status ,responseHeaders )) {
132- notifyHandler (channel ,future ,response ,handler ,status ,httpRequest ,responseHeaders );
86+ boolean abort =abortAfterHandlingStatus (handler ,status ) ||//
87+ abortAfterHandlingHeaders (handler ,responseHeaders ) ||//
88+ abortAfterHandlingReactiveStreams (channel ,future ,handler );
89+
90+ if (abort ) {
91+ finishUpdate (future ,channel ,false ,false );
92+ }
13393 }
13494 }
13595
@@ -138,26 +98,28 @@ private void handleChunk(HttpContent chunk,//
13898final NettyResponseFuture <?>future ,//
13999AsyncHandler <?>handler )throws IOException ,Exception {
140100
141- boolean interrupt =false ;
101+ boolean abort =false ;
142102boolean last =chunk instanceof LastHttpContent ;
143103
144104// Netty 4: the last chunk is not empty
145105if (last ) {
146106LastHttpContent lastChunk = (LastHttpContent )chunk ;
147107HttpHeaders trailingHeaders =lastChunk .trailingHeaders ();
148108if (!trailingHeaders .isEmpty ()) {
149- interrupt =handler .onHeadersReceived (new HttpResponseHeaders (trailingHeaders ,true ))!= State .CONTINUE ;
109+ abort =handler .onHeadersReceived (new HttpResponseHeaders (trailingHeaders ,true ))== State .ABORT ;
150110 }
151111 }
152112
153113ByteBuf buf =chunk .content ();
154- if (!interrupt && !(handler instanceof StreamedAsyncHandler ) && (buf .readableBytes () >0 ||last )) {
155- HttpResponseBodyPart part =config .getResponseBodyPartFactory ().newResponseBodyPart (buf ,last );
156- interrupt =updateBodyAndInterrupt ( future , handler , part ) ;
114+ if (!abort && !(handler instanceof StreamedAsyncHandler ) && (buf .readableBytes () >0 ||last )) {
115+ HttpResponseBodyPart bodyPart =config .getResponseBodyPartFactory ().newResponseBodyPart (buf ,last );
116+ abort =handler . onBodyPartReceived ( bodyPart ) == State . ABORT ;
157117 }
158118
159- if (interrupt ||last )
160- finishUpdate (future ,channel , !last );
119+ if (abort ||last ) {
120+ boolean keepAlive = !abort &&future .isKeepAlive ();
121+ finishUpdate (future ,channel ,keepAlive , !last );
122+ }
161123 }
162124
163125@ Override
@@ -207,7 +169,7 @@ private void readFailed(Channel channel, NettyResponseFuture<?> future, Throwabl
207169 }catch (Exception abortException ) {
208170logger .debug ("Abort failed" ,abortException );
209171 }finally {
210- finishUpdate (future ,channel ,false );
172+ finishUpdate (future ,channel ,false , false );
211173 }
212174 }
213175