99import com .owl .kafka .proxy .server .biz .bo .PullRequest ;
1010import com .owl .kafka .proxy .server .biz .bo .ServerConfigs ;
1111import com .owl .kafka .proxy .server .biz .service .PullRequestHoldService ;
12+ import io .netty .buffer .ByteBuf ;
13+ import io .netty .buffer .ByteBufAllocator ;
14+ import io .netty .buffer .PooledByteBufAllocator ;
15+ import io .netty .util .ReferenceCountUtil ;
1216import org .apache .kafka .clients .consumer .ConsumerRecord ;
1317import org .slf4j .Logger ;
1418import org .slf4j .LoggerFactory ;
@@ -37,6 +41,8 @@ public class PullCenter{
3741
3842private final PullRequestHoldService pullRequestHoldService =new PullRequestHoldService ();
3943
44+ private final ByteBufAllocator allocator =new PooledByteBufAllocator (true );
45+
4046public void putMessage (ConsumerRecord <byte [],byte []>record )throws InterruptedException {
4147this .pullQueue .put (record );
4248this .pullRequestHoldService .notifyMessageArriving ();
@@ -67,10 +73,11 @@ private boolean poll(Packet packet) {
6773Packet one =retryQueue .peek ();
6874if (one !=null ){
6975retryQueue .poll ();
70- ByteBuffer buffer =ByteBuffer . allocate (one .getBody ().length +packet .getBody ().length );
71- buffer .put (packet .getBody ());
72- buffer .put (one .getBody ());
76+ ByteBuf buffer =allocator . directBuffer (one .getBody ().length +packet .getBody ().length );
77+ buffer .writeBytes (packet .getBody ());
78+ buffer .writeBytes (one .getBody ());
7379packet .setBody (buffer .array ());
80+ buffer .release ();
7481polled =true ;
7582 }else {
7683ConsumerRecord <byte [],byte []>record =pullQueue .poll ();
@@ -80,20 +87,21 @@ private boolean poll(Packet packet) {
8087byte []headerInBytes =SerializerImpl .getFastJsonSerializer ().serialize (header );
8188//
8289int capacity =4 +headerInBytes .length +4 +record .key ().length +4 +record .value ().length ;
83- ByteBuffer buffer =ByteBuffer .allocate (capacity +packet .getBody ().length );
84-
85- buffer .put (packet .getBody ());
90+ ByteBuf buffer =allocator .directBuffer (capacity +packet .getBody ().length );
8691//
87- buffer .putInt (headerInBytes .length );
88- buffer .put (headerInBytes );
92+ buffer .writeBytes (packet .getBody ());
8993//
90- buffer .putInt (record .key ().length );
91- buffer .put (record .key ());
94+ buffer .writeInt (headerInBytes .length );
95+ buffer .writeBytes (headerInBytes );
96+ //
97+ buffer .writeInt (record .key ().length );
98+ buffer .writeBytes (record .key ());
99+ //
100+ buffer .writeInt (record .value ().length );
101+ buffer .writeBytes (record .value ());
92102//
93- buffer .putInt (record .value ().length );
94- buffer .put (record .value ());
95-
96103packet .setBody (buffer .array ());
104+ buffer .release ();
97105polled =true ;
98106 }
99107 }