Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit91a21d4

Browse files
committed
updates
1 parent0ba27b3 commit91a21d4

File tree

1 file changed

+21
-13
lines changed

1 file changed

+21
-13
lines changed

‎src/main/java/com/owl/kafka/proxy/server/biz/pull/PullCenter.java‎

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99
importcom.owl.kafka.proxy.server.biz.bo.PullRequest;
1010
importcom.owl.kafka.proxy.server.biz.bo.ServerConfigs;
1111
importcom.owl.kafka.proxy.server.biz.service.PullRequestHoldService;
12+
importio.netty.buffer.ByteBuf;
13+
importio.netty.buffer.ByteBufAllocator;
14+
importio.netty.buffer.PooledByteBufAllocator;
15+
importio.netty.util.ReferenceCountUtil;
1216
importorg.apache.kafka.clients.consumer.ConsumerRecord;
1317
importorg.slf4j.Logger;
1418
importorg.slf4j.LoggerFactory;
@@ -37,6 +41,8 @@ public class PullCenter{
3741

3842
privatefinalPullRequestHoldServicepullRequestHoldService =newPullRequestHoldService();
3943

44+
privatefinalByteBufAllocatorallocator =newPooledByteBufAllocator(true);
45+
4046
publicvoidputMessage(ConsumerRecord<byte[],byte[]>record)throwsInterruptedException{
4147
this.pullQueue.put(record);
4248
this.pullRequestHoldService.notifyMessageArriving();
@@ -67,10 +73,11 @@ private boolean poll(Packet packet) {
6773
Packetone =retryQueue.peek();
6874
if(one !=null){
6975
retryQueue.poll();
70-
ByteBufferbuffer =ByteBuffer.allocate(one.getBody().length +packet.getBody().length);
71-
buffer.put(packet.getBody());
72-
buffer.put(one.getBody());
76+
ByteBufbuffer =allocator.directBuffer(one.getBody().length +packet.getBody().length);
77+
buffer.writeBytes(packet.getBody());
78+
buffer.writeBytes(one.getBody());
7379
packet.setBody(buffer.array());
80+
buffer.release();
7481
polled =true;
7582
}else{
7683
ConsumerRecord<byte[],byte[]>record =pullQueue.poll();
@@ -80,20 +87,21 @@ private boolean poll(Packet packet) {
8087
byte[]headerInBytes =SerializerImpl.getFastJsonSerializer().serialize(header);
8188
//
8289
intcapacity =4 +headerInBytes.length +4 +record.key().length +4 +record.value().length;
83-
ByteBufferbuffer =ByteBuffer.allocate(capacity +packet.getBody().length);
84-
85-
buffer.put(packet.getBody());
90+
ByteBufbuffer =allocator.directBuffer(capacity +packet.getBody().length);
8691
//
87-
buffer.putInt(headerInBytes.length);
88-
buffer.put(headerInBytes);
92+
buffer.writeBytes(packet.getBody());
8993
//
90-
buffer.putInt(record.key().length);
91-
buffer.put(record.key());
94+
buffer.writeInt(headerInBytes.length);
95+
buffer.writeBytes(headerInBytes);
96+
//
97+
buffer.writeInt(record.key().length);
98+
buffer.writeBytes(record.key());
99+
//
100+
buffer.writeInt(record.value().length);
101+
buffer.writeBytes(record.value());
92102
//
93-
buffer.putInt(record.value().length);
94-
buffer.put(record.value());
95-
96103
packet.setBody(buffer.array());
104+
buffer.release();
97105
polled =true;
98106
}
99107
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp