Disclosure of Invention
The invention aims to provide a large data stream data processing method, a large data stream data processing device, electronic equipment and a storage medium, and solves the problem caused by disorder in the stream data processing process.
In order to achieve the above purpose, the embodiment of the present application adopts the following technical scheme:
The embodiment of the application provides a large data stream data processing method which comprises the steps of obtaining source data in a plurality of data sources, analyzing whether the source data contain change identifiers and/or attribute identifiers, if yes, carrying out data organization on the source data and sending the source data to a message middleware, wherein the source data after the organization of the data contains a system, a table name, a main key, the change identifiers and/or the attribute identifiers, analyzing at least one piece of source data obtained from the message middleware, matching at least one piece of source data after the organization into a target data record base according to the main key information, judging the processing priority of the data of the target data record base according to the attribute identifiers of the data in the target data record base, forming a data set to be processed according to the processing priority of the data of the target data record base, judging the operation standard of the data set to be processed according to the change identifiers of the data set to be processed, and forming a new target data base.
The method comprises the steps of identifying the acquired database attribute of data in a target database record library and determining the processing priority of the acquired target database record library data, taking the data of the target database record library with the same main key in the data of the target database record library with the optimal processing priority as a first data set to be processed, identifying the time attribute of the first data set to be processed and determining the processing priority of the first data set to be processed, taking the first data set with the optimal processing priority of the time attribute in the first data set to be processed as a second data set to be processed, and taking the time attribute of the same main key in the second data set to be processed as the optimal processing priority of the second data set to be processed.
Further, the step of judging the operation standard of the data set to be processed according to the change identification of the data set to be processed and forming a new target database comprises the steps of identifying the change identification of each target data in the third data set to be processed, updating the target data to form updated target data if the target data contains the modified or newly added change identification, carrying out logic deleting operation on the target data to form deleted target data if the target data contains the deleted change identification, and forming a new target database by the data of which the third data set to be processed does not have the change identification, the updated target data and the deleted target data.
Optionally, the analyzing whether the source data includes a change identifier and/or an attribute identifier includes:
If the source data in the data source contains a change identifier or an attribute identifier, the data source sends the change identifier or the attribute identifier of the source data to the downstream.
Optionally, the analyzing whether the source data includes a change identifier and/or an attribute identifier includes:
And analyzing the log information of the source data database in the data source downstream so as to identify whether the source data in the data source contains a change identifier or an attribute identifier.
In a second aspect, an embodiment of the present application provides a large data stream data processing apparatus, including:
The acquisition module is used for acquiring source data in a plurality of data sources;
The analysis module is used for analyzing whether the source data contains a change identifier or an attribute identifier, if so, carrying out data organization on the source data and sending the source data to the message middleware, wherein the source data after the organization of the data contains a system, a table name, a main key, the change identifier and/or the attribute identifier;
the first judging module is used for analyzing at least one piece of organized source data acquired from the message middleware and matching the at least one piece of organized source data into a target data record base according to the primary key information;
the second judging module is used for judging the processing priority sequence of the data of the target data record base according to the attribute identification of the data in the target data record base and forming a data group to be processed according to the processing priority sequence of the data of the target data record base;
and the third judging module is used for judging the operation standard of the data set to be processed according to the change identification of the data set to be processed and forming a new target database.
Further, the second judging module includes:
the database data judging unit is used for identifying the database attributes of the data in the acquired target database record library and determining the processing priority of the acquired target database record library data, and taking the data of the target database record library with the same processing priority of the database attributes in the data of the target database record library of which each main key is the same as a first data group to be processed;
The time attribute judging unit is used for identifying the time attribute of the first data set to be processed and determining the processing priority of the first data set to be processed, and taking the first data set to be processed with the optimal time attribute processing priority in the first data set to be processed, which is the same as each main key, as a second data set to be processed;
The system attribute judging unit is used for identifying the system attribute of the second data set to be processed and determining the processing priority of the second data set to be processed, and the second data set to be processed with the optimal system attribute processing priority in the second data set to be processed, which is the same in each main key, is used as a third data set to be processed.
In a third aspect, an embodiment of the present application provides an electronic device, including a processor, a memory, and a bus, where the memory stores machine-readable instructions executable by all the processor, and when the electronic device is running, the processor and the memory communicate with each other through the bus, and the processor executes the machine-readable instructions to perform the steps of the large data stream data processing method according to any one of the above.
In a fourth aspect, an embodiment of the present application provides a readable storage medium storing a computer program which when executed implements the steps of a large data stream data processing method as described in any one of the preceding claims.
The embodiment of the invention has the beneficial effects that the problem of data processing caused by disorder can be solved by only adding the change identifier and the attribute identifier in the data acquisition and data processing processes without changing the original technical architecture, and the accuracy and the consistency of data in large data stream processing are improved.
Detailed Description
For the purposes, technical solutions and advantages of the present invention will become more apparent, the technical solutions in the embodiments of the present invention will be clearly and completely described below with reference to the accompanying drawings, and it is apparent that the described embodiments are some embodiments of the present invention, but not all embodiments. The components of the embodiments of the present invention generally described and illustrated in the figures herein may be arranged and designed in a wide variety of different configurations.
Thus, the following detailed description of the embodiments of the invention, as presented in the figures, is not intended to limit the scope of the invention, as claimed, but is merely representative of selected embodiments of the invention. All other embodiments, which can be made by those skilled in the art based on the embodiments of the invention without making any inventive effort, are intended to be within the scope of the invention.
It should be noted that like reference numerals and letters refer to like items in the following figures, and thus once an item is defined in one figure, no further definition or explanation thereof is necessary in the following figures.
In the description of the present invention, the terms "first," "second," "third," and the like are used merely to distinguish between descriptions and are not to be construed as indicating or implying relative importance.
In addition, in the following description, descriptions of well-known structures and techniques are omitted so as not to unnecessarily obscure the present invention.
Referring to fig. 1, fig. 1 is a flow chart of a large data stream data processing method according to an embodiment of the present invention, as shown in fig. 1, the method includes:
s1, acquiring source data in a plurality of data sources;
s2, analyzing whether the source data contains a change identifier and/or an attribute identifier;
in particular, there are various methods for judging the change of source data in the process of stream data processing, and the method is generally divided into an active mode and a passive mode.
The active mode is that if the source data in the data source contains a change identifier or an attribute identifier, the data source sends the change identifier or the attribute identifier of the source data to the downstream, and the downstream refers to message middleware here and hereinafter.
The passive mode is that the downstream analyzes the log information of the source data database in the data source, identifies the change timestamp information of the source system and the like, and further identifies whether the source data in the data source contains a change identifier or an attribute identifier.
In general, in order to alleviate system pressure and workload of source data, a passive mode senses whether data changes by means of tool analysis of source database logs.
The change identification refers to making identification information when the source data information changes, so that the identification is convenient in the data processing process. The change identifier may include an addition, a modification, or a deletion, and may explicitly identify the change of the data record, whether the change is an addition, a modification, or a deletion with respect to the history record.
Examples:
| Sequence number | Identification mark | Description of the invention |
| 1 | I | New addition of |
| 2 | U | Modification of |
| 3 | D | Deletion of |
The attribute identification refers to a system attribute, a database attribute and a time attribute corresponding to the source data information, the attribute of the information is identified by adding the attribute identifications, and the data is processed according to the system attribute, the database attribute and the time attribute in the data processing process.
S2-1, if the source data comprises a change identifier and/or an attribute identifier, carrying out data organization on the source data and sending the source data to a message middleware, wherein the source data after the organization of the data comprises a system, a table name, a main key, the change identifier and/or the attribute identifier;
for example, the information organization will contain change identification and attribute identification information, while considering the expansibility of the data columns, the information is defined using JSON format, examples are as follows:
the organization of information according to the above method may be accomplished by a program or tool.
In addition, the source data after data organization is sent to the message middleware, in the application, different processes can be carried out according to the characteristics of the middleware, and in the application, using kafka as an example, corresponding Topic information needs to be created before information is sent, the number partiton can be initialized according to throughput, mainly the throughput of the two ends of the Producer and the Consumer, meanwhile, the configuration message can be used for routing of the part according to the hash value of the key, so that the consumption of the messages of the same key can be ensured to be orderly as much as possible, and the accuracy of the data is improved.
And if the source data does not contain the change identifier and/or the attribute identifier, directly sending the source data to the message middleware.
S3, analyzing at least one piece of organized source data obtained from the message middleware, and matching the at least one piece of organized source data into a target data record base according to the primary key information;
S4, judging the processing priority sequence of the data of the target data record base according to the attribute identification of the data in the target data record base, and forming a data group to be processed according to the processing priority sequence of the data of the target data record base;
specifically, as shown in fig. 2, the present step includes:
S401, identifying database attributes of data in an acquired target database record library, determining processing priority of the acquired target database record library data, and taking the data of the target database record library with the optimal processing priority of the database attributes in the data of the target database record library with the same main key as a first data group to be processed, wherein the database attributes comprise database migration, database splitting and/or database archiving;
Identifying the acquired database attribute information of the data in the target database record, and then carrying out data processing according to the priority configuration of the database, such as the following data:
| Main key | Database attributes | Data value |
| 3400001 | A01 | 8 |
| 3400001 | A02 | 9 |
| 3400001 | B01 | 10 |
| 3400001 | B02 | 11 |
| 3400001 | C01 | 7 |
The ABC code value of the database attribute is set in the database priority configuration to determine the priority order, so that the records with the same main key in the records can be seen, the corresponding database attribute C01 is the record with the highest priority, and the record with the highest priority is taken as the effective record to be processed whenever the data is processed. If the priority of the new data is lower than the target data, no processing is performed.
The database attribute represents the attribute of the source database, and for a system with a plurality of databases, such as a main database, a sub-database and an archive database, the attribute of the database is defined according to the actual situation, and the database is processed according to the different attributes of the databases during data processing.
The definition of the attribute can well solve the problem of inconsistent source data in the process of stream data processing by defining the attribute of the database under the conditions of database migration, database splitting and database archiving of the source data. The main library, the sub-library and the filing library can be represented by letters A, B, C, 01,02 and 03 represent migration conditions of the same attribute database, one database is A01 in default conditions, A02 is obtained after migration, B01 is obtained after splitting, and C01 is obtained when filing occurs.
Examples are as follows:
S402, identifying time attributes of the first data set to be processed, determining processing priority of the first data set to be processed, and taking the first data set to be processed with the optimal time attribute processing priority in the first data set to be processed, which is the same in each primary key, as a second data set to be processed, wherein the time attributes comprise time information of source data change after organization;
Specifically, the processing of the data is performed by configuring the time attribute information of the data of the first data group to be processed, for example, the latest data or the earliest data can be set for processing. When the data needing to take the maximum time is set as effective data, after the new data comes, the next data processing is carried out when the time is longer than that of the target library, otherwise, the data processing is not carried out.
The time attribute is time information of the change of the data. The definition of the attribute can solve the problem of data processing time disorder in the case of network, concurrency and the like. The value of the time attribute is recorded to the millisecond level, more precisely recording the timing information of the data change. To facilitate the processing of subsequent data, the time information in millisecond format may also be converted to a time stamp format, such as 2021-08-0115:01:21.260 to a time stamp 1627801281260.
S403, identifying the system attribute of the second data set to be processed, determining the processing priority of the second data set to be processed, and taking the second data set to be processed with the optimal system attribute processing priority in the second data set to be processed, which is the same in each primary key, as a third data set to be processed.
Specifically, the system attribute information is configured to process the data of the second data set to be processed, if the priority number of the corresponding column of the system attribute is smaller, the priority is higher, and when the new data is larger than the target data, the effective data is processed in the next step, otherwise, the subsequent processing is not performed.
It should be noted that the system attribute indicates the priority of the system, and may be defined to be the column attribute of the data record. The definition of the attribute can solve the problem that a plurality of systems correspond to the same target, and the judgment of the source priority can be realized by the numbers 1,2 and 3.
Examples are as follows:
The target end can store the data in a column type storage in the data processing process, and each column is marked with attribute information such as an information number, a source system and a system priority for logic judgment in the stream data processing process.
S5, judging the operation standard of the data set to be processed according to the change identification of the data set to be processed and forming a new target database.
The method comprises the steps of identifying a change identifier of each target data in a third data set to be processed, updating the target data to form updated target data if the target data contains a modified or newly added change identifier, logically deleting the target data to form deleted target data if the target data contains a deleted change identifier, and forming a new target database by the data, updated target data and deleted target data, wherein the data of the third data set to be processed does not have the change identifier.
The processed target database can be sent to downstream applications.
The large data stream data processing method provided by the embodiment of the application can solve the problem of data processing caused by disorder only by adding the change identifier and the attribute identifier in the data acquisition and data processing processes without changing the original technical architecture, and improves the data accuracy and consistency in large data stream processing.
Based on the same inventive concept, the embodiment of the present invention further provides a big data stream data processing device, and fig. 3 is a schematic structural diagram of the big data stream data processing device provided by the embodiment of the present invention, as shown in fig. 3, where the device includes:
An acquisition module 100 for acquiring source data in a plurality of data sources;
The analysis module 200 is configured to analyze whether the source data includes a change identifier or an attribute identifier, if yes, perform data organization on the source data and send the source data to the message middleware, where the source data after the organization after the data organization includes a system, a table name, a primary key, the change identifier and/or the attribute identifier;
The first judging module 300 is configured to parse at least one piece of organized source data obtained from the message middleware, and match at least one piece of organized source data to a target data record base according to the primary key information;
A second judging module 400, configured to judge a processing priority of data in the target data record base according to the attribute identifier of the data in the target data record base, and form a data set to be processed according to the processing priority of the data in the target data record base;
and a third judging module 500, configured to judge an operation standard of the data set to be processed according to the change identifier of the data set to be processed and form a new target database.
Wherein the second judging module 400 includes:
a database data judging unit 401, configured to identify database attributes of data in the acquired target database record library and determine a processing priority of the acquired target database record library data, and take, as a first data set to be processed, data of the target database record library with the same processing priority of the database attributes in the data of the target database record library for each primary key;
a time attribute judging unit 402, configured to identify a time attribute of the first to-be-processed data set and determine a processing priority of the first to-be-processed data set, and use, as a second to-be-processed data set, the first to-be-processed data set with the same time attribute processing priority in the first to-be-processed data set of each primary key, where the time attribute includes time information of a change in source data after organization;
And a system attribute judging unit 403, configured to identify a system attribute of the second to-be-processed data set and determine a processing priority of the second to-be-processed data set, and take the second to-be-processed data set with the same system attribute processing priority in the second to-be-processed data set of each primary key as a third to-be-processed data set.
The foregoing apparatus is used for executing the method provided in the foregoing embodiment, and its implementation principle and technical effects are similar, and are not described herein again.
The modules above may be one or more integrated circuits configured to implement the methods above, such as one or more Application SPECIFIC INTEGRATED Circuits (ASIC), or one or more microprocessors (DIGITAL SINGNAL processor, DSP), or one or more field programmable gate arrays (Field Programmable GATE ARRAY, FPGA), or the like. For another example, when a module above is implemented in the form of a processing element scheduler code, the processing element may be a general-purpose processor, such as a central processing unit (CentralProcessing Unit, CPU) or other processor that may invoke the program code. For another example, the modules may be integrated together and implemented in the form of a system-on-a-chip (SOC).
Fig. 5 is a schematic diagram of an electronic device according to an embodiment of the present application, where the electronic device may be integrated in a terminal device or a chip of the terminal device, and the terminal may be a computing device with a data processing function.
As shown in fig. 5, the electronic device includes a processor 501, a memory 502 and a bus, where the memory 502 stores program instructions executable by the processor 501, and when the electronic device is operated, the processor 501 communicates with the memory 502 through the bus, and the processor 501 executes the program instructions to perform the above-mentioned method embodiments. The specific implementation manner and the technical effect are similar, and are not repeated here.
Optionally, the present invention further provides a storage medium having stored thereon a computer program for executing the above-described method embodiments when the computer program is executed by a processor.
In the several embodiments provided by the present invention, it should be understood that the disclosed apparatus and method may be implemented in other manners. For example, the apparatus embodiments described above are merely illustrative, e.g., the division of the units is merely a logical function division, and there may be additional divisions when actually implemented, e.g., multiple units or components may be combined or integrated into another system, or some features may be omitted or not performed. Alternatively, the coupling or direct coupling or communication connection shown or discussed with each other may be an indirect coupling or communication connection via some interfaces, devices or units, which may be in electrical, mechanical or other form.
The units described as separate units may or may not be physically separate, and units shown as units may or may not be physical units, may be located in one place, or may be distributed on a plurality of network units. Some or all of the units may be selected according to actual needs to achieve the purpose of the solution of this embodiment.
In addition, each functional unit in the embodiments of the present invention may be integrated in one processing unit, or each unit may exist alone physically, or two or more units may be integrated in one unit. The integrated units may be implemented in hardware or in hardware plus software functional units.
The integrated units implemented in the form of software functional units described above may be stored in a computer readable storage medium. The software functional unit is stored in a storage medium, and includes several instructions for causing a computer device (which may be a personal computer, a server, or a network device, etc.) or a processor (english: processor) to perform some of the steps of the methods according to the embodiments of the invention. The storage medium includes various media capable of storing program codes, such as a U disk, a removable hard disk, a Read-Only Memory (ROM), a random access Memory (Random Access Memory, RAM), a magnetic disk or an optical disk.
It is to be understood that the above-described embodiments of the present invention are merely illustrative of or explanation of the principles of the present invention and are in no way limiting of the invention. Accordingly, any modification, equivalent replacement, improvement, etc. made without departing from the spirit and scope of the present invention should be included in the scope of the present invention. Furthermore, the appended claims are intended to cover all such changes and modifications that fall within the scope and boundary of the appended claims, or equivalents of such scope and boundary.