Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitdece389

Browse files
author
Stephane Landelle
committed
Reimplement Netty provider timeouts, backport4d02959
1 parent2b5c6e9 commitdece389

File tree

8 files changed

+349
-219
lines changed

8 files changed

+349
-219
lines changed

‎src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java‎

Lines changed: 116 additions & 192 deletions
Large diffs are not rendered by default.

‎src/main/java/com/ning/http/client/providers/netty/NettyResponseFuture.java‎

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
importcom.ning.http.client.ProxyServer;
4242
importcom.ning.http.client.Request;
4343
importcom.ning.http.client.listenable.AbstractListenableFuture;
44+
importcom.ning.http.client.providers.netty.timeout.TimeoutsHolder;
4445

4546
/**
4647
* A {@link Future} that can be used to track when an asynchronous HTTP request has been fully processed.
@@ -70,7 +71,9 @@ enum STATE {
7071
privateHttpResponsehttpResponse;
7172
privatefinalAtomicReference<ExecutionException>exEx =newAtomicReference<ExecutionException>();
7273
privatefinalAtomicIntegerredirectCount =newAtomicInteger();
73-
privatevolatileFuture<?>reaperFuture;
74+
privatevolatilebooleanrequestTimeoutReached;
75+
privatevolatilebooleanidleConnectionTimeoutReached;
76+
privatevolatileTimeoutsHoldertimeoutsHolder;
7477
privatefinalAtomicBooleaninAuth =newAtomicBoolean(false);
7578
privatefinalAtomicBooleanstatusReceived =newAtomicBoolean(false);
7679
privatefinalAtomicLongtouch =newAtomicLong(millisTime());
@@ -159,7 +162,7 @@ void setAsyncHandler(AsyncHandler<V> asyncHandler) {
159162
*/
160163
/* @Override */
161164
publicbooleancancel(booleanforce) {
162-
cancelReaper();
165+
cancelTimeouts();
163166

164167
if (isCancelled.get())
165168
returnfalse;
@@ -189,16 +192,23 @@ public boolean cancel(boolean force) {
189192
* @return <code>true</code> if response has expired and should be terminated.
190193
*/
191194
publicbooleanhasExpired() {
192-
longnow =millisTime();
193-
returnhasConnectionIdleTimedOut(now) ||hasRequestTimedOut(now);
195+
returnrequestTimeoutReached ||idleConnectionTimeoutReached;
194196
}
195197

196-
publicbooleanhasConnectionIdleTimedOut(longnow) {
197-
returnidleConnectionTimeoutInMs != -1 && (now -touch.get()) >=idleConnectionTimeoutInMs;
198+
publicvoidsetRequestTimeoutReached() {
199+
this.requestTimeoutReached =true;
198200
}
199201

200-
publicbooleanhasRequestTimedOut(longnow) {
201-
returnrequestTimeoutInMs != -1 && (now -start) >=requestTimeoutInMs;
202+
publicbooleanisRequestTimeoutReached() {
203+
returnrequestTimeoutReached;
204+
}
205+
206+
publicvoidsetIdleConnectionTimeoutReached() {
207+
this.idleConnectionTimeoutReached =true;
208+
}
209+
210+
publicbooleanisIdleConnectionTimeoutReached() {
211+
returnidleConnectionTimeoutReached;
202212
}
203213

204214
/**
@@ -209,14 +219,15 @@ public V get() throws InterruptedException, ExecutionException {
209219
try {
210220
returnget(requestTimeoutInMs,TimeUnit.MILLISECONDS);
211221
}catch (TimeoutExceptione) {
212-
cancelReaper();
222+
cancelTimeouts();
213223
thrownewExecutionException(e);
214224
}
215225
}
216226

217-
voidcancelReaper() {
218-
if (reaperFuture !=null) {
219-
reaperFuture.cancel(false);
227+
publicvoidcancelTimeouts() {
228+
if (timeoutsHolder !=null) {
229+
timeoutsHolder.cancel();
230+
timeoutsHolder =null;
220231
}
221232
}
222233

@@ -251,7 +262,7 @@ public V get(long l, TimeUnit tu) throws InterruptedException, TimeoutException,
251262
}
252263
thrownewExecutionException(te);
253264
}finally {
254-
cancelReaper();
265+
cancelTimeouts();
255266
}
256267
}
257268
}
@@ -287,7 +298,7 @@ V getContent() throws ExecutionException {
287298
}
288299
thrownewRuntimeException(ex);
289300
}finally {
290-
cancelReaper();
301+
cancelTimeouts();
291302
}
292303
}
293304
}
@@ -299,7 +310,7 @@ V getContent() throws ExecutionException {
299310
publicfinalvoiddone() {
300311

301312
try {
302-
cancelReaper();
313+
cancelTimeouts();
303314

304315
if (exEx.get() !=null) {
305316
return;
@@ -320,7 +331,7 @@ public final void done() {
320331
}
321332

322333
publicfinalvoidabort(finalThrowablet) {
323-
cancelReaper();
334+
cancelTimeouts();
324335

325336
if (isDone.get() ||isCancelled.get())
326337
return;
@@ -379,11 +390,6 @@ protected int incrementAndGetCurrentRedirectCount() {
379390
returnredirectCount.incrementAndGet();
380391
}
381392

382-
protectedvoidsetReaperFuture(Future<?>reaperFuture) {
383-
cancelReaper();
384-
this.reaperFuture =reaperFuture;
385-
}
386-
387393
protectedbooleanisInAuth() {
388394
returninAuth.get();
389395
}
@@ -407,15 +413,17 @@ public boolean getAndSetStatusReceived(boolean sr) {
407413
/**
408414
* {@inheritDoc}
409415
*/
410-
/* @Override */
411416
publicvoidtouch() {
412417
touch.set(millisTime());
413418
}
414419

420+
publiclonggetLastTouch() {
421+
returntouch.get();
422+
}
423+
415424
/**
416425
* {@inheritDoc}
417426
*/
418-
/* @Override */
419427
publicbooleangetAndSetWriteHeaders(booleanwriteHeaders) {
420428
booleanb =this.writeHeaders;
421429
this.writeHeaders =writeHeaders;
@@ -425,7 +433,6 @@ public boolean getAndSetWriteHeaders(boolean writeHeaders) {
425433
/**
426434
* {@inheritDoc}
427435
*/
428-
/* @Override */
429436
publicbooleangetAndSetWriteBody(booleanwriteBody) {
430437
booleanb =this.writeBody;
431438
this.writeBody =writeBody;
@@ -512,7 +519,7 @@ public String toString() {
512519
",\n\thttpResponse=" +httpResponse +//
513520
",\n\texEx=" +exEx +//
514521
",\n\tredirectCount=" +redirectCount +//
515-
",\n\treaperFuture=" +reaperFuture +//
522+
",\n\ttimeoutsHolder=" +timeoutsHolder +//
516523
",\n\tinAuth=" +inAuth +//
517524
",\n\tstatusReceived=" +statusReceived +//
518525
",\n\ttouch=" +touch +//
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2010-2013 Ning, Inc.
3+
*
4+
* Ning licenses this file to you under the Apache License, version 2.0
5+
* (the "License"); you may not use this file except in compliance with the
6+
* License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
packagecom.ning.http.client.providers.netty.timeout;
17+
18+
importstaticcom.ning.http.util.DateUtil.*;
19+
20+
importorg.jboss.netty.util.Timeout;
21+
22+
importcom.ning.http.client.providers.netty.NettyAsyncHttpProvider;
23+
importcom.ning.http.client.providers.netty.NettyResponseFuture;
24+
25+
publicclassIdleConnectionTimeoutTimerTaskextendsTimeoutTimerTask {
26+
27+
privatefinallongidleConnectionTimeout;
28+
privatefinallongrequestTimeoutInstant;
29+
30+
publicIdleConnectionTimeoutTimerTask(NettyResponseFuture<?>nettyResponseFuture,NettyAsyncHttpProviderprovider,TimeoutsHoldertimeoutsHolder,
31+
longrequestTimeout,longidleConnectionTimeout) {
32+
super(nettyResponseFuture,provider,timeoutsHolder);
33+
this.idleConnectionTimeout =idleConnectionTimeout;
34+
requestTimeoutInstant =requestTimeout >=0 ?nettyResponseFuture.getStart() +requestTimeout :Long.MAX_VALUE;
35+
}
36+
37+
publicvoidrun(Timeouttimeout)throwsException {
38+
if (provider.isClose()) {
39+
timeoutsHolder.cancel();
40+
return;
41+
}
42+
43+
if (!nettyResponseFuture.isDone() && !nettyResponseFuture.isCancelled()) {
44+
45+
longnow =millisTime();
46+
47+
longcurrentIdleConnectionTimeoutInstant =idleConnectionTimeout -nettyResponseFuture.getLastTouch();
48+
longdurationBeforeCurrentIdleConnectionTimeout =currentIdleConnectionTimeoutInstant -now;
49+
50+
if (durationBeforeCurrentIdleConnectionTimeout <=0L) {
51+
// idleConnectionTimeout reached
52+
longdurationSinceLastTouch =now -nettyResponseFuture.getLastTouch();
53+
expire("Connection reached idle timeout of " +idleConnectionTimeout +" ms after " +durationSinceLastTouch +" ms");
54+
nettyResponseFuture.setIdleConnectionTimeoutReached();
55+
56+
}elseif (currentIdleConnectionTimeoutInstant <requestTimeoutInstant) {
57+
// reschedule
58+
timeoutsHolder.idleConnectionTimeout =provider.newTimeoutInMs(this,durationBeforeCurrentIdleConnectionTimeout);
59+
60+
}else {
61+
// otherwise, no need to reschedule: requestTimeout will happen sooner
62+
timeoutsHolder.idleConnectionTimeout =null;
63+
}
64+
65+
}else {
66+
timeoutsHolder.cancel();
67+
}
68+
}
69+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2010-2013 Ning, Inc.
3+
*
4+
* Ning licenses this file to you under the Apache License, version 2.0
5+
* (the "License"); you may not use this file except in compliance with the
6+
* License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
packagecom.ning.http.client.providers.netty.timeout;
17+
18+
importstaticcom.ning.http.util.DateUtil.*;
19+
20+
importorg.jboss.netty.util.Timeout;
21+
22+
importcom.ning.http.client.providers.netty.NettyAsyncHttpProvider;
23+
importcom.ning.http.client.providers.netty.NettyResponseFuture;
24+
25+
publicclassRequestTimeoutTimerTaskextendsTimeoutTimerTask {
26+
27+
publicRequestTimeoutTimerTask(NettyResponseFuture<?>nettyResponseFuture,NettyAsyncHttpProviderprovider,TimeoutsHoldertimeoutsHolder) {
28+
super(nettyResponseFuture,provider,timeoutsHolder);
29+
}
30+
31+
publicvoidrun(Timeouttimeout)throwsException {
32+
33+
// in any case, cancel possible idleConnectionTimeout
34+
timeoutsHolder.cancel();
35+
36+
if (provider.isClose()) {
37+
return;
38+
}
39+
40+
if (!nettyResponseFuture.isDone() && !nettyResponseFuture.isCancelled()) {
41+
expire("Request reached timeout of " +nettyResponseFuture.getRequestTimeoutInMs() +" ms after " + (millisTime() -nettyResponseFuture.getStart()) +" ms");
42+
nettyResponseFuture.setRequestTimeoutReached();
43+
}
44+
}
45+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2010-2013 Ning, Inc.
3+
*
4+
* Ning licenses this file to you under the Apache License, version 2.0
5+
* (the "License"); you may not use this file except in compliance with the
6+
* License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
packagecom.ning.http.client.providers.netty.timeout;
17+
18+
importjava.util.concurrent.TimeoutException;
19+
20+
importorg.jboss.netty.util.TimerTask;
21+
importorg.slf4j.Logger;
22+
importorg.slf4j.LoggerFactory;
23+
24+
importcom.ning.http.client.providers.netty.NettyAsyncHttpProvider;
25+
importcom.ning.http.client.providers.netty.NettyResponseFuture;
26+
27+
publicabstractclassTimeoutTimerTaskimplementsTimerTask {
28+
29+
privatestaticfinalLoggerLOGGER =LoggerFactory.getLogger(TimeoutTimerTask.class);
30+
31+
protectedfinalNettyResponseFuture<?>nettyResponseFuture;
32+
protectedfinalNettyAsyncHttpProviderprovider;
33+
protectedfinalTimeoutsHoldertimeoutsHolder;
34+
35+
publicTimeoutTimerTask(NettyResponseFuture<?>nettyResponseFuture,NettyAsyncHttpProviderprovider,TimeoutsHoldertimeoutsHolder) {
36+
this.nettyResponseFuture =nettyResponseFuture;
37+
this.provider =provider;
38+
this.timeoutsHolder =timeoutsHolder;
39+
}
40+
41+
protectedvoidexpire(Stringmessage) {
42+
LOGGER.debug("{} for {}",message,nettyResponseFuture);
43+
provider.abort(nettyResponseFuture,newTimeoutException(message));
44+
}
45+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2010-2013 Ning, Inc.
3+
*
4+
* Ning licenses this file to you under the Apache License, version 2.0
5+
* (the "License"); you may not use this file except in compliance with the
6+
* License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
packagecom.ning.http.client.providers.netty.timeout;
17+
18+
importorg.jboss.netty.util.Timeout;
19+
20+
publicclassTimeoutsHolder {
21+
22+
publicvolatileTimeoutrequestTimeout;
23+
publicvolatileTimeoutidleConnectionTimeout;
24+
25+
publicvoidcancel() {
26+
if (requestTimeout !=null) {
27+
requestTimeout.cancel();
28+
requestTimeout =null;
29+
}
30+
if (idleConnectionTimeout !=null) {
31+
idleConnectionTimeout.cancel();
32+
idleConnectionTimeout =null;
33+
}
34+
}
35+
}

‎src/main/java/com/ning/http/util/AsyncHttpProviderUtils.java‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
importcom.ning.http.client.HttpResponseBodyPart;
3636
importcom.ning.http.client.HttpResponseBodyPartsInputStream;
3737
importcom.ning.http.client.Part;
38+
importcom.ning.http.client.Request;
3839
importcom.ning.http.client.StringPart;
3940
importcom.ning.http.multipart.ByteArrayPartSource;
4041
importcom.ning.http.multipart.MultipartRequestEntity;
@@ -548,4 +549,8 @@ public static void checkBodyParts(int statusCode, Collection<HttpResponseBodyPar
548549
publicstaticStringkeepAliveHeaderValue(AsyncHttpClientConfigconfig) {
549550
returnconfig.getAllowPoolingConnection() ?"keep-alive" :"close";
550551
}
552+
553+
publicstaticintrequestTimeout(AsyncHttpClientConfigconfig,Requestrequest) {
554+
return (request.getPerRequestConfig() !=null &&request.getPerRequestConfig().getRequestTimeoutInMs() !=0) ?request.getPerRequestConfig().getRequestTimeoutInMs() :config.getRequestTimeoutInMs();
555+
}
551556
}

‎src/test/java/com/ning/http/client/async/netty/NettyPerRequestTimeoutTest.java‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
*/
1313
packagecom.ning.http.client.async.netty;
1414

15-
importstaticorg.testng.Assert.assertTrue;
15+
importstaticorg.testng.Assert.*;
1616

1717
importcom.ning.http.client.AsyncHttpClient;
1818
importcom.ning.http.client.AsyncHttpClientConfig;
@@ -22,7 +22,7 @@
2222
publicclassNettyPerRequestTimeoutTestextendsPerRequestTimeoutTest {
2323

2424
protectedvoidcheckTimeoutMessage(Stringmessage) {
25-
assertTrue(message.startsWith("Request reachedtime out of 100 ms after "));
25+
assertTrue(message.startsWith("Request reachedtimeout of 100 ms after "));
2626
}
2727

2828
@Override

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp