Disclosure of Invention
In view of this, embodiments of the present invention provide a data processing method, an apparatus and a storage medium to solve technical problems in the prior art.
In a first aspect, the present invention provides a data processing method, including the following steps: acquiring real-time service data and synchronizing the real-time service data to a data warehouse; micro-batch task scheduling is carried out based on Airflow, and calculation tasks in an SQL task script are sequentially executed to obtain the data index; the SQL task script is used for acquiring index data from the data warehouse according to business logic and screening the index data to obtain corresponding data indexes, wherein the index data are included in the business data.
In an embodiment, the step of performing micro-batch task scheduling based on Airflow and sequentially executing the calculation tasks in the task script specifically includes the following steps: storing the real-time service data into a distributed file system; sequentially establishing a directed acyclic graph for the calculation tasks in the task script, wherein the directed acyclic graph is used for describing the calculation process of the service data; setting task scheduling parameters; and executing the computing task in the task script according to the directed acyclic graph.
In an embodiment, after the step of executing the computation task in the task script according to the directed acyclic graph, the method further includes the following steps: and when the time of the computing task is greater than a threshold value, carrying out load balancing on the computing task or increasing a system memory.
In one embodiment, the task scheduling parameters include: calculating the time range, the calculation period and the maximum number of the same directed acyclic graphs which simultaneously run on line; the calculation period is 8-15 min.
In an embodiment, in the step of sequentially executing the calculation tasks in the task script to obtain the data index, a calculation process of the calculation task specifically includes: acquiring index data from the data warehouse according to business logic; and screening according to the screening conditions to obtain the corresponding data indexes.
In one embodiment, the method further comprises: and exporting the calculated data to an application terminal, and displaying the data index in real time by the application terminal.
In an embodiment, before the step of acquiring real-time business data and synchronizing the real-time business data to the data warehouse, the method further includes the following steps: establishing a data transmission link between the data repository and a database, the database including the business data.
In a second aspect, the present invention provides a data processing apparatus comprising: the acquisition module is used for acquiring real-time service data and synchronizing the real-time service data to the data warehouse; the calculation module is used for carrying out micro-batch task scheduling based on Airflow and sequentially executing calculation tasks in an SQL task script to obtain the data index; the SQL task script is used for acquiring index data according to business logic and screening to obtain corresponding data indexes, wherein the index data are included in the business data.
In a third aspect, the present invention also provides an electronic device, including: a memory and a processor, the memory and the processor being communicatively connected to each other, the memory having stored therein computer instructions, the processor executing the computer instructions to perform the data processing method of any one of claims 1 to 7.
In a fourth aspect, a computer-readable storage medium stores computer instructions for causing a computer to execute the data processing method described above.
The technical scheme of the invention has the following advantages:
the invention provides a data processing method, a data processing device and a storage medium, wherein micro-batch calculation is periodically performed by adopting a mode of combining Hive-SQL (structured query language) and Airflow, business logic and data indexes are calculated by SQL task scripts, and micro-batch calculation task scheduling is realized by Airflow. On one hand, the development threshold of data is reduced, compared with real-time computing frameworks such as Flink and Spark Streaming, the method only needs basic SQL, the workload is reduced, and the development period is greatly shortened.
Detailed Description
In order to make the objects, technical solutions and advantages of the embodiments of the present invention clearer, the technical solutions in the embodiments of the present invention will be clearly and completely described below with reference to the drawings in the embodiments of the present invention, and it is obvious that the described embodiments are some, but not all, embodiments of the present invention. All other embodiments, which can be derived by a person skilled in the art from the embodiments given herein without making any creative effort, shall fall within the protection scope of the present invention.
As shown in fig. 1, the present invention provides a data processing method based on an Airflow structure, which includes the following steps: S1-S4.
And S1, establishing a data transmission link between the data warehouse and a database, wherein the database comprises business data. Specifically, incremental data subscription and consumption technology is provided by using canal (incremental log analysis based on a database, and incremental data subscription and consumption are provided) based on MySQL database incremental log analysis, and then a data transmission link between a data warehouse and the database incremental log is opened in real time, so that real-time access of service library data is realized, and further no support is provided for upper-layer calculation.
And S2, acquiring real-time service data and synchronizing the real-time service data to the data warehouse (Hive). Specifically, in step S2, the traffic data is acquired through the data transmission link established in step S1. The commonly used data warehouse tool is hive, which is a data warehouse tool based on Hadoop (Hadoop implements a Distributed File System), wherein one component is hdfs (Hadoop Distributed File System), and is used for data extraction, conversion and loading, which is a mechanism capable of storing, querying and analyzing large-scale data stored in Hadoop. The hive data warehouse tool can map the structured data file into a database table, provide SQL query function and convert SQL sentences into MapReduce tasks for execution. Hive has the advantages that the learning cost is low, rapid MapReduce statistics can be realized through similar SQL sentences, MapReduce is simpler, and a special MapReduce application program does not need to be developed.
And S3, carrying out micro-batch task scheduling based on Airflow, and sequentially executing the calculation tasks in the SQL task script to obtain the data index. The micro-batch task scheduling is to perform batch calculation on tasks, and the time interval between each batch is short and is 10 min. The calculation process of the calculation task specifically comprises the following steps: and acquiring index data from the data warehouse according to the service logic. The SQL task script is used for acquiring index data according to business logic and screening the index data to obtain a corresponding data index, wherein the index data is included in the business data. The invention realizes the corresponding index of the business logic calculation through Hive-SQL. On the one hand, the development threshold of data is greatly reduced, and compared with real-time computing frameworks such as Flink and Spark Streaming in the industry, a user only needs basic SQL, so that the workload can be reduced, and the development period is greatly shortened. For example, the obtained data index is the new house collection amount, and the business process logic of the data index is start- > enter house collection contract- > owner confirmation- > contract verification- > contract completion. The first screening condition is that the owner confirms and submits the newly-signed house contract amount after the house contract is submitted with the date of the house contract owner's confirmation in the time screening range; (if no owner confirms the pick-up date of the date); the second screening conditions were: contracting contract condition restriction: non-voiding, non-draft, non-signing, non-luxury home.
As shown in fig. 2, the step S3 specifically includes the following steps:
s301, storing the real-time service data into a distributed file system. Distributed File System (DFS) means that physical storage resources managed by a File System are not necessarily directly connected to a local node, but are connected to a node (which may be simply understood as a computer) through a computer network; or a complete hierarchical file system formed by combining several different logical disk partitions or volume labels.
S302, sequentially establishing a Directed Acyclic Graph (DAG) for the computing tasks in the task script, wherein the directed acyclic graph is used for describing a computing process for obtaining the data indexes.
S303, setting task scheduling parameters; the task scheduling parameters include: calculating the time range, the calculation period and the maximum number of the same directed acyclic graphs which simultaneously run on line; the calculation period is 8-15 min, and in the invention, the calculation period is 10 min.
And S304, executing the calculation task in the task script according to the directed acyclic graph. And when the time of the computing task is greater than a threshold value, carrying out load balancing on the computing task or increasing a system memory. The range of the threshold value is 10-20 min, 15min in the embodiment, and 12min, 16min and 17min in other embodiments.
The Airflow architecture contains the following components: job definition, included in source code control; a rich command line tool or command line interface for testing, running, backfilling, describing and cleaning the components of the Airflow; the Web application program is used for browsing the Airflow definition, the dependency, the progress, the metadata and the log, and the Web server is packaged in the Airflow and is constructed based on a Python framework flash; a metadata repository, typically a MySQL or Postgres database, which Airflow can use to record task job status and other persistent information; a set of work nodes to run task instances of jobs in a distributed manner; the scheduling program triggers a task instance to be operated; and RBAC authority management, the user authority is divided in a high-intensity manner, and each page, each button and each task on the page can be detailed.
And S4, exporting the calculation data to an application terminal based on canal, and displaying the calculation data in real time by the application terminal. In step S4, by limiting the corresponding query conditions to ensure that there is data at each query and that the time period of the data being written is 10 minutes, it is possible to ensure real-time performance of the data.
The invention provides a data processing method, which periodically performs micro-batch calculation by adopting a mode of combining Hive-SQL (structured query language) and Airflow, realizes service logic and calculates data indexes by using an SQL task script, and realizes micro-batch calculation task scheduling by using Airflow. On one hand, the development threshold of data is reduced, compared with real-time computing frameworks such as Flink and Spark Streaming, the method only needs basic SQL, the workload is reduced, and the development period is greatly shortened. On the other hand, the Airflow calculation period meets the requirement of data timeliness, and meanwhile, the method has good advantages in later-stage task maintenance and task stability.
As shown in fig. 3, the present invention also provides a data processing apparatus, including: anacquisition module 11 and acalculation module 12.
The obtainingmodule 11 is configured to obtain real-time service data and synchronize the real-time service data to a data warehouse.
The calculation module 13 performs micro-batch task scheduling based on Airflow, and sequentially executes calculation tasks in the SQL task script to obtain the data index. The SQL task script is used for acquiring index data according to business logic and screening to obtain corresponding data indexes, wherein the index data are included in the business data.
The invention provides a data processing device which periodically performs micro-batch calculation on data by adopting a mode of combining Hive-SQL (structured query language) and Airflow, realizes service logic and calculates data indexes by using an SQL task script, and realizes micro-batch calculation task scheduling by using Airflow.
As shown in FIG. 4, the present invention provides a data processing architecture comprising: adata synchronization layer 210, adata processing layer 220, and adata output layer 230.
Thedata synchronization layer 210 includes a plurality of databases (mySQL) including a tenant O2O database, a house-in database, a house-out database, and a proprietor O2O database.
Thedata processing layer 220 is connected to thedata synchronization layer 210 through canal to obtain real-time data in a data warehouse. The data processing layer is based on an Airflow architecture, which comprises the following components: job definition, included in source code control; a rich command line tool or command line interface for testing, running, backfilling, describing and cleaning the components of the Airflow; the Web application program is used for browsing the Airflow definition, the dependency, the progress, the metadata and the log, and the Web server is packaged in the Airflow and is constructed based on a Python framework flash; a metadata repository, typically a MySQL or Postgres database, which Airflow can use to record task job status and other persistent information; a set of work nodes to run task instances of jobs in a distributed manner; the scheduling program triggers a task instance to be operated; and RBAC authority management, the user authority is divided in a high-intensity manner, and each page, each button and each task on the page can be detailed.
Thedata processing layer 220 comprises a storage unit, a calculation unit and a scheduling unit; and the storage unit stores the real-time service data into a distributed file system. And the computing unit performs data index computation according to the SQL task script. And the scheduling unit calculates and schedules the calculating unit according to the set scheduling parameters. Specifically, the parameter schedule _ interval is set to '/105-23'. to make airflow schedule the relevant task every 10min for the period from 5 o 'clock to 23 o' clock to perform index calculation, thereby ensuring timeliness. The setting of the parameter dagrun _ timeout is to set the timeout time of the task to 15min in order to avoid the backlog of the task. The parameter is max _ active _ runs is 1, the maximum number of the same DAG running online at the same time is set to be 1, and a plurality of tasks are placed to interfere with each other.
Thedata output layer 230 includes a exporting unit for outputting the data index to an application terminal for displaying a BI report, data analysis or user representation. The calculated data index is written into a derivation unit (Clickhouse), and the corresponding query conditions are limited to ensure that the data is countable and written in the latest 10min at each query, wherein the query conditions are limited as follows:
where
if(:executiontime:=”,execution_time=(select max(execution_time)from table),execution_time=(select max(execution_time)from table where execution_time<=:executiontime:and contract_cnt<>0)
)
wherein, the execution time is a variable, the execution _ time is a field of running time of the computing task batch, and the contact _ cnt is the size of the index data.
Referring to fig. 5, fig. 5 is a schematic structural diagram of an electronic device according to an alternative embodiment of the present invention, where the electronic device may include: at least oneprocessor 51, such as a CPU (Central Processing Unit), at least onecommunication interface 53,memory 54, at least onecommunication bus 52. Wherein acommunication bus 52 is used to enable the connection communication between these components. Thecommunication interface 53 may include a Display (Display) and a Keyboard (Keyboard), and theoptional communication interface 53 may also include a standard wired interface and a standard wireless interface. TheMemory 54 may be a high-speed RAM Memory (volatile Random Access Memory) or a non-volatile Memory (non-volatile Memory), such as at least one disk Memory. Thememory 54 may alternatively be at least one memory device located remotely from theprocessor 51. Wherein theprocessor 51 may be in connection with the apparatus described in fig. 4, thememory 54 stores an application program, and theprocessor 51 calls the program code stored in thememory 54 for executing any of the method steps of the embodiments of fig. 1 and 3.
Thecommunication bus 52 may be a Peripheral Component Interconnect (PCI) bus or an Extended Industry Standard Architecture (EISA) bus. Thecommunication bus 52 may be divided into an address bus, a data bus, a control bus, and the like. For ease of illustration, only one thick line is shown, but this does not mean that there is only one bus or one type of bus.
Thememory 54 may include a volatile memory (RAM), such as a random-access memory (RAM); the memory may also include a non-volatile memory (english: non-volatile memory), such as a flash memory (english: flash memory), a hard disk (english: hard disk drive, abbreviated: HDD) or a solid-state drive (english: SSD); thememory 54 may also comprise a combination of the above types of memories.
Theprocessor 51 may be a Central Processing Unit (CPU), a Network Processor (NP), or a combination of a CPU and an NP.
Theprocessor 51 may further include a hardware chip. The hardware chip may be an application-specific integrated circuit (ASIC), a Programmable Logic Device (PLD), or a combination thereof. The PLD may be a Complex Programmable Logic Device (CPLD), a field-programmable gate array (FPGA), a General Array Logic (GAL), or any combination thereof.
Optionally, thememory 54 is also used to store program instructions. Theprocessor 51 may call program instructions to implement the data processing method as shown in the embodiments of fig. 1 and fig. 2 of the present application.
The embodiment of the invention also provides a non-transitory computer storage medium, wherein the computer storage medium stores computer executable instructions, and the computer executable instructions can execute the method for replacing the application software skin in any method embodiment. The storage medium may be a magnetic Disk, an optical Disk, a Read-Only Memory (ROM), a Random Access Memory (RAM), a Flash Memory (Flash Memory), a Hard Disk (Hard Disk Drive, abbreviated as HDD), a Solid State Drive (SSD), or the like; the storage medium may also comprise a combination of memories of the kind described above.
Although the embodiments of the present invention have been described in conjunction with the accompanying drawings, those skilled in the art may make various modifications and variations without departing from the spirit and scope of the invention, and such modifications and variations fall within the scope defined by the appended claims.