Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

kafka-proxy-server is a wrapped consumer to consume kafka topic and dispatch message to real consumers

NotificationsYou must be signed in to change notification settings

Technoboy-/kafka-proxy-server

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

93 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

  • Kafka的consumer数和parition数存在C:P的关系,如果C<=P,当某C节点挂掉,发生rebalance后,势必造成存活的某台C节点消费压力增大或导致消息消费时延增大。因此,实际的线上,C可能会部署大于P的情况以防止类似情形发生。多余的C即为standby。另有情形下,少量的partiton数足以满足上游生产消息的速率,consumer的消费能力却成为瓶颈,因为consumer可能要处理更多的业务逻辑。如果想降低消息消费的延迟时间,只能扩大partion数量,进而增加consumer的数量,来增加消费能力,解决消息延迟性。因此,一种代理模式就出现了,即kafka-proxy-server模式,即解决了所有线上C都可以消费到消息,又无需增大parition数来增大消费能力的情形。
  • 引入proxy模式后,consumer和partiton形成解耦,且消息在proxy-server和client之间,可以加入重试队列,当消费失败后,可以在一定时间内重复收到消费失败的消息。
  • 引入proxy模式后,消息无序,因此只有分区无序的场景下,才可以使用此代理。

二. 设计图

  • Proxy Server: 即原生kafka的consumer,负责某些topic消息的拉取,分发。启动后,注册实例到zookeeper中。
  • 消费者: 根据配置的topic,consumerGroup,namespace,通过zookeeper,获取对应的Push Server节点地址,建立连接,并接受Proxy server推送的消息。

三. 实现

  • proxy-server proxy-server即原生kafka的consumer,启动后,每个实例至少消费一个parition,并设置手动提交offset。启动并监听指定端口接受consumer的tcp链接,并将端口等信息写入zookeeper。proxy-server每次拉取N多消息放入阻塞队列,当有consumer链接后,分发给consumer。所以,消息形成无序状态。
  • pull模型 消息者启动后,通过配置信息,从zookeeper获取push-server节点信息,并和所有push-server建立tcp链接,然后发送pull消息请求,每次请求最多10条,最大16M的消息。并间隔60s发送心跳给push-server维持连接。
  • push模型 消息启动后,通过配置信息,从zookeeper获取push-server节点信息,并和所有push-server建立tcp链接。然后,有push-server发送推送消息给consumer。push模型存在一定的问题,即consumer数量小于proxy-server时,推模型可能会压死consumer,所以不建议使用push模型。默认才用pull模型实现。

四. 快速使用

  • 工程依赖: JDK1.7或更高版本
  • Maven依赖:
<dependency>    <groupId>com.owl.kafka</groupId>    <artifactId>kafka-proxy-server</artifactId>    <version>1.0.0-SNAPSHOT</version></dependency>
  • 配置信息在src/main/resources下的proxy-server.properties。
参数说明
server.kafka.server.listkafka的集群地址
server.kafka.server.listkafka的集群地址
server.group.idproxy-server的消费组信息
server.topicproxy-server消费的topic
server.queue.sizeproxy-server从kafka拉取多少消息到阻塞队列
server.portproxy-server监听consumer的端口
server.commit.offset.intervalproxy-server提交offset的间隔时间
server.commit.offset.batch.sizeproxy-server提交offset的批次大小
zookeeper.server.listproxy-server链接的zookeeper地址

五. 关于DLQ

  • 原生的kafka不支持消息的重复投递以及多次投递后进入DLQ的功能。proxy-server下推消息后,如果Ns(N=3默认)后未收到ack,将重复投递,共投递N次(N=5默认)。N次消费后未收到ack后,将失败的消息写入kafka的主题为-dlq中。
  • 将失败消息的msgId和写入kafka的dlq中的offset的映射关系写入zk,后续可通过msgId查看DLQ消息。

About

kafka-proxy-server is a wrapped consumer to consume kafka topic and dispatch message to real consumers

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages


[8]ページ先頭

©2009-2025 Movatter.jp