Movatterモバイル変換


[0]ホーム

URL:


CN106649766A - Message handling method based on kafka - Google Patents

Message handling method based on kafka
Download PDF

Info

Publication number
CN106649766A
CN106649766ACN201611226177.9ACN201611226177ACN106649766ACN 106649766 ACN106649766 ACN 106649766ACN 201611226177 ACN201611226177 ACN 201611226177ACN 106649766 ACN106649766 ACN 106649766A
Authority
CN
China
Prior art keywords
blocker
producer
consumer
message
parameter
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Granted
Application number
CN201611226177.9A
Other languages
Chinese (zh)
Other versions
CN106649766B (en
Inventor
杨佳辉
朱林
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
Beijing Ruian Technology Co Ltd
Original Assignee
Beijing Ruian Technology Co Ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Beijing Ruian Technology Co LtdfiledCriticalBeijing Ruian Technology Co Ltd
Priority to CN201611226177.9ApriorityCriticalpatent/CN106649766B/en
Publication of CN106649766ApublicationCriticalpatent/CN106649766A/en
Application grantedgrantedCritical
Publication of CN106649766BpublicationCriticalpatent/CN106649766B/en
Expired - Fee Relatedlegal-statusCriticalCurrent
Anticipated expirationlegal-statusCritical

Links

Classifications

Landscapes

Abstract

Translated fromChinese

本发明实施例公开了一种基于kafka的消息处理方法,应用于kafka的生产者,该方法包括:在发布消息时,调用生产者拦截器,所述生产者拦截器用于拦截生产者发布消息的参数;将发布消息的参数作为参数记录输出。本发明实施例还公开了一种基于kafka的消息处理方法,应用于kafka的消费者,该方法包括:在订阅消息时,调用消费者拦截器,所述消费者拦截器用于拦截消费者订阅消息的参数;将订阅消息的参数作为参数记录输出。本发明实施例通过生产者拦截器和消费者拦截器拦截发布消息和订阅消息的参数,从而拦截生产者和消费者在集群中的不同点的消息,继而观测不同的应用程序不同的消息度量,实现跨集群跟踪消息路径,为kafka系统出错后的检测分析提供了依据。

The embodiment of the present invention discloses a message processing method based on kafka, which is applied to a producer of kafka, and the method includes: calling a producer interceptor when publishing a message, and the producer interceptor is used to intercept the process of publishing a message by the producer Parameters; outputs the parameters of the published message as parameter records. The embodiment of the present invention also discloses a kafka-based message processing method, which is applied to kafka consumers. The method includes: calling a consumer interceptor when subscribing to a message, and the consumer interceptor is used to intercept consumer subscription messages Parameters; output the parameters of the subscription message as parameter records. The embodiment of the present invention intercepts the parameters of publishing messages and subscribing messages through producer interceptors and consumer interceptors, thereby intercepting messages of producers and consumers at different points in the cluster, and then observing different message metrics of different applications, Realize cross-cluster tracking of message paths, which provides a basis for detection and analysis of kafka system errors.

Description

Translated fromChinese
一种基于kafka的消息处理方法A message processing method based on Kafka

技术领域technical field

本发明实施例涉及大数据技术,尤其涉及一种基于kafka的消息处理方法。The embodiment of the present invention relates to big data technology, and in particular to a kafka-based message processing method.

背景技术Background technique

Kafka是一种高吞吐量的分布式发布订阅消息系统,kafka开发的主要目标是构建一个用来处理海量日志、用户行为和网站运营统计等的数据处理框架,在结合了数据挖掘、行为分析、运营监控等需求的情况下,需要能够满足各种实时在线和批量离线处理应用场合对低延迟和批量吞吐性能的要求。Kafka is a high-throughput distributed publish-subscribe message system. The main goal of Kafka development is to build a data processing framework for processing massive logs, user behavior and website operation statistics. It combines data mining, behavior analysis, In the case of operational monitoring and other requirements, it needs to be able to meet the requirements of various real-time online and batch offline processing applications for low latency and batch throughput performance.

Kafka集群包含一个或多个服务器,这种服务器被称为broker,每条发布到kafka集群的消息都有一个主题,该主题被称为topic,每个topic包含一个或多个分区,该分区被称为partition,同一topic内部的消息按照一定的key和算法被分区存储在不同的broker上,消息生产者producer和消费者consumer作为客户端可以在多个broker上生产或消费topic。The Kafka cluster consists of one or more servers, which are called brokers, and each message published to the Kafka cluster has a topic, which is called a topic, and each topic contains one or more partitions, which are It is called partition. Messages within the same topic are partitioned and stored on different brokers according to certain keys and algorithms. As clients, message producers and consumers can produce or consume topics on multiple brokers.

目前,kafka度量标准仅针对单个客户端或代理收集,这使得用户难以跟踪跨集群的单个消息的路径,提供系统性能和行为的完整端到端的场景。然而观察、测量和监视kafka客户端行为的能力在生产环境中很有价值,这为kafka系统出错后的检测分析提供了依据。Currently, Kafka metrics are only collected for a single client or broker, which makes it difficult for users to trace the path of individual messages across the cluster, providing a complete end-to-end picture of system performance and behavior. However, the ability to observe, measure, and monitor Kafka client behavior is valuable in a production environment, which provides a basis for detection and analysis of Kafka system errors.

发明内容Contents of the invention

本发明实施例提供一种基于kafka的消息处理方法,以解决现有技术中难以跟踪跨集群的单个消息路径的问题。An embodiment of the present invention provides a kafka-based message processing method to solve the problem in the prior art that it is difficult to track a single message path across clusters.

第一方面,本发明实施例提供了一种基于kafka的消息处理方法,应用于kafka的生产者,该方法包括:In the first aspect, the embodiment of the present invention provides a message processing method based on kafka, which is applied to a producer of kafka, and the method includes:

在发布消息时,调用生产者拦截器,所述生产者拦截器用于拦截生产者发布消息的参数;When publishing a message, the producer interceptor is called, and the producer interceptor is used to intercept the parameters of the producer publishing the message;

将发布消息的参数作为参数记录输出。Outputs the parameters of the published message as parameter records.

第二方面,本发明实施例还提供了一种基于kafka的消息处理方法,应用于kafka的消费者,该方法包括:In the second aspect, the embodiment of the present invention also provides a kafka-based message processing method, which is applied to kafka consumers, and the method includes:

在订阅消息时,调用消费者拦截器,所述消费者拦截器用于拦截消费者订阅消息的参数;When subscribing to a message, the consumer interceptor is called, and the consumer interceptor is used to intercept the parameters of the consumer subscription message;

将订阅消息的参数作为参数记录输出。Output the parameters of the subscription message as parameter records.

本发明实施例通过在kafka的生产者与消费者架构中增加生产者拦截器和消费者拦截器,通过该生产者拦截器在生产者发布消息时拦截发布消息的参数,以及通过消费者拦截器在消费者订阅消息时拦截订阅消息的参数,从而拦截生产者和消费者在集群中的不同点的消息,继而观测不同的应用程序不同的消息度量,实现跨集群跟踪消息路径,为kafka系统出错后的检测分析提供了依据。In the embodiment of the present invention, by adding a producer interceptor and a consumer interceptor in the producer and consumer architecture of kafka, the producer interceptor intercepts the parameters of the published message when the producer publishes the message, and the consumer interceptor When the consumer subscribes to the message, intercept the parameters of the subscription message, thereby intercepting the message of the producer and the consumer at different points in the cluster, and then observe the different message metrics of different applications, realize cross-cluster tracking of the message path, and make mistakes for the kafka system Subsequent analysis provided the basis.

附图说明Description of drawings

图1是本发明实施例一中的基于kafka的消息处理方法的流程图;Fig. 1 is the flow chart of the message processing method based on kafka in the first embodiment of the present invention;

图2是本发明实施例一中的基于kafka的消息处理方法的应用实例的流程图;Fig. 2 is the flow chart of the application example of the message processing method based on kafka in embodiment one of the present invention;

图3是本发明实施例二中的基于kafka的消息处理方法的流程图;Fig. 3 is the flowchart of the message processing method based on kafka in the second embodiment of the present invention;

图4是本发明实施例二中的基于kafka的消息处理方法的应用实例的流程图。Fig. 4 is a flow chart of an application example of the kafka-based message processing method in Embodiment 2 of the present invention.

具体实施方式detailed description

下面结合附图和实施例对本发明作进一步的详细说明。可以理解的是,此处所描述的具体实施例仅仅用于解释本发明,而非对本发明的限定。另外还需要说明的是,为了便于描述,附图中仅示出了与本发明相关的部分而非全部结构。The present invention will be further described in detail below in conjunction with the accompanying drawings and embodiments. It should be understood that the specific embodiments described here are only used to explain the present invention, but not to limit the present invention. In addition, it should be noted that, for the convenience of description, only some structures related to the present invention are shown in the drawings but not all structures.

Python是一种面向对象的解释型计算机程序设计语言。Python is an object-oriented interpreted computer programming language.

实施例一Embodiment one

图1为本发明实施例一提供的基于kafka的消息处理方法的流程图,本实施例可适用于kafka的生产者客户端,该方法可以由生产者客户端来执行。FIG. 1 is a flowchart of a kafka-based message processing method provided by Embodiment 1 of the present invention. This embodiment is applicable to a Kafka producer client, and the method can be executed by the producer client.

本发明实施例一的方法具体包括:The method of Embodiment 1 of the present invention specifically includes:

S110、在发布消息时,调用生产者拦截器,所述生产者拦截器用于拦截生产者发布消息的参数;S110. When publishing a message, call the producer interceptor, and the producer interceptor is used to intercept the parameters of the producer publishing the message;

S120、将发布消息的参数作为参数记录输出。S120. Output the parameters of the published message as parameter records.

具体的,在Kafka集群中,每条发布到kafka集群的消息都有一个主题,该类别被称为topic,每个topic包含一个或多个分区,该分区被称为partition,同一topic内部的消息按照一定的key和算法被分区存储在不同的broker上,存储的位置由偏移量表示,按照一定的key和算法处理的过程称为键值序列化,消息生产者producer和消费者consumer作为客户端可以在多个broker上生产或消费topic。由此,生产者发布消息的参数可以包括消息的主题、分区、偏移量和键值。Specifically, in the Kafka cluster, each message published to the Kafka cluster has a topic, which is called a topic, and each topic contains one or more partitions, which are called partitions, and messages within the same topic According to a certain key and algorithm, it is partitioned and stored on different brokers. The storage location is represented by an offset. The process of processing according to a certain key and algorithm is called key-value serialization. The message producer producer and consumer consumer serve as customers The end can produce or consume topics on multiple brokers. Thus, the parameters for the producer to publish the message can include the topic, partition, offset and key value of the message.

相应的,生产者拦截器包括四种类型:生产者主题拦截器、生产者分区拦截器、生产者偏移量拦截器和生产者键值拦截器,分别用于拦截生产者发布消息的主题、分区、偏移量和键值。在实施时,可以根据需要,选择对应的拦截器拦截需要的参数。Correspondingly, producer interceptors include four types: producer topic interceptors, producer partition interceptors, producer offset interceptors, and producer key value interceptors, which are used to intercept the topics, Partitions, offsets and key values. During implementation, parameters required by the corresponding interceptor to intercept can be selected as required.

进一步的,每种类型的生产者拦截器均定义在生产者拦截器列表中,而且每种类型的生产者拦截器的数量均为至少一个,每个拦截器都有自己的生命周期,由时长表示,也即每个拦截器在执行完拦截操作一定时间后就会停止操作。Further, each type of producer interceptor is defined in the producer interceptor list, and the number of each type of producer interceptor is at least one, and each interceptor has its own life cycle, determined by the duration Indicates that each interceptor will stop operating after executing the interception operation for a certain period of time.

因此,调用生产者拦截器包括:Therefore, calling a producer interceptor involves:

针对生产者拦截器列表中每种类型的至少一个生产者拦截器,进行依次调用,其中,在同种类型的至少一个生产者拦截器中,每个生产者拦截器还用于将拦截到的参数传递给下一个生产者拦截器;At least one producer interceptor of each type in the producer interceptor list is called sequentially, wherein, among at least one producer interceptor of the same type, each producer interceptor is also used to convert the intercepted The parameter is passed to the next producer interceptor;

相应的,将发布消息的参数作为参数记录输出包括:Correspondingly, outputting the parameters of the published message as parameter records includes:

将生产者拦截器列表中每种类型的最后一个生产者拦截器拦截到的参数,以及最后一个生产者拦截器从其上一个生产者拦截器传递得到的参数,作为参数记录输出。The parameters intercepted by the last producer interceptor of each type in the producer interceptor list, and the parameters passed by the last producer interceptor from its previous producer interceptor, are output as parameter records.

具体的,生产者客户端在发布消息时,调用目标类型的生产者拦截器用以拦截该目标类型的参数,该目标类型可以是主题、分区、偏移量和键值中的一个或多个。在调用时,选择生产者拦截器列表中的第一个目标类型的生产者拦截器进行调用,该拦截器被调用后,会持续拦截生产者发布消息的目标类型的参数,直到到达该拦截器的操作时长为止,之后生产者客户端会选择生产者拦截器列表中的下一个目标类型的生产者拦截器进行调用,接着拦截目标类型的参数。而在此过程中,已经被调用的生产者拦截器会将其拦截到的参数传递给下一个被调用的生产者拦截器,以此类推,列表中的最后一个目标类型的拦截器会将其拦截到的目标类型的参数,和从其上一个生产者拦截器传递得到的参数,一起作为参数记录输出,从而输出得到生产者客户端发布全部消息的全部目标类型的参数。这些输出的参数记录可以供技术人员查看、分析,从而为kafka系统出错后的检测提供依据。Specifically, when the producer client publishes a message, it calls the producer interceptor of the target type to intercept the parameters of the target type, and the target type can be one or more of topics, partitions, offsets, and key values. When calling, select the producer interceptor of the first target type in the list of producer interceptors to call. After the interceptor is called, it will continue to intercept the parameters of the target type of the message published by the producer until it reaches the interceptor After the operation time, the producer client will select the producer interceptor of the next target type in the producer interceptor list to call, and then intercept the parameters of the target type. In this process, the producer interceptor that has been called will pass the intercepted parameters to the next called producer interceptor, and so on, the last target type interceptor in the list will pass its The parameters of the intercepted target type and the parameters passed from the previous producer interceptor are recorded and output together as parameters, so as to output the parameters of all target types of all messages published by the producer client. These output parameter records can be viewed and analyzed by technicians, so as to provide a basis for the detection of kafka system errors.

在一种优选的实施方式中,每个生产者拦截器还会在时长结束之前,判断所拦截到的参数中是否存在无效信息,并在判断为存在无效信息时,将无效信息更改为空值。在kafka中每种参数的格式都是有标准的,那么明显不符合该标准的参数即为无效信息,此时,会在相应位置上赋值为空,从而避免记录出错。In a preferred implementation, each producer interceptor will also judge whether there is invalid information in the intercepted parameters before the end of the duration, and change the invalid information to a null value when it is judged that there is invalid information . The format of each parameter in Kafka has a standard, so the parameters that obviously do not meet the standard are invalid information. At this time, the corresponding position will be assigned a value of null, so as to avoid recording errors.

基于该优选实施方式可知,生产者拦截器有可能会修改拦截到的主题、分区、偏移量和键值的记录,因此,在调用生产者拦截器的操作中,在同种类型的至少一个生产者拦截器中,每个生产者拦截器还用于将拦截到的参数传递给下一个生产者拦截器,则可以将存在修改的记录进行继承,从而保存最原始和正确的参数记录。Based on this preferred implementation, it is known that the producer interceptor may modify the intercepted records of topics, partitions, offsets and key values. Therefore, in the operation of calling the producer interceptor, at least one of the same type In the producer interceptor, each producer interceptor is also used to pass the intercepted parameters to the next producer interceptor, and then the modified record can be inherited to save the most original and correct parameter record.

在另一种优选的实施方式中,若调用的某一个生产者拦截器抛出异常,为了不丢失该抛出异常的拦截器的参数记录,则在生产者拦截器列表的与所述抛出异常的生产者拦截器类型相同的生产者拦截器中,将所述抛出异常的生产者拦截器之前被调用并无异常的生产者拦截器拦截到的参数传递给下一个生产者拦截器,并调用下一个生产者拦截器进行参数拦截,从而实现生产者参数拦截的持续性。此外,根据kafka标准,在传递参数时,还需要同时抛出所述异常给生产者的onAcknowledgement()方法,用于处理该异常,使得元数据不为空,从而保证拦截操作的延续性。In another preferred implementation, if a certain producer interceptor called throws an exception, in order not to lose the parameter record of the interceptor that throws the exception, in the producer interceptor list and the throw Among the producer interceptors with the same type of abnormal producer interceptor, pass the parameters intercepted by the producer interceptor that throws the exception before the producer interceptor that is called without exception to the next producer interceptor, And call the next producer interceptor for parameter interception, so as to realize the continuity of producer parameter interception. In addition, according to the kafka standard, when passing parameters, it is also necessary to throw the exception to the onAcknowledgment() method of the producer to handle the exception so that the metadata is not empty, thereby ensuring the continuity of the interception operation.

优选的,onAcknowledgement()方法在生产者的I/O线程上调用,拦截器可以在生产者的提交线程上调用。Preferably, the onAcknowledgment() method is called on the producer's I/O thread, and the interceptor can be called on the producer's submission thread.

本发明实施例的基于kafka的消息处理方法,通过生产者拦截器在生产者发布消息时拦截发布消息的参数,从而拦截生产者在集群中的不同点的消息,继而观测不同的应用程序不同的消息度量,实现跨集群跟踪消息路径,为kafka系统出错后的检测分析提供了依据。In the kafka-based message processing method of the embodiment of the present invention, the producer interceptor intercepts the parameters of the published message when the producer publishes the message, thereby intercepting the message of the producer at different points in the cluster, and then observing different application programs. Message measurement enables cross-cluster tracking of message paths, providing a basis for detection and analysis of kafka system errors.

图2为本发明实施例一中的基于kafka的消息处理方法的应用实例的流程图,在该应用实例中,可以在生产者代码中创建一个新的类,它将封装一个ProducerInterceptor(生产者拦截器)实例的列表:ProducerInterceptors,因此KafkaProducer将有一个新成员:ProducerInterceptors<K,V>拦截器。KafkaProducer构造函数将加载在interceptor.classes中指定的拦截器类的实例。如果interceptor.classes配置没有列出任何拦截器类,拦截器列表将为空。Fig. 2 is the flow chart of the application example of the message processing method based on kafka in the embodiment of the present invention one, in this application example, can create a new class in producer code, it will encapsulate a ProducerInterceptor (producer intercepts Producer) instance: ProducerInterceptors, so KafkaProducer will have a new member: ProducerInterceptors<K, V> interceptors. The KafkaProducer constructor will load instances of the interceptor classes specified in interceptor.classes. If the interceptor.classes configuration does not list any interceptor classes, the list of interceptors will be empty.

如图2所示,KafkaProducer.send()调用Onsend()方法进行消息发布,Onsend()方法在发布消息时,调用主题拦截器、分区拦截器、偏移量拦截器或键值拦截器中的一种或多种,分别进行主题、分区、偏移量或键值的拦截操作,然后将获取到的参数记录返回给ProducerRecord对象,ProducerRecord对象返回给KafkaProducer类。当生产者拦截器抛出异常时,Onsend()方法将该异常抛出给onAcknowledgement()处理,Onsend()方法则调用下一个与该抛出异常的拦截器类型相同的下一个生产者拦截器,且该下一个生产者拦截器获取到该抛出异常的拦截器的上一个被调用并无异常的生产者拦截器拦截到的参数。As shown in Figure 2, KafkaProducer.send() calls the Onsend() method to publish the message. When the Onsend() method publishes the message, it calls the topic interceptor, partition interceptor, offset interceptor or key-value interceptor. One or more interception operations for topics, partitions, offsets, or key values, respectively, and then return the obtained parameter records to the ProducerRecord object, and the ProducerRecord object returns to the KafkaProducer class. When the producer interceptor throws an exception, the Onsend() method throws the exception to onAcknowledgment() for processing, and the Onsend() method calls the next producer interceptor of the same type as the interceptor that threw the exception , and the next producer interceptor obtains the parameters intercepted by the previous producer interceptor that was called without exception of the interceptor that threw the exception.

实施时,可以根据kafka集群的规模、配置和实际的需求来设置生产者拦截器。优选的,上述生产者拦截器可以用Python脚本来实现。During implementation, producer interceptors can be set according to the size, configuration and actual needs of the kafka cluster. Preferably, the above-mentioned producer interceptor can be implemented with a Python script.

实施例二Embodiment two

图3是本发明实施例二中的基于kafka的消息处理方法的流程图,本实施例可适用于kafka的消费者客户端,该方法可以由消费者客户端来执行。FIG. 3 is a flow chart of a kafka-based message processing method in Embodiment 2 of the present invention. This embodiment is applicable to a consumer client of Kafka, and the method can be executed by the consumer client.

如图3所示,本发明实施例二的方法具体包括:As shown in Figure 3, the method of Embodiment 2 of the present invention specifically includes:

S210、在订阅消息时,调用消费者拦截器,所述消费者拦截器用于拦截消费者订阅消息的参数;S210. When subscribing to a message, call a consumer interceptor, the consumer interceptor is used to intercept the parameters of the consumer's subscription message;

S220、将订阅消息的参数作为参数记录输出。S220. Output the parameters of the subscription message as parameter records.

具体的,消费订阅消息的参数可以包括消息的主题、分区、偏移量和键值。相应的,消费者拦截器包括四种类型:消费者主题拦截器、消费者分区拦截器、消费者偏移量拦截器和消费者键值拦截器,分别用于拦截消费者订阅消息的主题、分区、偏移量和键值。在实施时,可以根据需要,选择对应的拦截器拦截需要的参数。Specifically, the parameters for consuming subscribed messages may include message topics, partitions, offsets, and key values. Correspondingly, consumer interceptors include four types: consumer topic interceptors, consumer partition interceptors, consumer offset interceptors, and consumer key-value interceptors, which are used to intercept consumer subscription message topics, Partitions, offsets and key values. During implementation, parameters required by the corresponding interceptor to intercept can be selected as required.

进一步的,每种类型的消费者拦截器均定义在消费者拦截器列表中,而且每种类型的消费者拦截器的数量均为至少一个,每个拦截器都有自己的生命周期,由时长表示,也即每个拦截器在执行完拦截操作一定时间后就会停止操作。Further, each type of consumer interceptor is defined in the consumer interceptor list, and the number of each type of consumer interceptor is at least one, and each interceptor has its own life cycle, determined by the duration Indicates that each interceptor will stop operating after executing the interception operation for a certain period of time.

因此,调用消费者拦截器包括:Therefore, calling a consumer interceptor involves:

针对消费者拦截器列表中每种类型的至少一个消费者拦截器,进行依次调用,其中,在同种类型的至少一个消费拦截器中,每个消费者拦截器还用于将拦截到的参数传递给下一个消费者拦截器;Invoke at least one consumer interceptor of each type in the list of consumer interceptors sequentially, wherein, among at least one consumer interceptor of the same type, each consumer interceptor is also used to pass the intercepted parameter passed to the next consumer interceptor;

相应的,将订阅消息的参数作为参数记录输出包括:Correspondingly, recording and outputting the parameters of the subscription message as parameters includes:

将消费者拦截器列表中每种类型的最后一个消费者拦截器拦截到的参数,以及最后一个消费者拦截器从其上一个消费者拦截器传递得到的参数,作为参数记录输出。The parameters intercepted by the last consumer interceptor of each type in the list of consumer interceptors, and the parameters passed by the last consumer interceptor from its previous consumer interceptor are output as parameter records.

具体的,消费者客户端在订阅消息时,调用目标类型的消费者拦截器用以拦截该目标类型的参数,该目标类型可以是主题、分区、偏移量和键值中的一个或多个。在调用时,选择消费者拦截器列表中的第一个目标类型的消费者拦截器进行调用,该拦截器被调用后,会持续拦截消费者订阅消息的目标类型的参数,直到到达该拦截器的操作时长为止,之后消费者客户端会选择消费者拦截器列表中的下一个目标类型的消费者拦截器进行调用,接着拦截目标类型的参数。而在此过程中,已经被调用的消费者拦截器会将其拦截到的参数传递给下一个被调用的消费者拦截器,以此类推,列表中的最后一个目标类型的拦截器会将其拦截到的目标类型的参数,和从其上一个消费者拦截器传递得到的参数,一起作为参数记录输出,从而输出得到消费者客户端订阅全部消息的全部目标类型的参数。这些输出的参数记录可以供技术人员查看、分析,从而为kafka系统出错后的检测提供依据。Specifically, when a consumer client subscribes to a message, it invokes a consumer interceptor of a target type to intercept parameters of the target type, and the target type may be one or more of topics, partitions, offsets, and key values. When calling, select the consumer interceptor of the first target type in the consumer interceptor list to call. After the interceptor is called, it will continue to intercept the parameters of the target type of the consumer subscription message until it reaches the interceptor After the operation time, the consumer client will select the consumer interceptor of the next target type in the list of consumer interceptors to call, and then intercept the parameters of the target type. In this process, the consumer interceptor that has been called will pass the intercepted parameters to the next consumer interceptor that is called, and so on, the last target type interceptor in the list will pass its The parameters of the intercepted target type and the parameters passed from the previous consumer interceptor are output together as parameter records, so as to output the parameters of all target types that the consumer client subscribes to for all messages. These output parameter records can be viewed and analyzed by technicians, so as to provide a basis for the detection of kafka system errors.

在一种优选的实施方式中,每个消费者拦截器还会在时长结束之前,判断所拦截到的参数中是否存在无效信息,并在判断为存在无效信息时,将无效信息更改为空值。在kafka中每种参数的格式都是有标准的,那么明显不符合该标准的参数即为无效信息,此时,会在相应位置上赋值为空,从而避免记录出错。In a preferred implementation, each consumer interceptor will also judge whether there is invalid information in the intercepted parameters before the end of the duration, and change the invalid information to a null value when it is judged that there is invalid information . The format of each parameter in Kafka has a standard, so the parameters that obviously do not meet the standard are invalid information. At this time, the corresponding position will be assigned a value of null, so as to avoid recording errors.

基于该优选实施方式可知,消费者拦截器有可能会修改拦截到的主题、分区、偏移量和键值的记录,因此,在调用消费者拦截器的操作中,在同种类型的至少一个消费者拦截器中,每个消费者拦截器还用于将拦截到的参数传递给下一个消费者拦截器,则可以将存在修改的记录进行继承,从而保存最原始和正确的参数记录。Based on this preferred implementation, it is known that the consumer interceptor may modify the intercepted records of topics, partitions, offsets and key values. Therefore, in the operation of calling the consumer interceptor, at least one of the same type In the consumer interceptor, each consumer interceptor is also used to pass the intercepted parameters to the next consumer interceptor, and the modified records can be inherited to save the most original and correct parameter records.

在另一种优选的实施方式中,若调用的某一个消费者拦截器抛出异常,为了不丢失该抛出异常的拦截器的参数记录,则在消费者拦截器列表的与所述抛出异常的消费者拦截器类型相同的消费者拦截器中,将所述抛出异常的消费者拦截器之前被调用并无异常的消费者拦截器拦截到的参数传递给下一个消费者拦截器,并调用下一个消费者拦截器进行参数拦截,从而实现消费者参数拦截的持续性。此外,根据kafka标准,在传递参数时,还需要同时抛出所述异常给消费者的Broker.oncommit()方法,用于处理该异常,使得元数据不为空,从而保证拦截操作的延续性。In another preferred implementation, if a certain consumer interceptor called throws an exception, in order not to lose the parameter record of the interceptor that throws the exception, in the consumer interceptor list and the throw Among the consumer interceptors with the same type of abnormal consumer interceptor, pass the parameters intercepted by the consumer interceptor that throws the exception before the consumer interceptor that is called without exception to the next consumer interceptor, And call the next consumer interceptor for parameter interception, so as to realize the continuity of consumer parameter interception. In addition, according to the kafka standard, when passing parameters, it is also necessary to throw the exception to the Broker.oncommit() method of the consumer at the same time to handle the exception so that the metadata is not empty, thereby ensuring the continuity of the interception operation .

优选的,由于消费者是单线程的,因此,消费者拦截器可以在该单线程上调用。Preferably, since the consumer is a single thread, the consumer interceptor can be invoked on the single thread.

本发明实施例的基于kafka的消息处理方法,通过消费者拦截器在消费者订阅消息时拦截订阅消息的参数,从而拦截消费者在集群中的不同点的消息,继而观测不同的应用程序不同的消息度量,实现跨集群跟踪消息路径,为kafka系统出错后的检测分析提供了依据。In the kafka-based message processing method of the embodiment of the present invention, the consumer interceptor intercepts the parameters of the subscription message when the consumer subscribes to the message, thereby intercepting the message of the consumer at different points in the cluster, and then observing different application programs. Message measurement enables cross-cluster tracking of message paths, providing a basis for detection and analysis of kafka system errors.

图4是本发明实施例二中的基于kafka的消息处理方法的应用实例的流程图,在该应用实例中,可以在消费者代码中创建一个新的类,它将封装一个ConsumerInterceptor(消费者拦截器)实例的列表:ConsumerInterceptors,KafkaConsumer将有一个新成员:ConsumerInterceptors<K,V>interceptors。KafkaConsumer构造函数将加载在interceptor.classes中指定的拦截器类的实例。如果interceptor.classes配置没有列出任何拦截器类,拦截器列表将为空。Fig. 4 is the flowchart of the application example of the message processing method based on kafka in the embodiment of the present invention two, in this application example, can create a new class in consumer code, it will encapsulate a ConsumerInterceptor (consumer intercepts List of instances of the device: ConsumerInterceptors, KafkaConsumer will have a new member: ConsumerInterceptors<K, V>interceptors. The KafkaConsumer constructor will load instances of the interceptor classes specified in interceptor.classes. If the interceptor.classes configuration does not list any interceptor classes, the list of interceptors will be empty.

如图4所示,KafkaConsumer.pull()调用Onconsume()方法进行消息订阅,Onconsume()方法在订阅消息时,调用主题拦截器、分区拦截器、偏移量拦截器或键值拦截器中的一种或多种,分别进行主题、分区、偏移量或键值的拦截操作,然后将获取到的参数记录返回给ConsumerRecord对象,ConsumerRecord对象返回给KafkaConsumer类。当消费者拦截器抛出异常时,Onconsume()方法将该异常抛出给Broker.oncommit()处理,Onconsume()方法则调用下一个与该抛出异常的拦截器类型相同的下一个消费者拦截器,且该下一个消费者拦截器获取到该抛出异常的拦截器的上一个被调用并无异常的消费者拦截器拦截到的参数。As shown in Figure 4, KafkaConsumer.pull() calls the Onconsume() method for message subscription, and the Onconsume() method calls the topic interceptor, partition interceptor, offset interceptor or key-value interceptor when subscribing to a message One or more interception operations for topics, partitions, offsets, or key values, respectively, and then return the obtained parameter records to the ConsumerRecord object, and the ConsumerRecord object returns to the KafkaConsumer class. When the consumer interceptor throws an exception, the Onconsume() method throws the exception to Broker.oncommit() for processing, and the Onconsume() method calls the next consumer of the same type as the interceptor that threw the exception interceptor, and the next consumer interceptor obtains the parameter intercepted by the last consumer interceptor that was called without exception of the interceptor that threw the exception.

实施时,可以根据kafka集群的规模、配置和实际的需求来设置消费者拦截器。优选的,上述消费者拦截器可以用Python脚本来实现。During implementation, consumer interceptors can be set according to the size, configuration, and actual needs of the Kafka cluster. Preferably, the above-mentioned consumer interceptor can be implemented with a Python script.

注意,上述仅为本发明的较佳实施例及所运用技术原理。本领域技术人员会理解,本发明不限于这里所述的特定实施例,对本领域技术人员来说能够进行各种明显的变化、重新调整和替代而不会脱离本发明的保护范围。因此,虽然通过以上实施例对本发明进行了较为详细的说明,但是本发明不仅仅限于以上实施例,在不脱离本发明构思的情况下,还可以包括更多其他等效实施例,而本发明的范围由所附的权利要求范围决定。Note that the above are only preferred embodiments of the present invention and applied technical principles. Those skilled in the art will understand that the present invention is not limited to the specific embodiments described herein, and that various obvious changes, readjustments and substitutions can be made by those skilled in the art without departing from the protection scope of the present invention. Therefore, although the present invention has been described in detail through the above embodiments, the present invention is not limited to the above embodiments, and can also include more other equivalent embodiments without departing from the concept of the present invention, and the present invention The scope is determined by the scope of the appended claims.

Claims (10)

CN201611226177.9A2016-12-272016-12-27 A message processing method based on KafkaExpired - Fee RelatedCN106649766B (en)

Priority Applications (1)

Application NumberPriority DateFiling DateTitle
CN201611226177.9ACN106649766B (en)2016-12-272016-12-27 A message processing method based on Kafka

Applications Claiming Priority (1)

Application NumberPriority DateFiling DateTitle
CN201611226177.9ACN106649766B (en)2016-12-272016-12-27 A message processing method based on Kafka

Publications (2)

Publication NumberPublication Date
CN106649766Atrue CN106649766A (en)2017-05-10
CN106649766B CN106649766B (en)2020-12-29

Family

ID=58832555

Family Applications (1)

Application NumberTitlePriority DateFiling Date
CN201611226177.9AExpired - Fee RelatedCN106649766B (en)2016-12-272016-12-27 A message processing method based on Kafka

Country Status (1)

CountryLink
CN (1)CN106649766B (en)

Cited By (16)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
CN107273228A (en)*2017-07-132017-10-20焦点科技股份有限公司Method for message transmission based on star topology framework
CN108093047A (en)*2017-12-152018-05-29北京小度信息科技有限公司Data transmission method for uplink, device, electronic equipment and middleware system
CN108170527A (en)*2017-12-152018-06-15北京奇艺世纪科技有限公司A kind of strange land distributed message consuming method and device mostly living
CN108595483A (en)*2018-03-132018-09-28腾讯科技(深圳)有限公司Data processing method and relevant apparatus
CN109286610A (en)*2018-08-232019-01-29北京城市网邻信息技术有限公司Service blocking apparatus, method, equipment and storage medium
CN109947415A (en)*2019-02-282019-06-28东软集团股份有限公司Configuration method, device, storage medium and the electronic equipment of blocker
CN110968484A (en)*2018-09-302020-04-07北京国双科技有限公司Data delay monitoring method and device
CN111049899A (en)*2019-12-112020-04-21贝壳技术有限公司kafka message storage system, method, apparatus, and computer-readable storage medium
CN111163118A (en)*2018-11-072020-05-15株式会社日立制作所Message transmission method and device in Kafka cluster
CN112084047A (en)*2020-09-142020-12-15浪潮云信息技术股份公司Message sending implementation method based on asynchronous thread pool
CN112751893A (en)*2019-10-302021-05-04中移(苏州)软件技术有限公司Message track data processing method and device and electronic equipment
CN113268642A (en)*2021-06-252021-08-17浪潮云信息技术股份公司Method for realizing refined access of data of internet of things equipment
CN115242787A (en)*2022-07-222022-10-25恒生电子股份有限公司Message processing system and method
CN115766610A (en)*2022-10-262023-03-07中国航空工业集团公司西安航空计算技术研究所Message queue based on publish-subscribe
CN116233129A (en)*2023-02-082023-06-06中国科学院国家空间科学中心Multi-node publishing and subscribing system for spacecraft on-orbit data safety transmission
US11995096B2 (en)2021-08-252024-05-28Red Hat, Inc.Creation of message serializer for event streaming platform

Citations (7)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
US6480977B1 (en)*1999-10-292002-11-12Worldcom, Inc.Multi-protocol monitor
JP3672730B2 (en)*1998-03-312005-07-20アマノ株式会社 Card input terminal with communication interception function
CN102436373A (en)*2011-09-132012-05-02上海普元信息技术股份有限公司 Method for Realizing Resource Loading and Resource Hot Update in Enterprise Distributed Application System
CN105187519A (en)*2015-08-262015-12-23福建星网锐捷通讯股份有限公司Socket transmission system based on AIO
CN105224445A (en)*2015-10-282016-01-06北京汇商融通信息技术有限公司Distributed tracking system
CN105490854A (en)*2015-12-112016-04-13传线网络科技(上海)有限公司Real-time log collection method and system, and application server cluster
US20160321308A1 (en)*2015-05-012016-11-03Ebay Inc.Constructing a data adaptor in an enterprise server data ingestion environment

Patent Citations (7)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
JP3672730B2 (en)*1998-03-312005-07-20アマノ株式会社 Card input terminal with communication interception function
US6480977B1 (en)*1999-10-292002-11-12Worldcom, Inc.Multi-protocol monitor
CN102436373A (en)*2011-09-132012-05-02上海普元信息技术股份有限公司 Method for Realizing Resource Loading and Resource Hot Update in Enterprise Distributed Application System
US20160321308A1 (en)*2015-05-012016-11-03Ebay Inc.Constructing a data adaptor in an enterprise server data ingestion environment
CN105187519A (en)*2015-08-262015-12-23福建星网锐捷通讯股份有限公司Socket transmission system based on AIO
CN105224445A (en)*2015-10-282016-01-06北京汇商融通信息技术有限公司Distributed tracking system
CN105490854A (en)*2015-12-112016-04-13传线网络科技(上海)有限公司Real-time log collection method and system, and application server cluster

Non-Patent Citations (2)

* Cited by examiner, † Cited by third party
Title
小蕊: ""发布说明-KAFKA-Version0.10.0.0"", 《ORCHOME—HTTPS://WWW.ORCHOME.COM/209》*
杨华辉: ""分布式日志系统的设计与实现"", 《中国优秀硕士学位论文全文数据库 信息科技辑》*

Cited By (21)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
CN107273228A (en)*2017-07-132017-10-20焦点科技股份有限公司Method for message transmission based on star topology framework
CN107273228B (en)*2017-07-132020-09-04焦点科技股份有限公司Message transmission method based on star topology architecture
CN108093047B (en)*2017-12-152021-07-27北京星选科技有限公司Data sending method and device, electronic equipment and middleware system
CN108093047A (en)*2017-12-152018-05-29北京小度信息科技有限公司Data transmission method for uplink, device, electronic equipment and middleware system
CN108170527A (en)*2017-12-152018-06-15北京奇艺世纪科技有限公司A kind of strange land distributed message consuming method and device mostly living
CN108595483A (en)*2018-03-132018-09-28腾讯科技(深圳)有限公司Data processing method and relevant apparatus
CN108595483B (en)*2018-03-132020-11-24腾讯科技(深圳)有限公司Data processing method and related device
CN109286610A (en)*2018-08-232019-01-29北京城市网邻信息技术有限公司Service blocking apparatus, method, equipment and storage medium
CN110968484A (en)*2018-09-302020-04-07北京国双科技有限公司Data delay monitoring method and device
CN111163118B (en)*2018-11-072023-04-07株式会社日立制作所Message transmission method and device in Kafka cluster
CN111163118A (en)*2018-11-072020-05-15株式会社日立制作所Message transmission method and device in Kafka cluster
CN109947415A (en)*2019-02-282019-06-28东软集团股份有限公司Configuration method, device, storage medium and the electronic equipment of blocker
CN112751893A (en)*2019-10-302021-05-04中移(苏州)软件技术有限公司Message track data processing method and device and electronic equipment
CN111049899A (en)*2019-12-112020-04-21贝壳技术有限公司kafka message storage system, method, apparatus, and computer-readable storage medium
CN112084047A (en)*2020-09-142020-12-15浪潮云信息技术股份公司Message sending implementation method based on asynchronous thread pool
CN113268642A (en)*2021-06-252021-08-17浪潮云信息技术股份公司Method for realizing refined access of data of internet of things equipment
US11995096B2 (en)2021-08-252024-05-28Red Hat, Inc.Creation of message serializer for event streaming platform
CN115242787A (en)*2022-07-222022-10-25恒生电子股份有限公司Message processing system and method
CN115242787B (en)*2022-07-222023-09-05恒生电子股份有限公司Message processing system and method
CN115766610A (en)*2022-10-262023-03-07中国航空工业集团公司西安航空计算技术研究所Message queue based on publish-subscribe
CN116233129A (en)*2023-02-082023-06-06中国科学院国家空间科学中心Multi-node publishing and subscribing system for spacecraft on-orbit data safety transmission

Also Published As

Publication numberPublication date
CN106649766B (en)2020-12-29

Similar Documents

PublicationPublication DateTitle
CN106649766A (en)Message handling method based on kafka
CA3093925C (en)Router management by an event stream processing cluster manager
US8438131B2 (en)Synchronization of media resources in a media archive
CN102402481B (en)The fuzz testing of asynchronous routine code
CN110750592B (en)Data synchronization method, device and terminal equipment
US10671451B2 (en)Idempotent mode of executing commands triggered by complex event processing
US9569339B1 (en)Debugging in an actor-based system
JP7339321B2 (en) Machine learning model update method, computer program and management device
Nikolov et al.Conceptualization and scalable execution of big data workflows using domain-specific languages and software containers
US10938968B2 (en)Harmonized data for engineering simulation
US10929258B1 (en)Method and system for model-based event-driven anomalous behavior detection
Wu et al.Trak: A testing tool for studying the reliability of data delivery in apache kafka
US20150293796A1 (en)Programmable logic controller and event-driven programming method thereof
CN104808606A (en)Method for providing functions within an industrial automation system and industrial automation system
US11620164B1 (en)Virtual partitioning of a shared message bus
US10423468B2 (en)Complex event processing using pseudo-clock
Boncea et al.A scalable architecture for automated monitoring of microservices
WO2024163099A1 (en)Circular-buffer for generating machine learning estimates of streaming observations in real time
Aung et al.Performance Evaluation for Real-Time Messaging System in Big Data Pipeline Architecture
Swetha et al.Network data analysis using spark
US10481993B1 (en)Dynamic diagnostic data generation
Bonér et al.Reactive programming reactive systems
US11226966B2 (en)Journaling of streaming anchor resource(s)
US20110247007A1 (en)Operators with request-response interfaces for data stream processing applications
Ratmumad et al.Improvement of Message-Oriented Middleware (MOM) for the Surveillance Platform

Legal Events

DateCodeTitleDescription
PB01Publication
PB01Publication
SE01Entry into force of request for substantive examination
SE01Entry into force of request for substantive examination
GR01Patent grant
GR01Patent grant
CF01Termination of patent right due to non-payment of annual fee
CF01Termination of patent right due to non-payment of annual fee

Granted publication date:20201229


[8]ページ先頭

©2009-2025 Movatter.jp