Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0ba1177

Browse files
committed
Merge branch 'pull'
2 parents19383a2 +02efad2 commit0ba1177

20 files changed

+179
-147
lines changed

‎src/main/java/com/owl/kafka/push/server/biz/NettyServer.java‎

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
importcom.owl.kafka.client.transport.codec.PacketEncoder;
55
importcom.owl.kafka.client.transport.handler.MessageDispatcher;
66
importcom.owl.kafka.push.server.biz.bo.ServerConfigs;
7-
importcom.owl.kafka.push.server.biz.registry.RegistryCenter;
7+
importcom.owl.kafka.push.server.biz.service.InstanceHolder;
88
importcom.owl.kafka.push.server.transport.NettyTcpServer;
99
importcom.owl.kafka.push.server.transport.handler.*;
1010
importcom.owl.kafka.client.transport.protocol.Command;
@@ -37,11 +37,11 @@ public NettyServer(ProxyConsumer consumer) {
3737

3838
privateMessageDispatchernewDispatcher(ProxyConsumerconsumer){
3939
MessageDispatcherdispatcher =newMessageDispatcher();
40-
dispatcher.register(Command.PING,newHeartbeatMessageHandler());
40+
dispatcher.register(Command.PING,newPingMessageHandler());
4141
dispatcher.register(Command.UNREGISTER,newUnregisterMessageHandler());
4242
dispatcher.register(Command.ACK,newAckMessageHandler(consumer));
43-
dispatcher.register(Command.VIEW,newViewMessageHandler());
44-
dispatcher.register(Command.PULL,newPullMessageHandler());
43+
dispatcher.register(Command.VIEW_REQ,newViewReqMessageHandler());
44+
dispatcher.register(Command.PULL_REQ,newPullReqMessageHandler());
4545
dispatcher.register(Command.SEND_BACK,newSendBackMessageHandler());
4646
returndispatcher;
4747
}
@@ -55,7 +55,7 @@ protected void initTcpOptions(ServerBootstrap bootstrap){
5555

5656
@Override
5757
protectedvoidafterStart() {
58-
RegistryCenter.I.getServerRegistry().register();
58+
InstanceHolder.I.getRegistryCenter().getServerRegistry().register();
5959
}
6060

6161
protectedvoidinitNettyChannel(NioSocketChannelch)throwsException{

‎src/main/java/com/owl/kafka/push/server/biz/file/MappedFileManager.java‎

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,8 @@ public void write(long fileFromOffset, ByteBuffer byteBuffer, Packet packet, Soc
119119
store.put(packet.getVersion());
120120
store.put(packet.getCmd());
121121
store.putLong(packet.getOpaque());
122-
store.putInt(packet.getHeader().length);
123-
store.put(packet.getHeader());
124-
store.putInt(packet.getKey().length);
125-
store.put(packet.getKey());
126-
store.putInt(packet.getValue().length);
127-
store.put(packet.getValue());
122+
store.putInt(packet.getBody().length);
123+
store.put(packet.getBody());
128124

129125
}
130126

@@ -138,7 +134,7 @@ public static ByteBuffer host(SocketAddress socketAddress) {
138134
}
139135

140136
privateintcalculate(Packetpacket) {
141-
return1 +1 +8 +4 +packet.getHeader().length +4 +packet.getKey().length +4 +packet.getValue().length;
137+
return1 +1 +8 +4 +packet.getBody().length;
142138
}
143139

144140
privatevoidreset(ByteBufferbuffer,intlimit){

‎src/main/java/com/owl/kafka/push/server/biz/pull/PullCenter.java‎

Lines changed: 38 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,17 @@
22

33
importcom.owl.kafka.client.service.IdService;
44
importcom.owl.kafka.client.transport.protocol.Command;
5-
importcom.owl.kafka.client.transport.protocol.Header;
5+
importcom.owl.kafka.client.transport.message.Header;
66
importcom.owl.kafka.client.transport.protocol.Packet;
77
importcom.owl.kafka.push.server.biz.bo.ServerConfigs;
88
importcom.owl.kafka.push.server.biz.service.PullRequestHoldService;
99
importcom.owl.kafka.push.server.biz.bo.PullRequest;
1010
importcom.owl.kafka.serializer.SerializerImpl;
11-
importcom.owl.kafka.util.CollectionUtils;
1211
importorg.apache.kafka.clients.consumer.ConsumerRecord;
1312
importorg.slf4j.Logger;
1413
importorg.slf4j.LoggerFactory;
1514

16-
importjava.util.ArrayList;
17-
importjava.util.List;
15+
importjava.nio.ByteBuffer;
1816
importjava.util.concurrent.LinkedBlockingQueue;
1917

2018
/**
@@ -38,64 +36,65 @@ public class PullCenter{
3836

3937
publicvoidputMessage(ConsumerRecord<byte[],byte[]>record)throwsInterruptedException{
4038
this.pullQueue.put(record);
39+
//TODO消息满了情况、
4140
this.pullRequestHoldService.notifyMessageArriving();
4241
}
4342

4443
publicvoidreputMessage(Packetpacket)throwsInterruptedException{
4544
this.retryQueue.put(packet);
4645
}
4746

48-
publicList<Packet>pull(PullRequestrequest,booleanisSuspend) {
49-
intmessageCount =1;
50-
List<Packet>result =this.pull(messageCount,singleMessageSize *messageCount);
51-
if(CollectionUtils.isEmpty(result) &&isSuspend){
47+
publicPacketpull(PullRequestrequest,booleanisSuspend) {
48+
intmessageCount =2;
49+
longmessageSize =singleMessageSize *messageCount;
50+
finalPacketresult =request.getPacket();
51+
result.setCmd(Command.PULL_RESP.getCmd());
52+
while(messageCount >0 &&result.getBody().length <messageSize){
53+
messageCount--;
54+
this.poll(result);
55+
}
56+
if(result.isBodyEmtpy() &&isSuspend){
5257
pullRequestHoldService.suspend(request);
53-
returnnull;
58+
returnresult;
5459
}else{
5560
returnresult;
5661
}
5762
}
5863

59-
privateList<Packet>pull(longmessageCount,longmessageSize) {
60-
List<Packet>results =newArrayList<>();
61-
while(results.size() <messageCount ||calculateSize(results) <messageSize){
62-
Packetpoll =poll();
63-
if(poll ==null){
64-
break;
65-
}else{
66-
results.add(poll);
67-
}
68-
}
69-
returnresults;
70-
}
71-
72-
privatePacketpoll() {
73-
Packetpacket =retryQueue.peek();
74-
if(packet !=null){
64+
privatePacketpoll(Packetpacket) {
65+
Packetone =retryQueue.peek();
66+
if(one !=null){
7567
retryQueue.poll();
68+
ByteBufferbuffer =ByteBuffer.allocate(one.getBody().length +packet.getBody().length);
69+
buffer.put(packet.getBody());
70+
buffer.put(one.getBody());
71+
packet.setBody(buffer.array());
7672
}else{
7773
ConsumerRecord<byte[],byte[]>record =pullQueue.poll();
7874
if(record !=null){
79-
packet =newPacket();
80-
//
81-
packet.setCmd(Command.PULL.getCmd());
8275
Headerheader =newHeader(record.topic(),record.partition(),record.offset(),IdService.I.getId());
83-
packet.setHeader(SerializerImpl.getFastJsonSerializer().serialize(header));
84-
packet.setKey(record.key());
85-
packet.setValue(record.value());
76+
byte[]headerInBytes =SerializerImpl.getFastJsonSerializer().serialize(header);
77+
//
78+
intcapacity =4 +headerInBytes.length +4 +record.key().length +4 +record.value().length;
79+
ByteBufferbuffer =ByteBuffer.allocate(capacity +packet.getBody().length);
80+
81+
buffer.put(packet.getBody());
82+
//
83+
buffer.putInt(headerInBytes.length);
84+
buffer.put(headerInBytes);
85+
//
86+
buffer.putInt(record.key().length);
87+
buffer.put(record.key());
88+
//
89+
buffer.putInt(record.value().length);
90+
buffer.put(record.value());
91+
92+
packet.setBody(buffer.array());
8693
}
8794
}
8895
returnpacket;
8996
}
9097

91-
privatelongcalculateSize(List<Packet>records){
92-
longsize =0;
93-
for(Packetrecord :records){
94-
size =size +1 +1 +8 +record.getHeader().length +record.getKey().length +record.getValue().length;
95-
}
96-
returnsize;
97-
}
98-
9998
publicvoidclose(){
10099
this.pullRequestHoldService.close();
101100
}

‎src/main/java/com/owl/kafka/push/server/biz/pull/PullServer.java‎

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
importcom.owl.kafka.push.server.biz.bo.ServerConfigs;
88
importcom.owl.kafka.push.server.biz.registry.RegistryCenter;
99
importcom.owl.kafka.push.server.biz.service.DLQService;
10-
importcom.owl.kafka.push.server.biz.service.InstanceHolder;
1110
importcom.owl.kafka.push.server.consumer.AcknowledgeMessageListenerPullService;
1211
importcom.owl.kafka.push.server.consumer.ProxyConsumer;
1312
importcom.owl.kafka.util.StringUtils;
@@ -25,13 +24,15 @@ public class PullServer {
2524

2625
privatefinalDLQServicedlqService;
2726

27+
privatefinalRegistryCenterregistryCenter;
28+
2829
publicPullServer(){
2930
StringkafkaServerList =ServerConfigs.I.getServerKafkaServerList();
3031
if(StringUtils.isBlank(kafkaServerList)){
3132
kafkaServerList =KafkaZookeeperConfig.getBrokerIds(ServerConfigs.I.getZookeeperServerList(),ServerConfigs.I.getZookeeperNamespace());
3233
}
33-
//TODO
34-
finalRegistryCenterregistryCenter =RegistryCenter.I;
34+
this.registryCenter =newRegistryCenter();
35+
//
3536
ConsumerConfigconsumerConfigs =newConsumerConfig(kafkaServerList,ServerConfigs.I.getServerTopic(),ServerConfigs.I.getServerGroupId());
3637
consumerConfigs.setAutoCommit(false);
3738
this.consumer =newProxyConsumer(consumerConfigs);
@@ -41,8 +42,6 @@ public PullServer(){
4142
this.consumer.setMessageListenerService(messageListenerService);
4243

4344
this.dlqService =newDLQService(kafkaServerList,ServerConfigs.I.getServerTopic(),ServerConfigs.I.getServerGroupId());
44-
45-
InstanceHolder.I.setDLQService(this.dlqService);
4645
}
4746

4847
publicvoidstart(){
@@ -52,8 +51,8 @@ public void start(){
5251

5352
publicvoidclose(){
5453
this.consumer.close();
55-
RegistryCenter.I.close();
5654
this.nettyServer.close();
5755
this.dlqService.close();
56+
this.registryCenter.close();
5857
}
5958
}

‎src/main/java/com/owl/kafka/push/server/biz/push/PushCenter.java‎

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,21 @@
55
importcom.owl.kafka.client.service.LoadBalance;
66
importcom.owl.kafka.client.service.RetryPolicy;
77
importcom.owl.kafka.client.transport.Connection;
8-
importcom.owl.kafka.push.server.biz.bo.ControlResult;
9-
importcom.owl.kafka.push.server.biz.bo.ServerConfigs;
10-
importcom.owl.kafka.push.server.biz.registry.RegistryCenter;
11-
importcom.owl.kafka.push.server.biz.service.*;
128
importcom.owl.kafka.client.transport.exceptions.ChannelInactiveException;
139
importcom.owl.kafka.client.transport.protocol.Command;
14-
importcom.owl.kafka.client.transport.protocol.Header;
10+
importcom.owl.kafka.client.transport.message.Header;
1511
importcom.owl.kafka.client.transport.protocol.Packet;
12+
importcom.owl.kafka.push.server.biz.bo.ControlResult;
13+
importcom.owl.kafka.push.server.biz.bo.ServerConfigs;
14+
importcom.owl.kafka.push.server.biz.service.*;
1615
importcom.owl.kafka.serializer.SerializerImpl;
1716
importio.netty.channel.ChannelFuture;
1817
importio.netty.channel.ChannelFutureListener;
1918
importorg.apache.kafka.clients.consumer.ConsumerRecord;
2019
importorg.slf4j.Logger;
2120
importorg.slf4j.LoggerFactory;
2221

22+
importjava.nio.ByteBuffer;
2323
importjava.util.concurrent.LinkedBlockingQueue;
2424
importjava.util.concurrent.TimeUnit;
2525
importjava.util.concurrent.atomic.AtomicBoolean;
@@ -81,9 +81,9 @@ private void push(Packet packet, final ChannelFutureListener listener) throws In
8181
controlResult =flowController.flowControl(packet);
8282
}
8383
retryPolicy.reset();
84-
Connectionconnection =loadBalance.select(RegistryCenter.I.getClientRegistry().getClients());
84+
Connectionconnection =loadBalance.select(InstanceHolder.I.getRegistryCenter().getClientRegistry().getClients());
8585
while((connection ==null &&retryPolicy.allowRetry()) || (!connection.isWritable() && !connection.isActive())){
86-
connection =loadBalance.select(RegistryCenter.I.getClientRegistry().getClients());
86+
connection =loadBalance.select(InstanceHolder.I.getRegistryCenter().getClientRegistry().getClients());
8787
}
8888

8989
//
@@ -138,11 +138,21 @@ private Packet take() throws InterruptedException{
138138
//
139139
packet.setCmd(Command.PUSH.getCmd());
140140
packet.setOpaque(IdService.I.getId());
141+
141142
Headerheader =newHeader(record.topic(),record.partition(),record.offset(),IdService.I.getId());
142-
packet.setHeader(SerializerImpl.getFastJsonSerializer().serialize(header));
143-
packet.setKey(record.key());
144-
packet.setValue(record.value());
143+
byte[]headerInBytes =SerializerImpl.getFastJsonSerializer().serialize(header);
144+
//
145+
ByteBufferbuffer =ByteBuffer.allocate(4 +headerInBytes.length +4 +record.key().length +4 +record.value().length);
146+
buffer.putInt(headerInBytes.length);
147+
buffer.put(headerInBytes);
148+
//
149+
buffer.putInt(record.key().length);
150+
buffer.put(record.key());
151+
//
152+
buffer.putInt(record.value().length);
153+
buffer.put(record.value());
145154
//
155+
packet.setBody(buffer.array());
146156
}
147157
returnpacket;
148158
}

‎src/main/java/com/owl/kafka/push/server/biz/push/PushServer.java‎

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ public class PushServer {
2727

2828
privatefinalPushCenterpushCenter;
2929

30+
privatefinalRegistryCenterregistryCenter;
31+
3032
publicPushServer(){
3133
StringkafkaServerList =ServerConfigs.I.getServerKafkaServerList();
3234
if(StringUtils.isBlank(kafkaServerList)){
3335
kafkaServerList =KafkaZookeeperConfig.getBrokerIds(ServerConfigs.I.getZookeeperServerList(),ServerConfigs.I.getZookeeperNamespace());
3436
}
35-
//TODO
36-
finalRegistryCenterregistryCenter =RegistryCenter.I;
37+
this.registryCenter =newRegistryCenter();
38+
//
3739
ConsumerConfigconsumerConfigs =newConsumerConfig(kafkaServerList,ServerConfigs.I.getServerTopic(),ServerConfigs.I.getServerGroupId());
3840
consumerConfigs.setAutoCommit(false);
3941
this.consumer =newProxyConsumer(consumerConfigs);
@@ -45,8 +47,6 @@ public PushServer(){
4547
this.consumer.setMessageListenerService(messageListenerService);
4648

4749
this.dlqService =newDLQService(kafkaServerList,ServerConfigs.I.getServerTopic(),ServerConfigs.I.getServerGroupId());
48-
49-
InstanceHolder.I.setDLQService(this.dlqService);
5050
}
5151

5252
publicvoidstart(){
@@ -57,8 +57,8 @@ public void start(){
5757

5858
publicvoidclose(){
5959
this.consumer.close();
60-
RegistryCenter.I.close();
6160
this.nettyServer.close();
6261
this.dlqService.close();
62+
this.registryCenter.close();
6363
}
6464
}

‎src/main/java/com/owl/kafka/push/server/biz/registry/RegistryCenter.java‎

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
*/
1111
publicclassRegistryCenter {
1212

13-
publicstaticRegistryCenterI =newRegistryCenter();
14-
1513
privatefinalServerRegistryserverRegistry;
1614

1715
privatefinalClientRegistryclientRegistry;
@@ -20,7 +18,7 @@ public class RegistryCenter {
2018

2119
privatefinalRegistryServiceregistryService;
2220

23-
privateRegistryCenter(){
21+
publicRegistryCenter(){
2422
this.zookeeperClient =newZookeeperClient(ServerConfigs.I.getZookeeperServerList(),ZookeeperClient.PUSH_SERVER_NAMESPACE,ServerConfigs.I.getZookeeperSessionTimeoutMs(),ServerConfigs.I.getZookeeperConnectionTimeoutMs());
2523
this.registryService =newRegistryService(this.zookeeperClient);
2624
//
@@ -29,6 +27,7 @@ private RegistryCenter(){
2927

3028
//
3129
InstanceHolder.I.setZookeeperClient(this.zookeeperClient);
30+
InstanceHolder.I.setRegistryCenter(this);
3231
}
3332

3433
publicServerRegistrygetServerRegistry() {

‎src/main/java/com/owl/kafka/push/server/biz/service/DLQService.java‎

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
packagecom.owl.kafka.push.server.biz.service;
22

3+
importcom.owl.kafka.client.transport.message.Message;
34
importcom.owl.kafka.client.transport.protocol.Packet;
5+
importcom.owl.kafka.client.util.MessageCodec;
46
importcom.owl.kafka.consumer.Record;
57
importcom.owl.kafka.push.server.biz.bo.ResendPacket;
68
importcom.owl.kafka.push.server.biz.bo.ServerConfigs;
@@ -46,6 +48,7 @@ public DLQService(String bootstrapServers, String topic, String groupId){
4648
this.producer =neworg.apache.kafka.clients.producer.KafkaProducer(producerConfigs);
4749

4850
this.dlqConsumer =newDLQConsumer(bootstrapServers,this.topic,groupId);
51+
InstanceHolder.I.setDLQService(this);
4952
}
5053

5154
publicvoidclose(){
@@ -74,8 +77,9 @@ public void write(ResendPacket resendPacket){
7477
Preconditions.checkArgument(resendPacket.getRepost() >=ServerConfigs.I.getServerMessageRepostTimes(),"resendPacket must repost more than " +ServerConfigs.I.getServerMessageRepostTimes() +" times");
7578
try {
7679
Packetpacket =resendPacket.getPacket();
80+
Messagemessage =MessageCodec.decode(packet.getBody());
7781
Stringdlp =String.format(this.topic +DLQ_DATA_PATH,resendPacket.getMsgId());
78-
ProducerRecord<byte[],byte[]>record =newProducerRecord<>(this.topic,0,packet.getKey(),packet.getValue());
82+
ProducerRecord<byte[],byte[]>record =newProducerRecord<>(this.topic,0,message.getKey(),message.getValue());
7983
this.producer.send(record,newCallback() {
8084

8185
@Override
@@ -99,7 +103,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
99103
publicvoidwrite(longmsgId,Packetpacket){
100104
try {
101105
Stringdlp =String.format(this.topic +DLQ_DATA_PATH,msgId);
102-
ProducerRecord<byte[],byte[]>record =newProducerRecord<>(this.topic,0,packet.getKey(),packet.getValue());
106+
Messagemessage =MessageCodec.decode(packet.getBody());
107+
ProducerRecord<byte[],byte[]>record =newProducerRecord<>(this.topic,0,message.getKey(),message.getValue());
103108
this.producer.send(record,newCallback() {
104109

105110
@Override

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp