@@ -41,8 +41,6 @@ public class PullCenter{
4141
4242private final PullRequestHoldService pullRequestHoldService =new PullRequestHoldService ();
4343
44- private final ByteBufAllocator allocator =new PooledByteBufAllocator (true );
45-
4644public void putMessage (ConsumerRecord <byte [],byte []>record )throws InterruptedException {
4745this .pullQueue .put (record );
4846this .pullRequestHoldService .notifyMessageArriving ();
@@ -73,14 +71,10 @@ private boolean poll(Packet packet) {
7371Packet one =retryQueue .peek ();
7472if (one !=null ){
7573retryQueue .poll ();
76- ByteBuf buffer =allocator .directBuffer (one .getBody ().length +packet .getBody ().length );
77- try {
78- buffer .writeBytes (packet .getBody ());
79- buffer .writeBytes (one .getBody ());
80- packet .setBody (buffer .array ());
81- }finally {
82- buffer .release ();
83- }
74+ ByteBuffer buffer =ByteBuffer .allocate (one .getBody ().length +packet .getBody ().length );
75+ buffer .put (packet .getBody ());
76+ buffer .put (one .getBody ());
77+ packet .setBody (buffer .array ());
8478polled =true ;
8579 }else {
8680ConsumerRecord <byte [],byte []>record =pullQueue .poll ();
@@ -90,20 +84,17 @@ private boolean poll(Packet packet) {
9084byte []headerInBytes =SerializerImpl .getFastJsonSerializer ().serialize (header );
9185//
9286int capacity =4 +headerInBytes .length +4 +record .key ().length +4 +record .value ().length ;
93- ByteBuf buffer =allocator .directBuffer (capacity +packet .getBody ().length );
94- try {
95- buffer .writeBytes (packet .getBody ());
96- buffer .writeInt (headerInBytes .length );
97- buffer .writeBytes (headerInBytes );
98- buffer .writeInt (record .key ().length );
99- buffer .writeBytes (record .key ());
100- buffer .writeInt (record .value ().length );
101- buffer .writeBytes (record .value ());
102- //
103- packet .setBody (buffer .array ());
104- }finally {
105- buffer .release ();
106- }
87+ ByteBuffer buffer =ByteBuffer .allocate (capacity +packet .getBody ().length );
88+
89+ buffer .put (packet .getBody ());
90+ buffer .putInt (headerInBytes .length );
91+ buffer .put (headerInBytes );
92+ buffer .putInt (record .key ().length );
93+ buffer .put (record .key ());
94+ buffer .putInt (record .value ().length );
95+ buffer .put (record .value ());
96+ //
97+ packet .setBody (buffer .array ());
10798polled =true ;
10899 }
109100 }