Movatterモバイル変換


[0]ホーム

URL:


CN114637611A - Information processing method and device based on message queue and computer equipment - Google Patents

Information processing method and device based on message queue and computer equipment
Download PDF

Info

Publication number
CN114637611A
CN114637611ACN202210237120.8ACN202210237120ACN114637611ACN 114637611 ACN114637611 ACN 114637611ACN 202210237120 ACN202210237120 ACN 202210237120ACN 114637611 ACN114637611 ACN 114637611A
Authority
CN
China
Prior art keywords
message
transaction
processing result
node
message queue
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Pending
Application number
CN202210237120.8A
Other languages
Chinese (zh)
Inventor
李陈
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
Ping An International Smart City Technology Co Ltd
Original Assignee
Ping An International Smart City Technology Co Ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Ping An International Smart City Technology Co LtdfiledCriticalPing An International Smart City Technology Co Ltd
Priority to CN202210237120.8ApriorityCriticalpatent/CN114637611A/en
Publication of CN114637611ApublicationCriticalpatent/CN114637611A/en
Pendinglegal-statusCriticalCurrent

Links

Images

Classifications

Landscapes

Abstract

The embodiment of the application belongs to the technical field of cloud, and relates to a message queue-based information processing method, a message queue-based information processing device, computer equipment and a storage medium, wherein the method comprises the following steps: receiving a transaction message sent by a producer node; when the message state identifier of the transaction message is a standing identifier, adding the transaction message into a standing message queue which cannot be consumed by the consumer node; receiving transaction processing result information sent by a producer node, wherein the transaction processing result information is generated by the producer node according to a processing result of a transaction message; and when the producer node is determined to pass the transaction message processing according to the transaction processing result information, the message state identifier is modified into a processable identifier, and the transaction message is transferred to a to-be-processed message queue which can be consumed by the consumer node. In addition, the application also relates to a block chain technology, and transaction result information can be stored in the block chain. The method and the device realize the consistency of data among different nodes in the distributed system.

Description

Translated fromChinese
基于消息队列的信息处理方法、装置及计算机设备Information processing method, device and computer equipment based on message queue

技术领域technical field

本申请涉及云技术领域,尤其涉及一种基于消息队列的信息处理方法、装置、计算机设备及存储介质。The present application relates to the field of cloud technology, and in particular, to a message queue-based information processing method, apparatus, computer equipment, and storage medium.

背景技术Background technique

消息队列是一种在消息的传输过程中保存消息的容器,消息队列作为“中间人”,将消息从消息源中继到目的地。消息队列常常用于分布式系统,当一个系统需要与多个系统交互时,通常使用消息队列实现系统之间的解耦。一个系统会将消息发送至消息队列,然后由其它系统对消息队列中的消息进行消费,同时,该系统也会针对该消息进行数据处理。如果消息队列两边的系统出现故障导致一方对消息的处理出现错误,可能会导致各系统出现数据不一致。然而当前,并没有一种有效的措施来解决这种数据不一致的问题。A message queue is a container that saves messages during the transmission of messages. The message queue acts as a "middleman" to relay messages from a message source to a destination. Message queues are often used in distributed systems. When a system needs to interact with multiple systems, message queues are usually used to achieve decoupling between systems. A system will send a message to the message queue, and then other systems will consume the message in the message queue, and at the same time, the system will also perform data processing on the message. If the systems on both sides of the message queue fail, causing one party to process messages incorrectly, data inconsistency may occur between the systems. However, currently, there is no effective measure to solve this data inconsistency problem.

发明内容SUMMARY OF THE INVENTION

本申请实施例的目的在于提出一种基于消息队列的信息处理方法、装置、计算机设备及存储介质,以解决分布式系统中消息不一致的技术问题。The purpose of the embodiments of the present application is to propose an information processing method, apparatus, computer equipment and storage medium based on a message queue, so as to solve the technical problem of inconsistency of messages in a distributed system.

为了解决上述技术问题,本申请实施例提供一种基于消息队列的信息处理方法,采用了如下所述的技术方案:In order to solve the above technical problems, the embodiments of the present application provide a message queue-based information processing method, which adopts the following technical solutions:

接收生产者节点发送的事务消息;Receive transaction messages sent by producer nodes;

当检测到所述事务消息中的消息状态标识为静置标识时,将所述事务消息加入静置消息队列,所述静置消息队列中的事务消息不可被消费者节点进行消费;以及When it is detected that the message state identifier in the transaction message is a static identifier, the transaction message is added to a static message queue, and the transaction message in the static message queue cannot be consumed by the consumer node; and

接收所述生产者节点发送的事务处理结果信息,所述事务处理结果信息由所述生产者节点根据对所述事务消息的处理结果生成;Receive transaction processing result information sent by the producer node, where the transaction processing result information is generated by the producer node according to the processing result of the transaction message;

当根据所述事务处理结果信息确定所述生产者节点对所述事务消息处理通过时,将所述消息状态标识修改为可处理标识,并将所述事务消息转移到待处理消息队列,所述待处理消息队列中的事务消息可被消费者节点进行消费。When it is determined according to the transaction processing result information that the producer node has passed the processing of the transaction message, the message status identifier is modified to a processable identifier, and the transaction message is transferred to the message queue to be processed, and the Transaction messages in the pending message queue can be consumed by consumer nodes.

为了解决上述技术问题,本申请实施例还提供一种基于消息队列的信息处理装置,采用了如下所述的技术方案:In order to solve the above technical problems, the embodiments of the present application also provide a message queue-based information processing device, which adopts the following technical solutions:

事务消息接收模块,用于接收生产者节点发送的事务消息;The transaction message receiving module is used to receive transaction messages sent by the producer node;

静置加入模块,用于当检测到所述事务消息中的消息状态标识为静置标识时,将所述事务消息加入静置消息队列,所述静置消息队列中的事务消息不可被消费者节点进行消费;以及The static join module is used to add the transaction message to the static message queue when it is detected that the message state in the transaction message is identified as the static identifier, and the transaction message in the static message queue cannot be consumed by consumers node for consumption; and

结果信息接收模块,用于接收所述生产者节点发送的事务处理结果信息,所述事务处理结果信息由所述生产者节点根据对所述事务消息的处理结果生成;a result information receiving module, configured to receive transaction processing result information sent by the producer node, where the transaction processing result information is generated by the producer node according to the processing result of the transaction message;

消息转移模块,用于当根据所述事务处理结果信息确定所述生产者节点对所述事务消息处理通过时,将所述消息状态标识修改为可处理标识,并将所述事务消息转移到待处理消息队列,所述待处理消息队列中的事务消息可被消费者节点进行消费。A message transfer module is configured to modify the message status identifier to a processable identifier when it is determined according to the transaction processing result information that the producer node has passed the processing of the transaction message, and transfer the transaction message to the pending transaction message. A message queue is processed, and transaction messages in the to-be-processed message queue can be consumed by the consumer node.

为了解决上述技术问题,本申请实施例还提供一种计算机设备,采用了如下所述的技术方案:In order to solve the above-mentioned technical problems, the embodiment of the present application also provides a computer device, which adopts the following technical solutions:

接收生产者节点发送的事务消息;Receive transaction messages sent by producer nodes;

当检测到所述事务消息中的消息状态标识为静置标识时,将所述事务消息加入静置消息队列,所述静置消息队列中的事务消息不可被消费者节点进行消费;以及When it is detected that the message state identifier in the transaction message is a static identifier, the transaction message is added to a static message queue, and the transaction message in the static message queue cannot be consumed by the consumer node; and

接收所述生产者节点发送的事务处理结果信息,所述事务处理结果信息由所述生产者节点根据对所述事务消息的处理结果生成;Receive transaction processing result information sent by the producer node, where the transaction processing result information is generated by the producer node according to the processing result of the transaction message;

当根据所述事务处理结果信息确定所述生产者节点对所述事务消息处理通过时,将所述消息状态标识修改为可处理标识,并将所述事务消息转移到待处理消息队列,所述待处理消息队列中的事务消息可被消费者节点进行消费。When it is determined according to the transaction processing result information that the producer node has passed the processing of the transaction message, the message status identifier is modified to a processable identifier, and the transaction message is transferred to the message queue to be processed, and the Transaction messages in the pending message queue can be consumed by consumer nodes.

为了解决上述技术问题,本申请实施例还提供一种计算机可读存储介质,采用了如下所述的技术方案:In order to solve the above technical problems, the embodiments of the present application also provide a computer-readable storage medium, which adopts the following technical solutions:

接收生产者节点发送的事务消息;Receive transaction messages sent by producer nodes;

当检测到所述事务消息中的消息状态标识为静置标识时,将所述事务消息加入静置消息队列,所述静置消息队列中的事务消息不可被消费者节点进行消费;以及When it is detected that the message state identifier in the transaction message is a static identifier, the transaction message is added to a static message queue, and the transaction message in the static message queue cannot be consumed by the consumer node; and

接收所述生产者节点发送的事务处理结果信息,所述事务处理结果信息由所述生产者节点根据对所述事务消息的处理结果生成;Receive transaction processing result information sent by the producer node, where the transaction processing result information is generated by the producer node according to the processing result of the transaction message;

当根据所述事务处理结果信息确定所述生产者节点对所述事务消息处理通过时,将所述消息状态标识修改为可处理标识,并将所述事务消息转移到待处理消息队列,所述待处理消息队列中的事务消息可被消费者节点进行消费。When it is determined according to the transaction processing result information that the producer node has passed the processing of the transaction message, the message status identifier is modified to a processable identifier, and the transaction message is transferred to the message queue to be processed, and the Transaction messages in the pending message queue can be consumed by consumer nodes.

与现有技术相比,本申请实施例主要有以下有益效果:对事务消息和消息队列进行改造,给事务消息添加消息状态标识,消息状态标识用于标识事务消息是否可以被消费者节点进行消费;可以设置两个并列的消息队列,分别存储不可被消费的事务消息,以及可以被消费的事务消息;当检测到生产者节点发送的事务消息中的消息状态标识为不可被消费的静置标识时,将事务消息加入静置消息队列;生产者节点可以根据对事务消息的处理结果生成事务处理结果信息;若消息队列服务器根据事务处理结果信息确定生产者节点对事务消息处理通过时,再将消息状态标识修改为表示可以被消费的可处理标识,并将事务消息转移到待处理消息队列,以便消费者节点进行消费;本申请通过消息状态标识和两个消息队列控制事务消息的处理进程,保证了不同节点之间数据的一致性。Compared with the prior art, the embodiment of the present application mainly has the following beneficial effects: transforming transaction messages and message queues, adding a message state identifier to the transaction message, and the message state identifier is used to identify whether the transaction message can be consumed by the consumer node. ; Two parallel message queues can be set up to store transaction messages that cannot be consumed and transaction messages that can be consumed respectively; when it is detected that the message status in the transaction message sent by the producer node is identified as a static identifier that cannot be consumed When the transaction message is added to the static message queue; the producer node can generate transaction processing result information according to the processing result of the transaction message; if the message queue server determines that the producer node has passed the processing of the transaction message according to the transaction processing result information, it will The message state identifier is modified to represent a processable identifier that can be consumed, and the transaction message is transferred to the pending message queue for consumption by the consumer node; this application controls the processing process of the transaction message through the message state identifier and two message queues, It ensures the consistency of data between different nodes.

附图说明Description of drawings

为了更清楚地说明本申请中的方案,下面将对本申请实施例描述中所需要使用的附图作一个简单介绍,显而易见地,下面描述中的附图是本申请的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他的附图。In order to illustrate the solutions in the present application more clearly, the following will briefly introduce the accompanying drawings used in the description of the embodiments of the present application. For those of ordinary skill, other drawings can also be obtained from these drawings without any creative effort.

图1是本申请可以应用于其中的示例性系统架构图;FIG. 1 is an exemplary system architecture diagram to which the present application can be applied;

图2是根据本申请的基于消息队列的信息处理方法的一个实施例的流程图;2 is a flowchart of an embodiment of a message queue-based information processing method according to the present application;

图3是根据本申请的基于消息队列的信息处理装置的一个实施例的结构示意图;3 is a schematic structural diagram of an embodiment of a message queue-based information processing apparatus according to the present application;

图4是根据本申请的计算机设备的一个实施例的结构示意图。FIG. 4 is a schematic structural diagram of an embodiment of a computer device according to the present application.

具体实施方式Detailed ways

除非另有定义,本文所使用的所有的技术和科学术语与属于本申请的技术领域的技术人员通常理解的含义相同;本文中在申请的说明书中所使用的术语只是为了描述具体的实施例的目的,不是旨在于限制本申请;本申请的说明书和权利要求书及上述附图说明中的术语“包括”和“具有”以及它们的任何变形,意图在于覆盖不排他的包含。本申请的说明书和权利要求书或上述附图中的术语“第一”、“第二”等是用于区别不同对象,而不是用于描述特定顺序。Unless otherwise defined, all technical and scientific terms used herein have the same meaning as commonly understood by one of ordinary skill in the technical field of this application; the terms used herein in the specification of the application are for the purpose of describing specific embodiments only It is not intended to limit the application; the terms "comprising" and "having" and any variations thereof in the description and claims of this application and the above description of the drawings are intended to cover non-exclusive inclusion. The terms "first", "second" and the like in the description and claims of the present application or the above drawings are used to distinguish different objects, rather than to describe a specific order.

在本文中提及“实施例”意味着,结合实施例描述的特定特征、结构或特性可以包含在本申请的至少一个实施例中。在说明书中的各个位置出现该短语并不一定均是指相同的实施例,也不是与其它实施例互斥的独立的或备选的实施例。本领域技术人员显式地和隐式地理解的是,本文所描述的实施例可以与其它实施例相结合。Reference herein to an "embodiment" means that a particular feature, structure, or characteristic described in connection with the embodiment can be included in at least one embodiment of the present application. The appearances of the phrase in various places in the specification are not necessarily all referring to the same embodiment, nor a separate or alternative embodiment that is mutually exclusive of other embodiments. It is explicitly and implicitly understood by those skilled in the art that the embodiments described herein may be combined with other embodiments.

为了使本技术领域的人员更好地理解本申请方案,下面将结合附图,对本申请实施例中的技术方案进行清楚、完整地描述。In order to make those skilled in the art better understand the solutions of the present application, the technical solutions in the embodiments of the present application will be described clearly and completely below with reference to the accompanying drawings.

如图1所示,系统架构100可以包括终端设备101、102、生产者节点103、104、网络105、消息队列服务器106和消费者节点107。网络105用以在终端设备101、102、生产者节点103、104、消息队列服务器106和消费者节点107之间提供通信链路的介质。网络104可以包括各种连接类型,例如有线、无线通信链路或者光纤电缆等等。As shown in FIG. 1 , the system architecture 100 may includeterminal devices 101 , 102 ,producer nodes 103 , 104 , anetwork 105 , amessage queue server 106 and aconsumer node 107 . Thenetwork 105 is a medium used to provide communication links between theterminal devices 101 , 102 , theproducer nodes 103 , 104 , themessage queue server 106 and theconsumer node 107 . Thenetwork 104 may include various connection types, such as wired, wireless communication links, or fiber optic cables, among others.

用户可以使用终端设备101、102通过网络105与生产者节点103、104交互,以接收或发送消息等。终端设备101、102上可以安装有各种通讯客户端应用,例如网页浏览器应用、购物类应用、搜索类应用、即时通信工具、邮箱客户端、社交平台软件等。Users can useterminal devices 101, 102 to interact withproducer nodes 103, 104 throughnetwork 105 to receive or send messages and the like. Various communication client applications may be installed on theterminal devices 101 and 102, such as web browser applications, shopping applications, search applications, instant communication tools, email clients, social platform software, and the like.

终端设备101、102可以是具有显示屏并且支持网页浏览的各种电子设备,包括但不限于智能手机、平板电脑、电子书阅读器、MP3播放器(Moving Picture ExpertsGroupAudio Layer III,动态影像专家压缩标准音频层面3)、MP4(Moving PictureExpertsGroup Audio Layer IV,动态影像专家压缩标准音频层面4)播放器、膝上型便携计算机和台式计算机等等。Theterminal devices 101 and 102 may be various electronic devices that have a display screen and support web browsing, including but not limited to smart phones, tablet computers, e-book readers, MP3 players (Moving Picture Experts Group Audio Layer III, moving picture experts compression standard) Audio Layer 3), MP4 (Moving PictureExpertsGroup Audio Layer IV, Moving Picture Experts Compression Standard Audio Layer 4) Players, Laptops and Desktops, etc.

生产者节点103、104可以是提供服务的服务器,例如对终端设备101、102上显示的页面提供支持的后台服务器。Theproducer nodes 103 and 104 may be servers that provide services, such as background servers that provide support for pages displayed on theterminal devices 101 and 102 .

需要说明的是,本申请实施例所提供的基于消息队列的信息处理方法一般由消息队列服务器执行,相应地,基于消息队列的信息处理装置一般设置于消息队列服务器中。It should be noted that the message queue-based information processing method provided by the embodiments of the present application is generally executed by a message queue server, and accordingly, a message queue-based information processing apparatus is generally set in the message queue server.

应该理解,图1中的终端设备、网络、消息队列服务器、生产者节点和消费者节点的数目仅仅是示意性的。根据实现需要,可以具有任意数目的终端设备、网络和服务器。It should be understood that the numbers of terminal devices, networks, message queue servers, producer nodes and consumer nodes in FIG. 1 are merely illustrative. There can be any number of terminal devices, networks and servers according to implementation needs.

继续参考图2,示出了根据本申请的基于消息队列的信息处理方法的一个实施例的流程图。所述的基于消息队列的信息处理方法,包括以下步骤:Continuing to refer to FIG. 2 , there is shown a flow chart of an embodiment of the message queue-based information processing method according to the present application. The described message queue-based information processing method includes the following steps:

步骤S201,接收生产者节点发送的事务消息。Step S201, receiving the transaction message sent by the producer node.

在本实施例中,基于消息队列的信息处理方法运行于其上的电子设备(例如图1所示的消息队列服务器)可以通过有线连接方式或者无线连接方式与生产者节点或消费者节点进行通信。需要指出的是,上述无线连接方式可以包括但不限于3G/4G/5G连接、WiFi连接、蓝牙连接、WiMAX连接、Zigbee连接、UWB(ultra wideband)连接、以及其他现在已知或将来开发的无线连接方式。In this embodiment, the electronic device (for example, the message queue server shown in FIG. 1 ) on which the message queue-based information processing method runs can communicate with the producer node or the consumer node through a wired connection or a wireless connection . It should be pointed out that the above wireless connection methods may include, but are not limited to, 3G/4G/5G connection, WiFi connection, Bluetooth connection, WiMAX connection, Zigbee connection, UWB (ultra wideband) connection, and other wireless connections now known or developed in the future. connection method.

具体地,事务消息是记录事务的消息,由生产者节点生成,并发送至消息队列服务器。例如,用户在app(application,应用程序)中购买商品A,则支持app运行的生产者节点根据用户对商品A的购买行为生成一条事务消息,然后将该事务消息发送至消息队列服务器。Specifically, a transaction message is a message that records a transaction, is generated by the producer node, and is sent to the message queue server. For example, if a user purchases commodity A in an app (application), the producer node that supports the running of the app generates a transaction message according to the user's purchase behavior of commodity A, and then sends the transaction message to the message queue server.

步骤S202,当检测到事务消息中的消息状态标识为静置标识时,将事务消息加入静置消息队列,静置消息队列中的事务消息不可被消费者节点进行消费。Step S202 , when it is detected that the message state identifier in the transaction message is the static identifier, the transaction message is added to the static message queue, and the transaction message in the static message queue cannot be consumed by the consumer node.

具体地,事务消息中具有消息状态标识,消息状态标识用于标识事务消息当前是否可以被消费者节点进行消费。消息状态标识包括静置标识和可处理标识;当消息状态标识为静置标识时,事务消息暂时不可被消费者节点进行消费;当消息状态标识为可处理标识时,表示事务消息可以被消费者节点进行消费。Specifically, the transaction message has a message state identifier, and the message state identifier is used to identify whether the transaction message can be currently consumed by the consumer node. The message status ID includes a static ID and a processable ID; when the message status ID is a static ID, the transaction message cannot be consumed by the consumer node temporarily; when the message status ID is a processable ID, it means that the transaction message can be consumed by consumers node for consumption.

生产者节点也需要对事务消息进行处理,为了避免生产者节点对事务消息处理失败,而后续的消费者节点继续对事务消息进行处理造成的错误累加,生产者节点发送到消息队列服务器的事务消息中的消息状态标识为静置标识。The producer node also needs to process transaction messages. In order to avoid the failure of the producer node to process the transaction message and the accumulation of errors caused by the subsequent consumer nodes continuing to process the transaction message, the producer node sends the transaction message to the message queue server. The message status identifier in is the static identifier.

消息队列服务器可以设置两个消息队列,其中一个消息队列为静置消息队列,用于存储消息状态标识为静置标识的事务消息;另一个消息队列为待处理消息队列,用于存储消息状态标识为可处理标识的事务消息。The message queue server can set up two message queues, one of which is a static message queue, which is used to store transaction messages whose message status is marked as static; the other message queue is a pending message queue, which is used to store message status identifiers. A transactional message identified as a processable.

当事务消息中的消息状态标识为静置标识时,将事务消息添加到静置消息队列中;消费者节点对于静置消息队列中的事务消息是无感知的,也无法对静置消息队列中的事务消息进行消费。When the message status identifier in the transaction message is the static identifier, the transaction message is added to the static message queue; the consumer node is unaware of the transaction message in the static message queue, and cannot detect the transaction message in the static message queue. transaction messages for consumption.

步骤S203,接收生产者节点发送的事务处理结果信息,事务处理结果信息由生产者节点根据对事务消息的处理结果生成。Step S203: Receive transaction processing result information sent by the producer node, where the transaction processing result information is generated by the producer node according to the processing result of the transaction message.

具体地,生产者节点也可以对本地的事务消息进行处理,得到处理结果,并根据处理结果生成事务处理结果信息,然后将事务处理结果信息发送至消息队列服务器。例如,当生产者节点为订单系统时,订单系统需要在订单数据库中创建事务消息相对应的订单,并根据订单是否创建成功,生成事务处理结果信息。Specifically, the producer node can also process the local transaction message, obtain the processing result, generate transaction processing result information according to the processing result, and then send the transaction processing result information to the message queue server. For example, when the producer node is an order system, the order system needs to create an order corresponding to the transaction message in the order database, and generate transaction processing result information according to whether the order is successfully created.

需要强调的是,为进一步保证上述事务处理结果信息的私密和安全性,上述事务处理结果信息还可以存储于一区块链的节点中。以便后续根据区块链节点中的事务处理结果信息进行信息核对。It should be emphasized that, in order to further ensure the privacy and security of the above-mentioned transaction processing result information, the above-mentioned transaction processing result information can also be stored in a node of a blockchain. So that the information can be checked later according to the transaction result information in the blockchain nodes.

本申请所指区块链是分布式数据存储、点对点传输、共识机制、加密算法等计算机技术的新型应用模式。区块链(Blockchain),本质上是一个去中心化的数据库,是一串使用密码学方法相关联产生的数据块,每一个数据块中包含了一批次网络交易的信息,用于验证其信息的有效性(防伪)和生成下一个区块。区块链可以包括区块链底层平台、平台产品服务层以及应用服务层等。The blockchain referred to in this application is a new application mode of computer technologies such as distributed data storage, point-to-point transmission, consensus mechanism, and encryption algorithm. Blockchain, essentially a decentralized database, is a series of data blocks associated with cryptographic methods. Each data block contains a batch of network transaction information to verify its Validity of information (anti-counterfeiting) and generation of the next block. The blockchain can include the underlying platform of the blockchain, the platform product service layer, and the application service layer.

步骤S204,当根据事务处理结果信息确定生产者节点对事务消息处理通过时,将消息状态标识修改为可处理标识,并将事务消息转移到待处理消息队列,待处理消息队列中的事务消息可被消费者节点进行消费。Step S204, when it is determined that the producer node has passed the processing of the transaction message according to the transaction processing result information, the message state identifier is modified to a processable identifier, and the transaction message is transferred to the message queue to be processed, and the transaction message in the message queue to be processed can be processed. Consumed by the consumer node.

具体地,当根据事务处理结果信息确定生产者节点对事务消息处理通过时,事务流程可以推进。消息队列服务器将消息状态标识修改为可处理标识,然后将事务消息转移到待处理消息队列,以便消费者节点对待处理消息队列中的事务消息进行消费。Specifically, when it is determined according to the transaction processing result information that the producer node has passed the processing of the transaction message, the transaction flow can be advanced. The message queue server modifies the message state identifier to a processable identifier, and then transfers the transaction message to the pending message queue, so that the consumer node can consume the transaction message in the pending message queue.

例如,当根据事务处理结果信息确定订单数据库中已经建立事务消息所对应的订单时,消息队列服务器将事务消息转移到待处理消息队列。作为消费者节点的库存系统从待处理消息队列中取走事务消息进行消费,在库存数据库中将商品A的库存量减1。For example, when the order corresponding to the transaction message has been established in the order database according to the transaction processing result information, the message queue server transfers the transaction message to the pending message queue. The inventory system as a consumer node takes transaction messages from the pending message queue for consumption, and reduces the inventory of commodity A by 1 in the inventory database.

本实施例中,对事务消息和消息队列进行改造,给事务消息添加消息状态标识,消息状态标识用于标识事务消息是否可以被消费者节点进行消费;可以设置两个并列的消息队列,分别存储不可被消费的事务消息,以及可以被消费的事务消息;当检测到生产者节点发送的事务消息中的消息状态标识为不可被消费的静置标识时,将事务消息加入静置消息队列;生产者节点可以根据对事务消息的处理结果生成事务处理结果信息;若消息队列服务器根据事务处理结果信息确定生产者节点对事务消息处理通过时,再将消息状态标识修改为表示可以被消费的可处理标识,并将事务消息转移到待处理消息队列,以便消费者节点进行消费;本申请通过消息状态标识和两个消息队列控制事务消息的处理进程,保证了不同节点之间数据的一致性。In this embodiment, the transaction message and the message queue are transformed, and a message state identifier is added to the transaction message, and the message state identifier is used to identify whether the transaction message can be consumed by the consumer node; two parallel message queues can be set up to store the Transaction messages that cannot be consumed, and transaction messages that can be consumed; when it is detected that the message status in the transaction message sent by the producer node is a static flag that cannot be consumed, the transaction message is added to the static message queue; production The producer node can generate transaction processing result information according to the processing result of the transaction message; if the message queue server determines that the producer node has passed the processing of the transaction message according to the transaction processing result information, the message status flag is modified to indicate that it can be consumed. ID, and transfer the transaction message to the pending message queue for consumption by the consumer node; this application controls the processing process of the transaction message through the message status ID and two message queues to ensure the consistency of data between different nodes.

进一步的,上述步骤S202之后,还可以包括:获取事务消息的已接收时长;当已接收时长超过预设时长、且未接收到事务处理结果信息时,向生产者节点发送处理结果校验指令,处理结果校验指令用于指示生产者节点校验对事务消息的处理结果,并根据处理结果生成事务处理结果信息。Further, after the above step S202, it may further include: obtaining the received duration of the transaction message; when the received duration exceeds the preset duration and the transaction processing result information is not received, sending a processing result verification instruction to the producer node, The processing result verification instruction is used to instruct the producer node to verify the processing result of the transaction message, and generate transaction processing result information according to the processing result.

具体地,由于生产者节点对事务消息进行处理、生成事务处理结果信息以及发送事务处理结果信息需要一定的时间,因此,消息队列服务器在接收到事务消息后开始计时,记录的时长即为已接收时长。Specifically, since it takes a certain amount of time for the producer node to process the transaction message, generate the transaction processing result information and send the transaction processing result information, the message queue server starts timing after receiving the transaction message, and the recorded duration is the received time. duration.

消息队列服务器获取已接收时长,当已接收时长超过预设时长、且仍未接收到事务处理结果信息时,可以生成处理结果校验指令,并将处理结果校验指令发送至生产者节点。生产者节点可以根据处理结果校验指令获取自己对事务消息的处理结果,根据处理结果生成事务处理结果信息,并将事务处理结果信息发送至消息队列服务器。The message queue server obtains the received duration, and when the received duration exceeds the preset duration and the transaction processing result information has not been received, it can generate a processing result verification instruction, and send the processing result verification instruction to the producer node. The producer node can obtain its own processing result of the transaction message according to the processing result verification instruction, generate transaction processing result information according to the processing result, and send the transaction processing result information to the message queue server.

本实施例中,当已接收时长超过预设时长,且仍未接收到事务处理结果信息时,向生产者节点发送处理结果校验指令以获取事务处理结果消息,从而推进事务流程的处理。In this embodiment, when the received duration exceeds the preset duration and the transaction processing result information has not been received, a processing result verification instruction is sent to the producer node to obtain the transaction processing result message, thereby advancing the processing of the transaction process.

进一步的,上述获取事务消息的已接收时长的步骤之后,还可以包括:当已接收时长超过预设时长、且未接收到事务处理结果信息时,在生产者节点所在的分布式系统中,选取第二生产者节点;向第二生产者节点发送处理结果校验指令,处理结果校验指令用于指示第二生产者节点在生产者数据库中查询事务消息的处理结果,并根据处理结果生成事务处理结果信息。Further, after the above step of obtaining the received duration of the transaction message, it may further include: when the received duration exceeds the preset duration and the transaction processing result information is not received, in the distributed system where the producer node is located, select The second producer node; sending a processing result verification instruction to the second producer node, where the processing result verification instruction is used to instruct the second producer node to query the processing result of the transaction message in the producer database, and generate a transaction according to the processing result Processing result information.

具体地,当存在多个生产者节点时,本申请可以不依赖特定的生产者节点,可以通过其他的生产者节点获取事务消息的事务处理结果信息。Specifically, when there are multiple producer nodes, the present application may not rely on a specific producer node, and may obtain transaction processing result information of transaction messages through other producer nodes.

生产者可以采用集群部署,包括多个生产者节点,这样就会有多个实例,且多个生产者节点共享一个生产者数据库。通过生产者数据库可以获取到生产者节点对事务消息的处理结果。The producer can be deployed in a cluster, including multiple producer nodes, so that there will be multiple instances, and multiple producer nodes will share a producer database. Through the producer database, the processing result of the transaction message by the producer node can be obtained.

如果生产者节点因为网络故障、宕机或者其他原因,导致消息队列服务器在预设时长中没有接收到事务处理结果信息,也无法与生产者节点进行通信时,可以在分布式集群中查找其他可用的实例,得到第二生产者节点。然后向第二生产者节点发送针对事务消息的处理结果校验指令,由第二生产者节点在共享的生产者数据库中查询事务消息的处理结果,并根据处理结果生成事务处理结果信息。If the producer node does not receive transaction processing result information within the preset time period due to network failure, downtime or other reasons, and cannot communicate with the producer node, it can search for other available services in the distributed cluster. instance to get the second producer node. Then, a verification instruction of the processing result of the transaction message is sent to the second producer node, and the second producer node queries the processing result of the transaction message in the shared producer database, and generates transaction processing result information according to the processing result.

生产者节点可以提供一个查询接口,消息队列服务器通过该查询接口,调用生产者节点查询事务消息的处理结果。The producer node can provide a query interface, through which the message queue server calls the producer node to query the processing result of the transaction message.

例如,订单系统部署了10个服务,且10个服务实例共享订单数据库。最初的事务消息由集群中的某一个实例发送。当该实例出现故障时,消息队列服务器在10个实例中查找可以正常通信的实例作为第二生产者节点,然后由第二生产者节点到订单数据库中查询是否存在事务消息相关联的订单,并根据查询结果生成事务处理结果信息。For example, an order system deploys 10 services, and 10 service instances share the order database. The initial transaction message is sent by an instance in the cluster. When the instance fails, the message queue server finds an instance that can communicate normally among the 10 instances as the second producer node, and then the second producer node queries the order database to see if there is an order associated with the transaction message, and Generate transaction processing result information based on query results.

本实施例中,可以在分布式系统中选取其他实例作为第二生产者节点,并通过第二生产者节点到生产者数据库中查询事务消息的处理结果,保证了生产者节点在故障时也可以获取到事务处理结果信息。In this embodiment, other instances can be selected as the second producer node in the distributed system, and the second producer node can query the producer database for the processing result of the transaction message, which ensures that the producer node can also Obtain the transaction result information.

进一步的,上述步骤S203之后,还可以包括:当根据事务处理结果信息确定生产者节点对事务消息处理失败时,将事务消息从静置消息队列中删除。Further, after the above step S203, the method may further include: when it is determined that the producer node fails to process the transaction message according to the transaction processing result information, deleting the transaction message from the static message queue.

具体地,当根据事务处理结果信息确定生产者节点对事务消息处理失败时,表明事务流程的上一环节已经失败,无需进行接下来的流程,消息队列服务器可以将静置消息队列中的事务消息进行删除。Specifically, when it is determined that the producer node fails to process the transaction message according to the transaction processing result information, it indicates that the previous link of the transaction process has failed, and the next process does not need to be performed. The message queue server can put the transaction message in the static message queue. to delete.

本实施例中,当生产者节点对事务消息处理失败时,将静置消息队列中的事务消息删除,从而清理无用的事务消息,也避免了消息积压。In this embodiment, when the producer node fails to process the transaction message, the transaction message in the static message queue is deleted, so as to clear the useless transaction message and avoid the message backlog.

进一步的,上述步骤S204之后,还可以包括:获取消费者节点根据事务消息返回的消费反馈信息;当根据消费反馈信息确定消费者节点对事务消息处理成功时,将事务消息从待处理消息队列中删除。Further, after the above step S204, it may further include: acquiring the consumption feedback information returned by the consumer node according to the transaction message; when it is determined according to the consumption feedback information that the consumer node successfully processes the transaction message, removing the transaction message from the pending message queue. delete.

具体地,消费者节点从待处理消息队列中读取事务消息,对事务消息进行处理,并根据处理结果生成消费反馈消息,然后将消费反馈消息发送至消息队列服务器。消费反馈消息可以记录消费者节点对事务消息处理成功还是处理失败。Specifically, the consumer node reads the transaction message from the message queue to be processed, processes the transaction message, generates a consumption feedback message according to the processing result, and then sends the consumption feedback message to the message queue server. The consumer feedback message can record whether the consumer node successfully processes or fails to process the transaction message.

当消息队列服务器根据消费反馈信息确定消费者节点对事务消息处理成功时,将事务消息从待处理消息队列中删除,避免消息积压,降低消息队列服务器的运行压力,同时,从待处理消息队列中删除事务消息表示结束事务消息的处理流程,实现对消费者节点中事务消息消费状态的核对。When the message queue server determines that the consumer node has successfully processed the transaction message according to the consumption feedback information, it deletes the transaction message from the pending message queue to avoid message backlog and reduce the operating pressure of the message queue server. Deleting a transaction message means ending the processing flow of the transaction message, and checking the consumption status of the transaction message in the consumer node.

在一个实施例中,表示处理成功的消费反馈信息可以是ack消息(Acknowledgement,确认消息)。In one embodiment, the consumption feedback information indicating that the processing is successful may be an ack message (Acknowledgement, confirmation message).

本实施例中,当根据消费反馈信息确定消费者节点对事务消息处理成功时,将事务消息从待处理消息队列中删除,避免消息挤压,也完成分布式系统中消息的一致性核对。In this embodiment, when it is determined that the consumer node successfully processes the transaction message according to the consumption feedback information, the transaction message is deleted from the to-be-processed message queue to avoid message squeeze and also complete the consistency check of messages in the distributed system.

进一步的,上述在获取消费者节点根据事务消息返回的消费反馈信息的步骤之后,还可以包括:当根据消费反馈消息确定消费者节点对事务消息处理失败时,将事务消息转移到重试消息队列,重试消息队列中的事务消息用于供消费者节点重新消费。Further, after the above-mentioned step of obtaining the consumption feedback information returned by the consumer node according to the transaction message, it may further include: when it is determined according to the consumption feedback message that the consumer node fails to process the transaction message, transferring the transaction message to the retry message queue. , and retry the transaction message in the message queue for re-consumption by the consumer node.

具体地,消费者节点对事务消息进行处理时可能出现异常,例如,当消费者节点为库存系统时,库存系统需要根据事务消息将商品A的库存量减1,但是由于库存系统宕机,无法对库存量进行更新。此时,消费者节点可以生成表示处理失败的消费反馈信息。Specifically, an exception may occur when the consumer node processes transaction messages. For example, when the consumer node is an inventory system, the inventory system needs to reduce the inventory of commodity A by 1 according to the transaction message. However, due to the downtime of the inventory system, the inventory system cannot Update inventory. At this time, the consumer node can generate consumption feedback information indicating that the processing fails.

消费者节点对事务消息消费失败后,可以进行重新消费的重试操作,当重试操作的次数达到预设阈值,但依然无法处理成功时,消费者节点生成表示处理失败的消费反馈信息;或者,当消费者节点在预设时长内仍无法处理成功,生成表示处理失败的消费反馈信息。After the consumer node fails to consume the transaction message, it can retry the re-consumption operation. When the number of retry operations reaches the preset threshold, but still cannot be processed successfully, the consumer node generates a consumption feedback message indicating that the processing failed; or , when the consumer node still fails to process successfully within the preset time period, generates consumption feedback information indicating that the processing fails.

对于消费者节点处理失败的事务消息,消息队列服务器可以将事务消息转移到重试消息队列中。重试消息队列也是消息队列服务器中的消息队列。消费者节点可以通过定时任务,从重试消息队列中重新获取事务消息并进行消费,直至处理成功,向消息队列服务器返回表示处理成功的消费反馈信息,然后由消息队列服务器将该事务消息从重试消息队列中删除。For transaction messages that fail to be processed by consumer nodes, the message queue server can transfer the transaction messages to the retry message queue. The retry message queue is also the message queue in the message queue server. The consumer node can re-obtain transaction messages from the retry message queue through timed tasks and consume them until the processing is successful, and return the consumption feedback information indicating the successful processing to the message queue server, and then the message queue server The transaction message is removed from the retry message by the message queue server. removed from the queue.

本实施例中,当根据消费反馈消息确定消费者节点对事务消息处理失败时,将事务消息转移到重试消息队列,以便消费者节点重新进行消费,确保生产者节点和消费者节点中消息的最终一致性。In this embodiment, when it is determined that the consumer node fails to process the transaction message according to the consumption feedback message, the transaction message is transferred to the retry message queue, so that the consumer node can re-consume, and the message in the producer node and the consumer node is ensured. eventual consistency.

进一步的,上述在获取消费者节点根据事务消息返回的消费反馈信息的步骤之后,又可以包括:当根据消费反馈消息确定消费者节点对事务消息处理失败时,将事务消息转移到备份消息队列,并指示消费者节点将事务消息存储到数据库,数据库中的事务消息用于供消费者节点重新消费;当消费者节点对事务消息处理成功时,将事务消息从备份消息队列中删除。Further, after the above-mentioned step of acquiring the consumption feedback information returned by the consumer node according to the transaction message, it may further include: when it is determined according to the consumption feedback message that the consumer node fails to process the transaction message, transferring the transaction message to the backup message queue, And instruct the consumer node to store the transaction message in the database, and the transaction message in the database is used for re-consumption by the consumer node; when the consumer node successfully processes the transaction message, the transaction message is deleted from the backup message queue.

具体地,当消费者服务器对事务消息处理失败后,可以生成表示处理失败的消费反馈信息,将该消费反馈信息发送至消息队列服务器。消息队列服务器接收到消费反馈信息后,将事务消息转移到备份消息队列,备份消息队列也是消息队列服务器中的消息队列,可以对事务消息进行备份,以便消费者节点在需要时从备份消息队列中获取事务消息。Specifically, when the consumer server fails to process the transaction message, it may generate consumption feedback information indicating that the processing fails, and send the consumption feedback information to the message queue server. After the message queue server receives the consumption feedback information, it transfers the transaction message to the backup message queue. The backup message queue is also the message queue in the message queue server. Get transaction messages.

消费者节点可以将处理失败的事务消息持久化存储到数据库中,然后通过定时任务扫描数据库中之前未处理成功的事务消息,直至消费成功,将该事务消息从数据库中删除,并通知消息队列服务器将备份消息队列中的事务消息删除。通过额外的数据库存储处理失败的事务消息,可以减少消息队列中消息的积压,数据库的持久化特性保证了事务消息不会丢失,数据库的吞吐量通常高于消息队列,在极端异常的情况下,也可以存储大量未能处理成功的事务消息,可以保证消息的最终一致性。The consumer node can persist transaction messages that fail to be processed in the database, and then scan the database for previously unprocessed transaction messages through scheduled tasks until the consumption is successful, delete the transaction messages from the database, and notify the message queue server. Deletes transaction messages in the backup message queue. By storing and processing failed transaction messages in an additional database, the backlog of messages in the message queue can be reduced. The persistence feature of the database ensures that transaction messages will not be lost. The throughput of the database is usually higher than that of the message queue. It can also store a large number of unsuccessful transaction messages, which can ensure the eventual consistency of messages.

在一个实施例中,消费者节点对事务消息处理失败后,可以直接将事务消息持久化到数据库中,然后通过定时任务从数据库中读取之前处理失败的事务消息并重新进行消费,直至处理成功,将事务消息从数据库中删除。In one embodiment, after the consumer node fails to process the transaction message, it can directly persist the transaction message to the database, and then read the failed transaction message from the database through a scheduled task and re-consume it until the processing succeeds , deletes the transaction message from the database.

本实施例中,当消费者节点对事务消息处理失败时,可以将事务消息转移到备份消息队列中,供消费者节点进行消费;同时,消费者节点可以将事务消息持久化存储到数据库中,以便重新进行消费;备份消息队列和数据库的双重存储保证了事务消息的安全性,也确保了分布式系统中消息的最终一致性。In this embodiment, when the consumer node fails to process the transaction message, the transaction message can be transferred to the backup message queue for consumption by the consumer node; at the same time, the consumer node can persistently store the transaction message in the database, In order to re-consume; the double storage of backup message queue and database ensures the security of transaction messages and the eventual consistency of messages in distributed systems.

本申请可用于众多通用或专用的计算机系统环境或配置中。例如:个人计算机、服务器计算机、手持设备或便携式设备、平板型设备、多处理器系统、基于微处理器的系统、置顶盒、可编程的消费电子设备、网络PC、小型计算机、大型计算机、包括以上任何系统或设备的分布式计算环境等等。本申请可以在由计算机执行的计算机可执行指令的一般上下文中描述,例如程序模块。一般地,程序模块包括执行特定任务或实现特定抽象数据类型的例程、程序、对象、组件、数据结构等等。也可以在分布式计算环境中实践本申请,在这些分布式计算环境中,由通过通信网络而被连接的远程处理设备来执行任务。在分布式计算环境中,程序模块可以位于包括存储设备在内的本地和远程计算机存储介质中。The present application may be used in numerous general purpose or special purpose computer system environments or configurations. For example: personal computers, server computers, handheld or portable devices, tablet devices, multiprocessor systems, microprocessor-based systems, set-top boxes, programmable consumer electronics, network PCs, minicomputers, mainframe computers, including A distributed computing environment for any of the above systems or devices, and the like. The application may be described in the general context of computer-executable instructions, such as program modules, being executed by a computer. Generally, program modules include routines, programs, objects, components, data structures, etc. that perform particular tasks or implement particular abstract data types. The application may also be practiced in distributed computing environments where tasks are performed by remote processing devices that are linked through a communications network. In a distributed computing environment, program modules may be located in both local and remote computer storage media including storage devices.

本领域普通技术人员可以理解实现上述实施例方法中的全部或部分流程,是可以通过计算机程序来指令相关的硬件来完成,该计算机程序可存储于一计算机可读取存储介质中,该程序在执行时,可包括如上述各方法的实施例的流程。其中,前述的存储介质可为磁碟、光盘、只读存储记忆体(Read-Only Memory,ROM)等非易失性存储介质,或随机存储记忆体(Random Access Memory,RAM)等。Those of ordinary skill in the art can understand that the realization of all or part of the processes in the methods of the above embodiments can be accomplished by instructing relevant hardware through a computer program, and the computer program can be stored in a computer-readable storage medium, and the program is During execution, it may include the processes of the embodiments of the above-mentioned methods. The aforementioned storage medium may be a non-volatile storage medium such as a magnetic disk, an optical disk, a read-only memory (Read-Only Memory, ROM), or a random access memory (Random Access Memory, RAM).

应该理解的是,虽然附图的流程图中的各个步骤按照箭头的指示依次显示,但是这些步骤并不是必然按照箭头指示的顺序依次执行。除非本文中有明确的说明,这些步骤的执行并没有严格的顺序限制,其可以以其他的顺序执行。而且,附图的流程图中的至少一部分步骤可以包括多个子步骤或者多个阶段,这些子步骤或者阶段并不必然是在同一时刻执行完成,而是可以在不同的时刻执行,其执行顺序也不必然是依次进行,而是可以与其他步骤或者其他步骤的子步骤或者阶段的至少一部分轮流或者交替地执行。It should be understood that although the various steps in the flowchart of the accompanying drawings are sequentially shown in the order indicated by the arrows, these steps are not necessarily executed in sequence in the order indicated by the arrows. Unless explicitly stated herein, the execution of these steps is not strictly limited to the order and may be performed in other orders. Moreover, at least a part of the steps in the flowchart of the accompanying drawings may include multiple sub-steps or multiple stages, and these sub-steps or stages are not necessarily executed at the same time, but may be executed at different times, and the execution sequence is also It does not have to be performed sequentially, but may be performed alternately or alternately with other steps or at least a portion of sub-steps or stages of other steps.

进一步参考图3,作为对上述图2所示方法的实现,本申请提供了一种基于消息队列的信息处理装置的一个实施例,该装置实施例与图2所示的方法实施例相对应,该装置具体可以应用于各种电子设备中。Further referring to FIG. 3 , as an implementation of the method shown in FIG. 2 above, the present application provides an embodiment of a message queue-based information processing apparatus, and the apparatus embodiment corresponds to the method embodiment shown in FIG. 2 , Specifically, the device can be applied to various electronic devices.

如图3所示,本实施例所述的基于消息队列的信息处理装置300包括:事务消息接收模块301、静置加入模块302、结果信息接收模块303以及消息转移模块304,其中:As shown in FIG. 3 , the message queue-based information processing apparatus 300 in this embodiment includes: a transactionmessage receiving module 301, astatic join module 302, a resultinformation receiving module 303, and amessage transfer module 304, wherein:

事务消息接收模块301,用于接收生产者节点发送的事务消息。The transactionmessage receiving module 301 is configured to receive the transaction message sent by the producer node.

静置加入模块302,用于当检测到事务消息中的消息状态标识为静置标识时,将事务消息加入静置消息队列,静置消息队列中的事务消息不可被消费者节点进行消费。The static addingmodule 302 is configured to add the transaction message to the static message queue when it is detected that the message state identifier in the transaction message is the static identifier, and the transaction message in the static message queue cannot be consumed by the consumer node.

结果信息接收模块303,用于接收生产者节点发送的事务处理结果信息,事务处理结果信息由生产者节点根据对事务消息的处理结果生成。The resultinformation receiving module 303 is configured to receive transaction processing result information sent by the producer node, and the transaction processing result information is generated by the producer node according to the processing result of the transaction message.

消息转移模块304,用于当根据事务处理结果信息确定生产者节点对事务消息处理通过时,将消息状态标识修改为可处理标识,并将事务消息转移到待处理消息队列,待处理消息队列中的事务消息可被消费者节点进行消费。Themessage transfer module 304 is configured to modify the message status identifier to a processable identifier when it is determined according to the transaction processing result information that the producer node has passed the processing of the transaction message, and transfer the transaction message to the message queue to be processed, in the message queue to be processed The transaction messages can be consumed by consumer nodes.

本实施例中,对事务消息和消息队列进行改造,给事务消息添加消息状态标识,消息状态标识用于标识事务消息是否可以被消费者节点进行消费;可以设置两个并列的消息队列,分别存储不可被消费的事务消息,以及可以被消费的事务消息;当检测到生产者节点发送的事务消息中的消息状态标识为不可被消费的静置标识时,将事务消息加入静置消息队列;生产者节点可以根据对事务消息的处理结果生成事务处理结果信息;若消息队列服务器根据事务处理结果信息确定生产者节点对事务消息处理通过时,再将消息状态标识修改为表示可以被消费的可处理标识,并将事务消息转移到待处理消息队列,以便消费者节点进行消费;本申请通过消息状态标识和两个消息队列控制事务消息的处理进程,保证了不同节点之间数据的一致性。In this embodiment, the transaction message and the message queue are transformed, and a message state identifier is added to the transaction message, and the message state identifier is used to identify whether the transaction message can be consumed by the consumer node; two parallel message queues can be set up to store the Transaction messages that cannot be consumed, and transaction messages that can be consumed; when it is detected that the message status in the transaction message sent by the producer node is a static flag that cannot be consumed, the transaction message is added to the static message queue; production The producer node can generate transaction processing result information according to the processing result of the transaction message; if the message queue server determines that the producer node has passed the processing of the transaction message according to the transaction processing result information, the message status flag is modified to indicate that it can be consumed. ID, and transfer the transaction message to the pending message queue for consumption by the consumer node; this application controls the processing process of the transaction message through the message status ID and two message queues to ensure the consistency of data between different nodes.

在本实施例的一些可选的实现方式中,基于消息队列的信息处理装置300还可以包括:时长获取模块以及指令发送模块,其中:In some optional implementations of this embodiment, the message queue-based information processing apparatus 300 may further include: a duration acquiring module and an instruction sending module, wherein:

时长获取模块,用于获取事务消息的已接收时长。The duration obtaining module is used to obtain the received duration of the transaction message.

指令发送模块,用于当已接收时长超过预设时长、且未接收到事务处理结果信息时,向生产者节点发送处理结果校验指令,处理结果校验指令用于指示生产者节点校验对事务消息的处理结果,并根据处理结果生成事务处理结果信息。The instruction sending module is used to send a processing result verification instruction to the producer node when the received time exceeds the preset time and the transaction processing result information is not received, and the processing result verification instruction is used to instruct the producer node to verify the correct The processing result of the transaction message, and the transaction processing result information is generated according to the processing result.

本实施例中,当已接收时长超过预设时长,且仍未接收到事务处理结果信息时,向生产者节点发送处理结果校验指令以获取事务处理结果消息,从而推进事务流程的处理。In this embodiment, when the received duration exceeds the preset duration and the transaction processing result information has not been received, a processing result verification instruction is sent to the producer node to obtain the transaction processing result message, thereby advancing the processing of the transaction process.

在本实施例的一些可选的实现方式中,基于消息队列的信息处理装置300还可以包括:节点选取模块以及校验指令发送模块,其中:In some optional implementations of this embodiment, the message queue-based information processing apparatus 300 may further include: a node selection module and a verification instruction sending module, wherein:

节点选取模块,用于当已接收时长超过预设时长、且未接收到事务处理结果信息时,在生产者节点所在的分布式系统中,选取第二生产者节点。The node selection module is used to select a second producer node in the distributed system where the producer node is located when the received time duration exceeds the preset time duration and the transaction processing result information is not received.

校验指令发送模块,用于向第二生产者节点发送处理结果校验指令,处理结果校验指令用于指示第二生产者节点在生产者数据库中查询事务消息的处理结果,并根据处理结果生成事务处理结果信息。The verification instruction sending module is used to send the processing result verification instruction to the second producer node, and the processing result verification instruction is used to instruct the second producer node to query the processing result of the transaction message in the producer database, and according to the processing result Generate transaction result information.

本实施例中,可以在分布式系统中选取其他实例作为第二生产者节点,并通过第二生产者节点到生产者数据库中查询事务消息的处理结果,保证了生产者节点在故障时也可以获取到事务处理结果信息。In this embodiment, other instances can be selected as the second producer node in the distributed system, and the second producer node can query the producer database for the processing result of the transaction message, which ensures that the producer node can also Obtain the transaction result information.

在本实施例的一些可选的实现方式中,基于消息队列的信息处理装置300还可以包括静置删除模块,静置删除模块用于当根据事务处理结果信息确定生产者节点对事务消息处理失败时,将事务消息从静置消息队列中删除。In some optional implementations of this embodiment, the message queue-based information processing apparatus 300 may further include a static deletion module, and the static deletion module is configured to determine that the producer node fails to process the transaction message according to the transaction processing result information , the transaction message is removed from the quiescent message queue.

本实施例中,当生产者节点对事务消息处理失败时,将静置消息队列中的事务消息删除,从而清理无用的事务消息,也避免了消息积压。In this embodiment, when the producer node fails to process the transaction message, the transaction message in the static message queue is deleted, so as to clear the useless transaction message and avoid the message backlog.

在本实施例的一些可选的实现方式中,基于消息队列的信息处理装置300还可以包括:反馈获取模块以及删除模块,其中:In some optional implementations of this embodiment, the message queue-based information processing apparatus 300 may further include: a feedback acquisition module and a deletion module, wherein:

反馈获取模块,用于获取消费者节点根据事务消息返回的消费反馈信息。The feedback acquisition module is used to acquire the consumption feedback information returned by the consumer node according to the transaction message.

删除模块,用于当根据消费反馈信息确定消费者节点对事务消息处理成功时,将事务消息从待处理消息队列中删除。The deletion module is used to delete the transaction message from the pending message queue when it is determined according to the consumption feedback information that the transaction message is successfully processed by the consumer node.

本实施例中,当根据消费反馈信息确定消费者节点对事务消息处理成功时,将事务消息从待处理消息队列中删除,避免消息挤压,也完成分布式系统中消息的一致性核对。In this embodiment, when it is determined that the consumer node successfully processes the transaction message according to the consumption feedback information, the transaction message is deleted from the to-be-processed message queue to avoid message squeeze and also complete the consistency check of messages in the distributed system.

在本实施例的一些可选的实现方式中,基于消息队列的信息处理装置300还可以包括重试转移模块,重试转移模块用于当根据消费反馈消息确定消费者节点对事务消息处理失败时,将事务消息转移到重试消息队列,重试消息队列中的事务消息用于供消费者节点重新消费。In some optional implementations of this embodiment, the message queue-based information processing apparatus 300 may further include a retry transfer module, and the retry transfer module is used when it is determined according to the consumption feedback message that the consumer node fails to process the transaction message , transfer the transaction message to the retry message queue, and the transaction message in the retry message queue is used for re-consumption by the consumer node.

本实施例中,当根据消费反馈消息确定消费者节点对事务消息处理失败时,将事务消息转移到重试消息队列,以便消费者节点重新进行消费,确保生产者节点和消费者节点中消息的最终一致性。In this embodiment, when it is determined that the consumer node fails to process the transaction message according to the consumption feedback message, the transaction message is transferred to the retry message queue, so that the consumer node can re-consume, and the message in the producer node and the consumer node is ensured. eventual consistency.

在本实施例的一些可选的实现方式中,基于消息队列的信息处理装置300还可以包括:备份转移模块以及备份删除模块,其中:In some optional implementations of this embodiment, the message queue-based information processing apparatus 300 may further include: a backup transfer module and a backup deletion module, wherein:

备份转移模块,用于当根据消费反馈消息确定消费者节点对事务消息处理失败时,将事务消息转移到备份消息队列,并指示消费者节点将事务消息存储到数据库,数据库中的事务消息用于供消费者节点重新消费。The backup transfer module is used to transfer the transaction message to the backup message queue when it is determined according to the consumption feedback message that the consumer node fails to process the transaction message, and instruct the consumer node to store the transaction message in the database, and the transaction message in the database is used for For the consumer node to re-consume.

备份删除模块,用于当消费者节点对事务消息处理成功时,将事务消息从备份消息队列中删除。The backup deletion module is used to delete the transaction message from the backup message queue when the consumer node successfully processes the transaction message.

本实施例中,当消费者节点对事务消息处理失败时,可以将事务消息转移到备份消息队列中,供消费者节点进行消费;同时,消费者节点可以将事务消息持久化存储到数据库中,以便重新进行消费;备份消息队列和数据库的双重存储保证了事务消息的安全性,也确保了分布式系统中消息的最终一致性。In this embodiment, when the consumer node fails to process the transaction message, the transaction message can be transferred to the backup message queue for consumption by the consumer node; at the same time, the consumer node can persistently store the transaction message in the database, In order to re-consume; the double storage of backup message queue and database ensures the security of transaction messages and the eventual consistency of messages in distributed systems.

为解决上述技术问题,本申请实施例还提供计算机设备。具体请参阅图4,图4为本实施例计算机设备基本结构框图。To solve the above technical problems, the embodiments of the present application also provide computer equipment. Please refer to FIG. 4 for details. FIG. 4 is a block diagram of a basic structure of a computer device according to this embodiment.

所述计算机设备4包括通过系统总线相互通信连接存储器41、处理器42、网络接口43。需要指出的是,图中仅示出了具有组件41-43的计算机设备4,但是应理解的是,并不要求实施所有示出的组件,可以替代的实施更多或者更少的组件。其中,本技术领域技术人员可以理解,这里的计算机设备是一种能够按照事先设定或存储的指令,自动进行数值计算和/或信息处理的设备,其硬件包括但不限于微处理器、专用集成电路(ApplicationSpecific Integrated Circuit,ASIC)、可编程门阵列(Field-Programmable GateArray,FPGA)、数字处理器(Digital Signal Processor,DSP)、嵌入式设备等。The computer device 4 includes amemory 41, aprocessor 42, and anetwork interface 43 that communicate with each other through a system bus. It should be noted that only the computer device 4 with components 41-43 is shown in the figure, but it should be understood that it is not required to implement all of the shown components, and more or less components may be implemented instead. Among them, those skilled in the art can understand that the computer device here is a device that can automatically perform numerical calculation and/or information processing according to pre-set or stored instructions, and its hardware includes but is not limited to microprocessors, special-purpose Integrated circuit (ApplicationSpecific Integrated Circuit, ASIC), programmable gate array (Field-Programmable GateArray, FPGA), digital processor (Digital Signal Processor, DSP), embedded equipment, etc.

所述计算机设备可以是桌上型计算机、笔记本、掌上电脑及云端服务器等计算设备。所述计算机设备可以与用户通过键盘、鼠标、遥控器、触摸板或声控设备等方式进行人机交互。The computer equipment may be a desktop computer, a notebook computer, a palmtop computer, a cloud server and other computing equipment. The computer device can perform human-computer interaction with the user through a keyboard, a mouse, a remote control, a touch pad or a voice control device.

所述存储器41至少包括一种类型的可读存储介质,所述可读存储介质包括闪存、硬盘、多媒体卡、卡型存储器(例如,SD或DX存储器等)、随机访问存储器(RAM)、静态随机访问存储器(SRAM)、只读存储器(ROM)、电可擦除可编程只读存储器(EEPROM)、可编程只读存储器(PROM)、磁性存储器、磁盘、光盘等。在一些实施例中,所述存储器41可以是所述计算机设备4的内部存储单元,例如该计算机设备4的硬盘或内存。在另一些实施例中,所述存储器41也可以是所述计算机设备4的外部存储设备,例如该计算机设备4上配备的插接式硬盘,智能存储卡(Smart Media Card,SMC),安全数字(Secure Digital,SD)卡,闪存卡(FlashCard)等。当然,所述存储器41还可以既包括所述计算机设备4的内部存储单元也包括其外部存储设备。本实施例中,所述存储器41通常用于存储安装于所述计算机设备4的操作系统和各类应用软件,例如基于消息队列的信息处理方法的计算机可读指令等。此外,所述存储器41还可以用于暂时地存储已经输出或者将要输出的各类数据。Thememory 41 includes at least one type of readable storage medium, and the readable storage medium includes flash memory, hard disk, multimedia card, card-type memory (for example, SD or DX memory, etc.), random access memory (RAM), static Random Access Memory (SRAM), Read Only Memory (ROM), Electrically Erasable Programmable Read Only Memory (EEPROM), Programmable Read Only Memory (PROM), Magnetic Memory, Magnetic Disk, Optical Disk, etc. In some embodiments, thememory 41 may be an internal storage unit of the computer device 4 , such as a hard disk or a memory of the computer device 4 . In other embodiments, thememory 41 may also be an external storage device of the computer device 4 , such as a plug-in hard disk, a smart memory card (Smart Media Card, SMC), a secure digital (Secure Digital, SD) card, flash memory card (FlashCard) and so on. Of course, thememory 41 may also include both the internal storage unit of the computer device 4 and its external storage device. In this embodiment, thememory 41 is generally used to store the operating system and various application software installed on the computer device 4 , such as computer-readable instructions of a message queue-based information processing method, and the like. In addition, thememory 41 can also be used to temporarily store various types of data that have been output or will be output.

所述处理器42在一些实施例中可以是中央处理器(Central Processing Unit,CPU)、控制器、微控制器、微处理器、或其他数据处理芯片。该处理器42通常用于控制所述计算机设备4的总体操作。本实施例中,所述处理器42用于运行所述存储器41中存储的计算机可读指令或者处理数据,例如运行所述基于消息队列的信息处理方法的计算机可读指令。Theprocessor 42 may be a central processing unit (Central Processing Unit, CPU), a controller, a microcontroller, a microprocessor, or other data processing chips in some embodiments. Thisprocessor 42 is typically used to control the overall operation of the computer device 4 . In this embodiment, theprocessor 42 is configured to execute computer-readable instructions or process data stored in thememory 41, for example, computer-readable instructions for executing the message queue-based information processing method.

所述网络接口43可包括无线网络接口或有线网络接口,该网络接口43通常用于在所述计算机设备4与其他电子设备之间建立通信连接。Thenetwork interface 43 may include a wireless network interface or a wired network interface, and thenetwork interface 43 is generally used to establish a communication connection between the computer device 4 and other electronic devices.

本实施例中提供的计算机设备可以执行上述基于消息队列的信息处理方法。此处基于消息队列的信息处理方法可以是上述各个实施例的基于消息队列的信息处理方法。The computer device provided in this embodiment can execute the above message queue-based information processing method. The message queue-based information processing method here may be the message queue-based information processing method of the above embodiments.

本实施例中,对事务消息和消息队列进行改造,给事务消息添加消息状态标识,消息状态标识用于标识事务消息是否可以被消费者节点进行消费;可以设置两个并列的消息队列,分别存储不可被消费的事务消息,以及可以被消费的事务消息;当检测到生产者节点发送的事务消息中的消息状态标识为不可被消费的静置标识时,将事务消息加入静置消息队列;生产者节点可以根据对事务消息的处理结果生成事务处理结果信息;若消息队列服务器根据事务处理结果信息确定生产者节点对事务消息处理通过时,再将消息状态标识修改为表示可以被消费的可处理标识,并将事务消息转移到待处理消息队列,以便消费者节点进行消费;本申请通过消息状态标识和两个消息队列控制事务消息的处理进程,保证了不同节点之间数据的一致性。In this embodiment, the transaction message and the message queue are transformed, and a message state identifier is added to the transaction message, and the message state identifier is used to identify whether the transaction message can be consumed by the consumer node; two parallel message queues can be set up to store the Transaction messages that cannot be consumed, and transaction messages that can be consumed; when it is detected that the message status in the transaction message sent by the producer node is a static flag that cannot be consumed, the transaction message is added to the static message queue; production The producer node can generate transaction processing result information according to the processing result of the transaction message; if the message queue server determines that the producer node has passed the processing of the transaction message according to the transaction processing result information, the message status flag is modified to indicate that it can be consumed. ID, and transfer the transaction message to the pending message queue for consumption by the consumer node; this application controls the processing process of the transaction message through the message status ID and two message queues to ensure the consistency of data between different nodes.

本申请还提供了另一种实施方式,即提供一种计算机可读存储介质,所述计算机可读存储介质存储有计算机可读指令,所述计算机可读指令可被至少一个处理器执行,以使所述至少一个处理器执行如上述的基于消息队列的信息处理方法的步骤。The present application also provides another embodiment, which is to provide a computer-readable storage medium, where the computer-readable storage medium stores computer-readable instructions, and the computer-readable instructions can be executed by at least one processor to The at least one processor is caused to execute the steps of the above-mentioned message queue-based information processing method.

本实施例中,对事务消息和消息队列进行改造,给事务消息添加消息状态标识,消息状态标识用于标识事务消息是否可以被消费者节点进行消费;可以设置两个并列的消息队列,分别存储不可被消费的事务消息,以及可以被消费的事务消息;当检测到生产者节点发送的事务消息中的消息状态标识为不可被消费的静置标识时,将事务消息加入静置消息队列;生产者节点可以根据对事务消息的处理结果生成事务处理结果信息;若消息队列服务器根据事务处理结果信息确定生产者节点对事务消息处理通过时,再将消息状态标识修改为表示可以被消费的可处理标识,并将事务消息转移到待处理消息队列,以便消费者节点进行消费;本申请通过消息状态标识和两个消息队列控制事务消息的处理进程,保证了不同节点之间数据的一致性。In this embodiment, the transaction message and the message queue are transformed, and a message state identifier is added to the transaction message, and the message state identifier is used to identify whether the transaction message can be consumed by the consumer node; two parallel message queues can be set up to store the Transaction messages that cannot be consumed, and transaction messages that can be consumed; when it is detected that the message status in the transaction message sent by the producer node is a static flag that cannot be consumed, the transaction message is added to the static message queue; production The producer node can generate transaction processing result information according to the processing result of the transaction message; if the message queue server determines that the producer node has passed the processing of the transaction message according to the transaction processing result information, the message status flag is modified to indicate that it can be consumed. ID, and transfer the transaction message to the pending message queue for consumption by the consumer node; this application controls the processing process of the transaction message through the message status ID and two message queues to ensure the consistency of data between different nodes.

通过以上的实施方式的描述,本领域的技术人员可以清楚地了解到上述实施例方法可借助软件加必需的通用硬件平台的方式来实现,当然也可以通过硬件,但很多情况下前者是更佳的实施方式。基于这样的理解,本申请的技术方案本质上或者说对现有技术做出贡献的部分可以以软件产品的形式体现出来,该计算机软件产品存储在一个存储介质(如ROM/RAM、磁碟、光盘)中,包括若干指令用以使得一台终端设备(可以是手机,计算机,服务器,空调器,或者网络设备等)执行本申请各个实施例所述的方法。From the description of the above embodiments, those skilled in the art can clearly understand that the methods of the above embodiments can be implemented by means of software plus a necessary general hardware platform, and of course hardware can also be used, but in many cases the former is better implementation. Based on this understanding, the technical solutions of the present application can be embodied in the form of software products in essence or the parts that make contributions to the prior art, and the computer software products are stored in a storage medium (such as ROM/RAM, magnetic disk, CD-ROM), including several instructions to make a terminal device (which may be a mobile phone, a computer, a server, an air conditioner, or a network device, etc.) to execute the methods described in the various embodiments of this application.

显然,以上所描述的实施例仅仅是本申请一部分实施例,而不是全部的实施例,附图中给出了本申请的较佳实施例,但并不限制本申请的专利范围。本申请可以以许多不同的形式来实现,相反地,提供这些实施例的目的是使对本申请的公开内容的理解更加透彻全面。尽管参照前述实施例对本申请进行了详细的说明,对于本领域的技术人员来而言,其依然可以对前述各具体实施方式所记载的技术方案进行修改,或者对其中部分技术特征进行等效替换。凡是利用本申请说明书及附图内容所做的等效结构,直接或间接运用在其他相关的技术领域,均同理在本申请专利保护范围之内。Obviously, the above-described embodiments are only a part of the embodiments of the present application, rather than all of the embodiments. The accompanying drawings show the preferred embodiments of the present application, but do not limit the scope of the patent of the present application. This application may be embodied in many different forms, rather these embodiments are provided so that a thorough and complete understanding of the disclosure of this application is provided. Although the present application has been described in detail with reference to the foregoing embodiments, those skilled in the art can still modify the technical solutions described in the foregoing specific embodiments, or perform equivalent replacements for some of the technical features. . Any equivalent structures made by using the contents of the description and drawings of this application, which are directly or indirectly used in other related technical fields, are all within the scope of protection of the patent of this application.

Claims (10)

Translated fromChinese
1.一种基于消息队列的信息处理方法,其特征在于,包括下述步骤:1. an information processing method based on message queue, is characterized in that, comprises the following steps:接收生产者节点发送的事务消息;Receive transaction messages sent by producer nodes;当检测到所述事务消息中的消息状态标识为静置标识时,将所述事务消息加入静置消息队列,所述静置消息队列中的事务消息不可被消费者节点进行消费;以及When it is detected that the message state identifier in the transaction message is a static identifier, the transaction message is added to a static message queue, and the transaction message in the static message queue cannot be consumed by the consumer node; and接收所述生产者节点发送的事务处理结果信息,所述事务处理结果信息由所述生产者节点根据对所述事务消息的处理结果生成;receiving transaction processing result information sent by the producer node, where the transaction processing result information is generated by the producer node according to the processing result of the transaction message;当根据所述事务处理结果信息确定所述生产者节点对所述事务消息处理通过时,将所述消息状态标识修改为可处理标识,并将所述事务消息转移到待处理消息队列,所述待处理消息队列中的事务消息可被消费者节点进行消费。When it is determined according to the transaction processing result information that the producer node has passed the processing of the transaction message, the message state identifier is modified to a processable identifier, and the transaction message is transferred to the message queue to be processed, and the Transaction messages in the pending message queue can be consumed by consumer nodes.2.根据权利要求1所述的基于消息队列的信息处理方法,其特征在于,在所述当检测到所述事务消息中的消息状态标识为静置标识时,将所述事务消息加入静置消息队列的步骤之后,还包括:2 . The message queue-based information processing method according to claim 1 , wherein, when the message state identifier in the transaction message is detected as a static identifier, the transaction message is added to the static identifier. 3 . After the steps of the message queue, also include:获取所述事务消息的已接收时长;obtain the received duration of the transaction message;当所述已接收时长超过预设时长、且未接收到事务处理结果信息时,向所述生产者节点发送处理结果校验指令,所述处理结果校验指令用于指示所述生产者节点校验对所述事务消息的处理结果,并根据所述处理结果生成事务处理结果信息。When the received duration exceeds the preset duration and no transaction processing result information is received, a processing result verification instruction is sent to the producer node, and the processing result verification instruction is used to instruct the producer node to verify The processing result of the transaction message is verified, and transaction processing result information is generated according to the processing result.3.根据权利要求2所述的基于消息队列的信息处理方法,其特征在于,在所述获取所述事务消息的已接收时长的步骤之后,还包括:3. The message queue-based information processing method according to claim 2, wherein after the step of acquiring the received duration of the transaction message, further comprising:当所述已接收时长超过预设时长、且未接收到事务处理结果信息时,在所述生产者节点所在的分布式系统中,选取第二生产者节点;When the received duration exceeds the preset duration and no transaction processing result information is received, select a second producer node in the distributed system where the producer node is located;向所述第二生产者节点发送处理结果校验指令,所述处理结果校验指令用于指示所述第二生产者节点在生产者数据库中查询所述事务消息的处理结果,并根据所述处理结果生成事务处理结果信息。Send a processing result verification instruction to the second producer node, where the processing result verification instruction is used to instruct the second producer node to query the processing result of the transaction message in the producer database, and according to the The processing result generates transaction processing result information.4.根据权利要求1所述的基于消息队列的信息处理方法,其特征在于,在所述接收所述生产者节点发送的事务处理结果信息的步骤之后,还包括:4. The message queue-based information processing method according to claim 1, wherein after the step of receiving the transaction processing result information sent by the producer node, the method further comprises:当根据所述事务处理结果信息确定所述生产者节点对所述事务消息处理失败时,将所述事务消息从所述静置消息队列中删除。When it is determined according to the transaction processing result information that the producer node fails to process the transaction message, the transaction message is deleted from the static message queue.5.根据权利要求1所述的基于消息队列的信息处理方法,其特征在于,在所述当根据所述事务处理结果信息确定所述生产者节点对所述事务消息处理通过时,将所述消息状态标识修改为可处理标识,并将所述事务消息转移到待处理消息队列的步骤之后,还包括:5 . The message queue-based information processing method according to claim 1 , wherein when it is determined according to the transaction processing result information that the producer node has passed the processing of the transaction message, the After the step of modifying the message state identifier to a processable identifier, and transferring the transaction message to the message queue to be processed, the method further includes:获取所述消费者节点根据所述事务消息返回的消费反馈信息;obtaining the consumption feedback information returned by the consumer node according to the transaction message;当根据所述消费反馈信息确定所述消费者节点对所述事务消息处理成功时,将所述事务消息从所述待处理消息队列中删除。When it is determined according to the consumption feedback information that the transaction message is successfully processed by the consumer node, the transaction message is deleted from the to-be-processed message queue.6.根据权利要求5所述的基于消息队列的信息处理方法,其特征在于,在所述获取所述消费者节点根据所述事务消息返回的消费反馈信息的步骤之后,还包括:6. The message queue-based information processing method according to claim 5, wherein after the step of acquiring the consumption feedback information returned by the consumer node according to the transaction message, further comprising:当根据所述消费反馈消息确定所述消费者节点对所述事务消息处理失败时,将所述事务消息转移到重试消息队列,所述重试消息队列中的事务消息用于供所述消费者节点重新消费。When it is determined according to the consumption feedback message that the consumer node fails to process the transaction message, the transaction message is transferred to a retry message queue, and the transaction message in the retry message queue is used for the consumption The sender node re-consumes.7.根据权利要求5所述的基于消息队列的信息处理方法,其特征在于,在所述获取所述消费者节点根据所述事务消息返回的消费反馈信息的步骤之后,还包括:7. The message queue-based information processing method according to claim 5, wherein after the step of acquiring the consumption feedback information returned by the consumer node according to the transaction message, further comprising:当根据所述消费反馈消息确定所述消费者节点对所述事务消息处理失败时,将所述事务消息转移到备份消息队列,并指示所述消费者节点将所述事务消息存储到数据库,所述数据库中的事务消息用于供所述消费者节点重新消费;When it is determined that the consumer node fails to process the transaction message according to the consumption feedback message, the transaction message is transferred to the backup message queue, and the consumer node is instructed to store the transaction message in the database, so The transaction message in the database is used for re-consumption by the consumer node;当所述消费者节点对所述事务消息处理成功时,将所述事务消息从所述备份消息队列中删除。When the transaction message is successfully processed by the consumer node, the transaction message is deleted from the backup message queue.8.一种基于消息队列的信息处理装置,其特征在于,包括:8. An information processing device based on message queue, characterized in that, comprising:事务消息接收模块,用于接收生产者节点发送的事务消息;The transaction message receiving module is used to receive transaction messages sent by the producer node;静置加入模块,用于当检测到所述事务消息中的消息状态标识为静置标识时,将所述事务消息加入静置消息队列,所述静置消息队列中的事务消息不可被消费者节点进行消费;以及The static join module is used to add the transaction message to the static message queue when it is detected that the message state in the transaction message is identified as the static identifier, and the transaction message in the static message queue cannot be consumed by consumers node for consumption; and结果信息接收模块,用于接收所述生产者节点发送的事务处理结果信息,所述事务处理结果信息由所述生产者节点根据对所述事务消息的处理结果生成;a result information receiving module, configured to receive transaction processing result information sent by the producer node, where the transaction processing result information is generated by the producer node according to the processing result of the transaction message;消息转移模块,用于当根据所述事务处理结果信息确定所述生产者节点对所述事务消息处理通过时,将所述消息状态标识修改为可处理标识,并将所述事务消息转移到待处理消息队列,所述待处理消息队列中的事务消息可被消费者节点进行消费。A message transfer module is configured to modify the message status identifier to a processable identifier when it is determined that the producer node has passed the processing of the transaction message according to the transaction processing result information, and transfer the transaction message to the pending transaction message. A message queue is processed, and transaction messages in the to-be-processed message queue can be consumed by the consumer node.9.一种计算机设备,包括存储器和处理器,所述存储器中存储有计算机可读指令,所述处理器执行所述计算机可读指令时实现如权利要求1至7中任一项所述的基于消息队列的信息处理方法的步骤。9. A computer device comprising a memory and a processor, wherein computer-readable instructions are stored in the memory, the processor implementing the computer-readable instructions as claimed in any one of claims 1 to 7 when the processor executes the computer-readable instructions Steps of a message queue-based information processing method.10.一种计算机可读存储介质,其特征在于,所述计算机可读存储介质上存储有计算机可读指令,所述计算机可读指令被处理器执行时实现如权利要求1至7中任一项所述的基于消息队列的信息处理方法的步骤。10. A computer-readable storage medium, wherein computer-readable instructions are stored on the computer-readable storage medium, and when the computer-readable instructions are executed by a processor, any one of claims 1 to 7 is implemented. The steps of the message queue-based information processing method described in item.
CN202210237120.8A2022-03-112022-03-11Information processing method and device based on message queue and computer equipmentPendingCN114637611A (en)

Priority Applications (1)

Application NumberPriority DateFiling DateTitle
CN202210237120.8ACN114637611A (en)2022-03-112022-03-11Information processing method and device based on message queue and computer equipment

Applications Claiming Priority (1)

Application NumberPriority DateFiling DateTitle
CN202210237120.8ACN114637611A (en)2022-03-112022-03-11Information processing method and device based on message queue and computer equipment

Publications (1)

Publication NumberPublication Date
CN114637611Atrue CN114637611A (en)2022-06-17

Family

ID=81946945

Family Applications (1)

Application NumberTitlePriority DateFiling Date
CN202210237120.8APendingCN114637611A (en)2022-03-112022-03-11Information processing method and device based on message queue and computer equipment

Country Status (1)

CountryLink
CN (1)CN114637611A (en)

Cited By (3)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
CN115379012A (en)*2022-10-252022-11-22航天云网数据研究院(广东)有限公司Industrial interconnection platform message queue deployment method and device based on identification analysis
CN116112880A (en)*2022-11-092023-05-12上海创蓝云智信息科技股份有限公司MQ-based high concurrency short message sending method and device
CN118963927A (en)*2024-05-172024-11-15灵岫科技(深圳)有限公司 Implementation method of database and transaction integration based on message queue

Citations (2)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
CN108153598A (en)*2017-12-252018-06-12东软集团股份有限公司 Data consistency method and device based on microservice architecture
CN112822091A (en)*2019-11-182021-05-18北京京东尚科信息技术有限公司Message processing method and device

Patent Citations (2)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
CN108153598A (en)*2017-12-252018-06-12东软集团股份有限公司 Data consistency method and device based on microservice architecture
CN112822091A (en)*2019-11-182021-05-18北京京东尚科信息技术有限公司Message processing method and device

Non-Patent Citations (1)

* Cited by examiner, † Cited by third party
Title
徐进 等: "基于消息通信的分布式系统最终一致性平台", 《计算机应用》, vol. 37, no. 04, 10 April 2017 (2017-04-10), pages 1157 - 1163*

Cited By (4)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
CN115379012A (en)*2022-10-252022-11-22航天云网数据研究院(广东)有限公司Industrial interconnection platform message queue deployment method and device based on identification analysis
CN116112880A (en)*2022-11-092023-05-12上海创蓝云智信息科技股份有限公司MQ-based high concurrency short message sending method and device
CN116112880B (en)*2022-11-092023-09-08上海创蓝云智信息科技股份有限公司 Highly concurrent SMS sending method and device based on MQ
CN118963927A (en)*2024-05-172024-11-15灵岫科技(深圳)有限公司 Implementation method of database and transaction integration based on message queue

Similar Documents

PublicationPublication DateTitle
CN112380227B (en)Data synchronization method, device, equipment and storage medium based on message queue
CN114637611A (en)Information processing method and device based on message queue and computer equipment
CN112035144B (en)Upgrading method and device of block chain system, computer equipment and storage medium
CN111338834B (en)Data storage method and device
CN112328217A (en)Code deployment method, device, system, computer equipment and medium
CN114640503A (en)Application system verification method and device, computer equipment and storage medium
WO2022156087A1 (en)Data blood relationship establishing method and apparatus, computer device, and storage medium
CN114520807A (en)File uploading and downloading method and device, computer equipment and storage medium
CN112631884A (en)Pressure measurement method and device based on data synchronization, computer equipment and storage medium
CN114143308A (en)File uploading information processing method and device, computer equipment and storage medium
CN109088914B (en) Block generation method, blockchain ecosystem, and computer-readable storage medium
CN111722946A (en)Distributed transaction processing method and device, computer equipment and readable storage medium
CN115203262A (en)Database writing method, device, equipment and storage medium based on multi-writing component
CN114661523B (en) Data backup method, device, program product, medium and electronic device
CN115455058A (en)Cache data processing method and device, computer equipment and storage medium
CN115167769A (en)Method, device and system for writing data
CN114615325A (en) Message push method, device, computer equipment and storage medium
CN111311374A (en)University student-based idle commodity exchange method, device, equipment and storage medium
CN107657155B (en)Method and device for authenticating user operation authority
CN112860796A (en)Method, apparatus, device and storage medium for synchronizing data
CN116894010A (en)Resource state processing method and device, electronic equipment and storage medium
CN114186976A (en) Workflow flow method, device, computer equipment and storage medium
CN114968822A (en) Interface testing method, device, computer equipment and storage medium
CN115827778A (en) A data acquisition method, device, electronic equipment and storage medium
CN114218191A (en)System function migration method and device, computer equipment and storage medium

Legal Events

DateCodeTitleDescription
PB01Publication
PB01Publication
SE01Entry into force of request for substantive examination
SE01Entry into force of request for substantive examination

[8]ページ先頭

©2009-2025 Movatter.jp