Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitad4e101

Browse files
authored
F/delayed handler (#100)
* Add a non blocking delayed handler
1 parent434704f commitad4e101

File tree

15 files changed

+588
-4
lines changed

15 files changed

+588
-4
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
packagecom.stubbornjava.common;
2+
3+
importjava.util.concurrent.ExecutorService;
4+
importjava.util.concurrent.Executors;
5+
importjava.util.concurrent.TimeUnit;
6+
7+
importorg.jooq.lambda.Unchecked;
8+
importorg.slf4j.Logger;
9+
importorg.slf4j.LoggerFactory;
10+
11+
importcom.google.common.util.concurrent.MoreExecutors;
12+
13+
importokhttp3.OkHttpClient;
14+
importokhttp3.Request;
15+
importokhttp3.Response;
16+
17+
publicclassHttp {
18+
privatestaticfinalLoggerlog =LoggerFactory.getLogger(Http.class);
19+
20+
// {{start:get}}
21+
publicstaticResponseget(OkHttpClientclient,Stringurl) {
22+
Requestrequest =newRequest.Builder()
23+
.url(url)
24+
.get()
25+
.build();
26+
returnUnchecked.supplier(() -> {
27+
Responseresponse =client.newCall(request).execute();
28+
returnresponse;
29+
}).get();
30+
}
31+
// {{end:get}}
32+
33+
// {{start:getInParallel}}
34+
publicstaticvoidgetInParallel(OkHttpClientclient,Stringurl,intcount) {
35+
ExecutorServiceexec =Executors.newFixedThreadPool(count);
36+
for (inti =0;i <count;i++) {
37+
exec.submit(() ->Http.get(client,url));
38+
}
39+
MoreExecutors.shutdownAndAwaitTermination(exec,30,TimeUnit.SECONDS);
40+
}
41+
// {{end:getInParallel}}
42+
}

‎stubbornjava-common/src/main/java/com/stubbornjava/common/HttpClient.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,11 @@ private HttpClient() {
4040
log.debug(msg);
4141
});
4242
static {
43-
loggingInterceptor.setLevel(Level.BODY);
43+
if (log.isDebugEnabled()) {
44+
loggingInterceptor.setLevel(Level.BASIC);
45+
}elseif (log.isTraceEnabled()) {
46+
loggingInterceptor.setLevel(Level.BODY);
47+
}
4448
}
4549

4650
publicstaticHttpLoggingInterceptorgetLoggingInterceptor() {
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
packagecom.stubbornjava.common;
2+
3+
4+
importjava.util.concurrent.TimeUnit;
5+
6+
importorg.slf4j.Logger;
7+
importorg.slf4j.LoggerFactory;
8+
9+
importcom.google.common.base.Stopwatch;
10+
11+
publicclassTimers {
12+
privatestaticfinalLoggerlogger =LoggerFactory.getLogger(Timers.class);
13+
14+
privateTimers() {}
15+
16+
publicstaticvoidtime(Stringmessage,Runnablerunnable) {
17+
Stopwatchsw =Stopwatch.createStarted();
18+
try {
19+
logger.info("{}",message);
20+
runnable.run();
21+
}catch (Exceptionex) {
22+
logger.warn("Exception in runnable",ex);
23+
throwex;
24+
}finally {
25+
logger.info("{} took {}ms",message,sw.elapsed(TimeUnit.MILLISECONDS));
26+
}
27+
}
28+
29+
}

‎stubbornjava-common/src/main/java/com/stubbornjava/common/undertow/SimpleServer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public static SimpleServer simpleServer(HttpHandler handler) {
4747
* If you base64 encode any cookie values you probably want it on.
4848
*/
4949
.setServerOption(UndertowOptions.ALLOW_EQUALS_IN_COOKIE_VALUE,true)
50+
// Needed to set request time in access logs
51+
.setServerOption(UndertowOptions.RECORD_REQUEST_START_TIME,true)
5052
.addHttpListener(DEFAULT_PORT,DEFAULT_HOST,handler)
5153
;
5254
returnnewSimpleServer(undertow);
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
packagecom.stubbornjava.common.undertow;
2+
3+
importjava.net.InetSocketAddress;
4+
importjava.util.function.Consumer;
5+
6+
importorg.slf4j.Logger;
7+
importorg.slf4j.LoggerFactory;
8+
9+
importio.undertow.Undertow;
10+
importio.undertow.Undertow.ListenerInfo;
11+
importio.undertow.server.HttpHandler;
12+
13+
publicclassUndertowUtil {
14+
privatestaticfinalLoggerlogger =LoggerFactory.getLogger(UndertowUtil.class);
15+
16+
/**
17+
* This is currently intended to be used in unit tests but may
18+
* be appropriate in other situations as well. It's not worth building
19+
* out a test module at this time so it lives here.
20+
*
21+
* This helper will spin up the http handler on a random available port.
22+
* The full host and port will be passed to the hostConsumer and the server
23+
* will be shut down after the consumer completes.
24+
*
25+
* @param builder
26+
* @param handler
27+
* @param hostConusmer
28+
*/
29+
publicstaticvoiduseLocalServer(Undertow.Builderbuilder,
30+
HttpHandlerhandler,
31+
Consumer<String>hostConusmer) {
32+
Undertowundertow =null;
33+
try {
34+
// Starts server on a random open port
35+
undertow =builder.addHttpListener(0,"127.0.0.1",handler).build();
36+
undertow.start();
37+
ListenerInfolistenerInfo =undertow.getListenerInfo().get(0);
38+
InetSocketAddressaddr = (InetSocketAddress)listenerInfo.getAddress();
39+
Stringhost ="http://localhost:" +addr.getPort();
40+
hostConusmer.accept(host);
41+
}finally {
42+
if (undertow !=null) {
43+
undertow.stop();
44+
}
45+
}
46+
}
47+
}

‎stubbornjava-common/src/main/java/com/stubbornjava/common/undertow/handlers/CustomHandlers.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ public class CustomHandlers {
4646
privatestaticfinalLoggerlog =LoggerFactory.getLogger(CustomHandlers.class);
4747

4848
publicstaticAccessLogHandleraccessLog(HttpHandlernext,Loggerlogger) {
49-
returnnewAccessLogHandler(next,newSlf4jAccessLogReceiver(logger),"combined",CustomHandlers.class.getClassLoader());
49+
// see http://undertow.io/javadoc/2.0.x/io/undertow/server/handlers/accesslog/AccessLogHandler.html
50+
Stringformat ="%H %h %u\"%r\" %s %Dms %b bytes\"%{i,Referer}\"\"%{i,User-Agent}\"";
51+
returnnewAccessLogHandler(next,newSlf4jAccessLogReceiver(logger),format,CustomHandlers.class.getClassLoader());
5052
}
5153

5254
publicstaticAccessLogHandleraccessLog(HttpHandlernext) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
packagecom.stubbornjava.common.undertow.handlers.diagnostic;
2+
3+
importjava.time.Duration;
4+
importjava.util.concurrent.TimeUnit;
5+
importjava.util.function.Function;
6+
7+
importio.undertow.server.Connectors;
8+
importio.undertow.server.HttpHandler;
9+
importio.undertow.server.HttpServerExchange;
10+
importio.undertow.server.handlers.BlockingHandler;
11+
12+
// {{start:delayedHandler}}
13+
/**
14+
* A non blocking handler to add a time delay before the next handler
15+
* is executed. If the exchange has already been dispatched this will
16+
* un-dispatch the exchange and re-dispatch it before next is called.
17+
*/
18+
publicclassDelayedExecutionHandlerimplementsHttpHandler {
19+
20+
privatefinalHttpHandlernext;
21+
privatefinalFunction<HttpServerExchange,Duration>durationFunc;
22+
23+
DelayedExecutionHandler(HttpHandlernext,
24+
Function<HttpServerExchange,Duration>durationFunc) {
25+
this.next =next;
26+
this.durationFunc =durationFunc;
27+
}
28+
29+
@Override
30+
publicvoidhandleRequest(HttpServerExchangeexchange)throwsException {
31+
Durationduration =durationFunc.apply(exchange);
32+
33+
finalHttpHandlerdelegate;
34+
if (exchange.isBlocking()) {
35+
// We want to undispatch here so that we are not blocking
36+
// a worker thread. We will spin on the IO thread using the
37+
// built in executeAfter.
38+
exchange.unDispatch();
39+
delegate =newBlockingHandler(next);
40+
}else {
41+
delegate =next;
42+
}
43+
44+
exchange.dispatch(exchange.getIoThread(), () -> {
45+
exchange.getIoThread().executeAfter(() ->
46+
Connectors.executeRootHandler(delegate,exchange),
47+
duration.toMillis(),
48+
TimeUnit.MILLISECONDS);
49+
});
50+
}
51+
}
52+
// {{end:delayedHandler}}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
packagecom.stubbornjava.common.undertow.handlers.diagnostic;
2+
3+
importjava.time.Duration;
4+
importjava.util.concurrent.ThreadLocalRandom;
5+
importjava.util.concurrent.TimeUnit;
6+
7+
importio.undertow.server.HttpHandler;
8+
9+
publicclassDiagnosticHandlers {
10+
11+
// {{start:delayedHandler}}
12+
/**
13+
* Add a fixed delay before execution of the next handler
14+
* @param next
15+
* @param duration
16+
* @param unit
17+
* @return
18+
*/
19+
publicstaticDelayedExecutionHandlerfixedDelay(HttpHandlernext,
20+
longduration,
21+
TimeUnitunit) {
22+
returnnewDelayedExecutionHandler(
23+
next, (exchange) ->Duration.ofMillis(unit.toMillis(duration)));
24+
}
25+
26+
/**
27+
* Add a random delay between minDuration (inclusive) and
28+
* maxDuration (exclusive) before execution of the next handler.
29+
* This can be used to add artificial latency for requests.
30+
*
31+
* @param next
32+
* @param minDuration inclusive
33+
* @param maxDuration exclusive
34+
* @param unit
35+
* @return
36+
*/
37+
publicstaticDelayedExecutionHandlerrandomDelay(HttpHandlernext,
38+
longminDuration,
39+
longmaxDuration,
40+
TimeUnitunit) {
41+
returnnewDelayedExecutionHandler(
42+
next, (exchange) -> {
43+
longduration =ThreadLocalRandom.current()
44+
.nextLong(minDuration,maxDuration);
45+
returnDuration.ofMillis(unit.toMillis(duration));
46+
});
47+
}
48+
// {{end:delayedHandler}}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
packagecom.stubbornjava.common.undertow.handlers.diagnostic;
2+
3+
importstaticorg.junit.Assert.assertTrue;
4+
5+
importjava.util.List;
6+
importjava.util.concurrent.Callable;
7+
importjava.util.concurrent.ExecutorService;
8+
importjava.util.concurrent.Executors;
9+
importjava.util.concurrent.Future;
10+
importjava.util.concurrent.TimeUnit;
11+
importjava.util.stream.Collectors;
12+
importjava.util.stream.IntStream;
13+
14+
importorg.jooq.lambda.Seq;
15+
importorg.jooq.lambda.Unchecked;
16+
importorg.junit.Assert;
17+
importorg.junit.Test;
18+
19+
importcom.google.common.base.Stopwatch;
20+
importcom.google.common.util.concurrent.MoreExecutors;
21+
importcom.stubbornjava.common.Http;
22+
importcom.stubbornjava.common.HttpClient;
23+
importcom.stubbornjava.common.undertow.Exchange;
24+
importcom.stubbornjava.common.undertow.UndertowUtil;
25+
importcom.stubbornjava.common.undertow.handlers.CustomHandlers;
26+
importcom.stubbornjava.undertow.handlers.MiddlewareBuilder;
27+
28+
importio.undertow.Undertow;
29+
importio.undertow.server.HttpHandler;
30+
importio.undertow.server.handlers.BlockingHandler;
31+
importokhttp3.OkHttpClient;
32+
importokhttp3.Response;
33+
34+
publicclassDelayedExecutionHandlerTest {
35+
36+
// Delay for 500ms then return "ok"
37+
privatestaticfinalDelayedExecutionHandlerdelayedHandler =
38+
DiagnosticHandlers.fixedDelay((exchange) -> {
39+
Exchange.body().sendText(exchange,"ok");
40+
},
41+
500,TimeUnit.MILLISECONDS);
42+
43+
@Test
44+
publicvoidtestOnXIoThread()throwsInterruptedException {
45+
intnumThreads =10;
46+
run(delayedHandler,numThreads);
47+
}
48+
49+
@Test
50+
publicvoidtestOnWorkerThread()throwsInterruptedException {
51+
intnumThreads =10;
52+
run(newBlockingHandler(delayedHandler),numThreads);
53+
}
54+
55+
/**
56+
* Spin up a new server with a single IO thread and worker thread.
57+
* Run N GET requests against it concurrently and make sure they
58+
* do not take N * 500ms total. This is not the best test but it
59+
* should show that we are delaying N requests at once using a single
60+
* thread.
61+
*
62+
* @param handler
63+
* @param numThreads
64+
* @throws InterruptedException
65+
*/
66+
privatevoidrun(HttpHandlerhandler,intnumThreads)throwsInterruptedException {
67+
HttpHandlerroute =MiddlewareBuilder.begin(CustomHandlers::accessLog)
68+
.complete(handler);
69+
Undertow.Builderbuilder =Undertow.builder()
70+
.setWorkerThreads(1)
71+
.setIoThreads(1);
72+
UndertowUtil.useLocalServer(builder,route,host -> {
73+
ExecutorServiceexec =Executors.newFixedThreadPool(numThreads);
74+
OkHttpClientclient =newOkHttpClient().newBuilder()
75+
.addInterceptor(HttpClient.getLoggingInterceptor())
76+
.build();
77+
78+
// Using time in tests isn't the best approach but this one seems
79+
// A little difficult to test another way.
80+
Stopwatchsw =Stopwatch.createStarted();
81+
List<Callable<Response>>callables =IntStream.range(0,numThreads)
82+
.mapToObj(i -> (Callable<Response>) () ->Http.get(client,host))
83+
.collect(Collectors.toList());
84+
sw.stop();
85+
Seq.seq(Unchecked.supplier(() ->exec.invokeAll(callables)).get())
86+
.map(Unchecked.function(Future::get))
87+
.forEach(DelayedExecutionHandlerTest::assertSuccess);
88+
assertTrue("Responses took too long",sw.elapsed().toMillis() <1_000);
89+
MoreExecutors.shutdownAndAwaitTermination(exec,10,TimeUnit.SECONDS);
90+
});
91+
}
92+
93+
privatestaticvoidassertSuccess(Responseresponse) {
94+
Assert.assertTrue("Response should be a 200",response.isSuccessful());
95+
}
96+
97+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp