Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit94f8ef2

Browse files
committed
Have FeedableBodyGenerator use ByteBuf instead of ByteBuffer,closeAsyncHttpClient#1424
So one can use Netty's pooled buffers
1 parent27b2e7a commit94f8ef2

File tree

10 files changed

+111
-94
lines changed

10 files changed

+111
-94
lines changed

‎client/src/main/java/org/asynchttpclient/RequestBuilderBase.java‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
importstaticio.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
1919
importstaticorg.asynchttpclient.util.HttpUtils.*;
2020
importstaticorg.asynchttpclient.util.MiscUtils.isNonEmpty;
21+
importio.netty.buffer.ByteBuf;
2122
importio.netty.handler.codec.http.DefaultHttpHeaders;
2223
importio.netty.handler.codec.http.HttpHeaders;
2324
importio.netty.handler.codec.http.cookie.Cookie;
@@ -398,11 +399,11 @@ public T setBody(InputStream stream) {
398399
returnasDerivedType();
399400
}
400401

401-
publicTsetBody(Publisher<ByteBuffer>publisher) {
402+
publicTsetBody(Publisher<ByteBuf>publisher) {
402403
returnsetBody(publisher, -1L);
403404
}
404405

405-
publicTsetBody(Publisher<ByteBuffer>publisher,longcontentLength) {
406+
publicTsetBody(Publisher<ByteBuf>publisher,longcontentLength) {
406407
returnsetBody(newReactiveStreamsBodyGenerator(publisher,contentLength));
407408
}
408409

‎client/src/main/java/org/asynchttpclient/netty/request/body/NettyReactiveStreamsBody.java‎

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,13 @@
1212
*/
1313
packageorg.asynchttpclient.netty.request.body;
1414

15+
importio.netty.buffer.ByteBuf;
16+
importio.netty.channel.Channel;
17+
importio.netty.handler.codec.http.DefaultHttpContent;
18+
importio.netty.handler.codec.http.HttpContent;
19+
importio.netty.handler.codec.http.LastHttpContent;
20+
1521
importjava.io.IOException;
16-
importjava.nio.ByteBuffer;
1722
importjava.util.NoSuchElementException;
1823

1924
importorg.asynchttpclient.netty.NettyResponseFuture;
@@ -25,24 +30,16 @@
2530

2631
importcom.typesafe.netty.HandlerSubscriber;
2732

28-
importio.netty.buffer.ByteBuf;
29-
importio.netty.buffer.Unpooled;
30-
importio.netty.channel.Channel;
31-
importio.netty.handler.codec.http.DefaultHttpContent;
32-
importio.netty.handler.codec.http.HttpContent;
33-
importio.netty.handler.codec.http.LastHttpContent;
34-
importio.netty.util.concurrent.EventExecutor;
35-
3633
publicclassNettyReactiveStreamsBodyimplementsNettyBody {
3734

3835
privatestaticfinalLoggerLOGGER =LoggerFactory.getLogger(NettyReactiveStreamsBody.class);
3936
privatestaticfinalStringNAME_IN_CHANNEL_PIPELINE ="request-body-streamer";
4037

41-
privatefinalPublisher<ByteBuffer>publisher;
38+
privatefinalPublisher<ByteBuf>publisher;
4239

4340
privatefinallongcontentLength;
4441

45-
publicNettyReactiveStreamsBody(Publisher<ByteBuffer>publisher,longcontentLength) {
42+
publicNettyReactiveStreamsBody(Publisher<ByteBuf>publisher,longcontentLength) {
4643
this.publisher =publisher;
4744
this.contentLength =contentLength;
4845
}
@@ -69,32 +66,35 @@ public void write(Channel channel, NettyResponseFuture<?> future) throws IOExcep
6966
}
7067
}
7168

72-
privatestaticclassSubscriberAdapterimplementsSubscriber<ByteBuffer> {
69+
privatestaticclassSubscriberAdapterimplementsSubscriber<ByteBuf> {
7370
privatevolatileSubscriber<HttpContent>subscriber;
74-
71+
7572
publicSubscriberAdapter(Subscriber<HttpContent>subscriber) {
7673
this.subscriber =subscriber;
7774
}
75+
7876
@Override
7977
publicvoidonSubscribe(Subscriptions) {
80-
subscriber.onSubscribe(s);
78+
subscriber.onSubscribe(s);
8179
}
80+
8281
@Override
83-
publicvoidonNext(ByteBuffert) {
84-
ByteBufbuffer =Unpooled.wrappedBuffer(t);
82+
publicvoidonNext(ByteBufbuffer) {
8583
HttpContentcontent =newDefaultHttpContent(buffer);
8684
subscriber.onNext(content);
8785
}
86+
8887
@Override
8988
publicvoidonError(Throwablet) {
9089
subscriber.onError(t);
9190
}
91+
9292
@Override
9393
publicvoidonComplete() {
9494
subscriber.onComplete();
95-
}
95+
}
9696
}
97-
97+
9898
privatestaticclassNettySubscriberextendsHandlerSubscriber<HttpContent> {
9999
privatestaticfinalLoggerLOGGER =LoggerFactory.getLogger(NettySubscriber.class);
100100

@@ -109,8 +109,7 @@ public NettySubscriber(Channel channel, NettyResponseFuture<?> future) {
109109

110110
@Override
111111
protectedvoidcomplete() {
112-
EventExecutorexecutor =channel.eventLoop();
113-
executor.execute(() ->channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(future ->removeFromPipeline()));
112+
channel.eventLoop().execute(() ->channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(future ->removeFromPipeline()));
114113
}
115114

116115
@Override

‎client/src/main/java/org/asynchttpclient/request/body/generator/BodyChunk.java‎

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
*/
1414
packageorg.asynchttpclient.request.body.generator;
1515

16-
importjava.nio.ByteBuffer;
16+
importio.netty.buffer.ByteBuf;
1717

1818
publicfinalclassBodyChunk {
1919
publicfinalbooleanlast;
20-
publicfinalByteBufferbuffer;
20+
publicfinalByteBufbuffer;
2121

22-
publicBodyChunk(finalByteBufferbuffer,finalbooleanlast) {
22+
publicBodyChunk(ByteBufbuffer,booleanlast) {
2323
this.buffer =buffer;
2424
this.last =last;
2525
}

‎client/src/main/java/org/asynchttpclient/request/body/generator/FeedableBodyGenerator.java‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@
1313
*/
1414
packageorg.asynchttpclient.request.body.generator;
1515

16-
importjava.nio.ByteBuffer;
16+
importio.netty.buffer.ByteBuf;
1717

1818
/**
1919
* {@link BodyGenerator} which may return just part of the payload at the time handler is requesting it.
2020
* If it happens, client becomes responsible for providing the rest of the chunks.
2121
*/
2222
publicinterfaceFeedableBodyGeneratorextendsBodyGenerator {
2323

24-
booleanfeed(ByteBufferbuffer,booleanisLast)throwsException;
24+
booleanfeed(ByteBufbuffer,booleanisLast)throwsException;
2525

2626
voidsetListener(FeedListenerlistener);
2727
}

‎client/src/main/java/org/asynchttpclient/request/body/generator/PushBody.java‎

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
importio.netty.buffer.ByteBuf;
1717

1818
importjava.io.IOException;
19-
importjava.nio.ByteBuffer;
2019
importjava.util.Queue;
2120

2221
importorg.asynchttpclient.request.body.Body;
@@ -54,7 +53,7 @@ private BodyState readNextChunk(ByteBuf target) throws IOException {
5453
if (nextChunk ==null) {
5554
// Nothing in the queue. suspend stream if nothing was read. (reads == 0)
5655
returnres;
57-
}elseif (!nextChunk.buffer.hasRemaining() && !nextChunk.last) {
56+
}elseif (!nextChunk.buffer.isReadable() && !nextChunk.last) {
5857
// skip empty buffers
5958
queue.remove();
6059
}else {
@@ -66,26 +65,15 @@ private BodyState readNextChunk(ByteBuf target) throws IOException {
6665
}
6766

6867
privatevoidreadChunk(ByteBuftarget,BodyChunkpart) {
69-
move(target,part.buffer);
70-
71-
if (!part.buffer.hasRemaining()) {
68+
target.writeBytes(part.buffer);
69+
if (!part.buffer.isReadable()) {
7270
if (part.last) {
7371
state =BodyState.STOP;
7472
}
7573
queue.remove();
7674
}
7775
}
7876

79-
privatevoidmove(ByteBuftarget,ByteBuffersource) {
80-
intsize =Math.min(target.writableBytes(),source.remaining());
81-
if (size >0) {
82-
ByteBufferslice =source.slice();
83-
slice.limit(size);
84-
target.writeBytes(slice);
85-
source.position(source.position() +size);
86-
}
87-
}
88-
8977
@Override
9078
publicvoidclose() {
9179
}

‎client/src/main/java/org/asynchttpclient/request/body/generator/QueueBasedFeedableBodyGenerator.java‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
*/
1414
packageorg.asynchttpclient.request.body.generator;
1515

16-
importjava.nio.ByteBuffer;
16+
importio.netty.buffer.ByteBuf;
17+
1718
importjava.util.Queue;
1819

1920
importorg.asynchttpclient.request.body.Body;
@@ -35,7 +36,7 @@ public Body createBody() {
3536
protectedabstractbooleanoffer(BodyChunkchunk)throwsException;
3637

3738
@Override
38-
publicbooleanfeed(finalByteBufferbuffer,finalbooleanisLast)throwsException {
39+
publicbooleanfeed(finalByteBufbuffer,finalbooleanisLast)throwsException {
3940
booleanoffered =offer(newBodyChunk(buffer,isLast));
4041
if (offered &&listener !=null) {
4142
listener.onContentAdded();

‎client/src/main/java/org/asynchttpclient/request/body/generator/ReactiveStreamsBodyGenerator.java‎

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
packageorg.asynchttpclient.request.body.generator;
1414

1515
importio.netty.buffer.ByteBuf;
16+
importio.netty.buffer.Unpooled;
1617

1718
importjava.io.IOException;
18-
importjava.nio.ByteBuffer;
1919
importjava.util.concurrent.atomic.AtomicBoolean;
2020

2121
importorg.asynchttpclient.request.body.Body;
@@ -26,9 +26,8 @@
2626
importorg.slf4j.LoggerFactory;
2727

2828
publicclassReactiveStreamsBodyGeneratorimplementsFeedableBodyGenerator {
29-
privatestaticfinalByteBufferEMPTY =ByteBuffer.wrap("".getBytes());
3029

31-
privatefinalPublisher<ByteBuffer>publisher;
30+
privatefinalPublisher<ByteBuf>publisher;
3231
privatefinalFeedableBodyGeneratorfeedableBodyGenerator;
3332
privatevolatileFeedListenerfeedListener;
3433
privatefinallongcontentLength;
@@ -41,18 +40,18 @@ public class ReactiveStreamsBodyGenerator implements FeedableBodyGenerator {
4140
* @param publisher Body as a Publisher
4241
* @param contentLength Content-Length of the Body
4342
*/
44-
publicReactiveStreamsBodyGenerator(Publisher<ByteBuffer>publisher,longcontentLength) {
43+
publicReactiveStreamsBodyGenerator(Publisher<ByteBuf>publisher,longcontentLength) {
4544
this.publisher =publisher;
4645
this.feedableBodyGenerator =newUnboundedQueueFeedableBodyGenerator();
4746
this.contentLength =contentLength;
4847
}
4948

50-
publicPublisher<ByteBuffer>getPublisher() {
49+
publicPublisher<ByteBuf>getPublisher() {
5150
returnthis.publisher;
5251
}
5352

5453
@Override
55-
publicbooleanfeed(ByteBufferbuffer,booleanisLast)throwsException {
54+
publicbooleanfeed(ByteBufbuffer,booleanisLast)throwsException {
5655
returnfeedableBodyGenerator.feed(buffer,isLast);
5756
}
5857

@@ -79,7 +78,7 @@ private class StreamedBody implements Body {
7978

8079
privatefinallongcontentLength;
8180

82-
publicStreamedBody(Publisher<ByteBuffer>publisher,FeedableBodyGeneratorbodyGenerator,longcontentLength) {
81+
publicStreamedBody(Publisher<ByteBuf>publisher,FeedableBodyGeneratorbodyGenerator,longcontentLength) {
8382
this.body =bodyGenerator.createBody();
8483
this.subscriber =newSimpleSubscriber(bodyGenerator);
8584
this.contentLength =contentLength;
@@ -97,14 +96,15 @@ public long getContentLength() {
9796

9897
@Override
9998
publicBodyStatetransferTo(ByteBuftarget)throwsIOException {
100-
if (initialized.compareAndSet(false,true))
99+
if (initialized.compareAndSet(false,true)) {
101100
publisher.subscribe(subscriber);
101+
}
102102

103103
returnbody.transferTo(target);
104104
}
105105
}
106106

107-
privateclassSimpleSubscriberimplementsSubscriber<ByteBuffer> {
107+
privateclassSimpleSubscriberimplementsSubscriber<ByteBuf> {
108108

109109
privatefinalLoggerLOGGER =LoggerFactory.getLogger(SimpleSubscriber.class);
110110

@@ -130,7 +130,7 @@ public void onSubscribe(Subscription s) {
130130
}
131131

132132
@Override
133-
publicvoidonNext(ByteBuffert) {
133+
publicvoidonNext(ByteBuft) {
134134
if (t ==null)
135135
thrownull;
136136
try {
@@ -147,14 +147,15 @@ public void onError(Throwable t) {
147147
thrownull;
148148
LOGGER.debug("Error occurred while consuming body stream.",t);
149149
FeedListenerlistener =feedListener;
150-
if (listener !=null)
150+
if (listener !=null) {
151151
listener.onError(t);
152+
}
152153
}
153154

154155
@Override
155156
publicvoidonComplete() {
156157
try {
157-
feeder.feed(EMPTY,true);
158+
feeder.feed(Unpooled.EMPTY_BUFFER,true);
158159
}catch (Exceptione) {
159160
LOGGER.info("Ignoring exception occurred while completing stream processing.",e);
160161
this.subscription.cancel();

‎client/src/test/java/org/asynchttpclient/request/body/ChunkingTest.java‎

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
importstaticorg.asynchttpclient.test.TestUtils.*;
1717
importstaticorg.testng.Assert.*;
1818
importstaticorg.testng.FileAssert.fail;
19+
importio.netty.buffer.Unpooled;
1920

2021
importjava.io.BufferedInputStream;
2122
importjava.io.IOException;
2223
importjava.io.InputStream;
23-
importjava.nio.ByteBuffer;
2424
importjava.nio.file.Files;
2525

2626
importorg.asynchttpclient.AbstractBasicTest;
@@ -89,10 +89,10 @@ private void feed(FeedableBodyGenerator feedableBodyGenerator, InputStream is) t
8989
for (inti =0; (i =inputStream.read(buffer)) > -1;) {
9090
byte[]chunk =newbyte[i];
9191
System.arraycopy(buffer,0,chunk,0,i);
92-
feedableBodyGenerator.feed(ByteBuffer.wrap(chunk),false);
92+
feedableBodyGenerator.feed(Unpooled.wrappedBuffer(chunk),false);
9393
}
9494
}
95-
feedableBodyGenerator.feed(ByteBuffer.allocate(0),true);
95+
feedableBodyGenerator.feed(Unpooled.EMPTY_BUFFER,true);
9696

9797
}
9898

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp