本申请要求于2019年08月27日提交中国专利局、申请号为201910799567.2、发明名称为“分布式数据同步方法、装置、设备及可读存储介质”的中国专利申请的优先权,其全部内容通过引用结合在申请中。This application claims the priority of a Chinese patent application filed with the Chinese Patent Office on August 27, 2019, the application number is 201910799567. 2, the invention title is "distributed data synchronization method, device, equipment, and readable storage medium", the entire content of the Chinese patent application Incorporated in the application by reference.
本发明涉及金融科技(Fintech)技术领域,尤其涉及金融行业的分布式数据同步方法、装置、设备及可读存储介质。The present invention relates to the technical field of financial technology (Fintech), in particular to a distributed data synchronization method, device, equipment and readable storage medium in the financial industry.
随着计算机技术的发展,越来越多的技术(如分布式、区块链Blockchain、人工智能等)应用在金融领域,传统金融业正在逐步向金融科技(Fintech)转变,越来越多的技术应用于金融行业。金融行业的开发人员,通常会用到分布式系统,现有技术的分布式系统的数据同步方案,将数据从上游A系统,同步到下游B系统中,主要采用以下方案:With the development of computer technology, more and more technologies (such as distributed, blockchain, artificial intelligence, etc.) are applied in the financial field. The traditional financial industry is gradually transforming to Fintech, and more and more Technology is applied to the financial industry. Developers in the financial industry usually use distributed systems. The data synchronization scheme of the existing technology distributed system to synchronize data from the upstream system A to the downstream system B mainly adopts the following schemes:
第一种是使用支持事务的MessageQueue中间件,系统A对数据进行修改时,在数据修改的事务中启用分布式两阶段提交事务,保证数据插入联机事务数据库,同时,把数据持久化同步到MessageQueue中间件中,并且,系统B,启动分布式两阶段提交事务,接收到MessageQueue中间件中的数据,并提交到系统B的本地库后,才提交事务。然而,该方案使用分布式事务,对性能的损耗很大。The first is to use MessageQueue middleware that supports transactions. When system A modifies data, it enables a distributed two-phase commit transaction in the data modification transaction to ensure that the data is inserted into the online transaction database, and at the same time, the data persistence is synchronized to the MessageQueue In the middleware, system B starts a distributed two-phase commit transaction, receives the data in the MessageQueue middleware, and submits it to the local library of system B before committing the transaction. However, this scheme uses distributed transactions, which has a great loss in performance.
为了解决上述问题,现有技术提出了第二种方案,具体为:抽取Binlog日志并发送到分布式消息系统Kafka,且数据接收端准实时处理分布式消息系统Kafka中的消息,从而实现准实时地把数据从系统A同步到系统B的目的。但是,该方案的Kafka集群存在丢失消息的可能性,以及存在Kafka集群整体不可用而导致数据同步中断的可能,无法满足高可靠、高可用的要求。In order to solve the above problems, the prior art proposes a second solution, which is specifically: extracting Binlog logs and sending them to the distributed messaging system Kafka, and the data receiving end processes the messages in the distributed messaging system Kafka in quasi real-time, thereby achieving quasi real-time The purpose of synchronizing data from system A to system B. However, the Kafka cluster of this scheme has the possibility of losing messages and the possibility of data synchronization interruption due to the overall unavailability of the Kafka cluster, which cannot meet the requirements of high reliability and high availability.
现有技术还提出了第三种方案,通过ETL工具,定时连接系统A和系统B的联机事务数据库,定义并执行数据抽取任务,从而把数据从系统A同步到系统B,然而,该方案的数据延迟太长,达不到快速同步中的秒级速度。The prior art also proposes a third solution. Through the ETL tool, the online transaction database of System A and System B is regularly connected, and data extraction tasks are defined and executed, thereby synchronizing data from System A to System B. However, the solution is The data delay is too long to reach the second-level speed in fast synchronization.
本发明的主要目的在于提出一种分布式数据同步方法、装置、设备及可读存储介质,旨在解决现有技术中在将上游系统的数据同步到下游系统时无法满足高可靠、高可用要求的问题。The main purpose of the present invention is to propose a distributed data synchronization method, device, equipment and readable storage medium, which aims to solve the problem that the prior art cannot meet the requirements of high reliability and high availability when synchronizing the data of the upstream system to the downstream system. The problem.
为实现上述目的,本发明提供一种分布式数据同步方法,所述分布式数据同步方法包括如下步骤:In order to achieve the above objective, the present invention provides a distributed data synchronization method, which includes the following steps:
在检测到上游应用系统发出待同步数据时,通过拦截器拦截所述待同步数据;When detecting that the upstream application system sends out the data to be synchronized, intercept the data to be synchronized through an interceptor;
通过所述拦截器将所述待同步数据分别发送至应用通道及更新日志通道,其中,所述应用通道和更新日志通道分别属于不同的下游应用系统;Sending the data to be synchronized to an application channel and an update log channel through the interceptor, where the application channel and the update log channel belong to different downstream application systems;
通过所述应用通道及更新日志通道分别对所述待同步数据进行消费处理后,保存到所述下游应用系统对应的存储模块中。After consumption processing is performed on the data to be synchronized through the application channel and the update log channel, respectively, the data is stored in the storage module corresponding to the downstream application system.
在一实施例中,所述通过所述拦截器将所述待同步数据分别发送至应用通道及更新日志通道的步骤包括:In an embodiment, the step of separately sending the data to be synchronized to the application channel and the update log channel through the interceptor includes:
通过所述拦截器将所述待同步数据发送至更新日志通道,且通过所述拦截器将所述待同步数据发送至数据同步器中;Sending the data to be synchronized to the update log channel through the interceptor, and sending the data to be synchronized to the data synchronizer through the interceptor;
所述数据同步器接收到所述待同步数据后,通过所述数据同步器将所述待同步数据异步发送至所述下游应用系统中第一应用系统的应用通道中。After the data synchronizer receives the data to be synchronized, the data synchronizer asynchronously sends the data to be synchronized to the application channel of the first application system in the downstream application system.
在一实施例中,所述通过所述拦截器将所述待同步数据发送至更新日志通道的步骤包括:In an embodiment, the step of sending the data to be synchronized to the update log channel through the interceptor includes:
通过所述拦截器将所述待同步数据保存到联机事务数据库中;Saving the to-be-synchronized data in the online transaction database through the interceptor;
所述联机事务数据库接收到所述待同步数据后,通过所述联机事务数据库获取所述待同步数据对应的更新日志信息;After the online transaction database receives the data to be synchronized, obtain the update log information corresponding to the data to be synchronized through the online transaction database;
通过所述联机事务数据库将所述更新日志信息转换为第一预设格式;Converting the update log information into a first preset format through the online transaction database;
通过所述联机事务数据库将所述第一预设格式对应的第一转换值发送至日志连接器;Sending the first converted value corresponding to the first preset format to the log connector through the online transaction database;
通过所述日志连接器将所述第一转换值发送至所述下游应用系统中第二应用系统的更新日志通道中。The first conversion value is sent to the update log channel of the second application system in the downstream application system through the log connector.
在一实施例中,所述存储模块为分布式存储系统Hbase中的唯一主键;所述通过所述应用通道及更新日志通道分别对所述待同步数据进行消费处理后,保存到所述下游应用系统对应的存储模块中的步骤包括:In one embodiment, the storage module is a unique primary key in the distributed storage system Hbase; after the data to be synchronized is consumed through the application channel and the update log channel, respectively, the data to be synchronized is stored in the downstream application The steps in the storage module corresponding to the system include:
通过所述应用通道对所述待同步数据进行消费处理得到第一数据结构,且通过所述更新日志通道对所述第一转换值进行消费处理得到第二数据结构;Performing consumption processing on the to-be-synchronized data through the application channel to obtain a first data structure, and performing consumption processing on the first converted value through the update log channel to obtain a second data structure;
将所述第一数据结构及所述第二数据结构保存至分布式存储系统Hbase中的唯一主键中。The first data structure and the second data structure are stored in a unique primary key in the distributed storage system Hbase.
在一实施例中,所述通过所述联机事务数据库获取所述待同步数据对应的更新日志信息的步骤之后,所述分布式数据同步方法包括:In an embodiment, after the step of obtaining the update log information corresponding to the data to be synchronized through the online transaction database, the distributed data synchronization method includes:
通过所述联机事务数据库,在预设时间范围内将所述更新日志信息发送至离线数据库;Sending the update log information to the offline database within a preset time range through the online transaction database;
通过所述离线数据库,将所述更新日志信息转换为第二预设格式;Converting the update log information into a second preset format through the offline database;
通过所述离线数据库,将所述第二预设格式对应的第二转换值发送至第三分布式应用集群中。Send the second converted value corresponding to the second preset format to the third distributed application cluster through the offline database.
在一实施例中,所述通过所述离线数据库,将所述第二预设格式对应的第二转换值发送至第三分布式应用集群中的步骤之后,所述分布式数据同步方法包括:In an embodiment, after the step of sending the second converted value corresponding to the second preset format to the third distributed application cluster through the offline database, the distributed data synchronization method includes:
通过所述第三分布式应用集群对所述第二转换值进行消费处理得到第三数据结构;Performing consumption processing on the second conversion value through the third distributed application cluster to obtain a third data structure;
将所述第三数据结构保存至所述分布式存储系统Hbase中的唯一主键中。The third data structure is stored in the unique primary key in the distributed storage system Hbase.
在一实施例中,所述将所述第三数据结构保存至所述分布式存储系统Hbase中的唯一主键中的步骤之后,所述分布式数据同步方法包括:In an embodiment, after the step of storing the third data structure in the unique primary key in the distributed storage system Hbase, the distributed data synchronization method includes:
获取所述第二预设格式对应的第二转换值;Acquiring a second conversion value corresponding to the second preset format;
通过所述分布式存储系统Hbase获取所述第三数据结构;Obtaining the third data structure through the distributed storage system Hbase;
比较所述第二转换值及第三数据结构;Comparing the second conversion value and the third data structure;
确定所述第三数据结构与所述第二转换值一致后,判定所述第二转换值全部保存在所述分布式存储系统Hbase中。After determining that the third data structure is consistent with the second conversion value, it is determined that all the second conversion values are stored in the distributed storage system Hbase.
此外,为实现上述目的,本发明还提供一种分布式数据同步装置,所述分布式数据同步装置包括:In addition, in order to achieve the above objective, the present invention also provides a distributed data synchronization device, the distributed data synchronization device including:
拦截模块,配置为检测到上游应用系统发出待同步数据后,通过拦截器拦截所述待同步数据;The interception module is configured to intercept the data to be synchronized through an interceptor after detecting that the upstream application system sends out the data to be synchronized;
发送模块,配置为通过所述拦截器将所述待同步数据分别发送至应用通道及更新日志通道,其中,所述应用通道和更新日志通道分别属于不同的下游应用系统;A sending module configured to send the data to be synchronized to an application channel and an update log channel through the interceptor, wherein the application channel and the update log channel belong to different downstream application systems;
保存模块,配置为通过所述应用通道及更新日志通道分别对所述待同步数据进行消费处理后,保存到所述下游应用系统对应的存储模块中。The saving module is configured to perform consumption processing on the to-be-synchronized data through the application channel and the update log channel respectively, and then save the data in the storage module corresponding to the downstream application system.
在一实施例中,所述发送模块还配置为:In an embodiment, the sending module is further configured to:
通过所述拦截器将所述待同步数据发送至更新日志通道,且通过所述拦截器将所述待同步数据发送至数据同步器中;Sending the data to be synchronized to the update log channel through the interceptor, and sending the data to be synchronized to the data synchronizer through the interceptor;
所述数据同步器接收到所述待同步数据后,通过所述数据同步器将所述待同步数据异步发送至所述下游应用系统中第一应用系统的应用通道中。After the data synchronizer receives the data to be synchronized, the data synchronizer asynchronously sends the data to be synchronized to the application channel of the first application system in the downstream application system.
在一实施例中,所述发送模块还配置为:In an embodiment, the sending module is further configured to:
通过所述拦截器将所述待同步数据保存到联机事务数据库中;Saving the to-be-synchronized data in the online transaction database through the interceptor;
确定所述联机事务数据库接收到所述待同步数据后,通过所述联机事务数据库获取所述待同步数据对应的更新日志信息;After determining that the online transaction database receives the data to be synchronized, obtain the update log information corresponding to the data to be synchronized through the online transaction database;
通过所述联机事务数据库将所述更新日志信息转换为第一预设格式;Converting the update log information into a first preset format through the online transaction database;
通过所述联机事务数据库将所述第一预设格式对应的第一转换值发送至日志连接器;Sending the first converted value corresponding to the first preset format to the log connector through the online transaction database;
通过所述日志连接器将所述第一转换值发送至所述下游应用系统中第二应用系统的更新日志通道中。The first conversion value is sent to the update log channel of the second application system in the downstream application system through the log connector.
此外,为实现上述目的,本发明还提供一种分布式数据同步设备,所述分布式数据同步设备包括:存储器、处理器及存储在所述存储器上并可在所述处理器上运行的分布式数据同步程序,所述分布式数据同步程序被所述处理器执行时实现如上所述的分布式数据同步方法的步骤。In addition, in order to achieve the above object, the present invention also provides a distributed data synchronization device, the distributed data synchronization device includes: a memory, a processor, and a distribution stored on the memory and running on the processor The distributed data synchronization program implements the steps of the distributed data synchronization method as described above when the distributed data synchronization program is executed by the processor.
此外,为实现上述目的,本发明还提供一种可读存储介质,所述可读存储介质上存储有分布式数据同步程序,所述分布式数据同步程序被处理器执行时实现如上所述的分布式数据同步方法的步骤。In addition, in order to achieve the above-mentioned object, the present invention also provides a readable storage medium having a distributed data synchronization program stored on the readable storage medium, and when the distributed data synchronization program is executed by a processor, the above-mentioned Steps of a distributed data synchronization method.
本发明提出的分布式数据同步方法,在上游应用系统添加一个拦截器,当检测到上游应用系统发出待同步数据时,拦截器拦截到待同步数据后,将待同步数据分别发送至应用通道和更新日志通道,并且,所述应用通道和更新日志通道分别属于不同的下游应用系统,即,同一条待同步数据采用冗余双发的形式,有效降低分布式应用系统Kafka的消息丢失几率;再者,分别发送至不同的下游应用系统中,可以是同城不同中心,也可以是异地,在通过应用通道及更新日志通道分别对所述待同步数据进行消费处理后,保存到下游应用系统对应的存储模块中后,实现容灾效果,并且,当上游应用系统或者下游应用系统的数据出问题时,通过应用通道和更新日志通道,对数据的同步延迟无任何影响。In the distributed data synchronization method proposed by the present invention, an interceptor is added to the upstream application system. When it is detected that the upstream application system sends out the data to be synchronized, the interceptor intercepts the data to be synchronized and sends the data to be synchronized to the application channel and The update log channel, and the application channel and the update log channel belong to different downstream application systems, that is, the same data to be synchronized is in the form of redundant dual sending, which effectively reduces the probability of message loss in the distributed application system Kafka; Send to different downstream application systems, which can be in different centers in the same city, or in different places. After consumption processing of the data to be synchronized through the application channel and the update log channel, the data is saved to the corresponding storage module of the downstream application system. After the middle and later, the disaster recovery effect is achieved, and when the data of the upstream application system or the downstream application system has problems, the application channel and the update log channel have no effect on the synchronization delay of the data.
图1是本发明实施例方案涉及的硬件运行环境的设备结构示意图;FIG. 1 is a schematic diagram of a device structure of a hardware operating environment involved in a solution of an embodiment of the present invention;
图2为本发明分布式数据同步方法第一实施例的流程示意图;2 is a schematic flowchart of the first embodiment of the distributed data synchronization method of the present invention;
图3为本发明分布式数据同步方法第二实施例的流程示意图;3 is a schematic flowchart of a second embodiment of a distributed data synchronization method according to the present invention;
图4为本发明分布式数据同步方法第三实施例的流程示意图;4 is a schematic flowchart of a third embodiment of a distributed data synchronization method according to the present invention;
图5为本发明分布式数据同步方法第四实施例的流程示意图;5 is a schematic flowchart of a fourth embodiment of a distributed data synchronization method according to the present invention;
图6为本发明分布式数据同步方法第四实施例的流程框图。Fig. 6 is a flowchart of a fourth embodiment of a distributed data synchronization method according to the present invention.
本发明目的的实现、功能特点及优点将结合实施例,参照附图做进一步说明。The realization of the objectives, functional characteristics and advantages of the present invention will be further described in conjunction with the embodiments and with reference to the accompanying drawings.
应当理解,此处所描述的具体实施例仅仅用以解释本发明,并不用于限定本发明。It should be understood that the specific embodiments described here are only used to explain the present invention, but not to limit the present invention.
如图1所示,图1是本发明实施例方案涉及的硬件运行环境的设备结构示意图。As shown in Fig. 1, Fig. 1 is a schematic diagram of the device structure of the hardware operating environment involved in the solution of the embodiment of the present invention.
本发明实施例分布式数据同步设备可以是PC机或服务器设备。The distributed data synchronization device in the embodiment of the present invention may be a PC or a server device.
如图1所示,该分布式数据同步设备可以包括:处理器1001,例如CPU,网络接口1004,用户接口1003,存储器1005,通信总线1002。其中,通信总线1002用于实现这些组件之间的连接通信。用户接口1003可以包括显示屏(Display)、输入单元比如键盘(Keyboard),可选用户接口1003还可以包括标准的有线接口、无线接口。网络接口1004可选的可以包括标准的有线接口、无线接口(如WI-FI接口)。存储器1005可以是高速RAM存储器,也可以是稳定的存储器(non-volatilememory),例如磁盘存储器。存储器1005可选的还可以是独立于前述处理器1001的存储装置。As shown in FIG. 1, the distributed data synchronization device may include: a processor 1001, such as a CPU, a network interface 1004, a user interface 1003, a memory 1005, and a communication bus 1002. Among them, the communication bus 1002 is used to implement connection and communication between these components. The user interface 1003 may include a display screen (Display) and an input unit such as a keyboard (Keyboard), and the optional user interface 1003 may also include a standard wired interface and a wireless interface. The network interface 1004 may optionally include a standard wired interface and a wireless interface (such as a WI-FI interface). The memory 1005 can be a high-speed RAM memory or a stable memory (non-volatilememory), such as disk storage. Optionally, the memory 1005 may also be a storage device independent of the aforementioned processor 1001.
本领域技术人员可以理解,图1中示出的设备结构并不构成对设备的限定,可以包括比图示更多或更少的部件,或者组合某些部件,或者不同的部件布置。Those skilled in the art can understand that the structure of the device shown in FIG. 1 does not constitute a limitation on the device, and may include more or fewer components than those shown in the figure, or a combination of certain components, or different component arrangements.
如图1所示,作为一种计算机可读存储介质的存储器1005中可以包括操作系统、网络通信模块、用户接口模块以及分布式数据同步程序。As shown in FIG. 1, the memory 1005, which is a computer-readable storage medium, may include an operating system, a network communication module, a user interface module, and a distributed data synchronization program.
在图1所示的设备中,网络接口1004主要用于连接后台服务器,与后台服务器进行数据通信;用户接口1003主要用于连接客户端,与客户端进行数据通信;而处理器1001可以用于调用存储器1005中存储的分布式数据同步程序,并执行下述分布式数据同步方法各个实施例中的操作。In the device shown in FIG. 1, the network interface 1004 is mainly used to connect to a back-end server and perform data communication with the back-end server; the user interface 1003 is mainly used to connect to a client and perform data communication with the client; and the processor 1001 can be used to The distributed data synchronization program stored in the memory 1005 is called, and the operations in each embodiment of the distributed data synchronization method described below are executed.
基于上述硬件结构,提出本发明分布式数据同步方法实施例。Based on the above hardware structure, an embodiment of the distributed data synchronization method of the present invention is proposed.
参照图2,图2为本发明分布式数据同步方法第一实施例的流程示意图,所述方法包括:Referring to Figure 2, Figure 2 is a schematic flowchart of a first embodiment of a distributed data synchronization method according to the present invention. The method includes:
步骤S10,检测到上游应用系统发出待同步数据后,通过拦截器拦截所述待同步数据;Step S10, after detecting that the upstream application system sends out the data to be synchronized, intercept the data to be synchronized through an interceptor;
本实施例中,上游应用系统和下游应用系统均属于分布式系统,分布式系统,是由一群独立计算机和系统集合共同对外提供服务,但是对于系统的用户来说,就像是一台计算机在提供服务一样,如此,一群独立的计算机和系统就是一个分布式系统,分布式系统是为了提高整个系统的性能、可靠性、可用性及可维护性。In this embodiment, both the upstream application system and the downstream application system belong to a distributed system. The distributed system is provided by a group of independent computers and a collection of systems to provide services to the outside world, but for the users of the system, it is like a computer in the system. Providing services is the same. In this way, a group of independent computers and systems is a distributed system. The distributed system is to improve the performance, reliability, availability and maintainability of the entire system.
现有的分布式消息系统,如Kafka系统,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,该分布式消息系统的最大特性就是可以实时的处理大量数据以满足各种需求场景,比如基于hadoop的批处理系统、低延迟的实时系统,尤其适合应用在金融行业的银行核心业务系统CoreBanking System。The existing distributed messaging system, such as the Kafka system, is a distributed messaging system that supports partitions, multiple replicas, and is coordinated by zookeeper. The biggest feature of this distributed messaging system is that it can Real-time processing of large amounts of data to meet various demand scenarios, such as Hadoop-based batch processing systems, low-latency real-time systems, especially suitable for the core banking business system Core used in the financial industryBanking System.
现有技术的分布式消息系统Kafka,在把数据从上游系统A,同步到下游系统B时,采用抽取更新日志Binlog的方式,其中,Binlog是一个二进制格式的文件,配置为记录用户对数据库更新的SQL语句信息,例如更改数据库表和更改内容的SQL语句都会记录到Binlog里。且数据接收端准实时处理分布式消息系统Kafka中的消息,从而实现准实时地把数据从系统A同步到系统B的目的。The existing distributed messaging system Kafka uses the method of extracting the update log Binlog when synchronizing data from the upstream system A to the downstream system B. Binlog is a binary format file configured to record user updates to the database. The SQL statement information, such as the SQL statement that changes the database table and the content of the change, will be recorded in Binlog. And the data receiving end processes the messages in the distributed messaging system Kafka in quasi real time, so as to achieve the purpose of synchronizing data from system A to system B in quasi real time.
然而,现有技术直接抽取更新日志Binlog的方式,很容易导致Kafka系统集群丢失信息的可能。However, the existing technology directly extracts the Binlog update log, which easily leads to the possibility of information loss in the Kafka system cluster.
本案中,在上游应用系统中,添加一个拦截器MyBatis Intercepter,即在银行核心业务系统Core BankingSystem中添加一个insert/update/delete方法的AOP的拦截器MyBatisIntercepter,其中,AOP为面向切面的编程技术,用于定义拦截器MyBatisIntercepter。拦截器MyBatisIntercepter用于拦截待同步数据,避免待同步数据直接被抽取更新日志Binlog,而导致Kafka系统集群丢失信息的问题。In this case, an interceptor, MyBatis Intercepter, was added to the upstream application system, that is, in the core banking system of the bank.Add an AOP interceptor MyBatis with insert/update/delete method to the SystemIntercepter, where AOP is an aspect-oriented programming technology used to define the interceptor MyBatisIntercepter. Interceptor MyBatisIntercepter is used to intercept the data to be synchronized and avoid the problem that the data to be synchronized is directly extracted to update the log Binlog, which causes the Kafka system cluster to lose information.
步骤S20,通过所述拦截器将所述待同步数据分别发送至应用通道及更新日志通道,其中,所述应用通道和更新日志通道分别属于不同的下游应用系统;Step S20: Send the to-be-synchronized data to the application channel and the update log channel through the interceptor, where the application channel and the update log channel belong to different downstream application systems;
该步骤中,通过所述拦截器将所述待同步数据分别发送至应用通道及更新日志通道,其中,拦截器将待同步数据发送至应用通道具体为:通过所述拦截器将所述待同步数据发送至数据同步器中;在所述数据同步器接收到所述待同步数据后,通过所述数据同步器将所述待同步数据异步发送至所述下游应用系统中第一应用系统的应用通道中。In this step, sending the data to be synchronized to the application channel and the update log channel through the interceptor, wherein the interceptor sending the data to be synchronized to the application channel specifically includes: transmitting the data to be synchronized through the interceptor Data is sent to the data synchronizer; after the data synchronizer receives the data to be synchronized, the data synchronizer asynchronously sends the data to be synchronized to the application of the first application system in the downstream application system In the channel.
需要说明的是,应用通道即app通道,更新日志通道即Binlog通道。Binlog是一个二进制格式的文件,用于记录用户对数据库更新的SQL语句信息,例如更改数据库表和更改内容的SQL语句都会记录到Binlog里。It should be noted that the application channel is the app channel, and the update log channel is the Binlog channel. Binlog is a binary format file used to record the SQL statement information updated by the user to the database. For example, the SQL statements that modify the database table and modify the content will be recorded in Binlog.
拦截器将待同步数据发送至更新日志通道具体为:通过所述拦截器将所述待同步数据保存到联机事务数据库中;所述联机事务数据库接收到所述待同步数据后,通过所述联机事务数据库获取所述待同步数据对应的更新日志信息;通过所述联机事务数据库将所述更新日志信息转换为第一预设格式;通过所述联机事务数据库将所述第一预设格式对应的第一转换值发送至日志连接器;通过所述日志连接器将所述第一转换值发送至所述下游应用系统中第二应用系统的更新日志通道中。The interceptor sending the to-be-synchronized data to the update log channel specifically includes: saving the to-be-synchronized data in the online transaction database through the interceptor; after the online transaction database receives the to-be-synchronized data, using the online transaction database The transaction database obtains the update log information corresponding to the data to be synchronized; converts the update log information into a first preset format through the online transaction database; converts the update log information corresponding to the first preset format through the online transaction database The first conversion value is sent to the log connector; the first conversion value is sent to the update log channel of the second application system in the downstream application system through the log connector.
通过将待同步数据进行冗余双发,即待同步数据不但发送到下游应用系统中第一应用系统配置的应用通道中,还发送至联机事务数据库进行一系列处理后发送至下游应用系统中第二应用系统配置的更新日志通道中,一份数据进行两个不同中心的分布式应用集群的发送处理,并且,应用通道与更新日志通道处理的数据结构一致,为相同的数据,因此,有效降低了待同步数据发送到下游应用系统时丢失的可能性。By redundantly sending the data to be synchronized, the data to be synchronized is not only sent to the application channel configured by the first application system in the downstream application system, but also sent to the online transaction database for a series of processing and then sent to the second application in the downstream application system In the update log channel of the system configuration, one piece of data is sent to the distributed application clusters of two different centers, and the data structure processed by the application channel and the update log channel is the same, which is the same data. Therefore, it effectively reduces the waiting time. The possibility of loss of synchronization data when sent to the downstream application system.
步骤S30,通过所述应用通道及更新日志通道分别对所述待同步数据进行消费处理后,保存到所述下游应用系统对应的存储模块中。In step S30, the data to be synchronized is respectively subjected to consumption processing through the application channel and the update log channel, and then stored in the storage module corresponding to the downstream application system.
该步骤中,在应用通道和更新日志通道分别接收到待同步数据后,应用通道及更新日志通道分别对所述待同步数据进行消费处理,并保存到所述下游应用系统对应的存储模块中。具体为:通过所述应用通道对所述待同步数据进行消费处理得到第一数据结构,且通过所述更新日志通道对所述第一转换值进行消费处理得到第二数据结构;将所述第一数据结构及所述第二数据结构保存至分布式存储系统Hbase中的唯一主键中。In this step, after the application channel and the update log channel respectively receive the data to be synchronized, the application channel and the update log channel respectively perform consumption processing on the to-be-synchronized data and save it in the corresponding storage module of the downstream application system. Specifically: performing consumption processing on the data to be synchronized through the application channel to obtain a first data structure, and performing consumption processing on the first converted value through the update log channel to obtain a second data structure; A data structure and the second data structure are stored in a unique primary key in the distributed storage system Hbase.
在分布式应用集群中,设置两个实例,分别配置接受app通道,Binlog通道,由于app通道与Binlog通道的数据的结构一致,相同的数据,会保存到Hbase(分布式存储系统)同一个rowkey(唯一主键)中,因此达到了幂等处理的效果,不会造成数据重复,并且,由于下游应用系统中第一应用系统即app通道kafka集群,下游应用系统中第二应用系统即binglog通道kafka集群分布在同城不同中心,或者异地,这样达到了容灾的效果,并且当某一个数据中心出现问题时,对数据的同步延迟没有影响。In a distributed application cluster, set up two instances and configure the receiving app channel and Binlog channel respectively. Since the data structure of the app channel and the Binlog channel is the same, the same data will be saved to the same rowkey of Hbase (distributed storage system) (Unique primary key), so it achieves the effect of idempotent processing, and will not cause data duplication. Moreover, because the first application system in the downstream application system is the app channel kafka cluster, the second application system in the downstream application system is the binglog channel kafka Clusters are distributed in different centers in the same city, or in different places, which achieves the effect of disaster recovery, and when a problem occurs in a data center, it has no effect on the data synchronization delay.
需要说明的是,Hbase是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群。属于Hadoop生态圈。分布式KV数据库,用于海量数据的存储与查询。It should be noted that Hbase is a highly reliable, high-performance, column-oriented, and scalable distributed storage system. Using HBase technology, a large-scale structured storage cluster can be built on a cheap PC Server. It belongs to the Hadoop ecosystem. Distributed KV database for storage and query of massive data.
本发明提出的分布式数据同步方法,在上游应用系统添加一个拦截器,当检测到上游应用系统发出待同步数据时,拦截器拦截到待同步数据后,将待同步数据分别发送至应用通道和更新日志通道,并且,所述应用通道和更新日志通道分别属于不同的下游应用系统,即,同一条待同步数据采用冗余双发的形式,有效降低分布式应用系统Kafka的消息丢失几率;再者,分别发送至不同的分布式应用集群中,可以是同城不同中心,也可以是异地,在通过应用通道及更新日志通道分别对所述待同步数据进行消费处理后,保存到所述下游应用系统对应的存储模块中后,实现容灾效果,并且,当上游应用系统或者下游应用系统的数据出问题时,通过应用通道和更新日志通道,对数据的同步延迟无任何影响。In the distributed data synchronization method proposed by the present invention, an interceptor is added to the upstream application system. When it is detected that the upstream application system sends out the data to be synchronized, the interceptor intercepts the data to be synchronized and sends the data to be synchronized to the application channel and The update log channel, and the application channel and the update log channel belong to different downstream application systems, that is, the same data to be synchronized is in the form of redundant dual sending, which effectively reduces the probability of message loss in the distributed application system Kafka; Send to different distributed application clusters, which can be in different centers in the same city, or in different places. After consumption processing is performed on the data to be synchronized through the application channel and the update log channel, the data is saved to the corresponding downstream application system. After the storage module, the disaster recovery effect is realized, and when the data of the upstream application system or the downstream application system has problems, through the application channel and the update log channel, there is no impact on the synchronization delay of the data.
进一步地,基于本发明分布式数据同步方法的第一实施例,提出本发明分布式数据同步方法的第二实施例;所述下游应用系统包括第一应用系统;如图3所示,步骤S20可以包括:Further, based on the first embodiment of the distributed data synchronization method of the present invention, a second embodiment of the distributed data synchronization method of the present invention is proposed; the downstream application system includes the first application system; as shown in FIG. 3, step S20 Can include:
步骤S21,通过所述拦截器将所述待同步数据发送至更新日志通道,且通过所述拦截器将所述待同步数据发送至数据同步器中;Step S21, sending the data to be synchronized to the update log channel through the interceptor, and sending the data to be synchronized to the data synchronizer through the interceptor;
步骤S22,所述数据同步器接收到所述待同步数据后,通过所述数据同步器将所述待同步数据异步发送至所述下游应用系统中第一应用系统的应用通道中。Step S22: After receiving the data to be synchronized, the data synchronizer asynchronously sends the data to be synchronized to the application channel of the first application system in the downstream application system.
本实施例中,通过所述拦截器将所述待同步数据分别发送至应用通道及更新日志通道具体为:通过所述拦截器将所述待同步数据发送至更新日志通道,且通过所述拦截器将所述待同步数据发送至数据同步器中;所述数据同步器接收到所述待同步数据后,通过所述数据同步器将所述待同步数据异步发送至所述下游应用系统中第一应用系统的应用通道中。In this embodiment, sending the data to be synchronized to the application channel and the update log channel through the interceptor is specifically: sending the data to be synchronized to the update log channel through the interceptor, and through the interceptor The data synchronizer sends the data to be synchronized to the data synchronizer; after the data synchronizer receives the data to be synchronized, the data synchronizer asynchronously sends the data to be synchronized to the downstream application system. In the application channel of an application system.
应用通道即app通道,更新日志通道即Binlog通道。Binlog是一个二进制格式的文件,配置为记录用户对数据库更新的SQL语句信息,例如更改数据库表和更改内容的SQL语句都会记录到Binlog里。The application channel is the app channel, and the update log channel is the Binlog channel. Binlog is a binary format file that is configured to record the SQL statement information updated by the user to the database. For example, the SQL statements that modify the database table and modify the content will be recorded in Binlog.
通过所述拦截器将所述待同步数据发送至应用通道具体为:拦截器MyBatisIntercepter将待同步数据拦截下来后,发送至数据同步器DateSyne中,数据同步器DateSyne在接收到待同步数据后,异步发出该待同步数据至应用通道,即下游应用系统中第一应用系统Kafka的APP通道中,从而使得在下游应用系统中第一应用系统Kafka中的数据能够保持完整性。Sending the data to be synchronized to the application channel through the interceptor is specifically: the interceptor MyBatisThe Intercepter intercepts the data to be synchronized and sends it to the data synchronizer DateSyne. After receiving the data to be synchronized, the data synchronizer DateSyne asynchronously sends the data to be synchronized to the application channel, that is, the first application system Kafka in the downstream application system. In the APP channel, the data in the first application system Kafka in the downstream application system can maintain integrity.
进一步地,基于本发明分布式数据同步方法的第二实施例,提出本发明分布式数据同步方法的第三实施例;所述下游应用系统还包括第二应用系统;如图4所示,步骤S21可以包括:Further, based on the second embodiment of the distributed data synchronization method of the present invention, a third embodiment of the distributed data synchronization method of the present invention is proposed; the downstream application system also includes a second application system; as shown in FIG. 4, the steps S21 may include:
步骤S211,通过所述拦截器将所述待同步数据保存到联机事务数据库中;Step S211, saving the to-be-synchronized data in the online transaction database through the interceptor;
步骤S212,确定所述联机事务数据库接收到所述待同步数据后,通过所述联机事务数据库获取所述待同步数据对应的更新日志信息;Step S212: After determining that the online transaction database receives the data to be synchronized, obtain the update log information corresponding to the data to be synchronized through the online transaction database;
步骤S213,通过所述联机事务数据库将所述更新日志信息转换为第一预设格式;Step S213: Convert the update log information into a first preset format through the online transaction database;
步骤S214,通过所述联机事务数据库将所述第一预设格式对应的第一转换值发送至日志连接器;Step S214: Send the first converted value corresponding to the first preset format to the log connector through the online transaction database;
步骤S215,通过所述日志连接器将所述第一转换值发送至所述下游应用系统中第二应用系统的更新日志通道中。Step S215: Send the first converted value to the update log channel of the second application system in the downstream application system through the log connector.
在本实施例中,通过所述拦截器将所述待同步数据发送至更新日志通道具体为:通过所述拦截器将所述待同步数据保存到联机事务数据库中;所述联机事务数据库接收到所述待同步数据后,通过所述联机事务数据库获取所述待同步数据对应的更新日志信息;通过所述联机事务数据库将所述更新日志信息转换为第一预设格式;通过所述联机事务数据库将所述第一预设格式对应的第一转换值发送至日志连接器;通过所述日志连接器将所述第一转换值发送至所述下游应用系统中第二应用系统的更新日志通道中。In this embodiment, sending the data to be synchronized to the update log channel through the interceptor specifically includes: saving the data to be synchronized in the online transaction database through the interceptor; the online transaction database receives After the data to be synchronized, the update log information corresponding to the data to be synchronized is obtained through the online transaction database; the update log information is converted into the first preset format through the online transaction database; The database sends the first converted value corresponding to the first preset format to the log connector; sends the first converted value to the update log channel of the second application system in the downstream application system through the log connector in.
需要说明的是,OLTP(On-Line Transaction Processing,联机事务处理过程),也称为面向交易的处理过程,其基本特征是前台接收的用户数据可以立即传送到计算中心进行处理,并在很短的时间内给出处理结果,是对用户操作快速响应的方式之一;DB为数据库。It should be noted that OLTP (On-Line Transaction Processing, online transaction processing), also known as transaction-oriented processing, its basic feature is that the user data received by the front desk can be immediately transmitted to the computing center for processing, and in a very short time It is one of the ways to quickly respond to the user's operation to give the processing result within the time; DB is the database.
在拦截器拦截到待同步数据后,把待同步数据中的insert/update/delete的数据内容保存进联机事务数据库OLTP DB后,联机事务数据库OLTP DB获取所述待同步数据对应的更新日志信息,即抽取Binlog日志。After the interceptor intercepts the data to be synchronized, the data content of insert/update/delete in the data to be synchronized is saved in the online transaction database OLTP DB, and the online transaction database OLTP DB obtains the update log information corresponding to the data to be synchronized, That is, the Binlog log is extracted.
联机事务数据库将所述更新日志信息转换为第一预设格式,即将Binlog日志转化为第一预设格式,在一实施方式中,第一预设格式为json格式,其中,json(JavaScript ObjectNotation, JS对象简谱)是一种轻量级的数据交换格式,它基于 ECMAScript (欧洲计算机协会制定的js规范)的一个子集,采用完全独立于编程语言的文本格式来存储和表示数据,简洁和清晰的层次结构使得 JSON成为理想的数据交换语言,具有易于人阅读和编写、同时也易于机器解析和生成,并有效地提升网络传输效率的特点。The online transaction database converts the update log information into a first preset format, that is, Binlog logs into a first preset format. In one embodiment, the first preset format is a json format, where json (JavaScript ObjectNotation, JS Object Notation) is a lightweight data exchange format, which is based on a subset of ECMAScript (the js specification formulated by the European Computer Association), and uses a text format completely independent of the programming language to store and represent data, concise And clear hierarchy makes JSONBecome an ideal data exchange language, which has the characteristics of being easy for humans to read and write, but also easy for machine to parse and generate, and effectively improve the efficiency of network transmission.
在将Binlog日志转换为json格式后,得到对应的第一转换值,且将第一转换值发送至日志连接器Binlog Connector,在日志连接器Binlog Connector接收到第一转换值后,将第一转换值发送至下游应用系统中第二应用系统配置的更新日志通道中,即发送到下游应用系统中第二应用系统Kafka的Binlog通道中。After the Binlog log is converted to the json format, the corresponding first conversion value is obtained, and the first conversion value is sent to the log connector Binlog Connector. After the log connector Binlog Connector receives the first conversion value, the first conversion value is converted The value is sent to the update log channel configured by the second application system in the downstream application system, that is, sent to the Binlog channel of the second application system Kafka in the downstream application system.
并且,在发送第一转换值至下游应用系统中第二应用系统Kafka的Binlog通道中时,采用调用下游应用系统中第二应用系统kafka接口的方式,进行第一转换值发送。In addition, when sending the first converted value to the Binlog channel of the second application system Kafka in the downstream application system, the first conversion value is sent by calling the kafka interface of the second application system in the downstream application system.
通过将所述待同步数据发送至更新日志通道,且将待同步数据发送至应用通道,实现对待同步数据的冗余双发,一份数据进行两个不同中心的分布式应用集群的发送处理,并且,应用通道与更新日志通道处理的数据结构一致,为相同的数据,因此,有效降低了待同步数据发送到下游应用系统时丢失的可能性。By sending the data to be synchronized to the update log channel, and sending the data to be synchronized to the application channel, redundant dual sending of the data to be synchronized is realized. One piece of data is sent to two distributed application clusters in different centers, and, The data structure processed by the application channel and the update log channel is the same, and is the same data. Therefore, the possibility of data loss when the data to be synchronized is sent to the downstream application system is effectively reduced.
进一步地,所述存储模块为分布式存储系统Hbase中的唯一主键;步骤S30可以包括:Further, the storage module is the unique primary key in the distributed storage system Hbase; step S30 may include:
通过所述应用通道对所述待同步数据进行消费处理得到第一数据结构,且通过所述更新日志通道对所述第一转换值进行消费处理得到第二数据结构;Performing consumption processing on the to-be-synchronized data through the application channel to obtain a first data structure, and performing consumption processing on the first converted value through the update log channel to obtain a second data structure;
将所述第一数据结构及所述第二数据结构保存至分布式存储系统Hbase中的唯一主键中。The first data structure and the second data structure are stored in a unique primary key in the distributed storage system Hbase.
本实施例中,在应用通道和更新日志通道分别接收到待同步数据后,应用通道及更新日志通道分别对所述待同步数据进行消费处理,并保存到所述下游应用系统对应的存储模块中。具体为:通过所述应用通道对所述待同步数据进行消费处理得到第一数据结构,且通过所述更新日志通道对所述第一转换值进行消费处理得到第二数据结构;将所述第一数据结构及所述第二数据结构保存至分布式存储系统Hbase中的唯一主键rowkey中。In this embodiment, after the application channel and the update log channel respectively receive the data to be synchronized, the application channel and the update log channel respectively perform consumption processing on the data to be synchronized, and save it in the corresponding storage module of the downstream application system . Specifically: performing consumption processing on the data to be synchronized through the application channel to obtain a first data structure, and performing consumption processing on the first converted value through the update log channel to obtain a second data structure; A data structure and the second data structure are stored in the unique primary key rowkey in the distributed storage system Hbase.
在下游应用系统分布式应用集群Kafka中,同一套spark streaming应用,配置两个实例,分别配置app通道,Binlog通道,配置为接收待同步数据。由于app通道与Binlog通道的数据的结构一致,相同的数据,会保存到Hbase(分布式存储系统)同一个rowkey(唯一主键)中,达到了幂等处理的效果,不会造成数据重复。In the downstream application system distributed application cluster Kafka, the same set of spark streaming applications are configured with two instances, respectively, the app channel and the Binlog channel are configured to receive the data to be synchronized. Since the data structure of the app channel and the Binlog channel are the same, the same data will be stored in the same rowkey (unique primary key) of Hbase (distributed storage system), achieving the effect of idempotent processing and will not cause data duplication.
具体地,在分布式存储系统Hbase中,有rowkey这种类似唯一主键的约束,数据覆盖写这种操作很很容易就能实现。即使出现一批数据没有完全写完,出现流式计算崩溃,这种场景也没有问题,下次重启再写一次覆盖就可以了,不会出现脏数据。但是对于修改操作,多个线程并行修改,只要有一个没有完成,系统挂掉,在重启之前需要将上个批次没有修改完成的数据回复到最后一次修改完成的状态。Specifically, in the distributed storage system Hbase, there is a constraint similar to a unique primary key such as rowkey, and data overwriting is easy to achieve. Even if a batch of data is not completely written, and streaming computing crashes, there is no problem in this scenario. The next time you restart, you can write and overwrite again, and dirty data will not appear. But for the modification operation, multiple threads modify in parallel, as long as one is not completed, the system hangs, and the data that has not been modified in the previous batch needs to be restored to the state of the last modification before restarting.
需要说明的是,由于第一数据结构及第二数据结构保存到Hbase使用key-value的保存格式,对同一条数据,生成的key以及value一致,因此,不管插多少次,数据总量是不变的,因此,不会造成数据重复。It should be noted that, because the first data structure and the second data structure are saved to Hbase using the key-value storage format, for the same piece of data, the generated key and value are the same. Therefore, no matter how many times are inserted, the total amount of data is different. Therefore, there is no duplication of data.
幂等处理就是,在数据不变的情况下,一个操作,无论执行多少次,结果都是一样的。在本案中,由于app通道与Binlog通道的数据的结构一致,数据相同,会保存到Hbase(分布式存储系统)同一个rowkey中,因此能够实现幂等处理的效果。Idempotent processing means that when the data is unchanged, an operation will have the same result no matter how many times it is executed. In this case, because the app channel and the Binlog channel have the same data structure and the same data, they will be stored in the same rowkey of Hbase (distributed storage system), so the effect of idempotent processing can be achieved.
分布式存储系统Hbase是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群。属于Hadoop生态圈。分布式KV数据库,配置为海量数据的存储与查询。Distributed storage system Hbase is a highly reliable, high-performance, column-oriented, and scalable distributed storage system. Using HBase technology, a large-scale structured storage cluster can be built on a cheap PC Server. It belongs to the Hadoop ecosystem. The distributed KV database is configured to store and query massive data.
由于app通道kafka集群,binglog通道kafka集群分布在同城不同中心,或者异地,这样达到了容灾的效果,并且当某一个数据中心出现问题时,对数据的同步延迟没有影响。Because the app channel kafka cluster, the binglog channel kafka cluster are distributed in different centers in the same city, or in different places, this achieves the effect of disaster recovery, and when a problem occurs in a data center, it has no effect on the data synchronization delay.
进一步地,基于本发明分布式数据同步方法的第三实施例,提出本发明分布式数据同步方法的第四实施例;如图5所示,步骤S212之后,所述分布式数据同步方法包括:Further, based on the third embodiment of the distributed data synchronization method of the present invention, a fourth embodiment of the distributed data synchronization method of the present invention is proposed; as shown in FIG. 5, after step S212, the distributed data synchronization method includes:
步骤S2121,通过所述联机事务数据库,在预设时间范围内将所述更新日志信息发送至离线数据库;Step S2121: Send the update log information to the offline database within a preset time range through the online transaction database;
步骤S2122,通过所述离线数据库,将所述更新日志信息转换为第二预设格式;Step S2122: Convert the update log information into a second preset format through the offline database;
步骤S2123,通过所述离线数据库,将所述第二预设格式对应的第二转换值发送至第三分布式应用集群中。Step S2123: Send the second converted value corresponding to the second preset format to the third distributed application cluster through the offline database.
在本实施例中,为了保证每个待同步数据都能够保存到分布式存储系统Hbase中,进行T+1补数通道处理,具体为:通过所述联机事务数据库,在预设时间范围内将所述更新日志信息发送至离线数据库;通过所述离线数据库,将所述更新日志信息转换为第二预设格式;通过所述离线数据库,将所述第二预设格式对应的第二转换值发送至第三分布式应用集群中。In this embodiment, in order to ensure that each data to be synchronized can be saved in the distributed storage system Hbase, the T+1 complement channel processing is performed, specifically: through the online transaction database, within a preset time range The update log information is sent to an offline database; the update log information is converted into a second preset format through the offline database; the second conversion value corresponding to the second preset format is converted through the offline database Send to the third distributed application cluster.
具体为:通过联机事务数据库OLTP DB,T日(当日)数据准备好后,通过Sqoop等工具抽取到离线数据库Hive中;其中,离线数据库Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。Specifically: through the online transaction database OLTP DB, after the T day (the same day) data is prepared, it is extracted into the offline database Hive through tools such as Sqoop; among them, the offline database Hive is a data warehouse tool based on Hadoop, which can structure the data The data file is mapped to a database table and provides a simple SQL query function, which can convert SQL statements into MapReduce tasks for execution.
抽取完成后,把将所述第一数据结构及所述第二数据结构保存至分布式存储系统Hbase中的唯一主键中对应的程序,启动为spark应用程序,数据源为步骤a的Hive仓库。结合相应hive表的表头信息,把Hive中的一条条数据也会转换为第二预设格式,如json格式。After the extraction is completed, save the first data structure and the second data structure to the corresponding program in the unique primary key in the distributed storage system Hbase, start it as a spark application, and the data source is the Hive warehouse in step a. Combined with the header information of the corresponding hive table, each piece of data in Hive will also be converted into a second preset format, such as json format.
由于T+1(次日)兜底补数应用生成的数据结构与app通道还有Binlog通道的数据的结构一致,相同的数据,会保存到Hbase同一个rowkey中,达到了幂等处理的效果,不会造成数据重复。Since the data structure generated by the T+1 (next day) pocket complement application is consistent with the data structure of the app channel and the Binlog channel, the same data will be stored in the same rowkey of Hbase, achieving the effect of idempotent processing. Will not cause data duplication.
进一步地,步骤S2123之后,分布式数据同步方法还可以包括:Further, after step S2123, the distributed data synchronization method may further include:
通过所述第三分布式应用集群对所述第二转换值进行消费处理得到第三数据结构;Performing consumption processing on the second conversion value through the third distributed application cluster to obtain a third data structure;
将所述第三数据结构保存至所述分布式存储系统Hbase中的唯一主键中。The third data structure is stored in the unique primary key in the distributed storage system Hbase.
在本实施例中,在将所述第二预设格式对应的第二转换值发送至第三分布式应用集群中之后,通过所述第三分布式应用集群对所述第二转换值进行消费处理得到第三数据结构;并且,将所述第三数据结构保存至所述分布式存储系统Hbase中的唯一主键中。In this embodiment, after the second converted value corresponding to the second preset format is sent to the third distributed application cluster, the second converted value is consumed through the third distributed application cluster A third data structure is obtained through processing; and the third data structure is stored in the unique primary key in the distributed storage system Hbase.
T+1补数通道的数据来源是上游应用系统落库了的每条记录,再在第二天通过调度程序,批量全量往下游应用系统对应的联机事务数据库OLTPDB导入一次,并且,可以再做数据校验,这样就可以保证上游应用系统所有待同步数据都同步到了下游应用系统。The data source of the T+1 complement channel is each record that has been stored in the upstream application system, and then on the second day through the scheduler, batches are sent to the online transaction database OLTP corresponding to the downstream application system.The DB is imported once, and data verification can be done again, so as to ensure that all the data to be synchronized in the upstream application system is synchronized to the downstream application system.
进一步地,步骤S2123之后,请参阅图6,分布式数据同步方法还可以包括:Further, after step S2123, referring to FIG. 6, the distributed data synchronization method may further include:
获取所述第二预设格式对应的第二转换值;Acquiring a second conversion value corresponding to the second preset format;
通过所述分布式存储系统Hbase获取所述第三数据结构;Obtaining the third data structure through the distributed storage system Hbase;
比较所述第二转换值及第三数据结构;Comparing the second conversion value and the third data structure;
所述第三数据结构与所述第二转换值一致,判定所述第二转换值全部保存在所述分布式存储系统Hbase中。The third data structure is consistent with the second conversion value, and it is determined that all the second conversion values are stored in the distributed storage system Hbase.
本实施例中,为了保证上游应用系统的待同步数据全部同步到下游应用系统,需要进行一个数据校验过程,具体为:获取所述第二预设格式对应的第二转换值;通过所述分布式存储系统Hbase获取所述第三数据结构;比较所述第二转换值及第三数据结构;所述第三数据结构与所述第二转换值一致,判定所述第二转换值全部保存在所述分布式存储系统Hbase中。In this embodiment, in order to ensure that all to-be-synchronized data of the upstream application system is synchronized to the downstream application system, a data verification process is required, specifically: acquiring the second conversion value corresponding to the second preset format; The distributed storage system Hbase obtains the third data structure; compares the second conversion value and the third data structure; the third data structure is consistent with the second conversion value, and it is determined that all the second conversion values are saved In the distributed storage system Hbase.
确定第三数据结构与所述第二转换值一致后,可判定第二转换值全部保存在所述分布式存储系统Hbase中,当第三数据结构与所述第二转换值不一致时,可判定第二转换值未全部保存在所述分布式存储系统Hbase中,此时,可以进行再次T+1补数通道处理。After determining that the third data structure is consistent with the second conversion value, it can be determined that all the second conversion values are stored in the distributed storage system Hbase. When the third data structure is inconsistent with the second conversion value, it can be determined Not all the second converted values are stored in the distributed storage system Hbase. At this time, the T+1 complement channel processing can be performed again.
本发明还提供一种分布式数据同步装置。本发明所述分布式数据同步装置包括:The invention also provides a distributed data synchronization device. The distributed data synchronization device of the present invention includes:
拦截模块,在检测到上游应用系统发出待同步数据时,通过拦截器拦截所述待同步数据;The interception module, when detecting that the upstream application system sends out the data to be synchronized, intercept the data to be synchronized through the interceptor;
发送模块,配置为通过所述拦截器将所述待同步数据分别发送至应用通道及更新日志通道,其中,所述应用通道和更新日志通道分别属于不同的下游应用系统;A sending module configured to send the data to be synchronized to an application channel and an update log channel through the interceptor, wherein the application channel and the update log channel belong to different downstream application systems;
保存模块,配置为通过所述应用通道及更新日志通道分别对所述待同步数据进行消费处理后,保存到所述下游应用系统对应的存储模块中。The saving module is configured to perform consumption processing on the to-be-synchronized data through the application channel and the update log channel respectively, and then save the data in the storage module corresponding to the downstream application system.
进一步地,所述发送模块还配置为:Further, the sending module is also configured to:
通过所述拦截器将所述待同步数据发送至更新日志通道,且通过所述拦截器将所述待同步数据发送至数据同步器中;Sending the data to be synchronized to the update log channel through the interceptor, and sending the data to be synchronized to the data synchronizer through the interceptor;
在所述数据同步器接收到所述待同步数据后,通过所述数据同步器将所述待同步数据异步发送至所述下游应用系统中第一应用系统的应用通道中。After the data synchronizer receives the data to be synchronized, the data synchronizer asynchronously sends the data to be synchronized to the application channel of the first application system in the downstream application system.
进一步地,所述发送模块还配置为:Further, the sending module is also configured to:
通过所述拦截器将所述待同步数据保存到联机事务数据库中;Saving the to-be-synchronized data in the online transaction database through the interceptor;
在所述联机事务数据库接收到所述待同步数据后,通过所述联机事务数据库获取所述待同步数据对应的更新日志信息;After the online transaction database receives the to-be-synchronized data, obtain the update log information corresponding to the to-be-synchronized data through the online transaction database;
通过所述联机事务数据库将所述更新日志信息转换为第一预设格式;Converting the update log information into a first preset format through the online transaction database;
通过所述联机事务数据库将所述第一预设格式对应的第一转换值发送至日志连接器;Sending the first converted value corresponding to the first preset format to the log connector through the online transaction database;
通过所述日志连接器将所述第一转换值发送至所述下游应用系统中第二应用系统的更新日志通道中。The first conversion value is sent to the update log channel of the second application system in the downstream application system through the log connector.
进一步地,所述存储模块为分布式存储系统Hbase中的唯一主键;所述保存模块还配置为:Further, the storage module is a unique primary key in the distributed storage system Hbase; the storage module is also configured to:
通过所述应用通道对所述待同步数据进行消费处理得到第一数据结构,且通过所述更新日志通道对所述第一转换值进行消费处理得到第二数据结构;Performing consumption processing on the to-be-synchronized data through the application channel to obtain a first data structure, and performing consumption processing on the first converted value through the update log channel to obtain a second data structure;
将所述第一数据结构及所述第二数据结构保存至分布式存储系统Hbase中的唯一主键中。The first data structure and the second data structure are stored in a unique primary key in the distributed storage system Hbase.
进一步地,所述发送模块还配置为:Further, the sending module is also configured to:
通过所述联机事务数据库,在预设时间范围内将所述更新日志信息发送至离线数据库;Sending the update log information to the offline database within a preset time range through the online transaction database;
通过所述离线数据库,将所述更新日志信息转换为第二预设格式;Converting the update log information into a second preset format through the offline database;
通过所述离线数据库,将所述第二预设格式对应的第二转换值发送至第三分布式应用集群中。Send the second converted value corresponding to the second preset format to the third distributed application cluster through the offline database.
进一步地,所述发送模块执行将所述第二预设格式对应的第二转换值发送至第三分布式应用集群中的步骤之后,所述分布式数据同步装置还包括存储模块,配置为:Further, after the sending module executes the step of sending the second converted value corresponding to the second preset format to the third distributed application cluster, the distributed data synchronization device further includes a storage module configured to:
通过所述第三分布式应用集群对所述第二转换值进行消费处理得到第三数据结构;Performing consumption processing on the second conversion value through the third distributed application cluster to obtain a third data structure;
将所述第三数据结构保存至所述分布式存储系统Hbase中的唯一主键中。The third data structure is stored in the unique primary key in the distributed storage system Hbase.
进一步地,所述存储模块在执行将所述第三数据结构保存至所述分布式存储系统Hbase中的唯一主键中的步骤之后,所述分布式数据同步装置还包括验证模块,配置为:Further, after the storage module executes the step of storing the third data structure in the unique primary key in the distributed storage system Hbase, the distributed data synchronization device further includes a verification module configured to:
获取所述第二预设格式对应的第二转换值;Acquiring a second conversion value corresponding to the second preset format;
通过所述分布式存储系统Hbase获取所述第三数据结构;Obtaining the third data structure through the distributed storage system Hbase;
比较所述第二转换值及第三数据结构;Comparing the second conversion value and the third data structure;
所述第三数据结构与所述第二转换值一致,则判定所述第二转换值全部保存在所述分布式存储系统Hbase中。If the third data structure is consistent with the second conversion value, it is determined that all the second conversion values are stored in the distributed storage system Hbase.
本发明还提供一种计算机可读存储介质。The present invention also provides a computer-readable storage medium.
本发明计算机可读存储介质上存储有分布式数据同步程序,所述分布式数据同步程序被处理器执行时实现如上所述的分布式数据同步方法的步骤。The computer-readable storage medium of the present invention stores a distributed data synchronization program, and when the distributed data synchronization program is executed by a processor, the steps of the distributed data synchronization method described above are realized.
其中,在所述处理器上运行的分布式数据同步程序被执行时所实现的方法可参照本发明分布式数据同步方法各个实施例,此处不再赘述。For the method implemented when the distributed data synchronization program running on the processor is executed, reference may be made to the various embodiments of the distributed data synchronization method of the present invention, which will not be repeated here.
需要说明的是,在本文中,术语“包括”、“包含”或者其任何其他变体意在涵盖非排他性的包含,从而使得包括一系列要素的过程、方法、物品或者系统不仅包括那些要素,而且还包括没有明确列出的其他要素,或者是还包括为这种过程、方法、物品或者系统所固有的要素。在没有更多限制的情况下,由语句“包括一个……”限定的要素,并不排除在包括该要素的过程、方法、物品或者系统中还存在另外的相同要素。It should be noted that in this article, the terms "include", "include" or any other variants thereof are intended to cover non-exclusive inclusion, so that a process, method, article or system including a series of elements not only includes those elements, It also includes other elements that are not explicitly listed, or elements inherent to the process, method, article, or system. Without more restrictions, the element defined by the sentence "including a..." does not exclude the existence of other identical elements in the process, method, article, or system that includes the element.
上述本发明实施例序号仅仅为了描述,不代表实施例的优劣。The sequence numbers of the foregoing embodiments of the present invention are only for description, and do not represent the superiority of the embodiments.
通过以上的实施方式的描述,本领域的技术人员可以清楚地了解到上述实施例方法可借助软件加必需的通用硬件平台的方式来实现,当然也可以通过硬件,但很多情况下前者是更佳的实施方式。基于这样的理解,本发明的技术方案本质上或者说对现有技术做出贡献的部分可以以软件产品的形式体现出来,该计算机软件产品存储在如上所述的一个可读存储介质(如ROM/RAM、磁碟、光盘)中,包括若干指令用以使得一台分布式数据同步设备(可以是手机,计算机,服务器,空调器,或者网络设备等)执行本发明各个实施例所述的方法。Through the description of the above implementation manners, those skilled in the art can clearly understand that the above-mentioned embodiment method can be implemented by means of software plus the necessary general hardware platform, of course, it can also be implemented by hardware, but in many cases the former is better.的实施方式。 Based on this understanding, the technical solution of the present invention essentially or the part that contributes to the prior art can be embodied in the form of a software product, and the computer software product is stored in a readable storage medium (such as ROM) as described above. /RAM, magnetic disk, optical disk), including several instructions to make a distributed data synchronization device (can be a mobile phone, computer, server, air conditioner, or network device, etc.) execute the method described in each embodiment of the present invention .
以上仅为本发明的优选实施例,并非因此限制本发明的专利范围,凡是利用本发明说明书及附图内容所作的等效结构或等效流程变换,或直接或间接运用在其他相关的技术领域,均同理包括在本发明的专利保护范围内。The above are only preferred embodiments of the present invention, and do not limit the scope of the present invention. Any equivalent structure or equivalent process transformation made by using the content of the description and drawings of the present invention, or directly or indirectly applied to other related technical fields , The same reason is included in the scope of patent protection of the present invention.
| Application Number | Priority Date | Filing Date | Title | 
|---|---|---|---|
| CN201910799567.2 | 2019-08-27 | ||
| CN201910799567.2ACN110502583B (en) | 2019-08-27 | 2019-08-27 | Distributed data synchronization method, device, equipment and readable storage medium | 
| Publication Number | Publication Date | 
|---|---|
| WO2021036684A1true WO2021036684A1 (en) | 2021-03-04 | 
| Application Number | Title | Priority Date | Filing Date | 
|---|---|---|---|
| PCT/CN2020/106360CeasedWO2021036684A1 (en) | 2019-08-27 | 2020-07-31 | Distributed data synchronization method, apparatus and device and readable storage medium | 
| Country | Link | 
|---|---|
| CN (1) | CN110502583B (en) | 
| WO (1) | WO2021036684A1 (en) | 
| Publication number | Priority date | Publication date | Assignee | Title | 
|---|---|---|---|---|
| CN113051275A (en)* | 2021-03-31 | 2021-06-29 | 银盛支付服务股份有限公司 | Storage architecture method compatible with real-time and offline data processing | 
| CN113836113A (en)* | 2021-09-26 | 2021-12-24 | 北京金山云网络技术有限公司 | A DDL operation synchronization method, device and electronic device | 
| CN114625717A (en)* | 2022-03-04 | 2022-06-14 | 中国建设银行股份有限公司 | Database generation method, and management method and device for distributed objects | 
| CN116418824A (en)* | 2021-12-31 | 2023-07-11 | 北京字节跳动网络技术有限公司 | Data generating method, device, storage medium and program product | 
| Publication number | Priority date | Publication date | Assignee | Title | 
|---|---|---|---|---|
| CN110502583B (en)* | 2019-08-27 | 2024-05-17 | 深圳前海微众银行股份有限公司 | Distributed data synchronization method, device, equipment and readable storage medium | 
| CN113704212B (en)* | 2020-05-22 | 2024-08-16 | 深信服科技股份有限公司 | Data synchronization method, device and equipment of server and computer storage medium | 
| CN112084047B (en)* | 2020-09-14 | 2025-01-24 | 浪潮云信息技术股份公司 | A message sending implementation method based on asynchronous thread pool | 
| CN112597242B (en)* | 2020-12-16 | 2023-06-06 | 四川新网银行股份有限公司 | Extraction method based on application system data slices related to batch tasks | 
| CN112579326B (en)* | 2020-12-29 | 2024-10-18 | 北京五八信息技术有限公司 | Offline data processing method, device, electronic equipment and computer readable medium | 
| CN112910970B (en)* | 2021-01-21 | 2023-04-07 | 中国工商银行股份有限公司 | Remote disaster recovery data synchronization method, device and system | 
| CN112910971B (en)* | 2021-01-21 | 2023-04-07 | 中国工商银行股份有限公司 | Multi-station data synchronization method, device and system | 
| CN112988897A (en)* | 2021-03-30 | 2021-06-18 | 工银科技有限公司 | Data bidirectional synchronization method and device in system upgrading scene | 
| CN113392161A (en)* | 2021-07-08 | 2021-09-14 | 苏州海管家物流科技有限公司 | Data bidirectional real-time synchronization system | 
| CN113886480A (en)* | 2021-10-19 | 2022-01-04 | 上海微盟企业发展有限公司 | Data transmission method, system, device and storage medium | 
| CN114661824A (en)* | 2022-04-18 | 2022-06-24 | 中原银行股份有限公司 | Data synchronization system and method based on cloud environment, electronic device and storage medium | 
| CN116049302A (en)* | 2022-11-16 | 2023-05-02 | 浪潮通用软件有限公司 | A data synchronization method across databases | 
| Publication number | Priority date | Publication date | Assignee | Title | 
|---|---|---|---|---|
| US20170220423A1 (en)* | 2016-01-29 | 2017-08-03 | Netapp, Inc. | Online Backup to an Object Service Using Bulk Export | 
| CN107590182A (en)* | 2017-08-03 | 2018-01-16 | 华南理工大学 | A kind of distributed information log collection method | 
| CN110162571A (en)* | 2019-04-26 | 2019-08-23 | 厦门市美亚柏科信息股份有限公司 | A kind of system, method, storage medium that data among heterogeneous databases synchronize | 
| CN110502583A (en)* | 2019-08-27 | 2019-11-26 | 深圳前海微众银行股份有限公司 | Distributed data synchronization method, device, equipment and readable storage medium | 
| Publication number | Priority date | Publication date | Assignee | Title | 
|---|---|---|---|---|
| CN108885627B (en)* | 2016-01-11 | 2022-04-05 | 甲骨文美国公司 | Query-as-a-service system providing query result data to remote client | 
| BE1024939B1 (en)* | 2017-06-19 | 2018-08-21 | The Glue | SYSTEM AND DEVICE FOR GUARANTEED EXACTLY PROCESSING OF AN EVENT IN A DIVIDED EVENT-DRIVEN ENVIRONMENT | 
| CN109635039B (en)* | 2018-11-23 | 2021-08-24 | 金色熊猫有限公司 | Multi-data center data synchronization method and system | 
| Publication number | Priority date | Publication date | Assignee | Title | 
|---|---|---|---|---|
| US20170220423A1 (en)* | 2016-01-29 | 2017-08-03 | Netapp, Inc. | Online Backup to an Object Service Using Bulk Export | 
| CN107590182A (en)* | 2017-08-03 | 2018-01-16 | 华南理工大学 | A kind of distributed information log collection method | 
| CN110162571A (en)* | 2019-04-26 | 2019-08-23 | 厦门市美亚柏科信息股份有限公司 | A kind of system, method, storage medium that data among heterogeneous databases synchronize | 
| CN110502583A (en)* | 2019-08-27 | 2019-11-26 | 深圳前海微众银行股份有限公司 | Distributed data synchronization method, device, equipment and readable storage medium | 
| Publication number | Priority date | Publication date | Assignee | Title | 
|---|---|---|---|---|
| CN113051275A (en)* | 2021-03-31 | 2021-06-29 | 银盛支付服务股份有限公司 | Storage architecture method compatible with real-time and offline data processing | 
| CN113836113A (en)* | 2021-09-26 | 2021-12-24 | 北京金山云网络技术有限公司 | A DDL operation synchronization method, device and electronic device | 
| CN116418824A (en)* | 2021-12-31 | 2023-07-11 | 北京字节跳动网络技术有限公司 | Data generating method, device, storage medium and program product | 
| CN114625717A (en)* | 2022-03-04 | 2022-06-14 | 中国建设银行股份有限公司 | Database generation method, and management method and device for distributed objects | 
| Publication number | Publication date | 
|---|---|
| CN110502583A (en) | 2019-11-26 | 
| CN110502583B (en) | 2024-05-17 | 
| Publication | Publication Date | Title | 
|---|---|---|
| WO2021036684A1 (en) | Distributed data synchronization method, apparatus and device and readable storage medium | |
| US12079130B2 (en) | Method and apparatus for processing data based on block chain, device and readable storage medium | |
| CN106682119B (en) | Asynchronous data synchronization method and system based on http service section and log system | |
| CN110196871A (en) | Data storage method and system | |
| CN111625552B (en) | Data collection method, device, equipment and readable storage medium | |
| WO2021184761A1 (en) | Data access method and apparatus, and data storage method and device | |
| CN110837423B (en) | A method and device for collecting data of an automatic guided transport vehicle | |
| CN102129469A (en) | Virtual experiment-oriented unstructured data accessing method | |
| CN106919654A (en) | A kind of implementation method of the High Availabitity MySQL database based on Nginx | |
| CN106569896B (en) | A data distribution and parallel processing method and system | |
| US12079202B2 (en) | Parallel stream processing of change data capture | |
| CN110334145A (en) | Method and device for data processing | |
| CN115292414A (en) | A method for synchronizing business data to data warehouse | |
| CN115168440A (en) | Data reading and writing method, distributed storage system, device, equipment and storage medium | |
| CN105989065B (en) | Flash data processing method and system | |
| US11789971B1 (en) | Adding replicas to a multi-leader replica group for a data set | |
| US20250086199A1 (en) | Methods for configuring nodes in distributed database, methods for synchronizing transaction logs in the distributed database, and nodes in distributed database | |
| CN117422556B (en) | Derivative transaction system, device and computer medium based on replication state machine | |
| US12399909B2 (en) | Configuration and management of replication units for asynchronous database transaction replication | |
| CN113810231B (en) | Log analysis method, system, electronic equipment and storage medium | |
| CN111597157A (en) | Method for improving log processing system architecture | |
| WO2024103898A1 (en) | Database cluster management method and apparatus | |
| CN117950850A (en) | Data transmission method, device, electronic equipment and computer readable medium | |
| CN116303789A (en) | Multi-shard multi-copy database parallel synchronization method, device and readable medium | |
| CN115168472A (en) | Real-time report generation method and system based on Flink | 
| Date | Code | Title | Description | 
|---|---|---|---|
| 121 | Ep: the epo has been informed by wipo that ep was designated in this application | Ref document number:20858802 Country of ref document:EP Kind code of ref document:A1 | |
| NENP | Non-entry into the national phase | Ref country code:DE | |
| 122 | Ep: pct application non-entry in european phase | Ref document number:20858802 Country of ref document:EP Kind code of ref document:A1 |