Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3529482

Browse files
committed
Start investigating ReactiveStreamsTest random failures
Still not sure if the issue is on the client or on the server side.What’s for sure is that Jetty randomly reads a corrupted chunk.What’s weird is that the corruption always happens after 12.000 or24.000 bytes for a given request/socket.
1 parent0450cc6 commit3529482

File tree

5 files changed

+181
-93
lines changed

5 files changed

+181
-93
lines changed

‎client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsTest.java‎

Lines changed: 83 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@
1414

1515
importstaticorg.asynchttpclient.Dsl.*;
1616
importstaticorg.asynchttpclient.test.TestUtils.*;
17+
importstaticio.netty.handler.codec.http.HttpHeaderNames.*;
1718
importstaticorg.testng.Assert.assertEquals;
1819
importio.netty.handler.codec.http.HttpHeaders;
1920

2021
importjava.io.ByteArrayOutputStream;
2122
importjava.nio.ByteBuffer;
2223
importjava.util.ArrayList;
24+
importjava.util.Arrays;
2325
importjava.util.Collections;
26+
importjava.util.Iterator;
2427
importjava.util.List;
2528
importjava.util.concurrent.CountDownLatch;
2629
importjava.util.concurrent.ExecutionException;
@@ -33,6 +36,7 @@
3336
importorg.asynchttpclient.ListenableFuture;
3437
importorg.asynchttpclient.Response;
3538
importorg.asynchttpclient.handler.StreamedAsyncHandler;
39+
importorg.asynchttpclient.test.TestUtils;
3640
importorg.reactivestreams.Publisher;
3741
importorg.reactivestreams.Subscriber;
3842
importorg.reactivestreams.Subscription;
@@ -43,10 +47,15 @@
4347

4448
publicclassReactiveStreamsTestextendsAbstractBasicTest {
4549

50+
publicstaticPublisher<ByteBuffer>createPublisher(finalbyte[]bytes,finalintchunkSize) {
51+
Observable<ByteBuffer>observable =Observable.from(newByteBufferIterable(bytes,chunkSize));
52+
returnRxReactiveStreams.toPublisher(observable);
53+
}
54+
4655
@Test(groups ="standalone")
4756
publicvoidtestStreamingPutImage()throwsException {
4857
try (AsyncHttpClientclient =asyncHttpClient(config().setRequestTimeout(100 *6000))) {
49-
Responseresponse =client.preparePut(getTargetUrl()).setBody(LARGE_IMAGE_PUBLISHER).execute().get();
58+
Responseresponse =client.preparePut(getTargetUrl()).setBody(createPublisher(LARGE_IMAGE_BYTES,2342)).execute().get();
5059
assertEquals(response.getStatusCode(),200);
5160
assertEquals(response.getResponseBodyAsBytes(),LARGE_IMAGE_BYTES);
5261
}
@@ -56,16 +65,49 @@ public void testStreamingPutImage() throws Exception {
5665
publicvoidtestConnectionDoesNotGetClosed()throwsException {
5766
// test that we can stream the same request multiple times
5867
try (AsyncHttpClientclient =asyncHttpClient(config().setRequestTimeout(100 *6000))) {
59-
BoundRequestBuilderrequestBuilder =client.preparePut(getTargetUrl()).setBody(LARGE_IMAGE_PUBLISHER);
68+
StringexpectedMd5 =TestUtils.md5(LARGE_IMAGE_BYTES);
69+
BoundRequestBuilderrequestBuilder =client.preparePut(getTargetUrl())//
70+
.setBody(createPublisher(LARGE_IMAGE_BYTES,1000))//
71+
.setHeader("X-" +CONTENT_LENGTH,LARGE_IMAGE_BYTES.length)//
72+
.setHeader("X-" +CONTENT_MD5,expectedMd5);
73+
6074
Responseresponse =requestBuilder.execute().get();
6175
assertEquals(response.getStatusCode(),200);
62-
assertEquals(response.getResponseBodyAsBytes().length,LARGE_IMAGE_BYTES.length);
63-
assertEquals(response.getResponseBodyAsBytes(),LARGE_IMAGE_BYTES);
76+
byte[]responseBody =response.getResponseBodyAsBytes();
77+
responseBody =response.getResponseBodyAsBytes();
78+
assertEquals(Integer.valueOf(response.getHeader("X-" +CONTENT_LENGTH)).intValue(),LARGE_IMAGE_BYTES.length,"Server received payload length invalid");
79+
assertEquals(responseBody.length,LARGE_IMAGE_BYTES.length,"Client back payload length invalid");
80+
assertEquals(response.getHeader(CONTENT_MD5),expectedMd5,"Server received payload MD5 invalid");
81+
assertEquals(TestUtils.md5(responseBody),expectedMd5,"Client back payload MD5 invalid");
82+
assertEquals(responseBody,LARGE_IMAGE_BYTES);
6483

6584
response =requestBuilder.execute().get();
6685
assertEquals(response.getStatusCode(),200);
67-
assertEquals(response.getResponseBodyAsBytes().length,LARGE_IMAGE_BYTES.length);
68-
assertEquals(response.getResponseBodyAsBytes(),LARGE_IMAGE_BYTES);
86+
responseBody =response.getResponseBodyAsBytes();
87+
assertEquals(Integer.valueOf(response.getHeader("X-" +CONTENT_LENGTH)).intValue(),LARGE_IMAGE_BYTES.length,"Server received payload length invalid");
88+
assertEquals(responseBody.length,LARGE_IMAGE_BYTES.length,"Client back payload length invalid");
89+
try {
90+
assertEquals(response.getHeader(CONTENT_MD5),expectedMd5,"Server received payload MD5 invalid");
91+
assertEquals(TestUtils.md5(responseBody),expectedMd5,"Client back payload MD5 invalid");
92+
assertEquals(responseBody,LARGE_IMAGE_BYTES);
93+
}catch (AssertionErrore) {
94+
for (inti =0;i <LARGE_IMAGE_BYTES.length;i++) {
95+
assertEquals(responseBody[i],LARGE_IMAGE_BYTES[i],"Invalid response byte at position " +i);
96+
}
97+
throwe;
98+
}
99+
}
100+
}
101+
102+
publicstaticvoidmain(String[]args)throwsException {
103+
ReactiveStreamsTesttest =newReactiveStreamsTest();
104+
test.setUpGlobal();
105+
try {
106+
for (inti =0;i <1000;i++) {
107+
test.testConnectionDoesNotGetClosed();
108+
}
109+
}finally {
110+
test.tearDownGlobal();
69111
}
70112
}
71113

@@ -294,4 +336,39 @@ public void onError(Throwable error) {
294336
publicvoidonComplete() {
295337
}
296338
}
339+
340+
staticclassByteBufferIterableimplementsIterable<ByteBuffer> {
341+
privatefinalbyte[]payload;
342+
privatefinalintchunkSize;
343+
344+
publicByteBufferIterable(byte[]payload,intchunkSize) {
345+
this.payload =payload;
346+
this.chunkSize =chunkSize;
347+
}
348+
349+
@Override
350+
publicIterator<ByteBuffer>iterator() {
351+
returnnewIterator<ByteBuffer>() {
352+
privatevolatileintcurrentIndex =0;
353+
354+
@Override
355+
publicbooleanhasNext() {
356+
returncurrentIndex !=payload.length;
357+
}
358+
359+
@Override
360+
publicByteBuffernext() {
361+
intnewIndex =Math.min(currentIndex +chunkSize,payload.length);
362+
byte[]bytesInElement =Arrays.copyOfRange(payload,currentIndex,newIndex);
363+
currentIndex =newIndex;
364+
returnByteBuffer.wrap(bytesInElement);
365+
}
366+
367+
@Override
368+
publicvoidremove() {
369+
thrownewUnsupportedOperationException("ByteBufferIterable's iterator does not support remove.");
370+
}
371+
};
372+
}
373+
}
297374
}

‎client/src/test/java/org/asynchttpclient/test/EchoHandler.java‎

Lines changed: 75 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,33 @@
1313
*/
1414
packageorg.asynchttpclient.test;
1515

16-
importorg.eclipse.jetty.server.Request;
17-
importorg.eclipse.jetty.server.handler.AbstractHandler;
18-
importorg.slf4j.Logger;
19-
importorg.slf4j.LoggerFactory;
16+
importstaticio.netty.handler.codec.http.HttpHeaderNames.*;
17+
importio.netty.buffer.ByteBufUtil;
18+
importio.netty.buffer.Unpooled;
19+
importio.netty.util.internal.StringUtil;
20+
21+
importjava.io.IOException;
22+
importjava.util.Enumeration;
2023

2124
importjavax.servlet.ServletException;
2225
importjavax.servlet.http.Cookie;
2326
importjavax.servlet.http.HttpServletRequest;
2427
importjavax.servlet.http.HttpServletResponse;
2528

26-
importjava.io.IOException;
27-
importjava.util.Enumeration;
29+
importorg.eclipse.jetty.server.Request;
30+
importorg.eclipse.jetty.server.handler.AbstractHandler;
31+
importorg.slf4j.Logger;
32+
importorg.slf4j.LoggerFactory;
2833

2934
publicclassEchoHandlerextendsAbstractHandler {
30-
35+
3136
privatestaticfinalLoggerLOGGER =LoggerFactory.getLogger(EchoHandler.class);
3237

3338
@Override
3439
publicvoidhandle(StringpathInContext,Requestrequest,HttpServletRequesthttpRequest,HttpServletResponsehttpResponse)throwsIOException,ServletException {
3540

3641
LOGGER.debug("Echo received request {} on path {}",request,pathInContext);
37-
42+
3843
if (httpRequest.getHeader("X-HEAD") !=null) {
3944
httpResponse.setContentLength(1);
4045
}
@@ -49,34 +54,23 @@ public void handle(String pathInContext, Request request, HttpServletRequest htt
4954
httpResponse.addHeader("Allow","GET,HEAD,POST,OPTIONS,TRACE");
5055
}
5156

52-
Enumeration<?>e =httpRequest.getHeaderNames();
53-
Stringparam;
57+
Enumeration<String>e =httpRequest.getHeaderNames();
58+
StringheaderName;
5459
while (e.hasMoreElements()) {
55-
param =e.nextElement().toString();
56-
57-
if (param.startsWith("LockThread")) {
58-
finalintsleepTime =httpRequest.getIntHeader(param);
60+
headerName =e.nextElement();
61+
if (headerName.startsWith("LockThread")) {
62+
finalintsleepTime =httpRequest.getIntHeader(headerName);
5963
try {
6064
Thread.sleep(sleepTime == -1 ?40 :sleepTime *1000);
6165
}catch (InterruptedExceptionex) {
6266
}
6367
}
6468

65-
if (param.startsWith("X-redirect")) {
69+
if (headerName.startsWith("X-redirect")) {
6670
httpResponse.sendRedirect(httpRequest.getHeader("X-redirect"));
6771
return;
6872
}
69-
httpResponse.addHeader("X-" +param,httpRequest.getHeader(param));
70-
}
71-
72-
Enumeration<?>i =httpRequest.getParameterNames();
73-
74-
StringBuilderrequestBody =newStringBuilder();
75-
while (i.hasMoreElements()) {
76-
param =i.nextElement().toString();
77-
httpResponse.addHeader("X-" +param,httpRequest.getParameter(param));
78-
requestBody.append(param);
79-
requestBody.append("_");
73+
httpResponse.addHeader("X-" +headerName,httpRequest.getHeader(headerName));
8074
}
8175

8276
StringpathInfo =httpRequest.getPathInfo();
@@ -96,27 +90,71 @@ public void handle(String pathInContext, Request request, HttpServletRequest htt
9690
}
9791
}
9892

99-
if (requestBody.length() >0) {
100-
httpResponse.getOutputStream().write(requestBody.toString().getBytes());
101-
}
93+
Enumeration<String>i =httpRequest.getParameterNames();
94+
if (i.hasMoreElements()) {
95+
StringBuilderrequestBody =newStringBuilder();
96+
while (i.hasMoreElements()) {
97+
headerName =i.nextElement();
98+
httpResponse.addHeader("X-" +headerName,httpRequest.getParameter(headerName));
99+
requestBody.append(headerName);
100+
requestBody.append("_");
101+
}
102102

103-
intsize =16384;
104-
if (httpRequest.getContentLength() >0) {
105-
size =httpRequest.getContentLength();
103+
if (requestBody.length() >0) {
104+
Stringbody =requestBody.toString();
105+
httpResponse.getOutputStream().write(body.getBytes());
106+
}
106107
}
107-
byte[]bytes =newbyte[size];
108-
if (bytes.length >0) {
108+
109+
StringclientContentLength =httpRequest.getHeader("X-" +CONTENT_LENGTH);
110+
StringclientMd5 =httpRequest.getHeader("X-" +CONTENT_MD5);
111+
112+
if (clientContentLength !=null) {
113+
byte[]bytes =newbyte[Integer.valueOf(clientContentLength)];
109114
intread =0;
115+
inttotal =0;
110116
while (read > -1) {
111-
read =httpRequest.getInputStream().read(bytes);
117+
read =httpRequest.getInputStream().read(bytes,total,5000);
112118
if (read >0) {
113-
httpResponse.getOutputStream().write(bytes,0,read);
119+
total +=read;
120+
}
121+
}
122+
123+
httpResponse.addIntHeader("X-" +CONTENT_LENGTH,total);
124+
Stringmd5 =TestUtils.md5(bytes,0,total);
125+
httpResponse.addHeader(CONTENT_MD5.toString(),md5);
126+
127+
if (!md5.equals(clientMd5)) {
128+
intlength =total;
129+
introws =length /16 + (length %15 ==0 ?0 :1) +4;
130+
StringBuilderbuf =newStringBuilder("JETTY".length() +1 +"JETTY".length() +2 +10 +1 +2 +rows *80);
131+
132+
buf.append("JETTY").append(' ').append("JETTY").append(": ").append(length).append('B').append(StringUtil.NEWLINE);
133+
ByteBufUtil.appendPrettyHexDump(buf,Unpooled.wrappedBuffer(bytes));
134+
LOGGER.error(buf.toString());
135+
}
136+
137+
httpResponse.getOutputStream().write(bytes,0,total);
138+
}else {
139+
intsize =16384;
140+
if (httpRequest.getContentLength() >0) {
141+
size =httpRequest.getContentLength();
142+
}
143+
if (size >0) {
144+
intread =0;
145+
while (read > -1) {
146+
byte[]bytes =newbyte[size];
147+
read =httpRequest.getInputStream().read(bytes);
148+
if (read >0) {
149+
httpResponse.getOutputStream().write(bytes,0,read);
150+
}
114151
}
115152
}
116153
}
117154

118155
request.setHandled(true);
119156
httpResponse.getOutputStream().flush();
157+
// FIXME don't always close, depends on the test, cf ReactiveStreamsTest
120158
httpResponse.getOutputStream().close();
121159
}
122-
}
160+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp