



技术领域technical field
本发明涉及数据监控技术领域,具体地,涉及一种基于流式计算引擎的工业数据监控系统和方法、介质。The present invention relates to the technical field of data monitoring, in particular to an industrial data monitoring system, method and medium based on a flow computing engine.
背景技术Background technique
工业互联网是新一代信息通信技术与工业经济深度融合的全新工业生态、关键基础设施和新型应用模式,它以网络为基础、平台为中枢、数据为要素、安全为保障,通过对数据的采集挖掘、对技术的应用落地、对流程的智能改造以提升工业生产的效率。The Industrial Internet is a brand-new industrial ecology, key infrastructure and new application model that is deeply integrated with the new generation of information and communication technology and the industrial economy. It is based on the network, the platform is the center, the data is the element, and the security is the guarantee. , The application of technology, the intelligent transformation of the process to improve the efficiency of industrial production.
随着工业生产的信息化以及智能化改造的推进,生产过程中产生的数据量呈指数级攀升。这些复杂数据分布于传感器设备、物联网设备、电子商务、企业通讯工具等等,具有来源复杂、非结构化、分布不均等特点,对于数据价值的挖掘以及对非结构化数据的处理分析提出了挑战。With the informatization of industrial production and the advancement of intelligent transformation, the amount of data generated in the production process is increasing exponentially. These complex data are distributed in sensor devices, Internet of Things devices, e-commerce, enterprise communication tools, etc., and have the characteristics of complex sources, unstructured, and uneven distribution. For the mining of data value and the processing and analysis of unstructured data, proposed challenge.
数据源多样化会对数据集成系统的连接扩展能力提出较高的要求,而分布式大数据存储系统再在给业务带来更强能力的同时也释放了业务的压力,进而促使了数据量的加速膨胀。数据量级上的快速增长对数据集成平台的吞吐和实时性都需要更高的要求。当然作为数据相关的基础系统,数据准确性则是基础的要求。The diversification of data sources will put forward higher requirements on the connection and expansion capabilities of the data integration system, and the distributed big data storage system will not only bring stronger capabilities to the business, but also release the pressure on the business, thereby promoting the increase in the amount of data. Accelerate expansion. The rapid growth of data volume requires higher requirements for the throughput and real-time performance of the data integration platform. Of course, as a data-related basic system, data accuracy is the basic requirement.
此外,系统需要理解并整合散落在不同业务团队的数据,做好管理并确保数据访问的安全,整个流程是相对复杂的。虽然平台化能够将复杂的流程自动化起来,但数据集成工作所固有的高成本并不能完全以平台化的方式消除。因此尽最大的可能提升流程的可复用性和可控性也是数据集成系统需要持续应对的挑战。In addition, the system needs to understand and integrate data scattered in different business teams, manage it well and ensure the security of data access. The whole process is relatively complicated. Although platformization can automate complex processes, the inherent high cost of data integration work cannot be completely eliminated in a platformized manner. Therefore, improving the reusability and controllability of the process as much as possible is also a challenge that the data integration system needs to continuously address.
同时,流式数据现在面临需要查询持久化数据库,对底层数据库会产生压力的问题仍然未被解决。At the same time, streaming data is now faced with the need to query the persistent database, and the problem of putting pressure on the underlying database has not yet been resolved.
经过检索,专利文献CN109637090A公开了一种基于SOA架构的灾害监测预警平台建设方法,解决了目前的地质灾害监测预警大多是通过人工巡查监测以及监测设备阈值报警进行预警,但由于诱发地质灾害的因素的多样性和不确定性,并且通常穿插交融着复杂的地理环境因素,对数据的管理与评价变得异常困难的缺陷。该项发明充分利用SOA架构的低耦合特点将AI预警服务更好的与监测系统结合,同时利用机器学习算法处理复杂数据和精确分类的优势,在风险源的危险性评价及监测对象评价的基础上,实时监测各个监测对象的数据,预测危险区内一定时段可能发生的一系列不同强度地质灾害的等级,针对不同风险区的特点提出减少风险的各项对策,为地质灾害监测预警提供辅助决策。但是该现有技术仍然无法对复杂事件进行处理,对于流式数据中符合某种特征的模式进而触发对应的后续动作无法做到实时告警。After searching, the patent document CN109637090A discloses a disaster monitoring and early warning platform construction method based on SOA architecture, which solves the problem that most of the current geological disaster monitoring and early warning is carried out through manual inspection and threshold alarm of monitoring equipment, but due to the factors that induce geological disasters The diversity and uncertainty of the data are usually interspersed with complex geographical environment factors, making the management and evaluation of data extremely difficult. This invention makes full use of the low coupling characteristics of the SOA architecture to better combine the AI early warning service with the monitoring system. At the same time, it uses the advantages of machine learning algorithms to process complex data and accurate classification. First, monitor the data of each monitoring object in real time, predict the level of a series of geological disasters of different intensities that may occur in a certain period of time in the dangerous area, and propose various risk reduction countermeasures according to the characteristics of different risk areas, and provide auxiliary decision-making for geological disaster monitoring and early warning . However, this existing technology is still unable to process complex events, and it is impossible to achieve real-time alarms for patterns in streaming data that meet certain characteristics and then trigger corresponding follow-up actions.
专利文献CN111475682A公开了一种基于超大规模数据系统的智能运维平台,采用开放可扩展的数据采集底层架构,对接多种类型数据源,降低大数据采集门槛。对系统内所有类型数据的全视角查看,并追踪各个数据节点的整个生命周期,共享展现各个数据节点间的链路关系。如通过系统节点日志监控,实现用户、数据、作业任务、API、服务等多个维度的链路监控梳理数据血缘关系,保证细颗粒度数据和操作的可追溯,完成数据的全生命周期管理。该现有技术虽然能进行高亮显示对据及原始数据进行对比验证,确保数据的完整性和可审计性,但是还是不能解决对数据价值的挖掘以及非结构化数据的处理分析。Patent document CN111475682A discloses an intelligent operation and maintenance platform based on a super-large-scale data system, adopts an open and scalable data acquisition underlying architecture, connects to various types of data sources, and lowers the threshold for big data acquisition. View all types of data in the system from a full perspective, track the entire life cycle of each data node, and share and display the link relationship between each data node. For example, through system node log monitoring, multi-dimensional link monitoring such as users, data, job tasks, APIs, and services can be implemented to sort out the blood relationship of data, ensure the traceability of fine-grained data and operations, and complete the full lifecycle management of data. Although this prior art can perform comparison and verification of highlighted data and original data to ensure data integrity and auditability, it still cannot solve the mining of data value and the processing and analysis of unstructured data.
因此,亟需研发设计一种能够解决流式计算业务在工业智慧化改造落地时遇见的问题的系统和方法。Therefore, there is an urgent need to develop and design a system and method that can solve the problems encountered when the flow computing business is implemented in the transformation of industrial intelligence.
发明内容Contents of the invention
针对现有技术中的缺陷,本发明的目的是提供一种基于流式计算引擎的工业数据监控系统和方法、介质,使流式数据,无需查询持久化数据库,对底层数据库不会产生任何压力。Aiming at the defects in the prior art, the object of the present invention is to provide an industrial data monitoring system, method and medium based on a streaming computing engine, so that the streaming data does not need to query the persistent database, and will not generate any pressure on the underlying database .
根据本发明提供的一种基于流式计算引擎的工业数据监控系统,包括:A kind of industrial data monitoring system based on flow computing engine provided by the present invention comprises:
状态监控模块:采用流式计算引擎原生支持的Prometheus进行实时指标采集和存储,通过meta节点定时抓取所有子节点指标进行汇总,统一数据源提供给Grafana进行可视化并通过dashboard进行告警配置;Status monitoring module: Prometheus, which is natively supported by the streaming computing engine, is used to collect and store real-time indicators, and the indicators of all sub-nodes are regularly captured through the meta node for summary, and the unified data source is provided to Grafana for visualization and alarm configuration through the dashboard;
资源调配模块:对实时任务进行分析提供足够的内存给作业,同时对任务消息进行实时处理,结合实时任务内存分析所得相关指标、实时任务并发度的合理性,得出实时任务资源预设值,调整实时任务资源,达到实时任务资源配置;Resource allocation module: analyze the real-time tasks to provide enough memory for the job, and at the same time process the task messages in real time, combine the relevant indicators obtained from the real-time task memory analysis and the rationality of the real-time task concurrency, and obtain the real-time task resource preset value, Adjust real-time task resources to achieve real-time task resource allocation;
异常状态模块:利用SideOutput来收集流式作业各环节中出错的数据,汇总到统一的错误流,通过在流式数据中发现符合设定特征的模式进而触发对应的后续动作进行实时告警。Abnormal status module: Use SideOutput to collect error data in each link of streaming operations, summarize them into a unified error stream, and trigger corresponding follow-up actions to give real-time alarms by finding patterns in the streaming data that meet the set characteristics.
优选地,状态监控模块传入每个数据源,元数据服务会生成一组定时的心跳信号,由作业进行消费,心跳信号流入每个作业后,随数据流一起经过每个节点,在每个节点上打上当前节点的标签,然后跳过该节点的处理逻辑流向下个节点,流经作业组件末端时会通过JMX被采集至展示页面。Preferably, the status monitoring module is passed to each data source, and the metadata service will generate a set of timed heartbeat signals, which are consumed by the job. After the heartbeat signal flows into each job, it passes through each node along with the data flow. Label the current node on the node, and then skip the processing logic of this node and flow to the next node. When it flows through the end of the job component, it will be collected to the display page through JMX.
优选地,状态监控模块通过心跳信号产生的时间、流入作业的时间以及到达每个节点的时间能判断一条数据被整个管道处理所用的时间和每个节点处理数据所用的时间,进而判断作业的性能瓶颈。Preferably, the status monitoring module can judge the time it takes for a piece of data to be processed by the entire pipeline and the time it takes for each node to process the data through the time when the heartbeat signal is generated, the time when it flows into the job, and the time when it reaches each node, and then judges the performance of the job bottleneck.
优选地,状态监控模块通过JMX获取流式计算引擎作业中消费的偏移量反查消息队列中持久化的消息主题的挤压情况,同时流式计算通过引擎提供的JMX采集作业节点的背压情况综合分析推断作业是否出现数据堆积的现象。Preferably, the state monitoring module obtains the offset consumed in the streaming computing engine job through JMX to check the squeeze situation of the persistent message topic in the message queue, and the streaming computing collects the back pressure of the job node through the JMX provided by the engine Comprehensive analysis of the situation infers whether there is data accumulation in the job.
优选地,状态监控模块通过调整log4j日志框架默认机制,按天切分任务日志,定期清理过期日志,避免异常任务频繁写满磁盘导致计算节点不可用的情况,同时在所有计算节点部署agent实时采集日志,抽集到消息队列中再汇聚到非关系型数据库。Preferably, the status monitoring module adjusts the default mechanism of the log4j log framework, divides task logs by day, regularly cleans expired logs, and avoids the situation where abnormal tasks frequently fill up disks and make computing nodes unavailable. At the same time, agents are deployed on all computing nodes to collect data in real time Logs are collected into the message queue and then aggregated into the non-relational database.
优选地,资源调配模块根据配置间隔定时扫描所有正在运行的流式计算任务,结合实时任务GC日志,同时根据内存优化规则计算流式计算任务推荐的堆内存大小,并与实际分配的流式计算任务的堆内存进行比较,如果两者相差的倍数过大时,认为流式计算任务的内存配置存在浪费的情况,则产生报警提示到任务调度进行优化。Preferably, the resource allocation module regularly scans all running streaming computing tasks according to the configuration interval, combines the real-time task GC logs, and calculates the recommended heap memory size of the streaming computing tasks according to the memory optimization rules, and compares it with the actual allocated streaming computing tasks Compare the heap memory of the task. If the multiple of the difference between the two is too large, it is considered that the memory configuration of the streaming computing task is wasted, and an alarm will be generated to prompt the task scheduler for optimization.
优选地,资源调配模块通过判断实时任务消费的数据源单位时间的输入和实时任务各个Operator/Task消息处理能力是否匹配;Preferably, the resource allocation module judges whether the input per unit time of the data source consumed by the real-time task matches the message processing capability of each Operator/Task of the real-time task;
通过流式计算引擎提供的原生K8s支持,作业能够主动向调度器申请资源。Through the native K8s support provided by the streaming computing engine, jobs can actively apply for resources from the scheduler.
优选地,异常状态模块中的错误流记录中包含系统预设的错误码、原始输入数据以及错误类和错误信息,错误流信息被分类写入分布式文件存储单元,用户通过监控存储目录得知数据是否正常。Preferably, the error stream record in the abnormal state module contains the system preset error code, original input data, error class and error information, and the error stream information is classified and written into the distributed file storage unit, and the user learns by monitoring the storage directory Whether the data is normal.
根据本发明提供的一种基于流式计算引擎的工业数据监控方法,包括如下步骤:A kind of industrial data monitoring method based on flow computing engine provided by the present invention comprises the following steps:
状态监控步骤:采用流式计算引擎原生支持的Prometheus进行实时指标采集和存储,通过meta节点定时抓取所有子节点指标进行汇总,统一数据源提供给Grafana进行可视化并通过dashboard进行告警配置;Status monitoring steps: Use Prometheus natively supported by the streaming computing engine to collect and store real-time indicators, regularly capture all sub-node indicators through the meta node for summary, and provide unified data sources to Grafana for visualization and alarm configuration through the dashboard;
资源调配步骤:对实时任务进行分析提供足够的内存给作业,同时对任务消息进行实时处理,结合实时任务内存分析所得相关指标、实时任务并发度的合理性,得出实时任务资源预设值,调整实时任务资源,达到实时任务资源配置;Resource allocation step: analyze the real-time tasks to provide enough memory for the job, and at the same time process the task messages in real time, combine the relevant indicators obtained from the real-time task memory analysis and the rationality of the real-time task concurrency, and obtain the real-time task resource preset value, Adjust real-time task resources to achieve real-time task resource allocation;
异常状态步骤:利用SideOutput来收集流式作业各环节中出错的数据,汇总到统一的错误流,通过在流式数据中发现符合设定特征的模式进而触发对应的后续动作进行实时告警。Abnormal state step: Use SideOutput to collect error data in each link of the stream job, summarize it into a unified error stream, and trigger the corresponding follow-up action to give a real-time alarm by finding a pattern that meets the set characteristics in the stream data.
根据本发明提供的一种存储有计算机程序的计算机可读存储介质,计算机程序被处理器执行时实现上述的方法的步骤。According to a computer-readable storage medium storing a computer program provided by the present invention, the computer program implements the steps of the above method when executed by a processor.
与现有技术相比,本发明具有如下的有益效果:Compared with the prior art, the present invention has the following beneficial effects:
1、本发明通过将复杂事件处理与流式计算引擎结合,能增强告警的实时性和适用范围。1. The present invention can enhance the real-time performance and scope of application of alarms by combining complex event processing with a streaming computing engine.
2、本发明利用实时告警CEP(全称为Complex Event Processing复杂事件处理)能力通过在流式数据中发现符合某种特征的模式进而触发对应的后续动作,其既支持基于单条事件的简单无状态的模式匹配(例如基于事件中的某个字段进行筛选过滤),也可以支持基于关联/聚合/时间窗口等跨事件的复杂有状态模式匹配(例如计算滑动时间窗口移动均值)。2. The present invention utilizes the ability of real-time alarm CEP (full name is Complex Event Processing) to trigger corresponding follow-up actions by finding a pattern in streaming data that meets certain characteristics, which supports simple and stateless processing based on a single event. Pattern matching (such as filtering based on a certain field in the event) can also support complex stateful pattern matching based on association/aggregation/time window and other cross-events (such as calculating the moving average of the sliding time window).
3、本发明直接作用于流式数据,无需查询持久化数据库,对底层数据库不会产生任何压力。3. The present invention directly acts on streaming data, without querying the persistent database, and will not generate any pressure on the underlying database.
4、本发明通过自主学习的CEP引擎适应不断变化的规则条件,以期解决流式计算业务在工业智慧化改造落地时遇见的问题。4. The invention adopts the self-learning CEP engine to adapt to changing rules and conditions, in order to solve the problems encountered when the flow computing business is implemented in the transformation of industrial intelligence.
附图说明Description of drawings
通过阅读参照以下附图对非限制性实施例所作的详细描述,本发明的其它特征、目的和优点将会变得更明显:Other characteristics, objects and advantages of the present invention will become more apparent by reading the detailed description of non-limiting embodiments made with reference to the following drawings:
图1为本发明中基于流式计算引擎的工业数据监控方法的步骤流程图;Fig. 1 is the flow chart of the steps of the industrial data monitoring method based on the streaming computing engine in the present invention;
图2为本发明中基于流式计算引擎的工业数据监控系统的任务调度流程图;Fig. 2 is the task scheduling flow chart of the industrial data monitoring system based on the streaming computing engine in the present invention;
图3为本发明中基于流式计算引擎的数据处理系统的整体框架流程图;Fig. 3 is the flow diagram of the overall framework of the data processing system based on the streaming computing engine in the present invention;
图4为本发明中基于流式计算引擎的数据处理方法的步骤流程图。FIG. 4 is a flow chart of the steps of the data processing method based on the streaming computing engine in the present invention.
具体实施方式Detailed ways
下面结合具体实施例对本发明进行详细说明。以下实施例将有助于本领域的技术人员进一步理解本发明,但不以任何形式限制本发明。应当指出的是,对本领域的普通技术人员来说,在不脱离本发明构思的前提下,还可以做出若干变化和改进。这些都属于本发明的保护范围。The present invention will be described in detail below in conjunction with specific embodiments. The following examples will help those skilled in the art to further understand the present invention, but do not limit the present invention in any form. It should be noted that those skilled in the art can make several changes and improvements without departing from the concept of the present invention. These all belong to the protection scope of the present invention.
如图2所示,本发明提供了一种基于流式计算引擎的工业数据监控系统,包括:As shown in Fig. 2, the present invention provides a kind of industrial data monitoring system based on streaming computing engine, comprising:
状态监控模块:采用流式计算引擎原生支持的Prometheus进行实时指标采集和存储,通过meta节点定时抓取所有子节点指标进行汇总,统一数据源提供给Grafana进行可视化并通过dashboard进行告警配置。具体地:Status monitoring module: Prometheus, which is natively supported by the streaming computing engine, is used to collect and store real-time indicators. Meta nodes are used to regularly capture all sub-node indicators for aggregation. The unified data source is provided to Grafana for visualization and alarm configuration through the dashboard. specifically:
为流式计算引擎设计的前端界面提供了大量的运行时信息供用户了解任务当前运行状况,但是存在无法获取历史状态监测的问题导致用户无法了解任务历史运行状态,本发明采用计算引擎原生支持的Prometheus进行实时指标采集和存储。通过meta节点定时抓取所有子节点指标进行汇总,方便统一数据源提供给Grafana进行可视化并通过dashboard进行告警配置。The front-end interface designed for the streaming computing engine provides a large amount of runtime information for users to understand the current running status of the task, but there is a problem that the historical status monitoring cannot be obtained, so that the user cannot understand the historical running status of the task. This invention adopts the native support of the computing engine Prometheus collects and stores real-time indicators. Through the meta node, all sub-node indicators are regularly captured and summarized, so that the unified data source can be provided to Grafana for visualization and alarm configuration through the dashboard.
在流式计算作业过程中,由于数据源源不断地流入清洗处理,很难去监控作业的运行情况。即使开启了检查点也无法确定是否丢失数据或丢失了多少数据。因此,本发明设计了心跳的机制,为每个作业注入了心跳信息以监控其运行情况。During the streaming computing job, it is difficult to monitor the running status of the job due to the continuous influx of data for cleaning and processing. Even with checkpointing turned on, it is impossible to determine whether or how much data is lost. Therefore, the present invention designs a heartbeat mechanism, and injects heartbeat information into each job to monitor its running conditions.
伴随着每一个数据源的传入,元数据服务会生成一组定时的心跳信号,由作业进行消费。心跳信息流入每个作业后,会随数据流一起经过每个节点,在每个节点上打上当前节点的标签,然后跳过该节点的处理逻辑流向下个节点。流经作业组件末端时,会通过JMX被采集至展示页面。With the incoming of each data source, the metadata service will generate a set of regular heartbeat signals, which will be consumed by the job. After the heartbeat information flows into each job, it will pass through each node along with the data flow, label the current node on each node, and then skip the processing logic of this node and flow to the next node. When flowing through the end of the job component, it will be collected to the display page through JMX.
该指标包含了心跳信号产生的时间,流入作业的时间以及到达每个节点的时间。通过这个指标可以判断一条数据被整个管道处理所用的时间和每个节点处理数据所用的时间,进而判断该作业的性能瓶颈。This indicator includes the time when the heartbeat signal is generated, the time when the job flows in, and the time to reach each node. Through this indicator, you can judge the time it takes for a piece of data to be processed by the entire pipeline and the time it takes for each node to process the data, and then determine the performance bottleneck of the job.
由于心跳信号是定时发送的,因此每个作业收到的心跳信息个数应该一致。若最后发出的指标个数与期望不一致,则可以进一步判断是否有数据丢失。Since heartbeat signals are sent regularly, the number of heartbeat messages received by each job should be consistent. If the number of indicators finally sent is inconsistent with the expectation, it can be further judged whether there is data loss.
吞吐能力和延迟作为衡量实时任务性能最重要的指标,经常需要通过这两个指标来调整任务并发度和资源配置。如果启用计算引擎原生的latency参数追踪任务延迟跟踪会显著影响集群和任务性能,本发明采用消息主题消费堆积作为衡量任务延迟的指标,通过JMX获取引擎作业中消费的偏移量反查消息队列中持久化的消息主题的挤压情况,同时通过引擎提供的JMX采集作业节点的背压情况综合分析推断作业是否出现数据堆积的现象。Throughput and latency are the most important indicators to measure the performance of real-time tasks, and it is often necessary to adjust task concurrency and resource allocation through these two indicators. If the native latency parameter tracking of the computing engine is enabled, task delay tracking will significantly affect cluster and task performance. This invention uses message topic consumption accumulation as an indicator to measure task delay, and obtains the offset consumed in the engine job through JMX to check the message queue. The squeeze situation of the persistent message topic, and at the same time, through the comprehensive analysis of the backpressure situation of the JMX collection job node provided by the engine, it is inferred whether there is data accumulation in the job.
流式计算引擎为了适应大规模数据处理的需要通常提供了分布式计算的能力,向平台提交的所有任务会由调度器统一调度到任意计算节点,因此任务的运行日志会分布在不同的机器,用户定位日志困难。本发明通过调整log4j日志框架默认机制,按天切分任务日志,定期清理过期日志,避免异常任务频繁写满磁盘导致计算节点不可用的情况,同时在所有计算节点部署agent实时采集日志,抽集到消息队列中再汇聚到Elasticsearch等非关系型数据库,方便用户在需要通过日志定位判断异常信息时快速找到作业对应日志。In order to meet the needs of large-scale data processing, the streaming computing engine usually provides distributed computing capabilities. All tasks submitted to the platform will be uniformly scheduled by the scheduler to any computing node, so the running logs of the tasks will be distributed on different machines. It is difficult for users to locate logs. The present invention adjusts the default mechanism of the log4j log framework, divides the task log by day, regularly clears the expired log, avoids the situation that abnormal tasks frequently fill up the disk and causes the computing node to be unavailable, and at the same time deploys an agent on all computing nodes to collect logs in real time and collect them. Into the message queue and then aggregated to non-relational databases such as Elasticsearch, it is convenient for users to quickly find the corresponding log of the job when they need to judge abnormal information through log location.
资源调配模块:对实时任务进行分析提供足够的内存给作业,同时对任务消息进行实时处理,结合实时任务内存分析所得相关指标、实时任务并发度的合理性,得出实时任务资源预设值,调整实时任务资源,达到实时任务资源配置。具体地:Resource allocation module: analyze the real-time tasks to provide enough memory for the job, and at the same time process the task messages in real time, combine the relevant indicators obtained from the real-time task memory analysis and the rationality of the real-time task concurrency, and obtain the real-time task resource preset value, Adjust real-time task resources to achieve real-time task resource configuration. specifically:
对于实时任务资源分析思路,主要包含两点:For the analysis of real-time task resources, it mainly includes two points:
一方面是从作业占用的内存入手,从运行时堆内存方面对实时任务进行分析,提供足够的内存保证作业稳定执行。On the one hand, start with the memory occupied by the job, analyze the real-time tasks from the runtime heap memory, and provide enough memory to ensure the stable execution of the job.
另一方面则是从实时任务消息处理能力入手,保证满足数据处理需求的同时,尽可能合理使用CPU资源。On the other hand, it starts with the real-time task message processing capability to ensure that the CPU resources are used as reasonably as possible while meeting the data processing requirements.
之后再结合实时任务内存分析所得相关指标、实时任务并发度的合理性,得出一个实时任务资源预设值,调整实时任务资源,最终达到实时任务资源配置合理化的目的,从而更好的降低机器使用成本。Then, combined with the relevant indicators obtained from real-time task memory analysis and the rationality of real-time task concurrency, a preset value of real-time task resources is obtained, and real-time task resources are adjusted to finally achieve the purpose of rationalizing real-time task resource allocation, thereby better reducing machine workloads. The cost.
在对任务内存的管控方面,系统会根据配置间隔定时扫描所有正在运行的流式计算任务,结合实时任务GC日志,同时根据内存优化规则,计算出流式计算任务推荐的堆内存大小,并与实际分配的流式计算任务的堆内存进行比较,如果两者相差的倍数过大时,可以认为流式计算任务的内存配置存在浪费的情况,接下来会产生报警提示到任务调度进行优化。In terms of task memory management and control, the system will periodically scan all running streaming computing tasks according to the configuration interval, combine the real-time task GC logs, and calculate the recommended heap memory size for streaming computing tasks according to memory optimization rules, and communicate with Compare the actual allocated heap memory of the streaming computing task. If the difference between the two is too large, it can be considered that the memory configuration of the streaming computing task is wasted, and then an alarm will be generated to prompt the task scheduler for optimization.
在对于实时作业任务消息处理能力分析方面,主要是通过判断实时任务消费的数据源单位时间的输入和实时任务各个Operator/Task消息处理能力是否匹配。通过监控数据源的流量结合作业中Operator的吞吐情况,判断作业是否会出现数据倾斜,流量背压的情况,通过作业的优化规则提醒调整算子并行度,优化作业资源分配。In terms of analyzing the message processing capability of real-time job tasks, it is mainly by judging whether the input per unit time of the data source consumed by the real-time task matches the message processing capability of each Operator/Task of the real-time task. By monitoring the traffic of the data source combined with the throughput of the Operator in the job, it is judged whether there will be data skew and traffic back pressure in the job, and the optimization rules of the job are used to remind and adjust the parallelism of the operator to optimize the resource allocation of the job.
通过流式计算引擎提供的原生K8s支持,作业能够主动向调度器申请资源从而达成作业资源的弹性扩容从而应对突发的数据尖峰。基于此构建实现了任务资源的优化全部自动化,会结合实时任务历史不同时段的资源使用情况,自动化推测和调整实时任务的资源配置,从而达到提升整个实时集群资源利用率的目的。Through the native K8s support provided by the streaming computing engine, jobs can actively apply for resources from the scheduler to achieve elastic expansion of job resources to cope with sudden data spikes. Based on this construction, the optimization of task resources is fully automated, and the resource usage of real-time task history in different periods will be combined to automatically predict and adjust the resource allocation of real-time tasks, so as to achieve the purpose of improving the resource utilization of the entire real-time cluster.
异常状态模块:利用SideOutput来收集流式作业各环节中出错的数据,汇总到统一的错误流,通过在流式数据中发现符合设定特征的模式进而触发对应的后续动作进行实时告警。具体地:Abnormal status module: Use SideOutput to collect error data in each link of streaming operations, summarize them into a unified error stream, and trigger corresponding follow-up actions to give real-time alarms by finding patterns in the streaming data that meet the set characteristics. specifically:
错误记录中包含系统预设的错误码、原始输入数据以及错误类和错误信息。一般情况下,错误数据会被分类写入分布式文件存储系统,用户通过监控存储目录可以得知数据是否正常。The error record contains the system preset error code, original input data, error class and error information. In general, error data will be categorized and written into the distributed file storage system, and users can know whether the data is normal by monitoring the storage directory.
恢复数据时通常有三种情况。There are usually three situations when restoring data.
1、数据格式异常,比如日志被截断导致不完整或者时间戳不符合约定格式,这种情况下系统中提供通过精确离线批作业来修复数据,跳过处理不正常的数据事件或者对数据重新进行修正后重新打入作业流程,将数据回填到原有的数据管道。1. The data format is abnormal. For example, the log is truncated, resulting in incompleteness or the timestamp does not conform to the agreed format. In this case, the system provides accurate offline batch jobs to repair the data, skip processing abnormal data events or redo the data. After correction, re-enter the operation process and backfill the data to the original data pipeline.
2、作业管道异常,比如数据实际的schema有变更但流表配置没有更新,可能会导致某个字段都是空值,但整体作业并没有出现异常。通过指标输出报警进行提示后,提醒用户进行补数据处理操作。2. The job pipeline is abnormal. For example, the actual schema of the data has changed but the configuration of the flow table has not been updated. This may cause a certain field to be empty, but the overall job is not abnormal. After being prompted by the indicator output alarm, the user is reminded to perform supplementary data processing operations.
3、数据链路异常,比如数据源主从集群发生异常情况进行了切换,但消费配置没有进行改变,会导致下游数据应用出现超时等情况。通常情况下依托系统提供的重试机制和异常恢复机制,能够及时的切换到健康的数据源,如果作业失败会进行报警告知。3. The data link is abnormal. For example, the master-slave cluster of the data source is abnormally switched, but the consumption configuration is not changed, which will cause the downstream data application to time out. Usually relying on the retry mechanism and exception recovery mechanism provided by the system, it can switch to a healthy data source in a timely manner, and an alarm will be given if the job fails.
补数据作业时,首先更新线上的流表配置为最新,切换至健康的数据源,保证不再产生更多异常数据,这时存储里仍有部分分区是异常的。所以发布独立的补数作业来专门修复异常的数据,输出的数据会写到一个临时的目录,并在metastore上切换partition分区的location来替换掉原来的异常目录。因此这样的补数流程对离线查询的用户来说是透明的。最后再在合适的时间替换掉异常分区的数据并恢复location。When supplementing data, first update the online flow table configuration to the latest and switch to a healthy data source to ensure that no more abnormal data will be generated. At this time, some partitions in the storage are still abnormal. Therefore, an independent complement job is released to specifically repair abnormal data. The output data will be written to a temporary directory, and the location of the partition partition will be switched on the metastore to replace the original abnormal directory. Therefore, such a complement process is transparent to users who query offline. Finally, replace the data of the abnormal partition and restore the location at the appropriate time.
如图1所示根据本发明提供的一种基于流式计算引擎的工业数据监控方法,包括如下步骤:As shown in Figure 1, according to a kind of industrial data monitoring method based on flow calculation engine provided by the present invention, comprise the following steps:
状态监控步骤:采用流式计算引擎原生支持的Prometheus进行实时指标采集和存储,通过meta节点定时抓取所有子节点指标进行汇总,统一数据源提供给Grafana进行可视化并通过dashboard进行告警配置;Status monitoring steps: Use Prometheus natively supported by the streaming computing engine to collect and store real-time indicators, regularly capture all sub-node indicators through the meta node for summary, and provide unified data sources to Grafana for visualization and alarm configuration through the dashboard;
资源调配步骤:对实时任务进行分析提供足够的内存给作业,同时对任务消息进行实时处理,结合实时任务内存分析所得相关指标、实时任务并发度的合理性,得出实时任务资源预设值,调整实时任务资源,达到实时任务资源配置;Resource allocation step: analyze the real-time tasks to provide enough memory for the job, and at the same time process the task messages in real time, combine the relevant indicators obtained from the real-time task memory analysis and the rationality of the real-time task concurrency, and obtain the real-time task resource preset value, Adjust real-time task resources to achieve real-time task resource allocation;
异常状态步骤:利用SideOutput来收集流式作业各环节中出错的数据,汇总到统一的错误流,通过在流式数据中发现符合设定特征的模式进而触发对应的后续动作进行实时告警。Abnormal state step: Use SideOutput to collect error data in each link of the stream job, summarize it into a unified error stream, and trigger the corresponding follow-up action to give a real-time alarm by finding a pattern that meets the set characteristics in the stream data.
根据本发明提供的一种存储有计算机程序的计算机可读存储介质,计算机程序被处理器执行时实现上述的方法的步骤。According to a computer-readable storage medium storing a computer program provided by the present invention, the computer program implements the steps of the above method when executed by a processor.
本发明利用流式引擎进行查询优化:The present invention utilizes the streaming engine to perform query optimization:
维表关联的场景下因为维表经常发生变化,尤其是新增维度,而关联操作发生在维度新增之前,经常导致关联不上。针对此种场景系统中开发了针对不同需求的异常处理策略,如果在关联时发生异常,则暂时将数据缓存起来之后再进行尝试,并且可以控制尝试次数,能够自定义延迟关联的规则。In the scenario of dimension table association, because the dimension table often changes, especially when new dimensions are added, and the association operation occurs before the dimension is added, the association often fails. For this kind of scenario, the system has developed exception handling strategies for different requirements. If an exception occurs during association, the data will be temporarily cached and then tried again. The number of attempts can be controlled, and the rules for delayed association can be customized.
同时在SQL支持中添加了一个可以支持延迟关联维表的算子。当关联没有命中时,本地缓存不会缓存数据集为空结果,同时将数据暂时保存在一个状态后端中,之后根据设置定时器以及它的重试次数进行重试。At the same time, an operator that can support delayed association dimension tables is added to the SQL support. When the association is not hit, the local cache will not cache the empty result of the data set, and temporarily save the data in a state backend, and then retry according to the set timer and its number of retries.
通过作业结构拓扑分析,流式引擎作业时计算算子和关联计算算子是连接在一起的。因为它没有唯一键分流的语义。当作业并行度比较大,每一个维表关联的子任务访问的是所有的缓存空间,这样对缓存来说有很大的压力。Through the topology analysis of the job structure, the calculation operator and the associated calculation operator are connected together when the streaming engine is working. Because it does not have the semantics of unique key shunting. When the job parallelism is relatively large, the subtasks associated with each dimension table access all the cache space, which puts a lot of pressure on the cache.
但观察管理操作的SQL实现,等值连接是天然具有哈希属性的。在包装算子时直接开放了配置,用户可以把维表关联的key作为哈希的条件,将数据进行分区。这样就能保证下游每一个算子的子任务之间的访问空间是独立的,这样可以大大的提升作业开始时的缓存命中率。But observing the SQL implementation of management operations, the equivalent connection is naturally hashed. When packaging the operator, the configuration is directly opened, and the user can use the key associated with the dimension table as a hash condition to partition the data. In this way, the access space between the subtasks of each downstream operator is guaranteed to be independent, which can greatly improve the cache hit rate at the beginning of the job.
本发明利用流式引擎进行索引优化:The present invention utilizes the streaming engine for index optimization:
数据库为了加速数据检索,往往会事先为数据创建索引,再在扫描数据之前通过索引定位到数据的起始位置,从而加速数据检索。而传统数据库常见的是行索引,通过一个或若干字段创建索引,索引结果以树形结构存储,此类索引能够精确到行级别,索引效率最高。In order to speed up data retrieval, the database often creates an index for the data in advance, and then locates the starting position of the data through the index before scanning the data, thereby speeding up the data retrieval. In traditional databases, row indexes are commonly used. Indexes are created through one or several fields, and the index results are stored in a tree structure. This type of index can be accurate to the row level, and the index efficiency is the highest.
某些大数据项目也支持了行索引,而它所带来的弊端就是大量的索引数据会造成写入和检索的延时。而平台处理的多数是采集数据,例如传感器这类数据,它的特点是重复度非常高,而分析的结果往往非常少,极少数的目标行为会隐藏在海量数据里,占比往往会是千分之一甚至更少。所以选择性价比更高的块索引方案,已经能够支撑目前的应用场景。Some big data projects also support row indexing, but the disadvantage it brings is that a large amount of index data will cause delays in writing and retrieval. Most of what the platform processes is collected data, such as sensor data, which is characterized by a very high degree of repetition, and the results of the analysis are often very few. Very few target behaviors will be hidden in the massive data, and the proportion will often be thousands. One-third or even less. Therefore, choosing a more cost-effective block index solution can already support the current application scenarios.
现有的方案中多是将索引数据以文件的形式存储在磁盘上,外加一些高速缓存机制来加速数据访问,而系统中是将索引数据直接存在了数据库中。主要有以下两个方面的考虑:Most of the existing solutions store the index data in the form of files on the disk, plus some cache mechanisms to speed up data access, while the system stores the index data directly in the database. There are mainly two considerations:
1、Transaction,通常来说列存文件往往是无法更新的,而系统在定期优化文件分布时会做多个文件内容合并操作,为了保证查询一致性,需要数据库提供transaction能力。1. Transaction, generally speaking, the column storage files are often unable to be updated, and the system will perform multiple file content merging operations when regularly optimizing file distribution. In order to ensure query consistency, the database needs to provide transaction capabilities.
2、Performance,数据库拥有较强的读写和检索能力,甚至可以将谓词下推到数据库来完成,数据库的高压缩比也能进一步节省存储。2. Performance. The database has strong read-write and retrieval capabilities, and can even push down predicates to the database for completion. The high compression ratio of the database can further save storage.
本发明利用流式引擎进行异常优化:The present invention utilizes the streaming engine to perform exception optimization:
由于流式作业需要缓存大量的中间过程,以及申请不少的计算资源。作业从异常失败到作业重启需要大约一两分钟的时间,这对于某些在线业务场景来说是不能接受的。Because streaming jobs need to cache a large number of intermediate processes and apply for a lot of computing resources. It takes about one or two minutes for a job to restart from an abnormal failure, which is unacceptable for some online business scenarios.
通过对作业异常产生过程进行分析发现,异常检测和初始化的消耗是主要瓶颈。异常检测受制于接口轮询间隔,资源初始化受制于容器初始化步骤。在这两方面系统分别进行了优化,作业运行时做到快速发现失活。此外,还要预留资源,当宕机出现时,可以省去申请资源,以及初始化的时间。Through the analysis of the job exception generation process, it is found that the consumption of exception detection and initialization is the main bottleneck. Anomaly detection is subject to the interface polling interval, and resource initialization is subject to the container initialization step. In these two aspects, the system has been optimized separately, and the inactivation can be quickly found when the job is running. In addition, resources must be reserved. When a downtime occurs, the time for applying for resources and initialization can be saved.
系统中在流式计算引擎之上增设了一个多数派的连通性检测服务,检测服务集群中多个工作节点会周期性地检测集群中每台机器的连通性,由于它是多数派的,所以可信度是有保障的。In the system, a majority connectivity detection service is added on top of the streaming computing engine. Multiple working nodes in the detection service cluster will periodically detect the connectivity of each machine in the cluster. Since it is majority, so Credibility is guaranteed.
此外,在预留资源方面,系统扩展了流式引擎作业的资源申请模型,在流式作业提交时可以设定资源冗余参数,当冗余参数被激活后,会自动保障冗余资源量会高于单点故障导致的资源缺失量,且在资源排布上避免冗余资源的聚集性。In addition, in terms of reserving resources, the system expands the resource application model for streaming engine jobs. Resource redundancy parameters can be set when streaming jobs are submitted. When the redundancy parameters are activated, the amount of redundant resources will be automatically guaranteed. It is higher than the amount of missing resources caused by a single point of failure, and avoids the aggregation of redundant resources in resource arrangement.
同时,应对集群硬件资源满载的情况系统会控制数据源的消费速度。引入协调员,周期性检查作业集器上的资源消耗情况以及数据源水印的进展。并根据全局的现状,预测出来各个数据源接下来允许读到的最大位置之后,下发给所有数据源。数据源根据得到的最大位置以及当前自己的位置,确定读取速度。并根据负载限制数据的消费速度。动态调节所有数据源的消费速度,从而保证流式作业的稳定。At the same time, the system will control the consumption speed of data sources when the cluster hardware resources are fully loaded. A coordinator is introduced to periodically check the resource consumption on the job setter and the progress of the data source watermark. And according to the overall status quo, after predicting the maximum position that each data source is allowed to read next, it is issued to all data sources. The data source determines the reading speed based on the obtained maximum position and the current own position. And limit the consumption speed of data according to the load. Dynamically adjust the consumption speed of all data sources to ensure the stability of streaming jobs.
Prometheus是一个开源的服务监控系统和时间序列数据库;meta是html语言head区的一个辅助性标签,位于文档的头部,不包含任何内容;Grafana是仪表盘和图形编辑器;SideOutput是侧输出,任意数量额外的侧输出结果流;JMX是一个为应用程序、设备、系统等植入管理功能的框架;agent是能自主活动的软件或者硬件实体;log4j是Apache的一个开源项目,通过使用Log4j,可以控制日志信息输送的目的地是控制台、文件、GUI组件,甚至是套接口服务器、NT的事件记录器、UNIX Syslog守护进程等。Transaction,一般是指要做的或所做的事情。在计算机术语中是指访问并可能更新数据库中各种数据项的一个程序执行单元。Performance是前端性能监控的API。它可以检测页面中的性能,W3C性能小组引入进来的一个新的API,它可以检测到白屏时间、首屏时间、用户可操作的时间节点,页面总下载的时间、DNS查询的时间、TCP链接的时间等。Prometheus is an open source service monitoring system and time series database; meta is an auxiliary label in the head area of html language, located at the head of the document, does not contain any content; Grafana is a dashboard and graphics editor; SideOutput is side output, Any number of additional side output result streams; JMX is a framework for implanting management functions for applications, devices, systems, etc.; agent is a software or hardware entity that can act autonomously; log4j is an open source project of Apache. By using Log4j, The destinations that can control the transmission of log information are consoles, files, GUI components, and even socket servers, NT event recorders, UNIX Syslog daemons, etc. Transaction generally refers to something to be done or done. In computer terms, a unit of program execution that accesses and possibly updates various data items in a database. Performance is an API for front-end performance monitoring. It can detect the performance of the page, a new API introduced by the W3C performance team, it can detect the white screen time, first screen time, user-operable time node, total page download time, DNS query time, TCP link time etc.
基于上述发明,如图3所示,本发明提供了一种基于流式计算引擎的数据处理系统,包括:Based on the above invention, as shown in Figure 3, the present invention provides a data processing system based on a flow computing engine, including:
数据采集模块:通过从终端采集获取原始数据后,同步编制发送端与接收端的编解码协议,从无界数据流中匹配对应的表信息完成对信息事件的采集,同时对业务字段信息进行清洗筛选。具体地:Data acquisition module: After acquiring the original data from the terminal, the coding and decoding protocols of the sending end and the receiving end are compiled synchronously, and the corresponding table information is matched from the unbounded data stream to complete the collection of information events, and at the same time, the business field information is cleaned and screened. specifically:
消息中数据采集和提取都比较复杂,包括很多难点,比如如何处理传感器信号数据、如何解析数据库二进制日志文件和如何筛选生产过程中的业务日志。Data collection and extraction in messages are relatively complex, including many difficulties, such as how to process sensor signal data, how to parse database binary log files, and how to filter business logs in the production process.
在处理传感器信号数据时,从终端设备中采集获取原始数据后,需要同步编制发送端与接收端的编解码协议,考虑数据传输的可靠性与传输效率,同时对周期上传的数据进行压缩以提升数据发送效率。When processing sensor signal data, after collecting the original data from the terminal equipment, it is necessary to synchronously compile the encoding and decoding protocols of the sending end and the receiving end, consider the reliability and transmission efficiency of data transmission, and compress the periodically uploaded data to improve the data sending efficiency.
在解析数据库二进制日志文件时,首先需要从无界数据流中匹配对应的表信息从而完成对信息事件的采集。日志文件写入时为混合格式,需要根据记录存储格式分别解析出row格式以及statement格式的记录,日志文件在流式计算引擎提交事务刷新至磁盘后才会更新至日志文件中,需要记录日志消费点位应对作业异常重启情况。When parsing a database binary log file, it is first necessary to match the corresponding table information from the unbounded data stream to complete the collection of information events. When the log file is written, it is in a mixed format. It is necessary to parse out the records in the row format and the statement format according to the record storage format. The log file will not be updated to the log file until the streaming computing engine submits the transaction and refreshes it to the disk, and the log consumption needs to be recorded. The point responds to the abnormal restart of the operation.
在筛选生产过程中的业务日志时,生产过程中的日志包含了许多内部业务字段,进行需要进行清洗筛选,实现了提取日志消息头中携带的业务字段信息的功能,从而提供对业务日志的抽取分析能力。When screening business logs in the production process, the logs in the production process contain many internal business fields, which need to be cleaned and screened to realize the function of extracting the business field information carried in the log message header, thereby providing the extraction of business logs Skills of analyze.
上述场景中涉及的设备诊断数据通过MEMS传感器采集,将湿度、温度、压力等模拟检测值转变为数字信号传输至诊断系统中,在数据处理部分需要将传感器寄存器中的二进制数据重建成对分析作业有效用的数据,具体包括:The equipment diagnostic data involved in the above scenarios are collected by MEMS sensors, and the analog detection values such as humidity, temperature, and pressure are converted into digital signals and transmitted to the diagnostic system. In the data processing part, the binary data in the sensor registers needs to be reconstructed into a pair of analysis tasks Effective data, including:
1、通过采集计算公式还原出测量值数据。1. Restore the measured value data through the collection calculation formula.
2、通过记录传感器采集的初值,在测量值中消除采集数据的零偏误差。2. By recording the initial value collected by the sensor, the zero bias error of the collected data is eliminated in the measured value.
3、选择滤波器合理的截止频率,降低带外噪声,减小因为随机噪声信号带来的误差。3. Select a reasonable cut-off frequency of the filter to reduce out-of-band noise and reduce errors caused by random noise signals.
无界信息流被采集或者推送至系统后,首先被发往消费队列,借由消息队列提供的高吞吐能力完成对数据的削峰填谷,从而不会因为相同采集间隔的传感器数据上传导致处理系统的压力陡增。对于消息队列的消息主题采用多分区设置提高并发效率。After the unbounded information flow is collected or pushed to the system, it is first sent to the consumption queue. The high throughput capability provided by the message queue is used to complete the peak-shaving and valley-filling of the data, so that the processing system will not be affected by the uploading of sensor data at the same collection interval. pressure increased sharply. For the message topic of the message queue, the multi-partition setting is used to improve the concurrency efficiency.
在数据的管理和获取阶段,提供了非常丰富的连接器组件,包括HDFS,Kafka等多种存储支持,同时支持多并行度的消费消息队列中存储的数据,保证数据链路的消费速度不在入口处形成短板。In the data management and acquisition phase, it provides a very rich connector component, including HDFS, Kafka and other storage supports, and supports multi-parallel consumption of data stored in the message queue to ensure that the consumption speed of the data link is not at the entrance Short boards are formed.
现有的流式计算引擎包括系统中使用的Flink,在目前都还没有提供对整个数据集的管理功能,本发明通过数据库接口进行了一定的扩展,将所有接入的数据源的数据信息通过数据库传递到下游。The existing streaming computing engine including Flink used in the system has not yet provided the management function for the entire data set. The present invention expands the data information of all connected data sources through the database interface to a certain extent. The database is passed downstream.
前置过滤模块:将采集筛选出的信息经由布隆过滤器提供的轻量级数据过滤去重,同步利用Key-Value存储提供的海量数据过滤去重。具体地:Pre-filtering module: Filter and deduplicate the collected and screened information through the lightweight data provided by the Bloom filter, and simultaneously use the massive data provided by the Key-Value storage to filter and deduplicate. specifically:
上游流程在数据采集、消息投递、格式解析等环节往往容易造成数据重复,数据重复会对于下游应用如数据监控等造成影响,除了统计UV等传统用法之外,去重的意义更在于消除不可靠数据源产生的脏数据——即重复上报数据或重复投递数据的影响,使流式计算产生的结果更加准确。The upstream process is prone to data duplication in data collection, message delivery, format parsing, etc., and data duplication will affect downstream applications such as data monitoring. In addition to traditional usage such as statistical UV, the significance of deduplication lies in eliminating unreliability Dirty data generated by the data source—that is, the impact of repeated data reporting or repeated delivery of data makes the results of stream computing more accurate.
本发明实现了一个经由布隆过滤器提供的轻量级数据过滤,在上游数据源仅能保证at least once投递语义时,会导致下游统计数据偏高。The present invention implements a light-weight data filtering provided by a Bloom filter, and when the upstream data source can only guarantee at least once delivery semantics, the downstream statistical data will be high.
布隆过滤器通过内置的哈希函数实现数据的去重,其优势在于:The Bloom filter implements data deduplication through the built-in hash function, and its advantages are as follows:
1、运算时不需要存储数据本身,只用比特表示,因此空间占用相对于传统方式有巨大的优势,并且能够保密数据。1. There is no need to store the data itself during calculation, it is only expressed in bits, so the space occupation has a huge advantage over the traditional method, and the data can be kept secret.
2、去重算法的执行效率较高,插入和查询的时间复杂度均为O(k)。2. The execution efficiency of the deduplication algorithm is high, and the time complexity of insertion and query is O(k).
3、哈希函数之间相互独立,如有需要可以在硬件指令层面并行计算,增加处理效率。3. The hash functions are independent of each other. If necessary, they can be calculated in parallel at the hardware instruction level to increase processing efficiency.
但是布隆过滤器不能保证过滤的完全准确,如是需要100%准确率的场景不适用。However, the Bloom filter cannot guarantee the complete accuracy of the filtering, so it is not applicable to scenarios that require 100% accuracy.
对于需要准确去重的场景,本发明实现了一种通过Key-Value存储特性提供海量数据的去重方式。借由诸如Flink引擎提供的RocksDB状态后端的Key-Value存储特性对持有相同key的消息记录进行合并。适合对业务数据要求很高,不容忍错误的情况。在提供了准确过滤能力的同时,需要对进行过滤过程的作业做到精细的状态管理,通过设置超时时间以及配置增量检查点等方式避免状态无限制的增长。For scenarios that require accurate deduplication, the present invention implements a deduplication method that provides massive data through the Key-Value storage feature. Use the Key-Value storage feature of the RocksDB state backend provided by the Flink engine to merge message records with the same key. It is suitable for situations where business data requirements are high and errors are not tolerated. While providing accurate filtering capabilities, it is necessary to implement fine state management for the jobs that perform the filtering process, and avoid unlimited state growth by setting timeouts and configuring incremental checkpoints.
考虑到数据的key占用空间较大时会出现状态膨胀的问题,采用哈希算法将key转化为整型再进行存储,保证其最多占用8个字节。但由于哈希算法无法保证不产生冲突,需要根据业务场景确定是否需要启用。经过去重处理过的数据流再通过实现自定义业务阈值或是编制自定义过滤规则的filter算子,实现对不合法数据的过滤,减轻下游的计算压力。同时完成对原始数据的拆包,根据业务需要包装成所需使用的数据格式。最后为进入流式计算系统的数据流记录打上时间戳,方便后续处理。Considering the problem of state expansion when the data key occupies a large space, the hash algorithm is used to convert the key into an integer and then store it to ensure that it occupies a maximum of 8 bytes. However, since the hash algorithm cannot guarantee that no conflicts will occur, it is necessary to determine whether to enable it according to the business scenario. The deduplication-processed data stream can filter illegal data and reduce downstream computing pressure by implementing custom business thresholds or compiling filter operators for custom filtering rules. At the same time, the unpacking of the original data is completed, and it is packaged into the required data format according to business needs. Finally, time stamp the data stream records entering the stream computing system to facilitate subsequent processing.
数据预处理模块:流式计算引擎提供流批统一流处理,避免离线与实时两套业务开发产生的资源浪费,并且通过提供SQL支持对数据流向进行抽象。具体地:Data preprocessing module: The stream computing engine provides stream batch unified stream processing, avoiding the waste of resources caused by offline and real-time business development, and abstracting the data flow direction by providing SQL support. specifically:
在数据的预处理及特征工程阶段,流式计算引擎致力于提供流批统一的计算能力,避免离线与实时两套业务开发产生的资源浪费,并且通过提供SQL支持对数据流向进行抽象,提高作业开发的效率。基于这些特性,相比传统方案,本发明可以借由统一计算引擎来统一实时和离线两条链路的开发逻辑从而规避数据加工链路不一致导致的计算语义不一致的现象,从而使得下游数据不需要重复进行清洗过滤。In the stage of data preprocessing and feature engineering, the streaming computing engine is committed to providing unified streaming and batch computing capabilities, avoiding the waste of resources caused by offline and real-time business development, and abstracting the data flow direction by providing SQL support to improve operation development efficiency. Based on these characteristics, compared with the traditional solution, the present invention can unify the development logic of real-time and offline links by means of a unified computing engine, so as to avoid the phenomenon of inconsistent computing semantics caused by inconsistency in data processing links, so that downstream data does not need Repeat for wash filtration.
经过过滤的工业数据在数据预处理阶段还需要经过有效的裁剪:The filtered industrial data also needs to be effectively cropped in the data preprocessing stage:
1、从消息队列中消费获取到采集器发来的数据包,根据传输协议进行解析。1. Obtain the data packet sent by the collector from the message queue, and analyze it according to the transmission protocol.
2、通过采集器以及器件标识对数据流进行拆分,然后将相同标识的数据收纳在同一数据流中。2. Split the data stream through the collector and device identification, and then store the data with the same identification in the same data stream.
3、根据预先设定的数据处理分组进行数据组合,将关联分析的数据合并至同一分组。3. Carry out data combination according to the preset data processing group, and merge the data of correlation analysis into the same group.
4、根据时间窗口对数据进行聚合,通过窗口的滑动,将聚合的数据重新生成无界流。4. The data is aggregated according to the time window, and the aggregated data is regenerated into an unbounded flow through the sliding of the window.
5、基于简单的阈值逻辑对数据进行清理,将不满足要求的数据或是明显异常的数据记录至异常分支流供系统优化分析使用。5. The data is cleaned based on simple threshold logic, and the data that does not meet the requirements or the data that is obviously abnormal is recorded in the abnormal branch stream for system optimization and analysis.
6、记录数据处理过程中的有效数据占比以及处理规则命中数供系统优化分析使用。6. Record the proportion of valid data in the process of data processing and the number of processing rule hits for system optimization and analysis.
数据血缘模块:通过流式计算引擎提供的SQL抽象处理,采用SQL语言编写脚本描述数据的流向,再提交到计算平台上解析启动作业。具体地:Data lineage module: Through the SQL abstraction processing provided by the streaming computing engine, scripts are written in SQL language to describe the flow of data, and then submitted to the computing platform for analysis and startup. specifically:
通过流式计算引擎提供的SQL抽象能力,能够快速的描画任务作业中数据的处理过程。因此系统中提交构筑流计算任务主要手段是采用SQL语言编写脚本描述数据的流向,再提交到平台上解析启动作业。Through the SQL abstraction capability provided by the streaming computing engine, it is possible to quickly describe the data processing process in the task operation. Therefore, the main means of submitting and constructing stream computing tasks in the system is to use SQL language to write scripts to describe the flow of data, and then submit them to the platform to analyze and start jobs.
通常在开发时需要先定义DDL(数据库模式定义语言)指定数据元信息,再编写DML(数据库操作语言)指定数据加工过程,多数任务中DDL(数据库模式定义语言)定义语句是重复但却没有在作业开发中获得收益。元数据信息也储存在持久化存储的metastore(元存储)组件存储中,同时也能在上下游的catalog(数据库)中取得。因此方案中将实时的数据流定义成实时表存储至metastore组件,用户可以直接从中选择需要的数据源,然后仅需要写DML语句即可完成作业开发。在任务提交的时候,系统会根据上下游catalog将转化的DDL语句补充上去,构成完整的StreamingSQL脚本进行提交。可以进一步降低作业开发难度,同时也能够进一步打通批处理业务和流处理业务的开发流程,使得两者的开发步骤更加趋近相同。Usually, during development, it is necessary to first define DDL (database schema definition language) to specify data metadata, and then write DML (database manipulation language) to specify data processing procedures. In most tasks, DDL (database schema definition language) definition statements are repeated but not in Benefit from job development. Metadata information is also stored in the metastore (metastore) component storage of persistent storage, and can also be obtained in upstream and downstream catalogs (databases). Therefore, in the solution, the real-time data flow is defined as a real-time table and stored in the metastore component, from which users can directly select the required data source, and then only need to write DML statements to complete job development. When the task is submitted, the system will supplement the converted DDL statement according to the upstream and downstream catalogs to form a complete StreamingSQL script for submission. It can further reduce the difficulty of job development, and at the same time, it can further open up the development process of batch processing business and stream processing business, making the development steps of the two more similar.
此外,方案的规则引擎支持配置规则的不断升级。托管在metastore的元数据能够对不同数据节点引入的数据schema不一致的情况提供一定的兼容能力。数据处理过程中通常会将预处理的数据写入消息队列中,然后进行在线训练,训练的过程是持续不断的,期间会不断的产生动态的模型,然后推送给在线的推理模块进行推理。在线的机器学习的特点就是模型的动态更新、持续训练和不断验证。同时需要比较复杂的模型监控,模型部署和模型回滚等策略。在计算节点可能经由维表关联、多流汇聚导致数据结构产生变更,通过追溯数据schema的血缘关系能够对变化的数据结构提供支持。In addition, the solution's rule engine supports continuous upgrade of configuration rules. The metadata hosted in the metastore can provide certain compatibility for inconsistent data schema introduced by different data nodes. During data processing, the preprocessed data is usually written into the message queue, and then online training is performed. The training process is continuous, during which dynamic models are continuously generated, and then pushed to the online reasoning module for reasoning. Online machine learning is characterized by dynamic model updates, continuous training, and continuous verification. At the same time, more complex model monitoring, model deployment and model rollback strategies are required. In computing nodes, the data structure may be changed through dimension table association and multi-stream aggregation, and the changed data structure can be supported by tracing the blood relationship of the data schema.
如图4所示,本发明还提供了一种基于流式计算引擎的数据处理方法,包括如下步骤:As shown in Figure 4, the present invention also provides a data processing method based on a streaming computing engine, comprising the following steps:
数据采集步骤:通过从终端采集获取原始数据后,同步编制发送端与接收端的编解码协议,从无界数据流中匹配对应的表信息完成对信息事件的采集,同时对业务字段信息进行清洗筛选。Data collection steps: After acquiring the original data from the terminal, the coding and decoding protocols of the sending end and the receiving end are compiled synchronously, and the corresponding table information is matched from the unbounded data stream to complete the collection of information events, and at the same time, the business field information is cleaned and screened.
前置过滤步骤:将采集筛选出的信息经由布隆过滤器提供的轻量级数据过滤去重,同步利用Key-Value存储提供的海量数据过滤去重。Pre-filtering step: Filter and deduplicate the collected and screened information through the lightweight data provided by the Bloom filter, and simultaneously use the massive data provided by the Key-Value storage to filter and deduplicate.
数据预处理步骤:流式计算引擎提供流批统一流处理,避免离线与实时两套业务开发产生的资源浪费,并且通过提供SQL支持对数据流向进行抽象。Data preprocessing steps: The stream computing engine provides stream-batch unified stream processing, avoiding the waste of resources caused by offline and real-time business development, and abstracting the data flow direction by providing SQL support.
数据血缘步骤:通过流式计算引擎提供的SQL抽象处理,采用SQL语言编写脚本描述数据的流向,再提交到计算平台上解析启动作业。Data lineage step: Through the SQL abstraction processing provided by the streaming computing engine, write a script in SQL language to describe the flow of data, and then submit it to the computing platform to analyze and start the job.
本发明又提供了一种存储有计算机程序的计算机可读存储介质,计算机程序被处理器执行时实现上述的方法的步骤。The present invention further provides a computer-readable storage medium storing a computer program. When the computer program is executed by a processor, the steps of the above method are realized.
本发明构建任务的方式主要为SQL,并由此开发了StreamingSQL的编辑器,提供了对系统定义的SQL规范的支持,同时兼容StromSQL、SparkSQL、KSQL、HiveSQL和FlinkSQL语法。除了提供了引擎方言SQL的支持之外,还提供了SQL模板。方便平台的用户基于SQL模板快速开发预设场景的SQL任务。此外对于业务人员还提供了web式的引导页面,帮助其通过表单填空的方式在不编写代码的情况构建出流式处理任务。The method of constructing tasks in the present invention is mainly SQL, and an editor for StreamingSQL is developed to provide support for SQL specifications defined by the system, and is compatible with StromSQL, SparkSQL, KSQL, HiveSQL and FlinkSQL syntax. In addition to providing support for the engine dialect SQL, SQL templates are also provided. It is convenient for users of the platform to quickly develop SQL tasks for preset scenarios based on SQL templates. In addition, a web-style guide page is provided for business personnel to help them build stream processing tasks by filling in the blanks without writing code.
此外,编辑器还提供了自定义函数的组件库,包含了丰富的内置函数,包括时间函数、集合函数、Json处理函数及字符串函数。丰富的内置函数可以方便用户的开发,省去用户自己去开发的时间。组件库中还包含了开发人员编写的使用说明以及使用案例等,供平台用户检索查阅。组件库也提供了云平台,供系统用户上传下载业务中累积的自定义函数以及自定义算子,方便不同系统部署时的数据共享,以及后续开发支持In addition, the editor also provides a component library of custom functions, including rich built-in functions, including time functions, collection functions, Json processing functions and string functions. Abundant built-in functions can facilitate the user's development and save the time for the user to develop by himself. The component library also contains instructions and use cases written by developers for retrieval by platform users. The component library also provides a cloud platform for system users to upload and download custom functions and custom operators accumulated in the business, which facilitates data sharing during deployment of different systems and subsequent development support
编辑器还实现了针对StreamingSQL的语法检测和智能提示,用户在编写语句的过程中能够做出实时的语法检测并给出一定程度的提示,还能够对语句中涉及的表和字段等提供元数据的补全功能,优化了开发体验。The editor also implements syntax detection and intelligent prompts for StreamingSQL. Users can perform real-time syntax detection and give a certain degree of prompts during the process of writing statements, and can also provide metadata for the tables and fields involved in the statement. The completion function optimizes the development experience.
编辑器同时支持在线调试的功能,对于流式计算这种能力是非常重要的。避免了任务需要上线之后再根据观察的结果进行调整的情况,节约了开发成本。系统可以接受文本文件作为数据源来检验输出是否符合预期,也能采样消息队列中的消息主题或是数据库表中的数据进行业务逻辑校验。The editor also supports online debugging, which is very important for stream computing. It avoids the situation that the task needs to be adjusted according to the observation results after it goes online, which saves the development cost. The system can accept text files as a data source to verify whether the output meets expectations, and can also sample message topics in the message queue or data in database tables for business logic verification.
在SQL解析过程中,最为复杂的是通过维表关联进行数据表的打宽。发明中通过热存储进行表关联。数据从数据源导入后,系统使用Async I/O技术访问后端,系统后端使用Data Accessor接口访问后端的存储。系统后端存储支持从分布式存储系统、结构化数据库、NoSQL数据库、消息队列等数据管道中获取数据,同时后端会将数据缓存于LRU Cache模块中。通过在消息队列中在维表更新时进行广播,减少维度数据更新不及时的情况。在系统中为维表关联后的数据开发了多种大数据工具的存储支持,从而大大增加了系统的兼容性。In the SQL parsing process, the most complicated thing is to widen the data table through dimension table association. In the invention, table association is performed through hot storage. After the data is imported from the data source, the system uses Async I/O technology to access the backend, and the system backend uses the Data Accessor interface to access the backend storage. The back-end storage of the system supports obtaining data from distributed storage systems, structured databases, NoSQL databases, message queues and other data pipelines, and the back-end will cache the data in the LRU Cache module. By broadcasting in the message queue when the dimension table is updated, the situation that the dimension data is not updated in time can be reduced. In the system, the storage support of various big data tools has been developed for the data associated with dimension tables, thus greatly increasing the compatibility of the system.
由于规则匹配情况和业务需求场景不断变化,规则经常需要根据实际变化做出频繁调整。业务人员在前端的特征管理界面,对规则库中的规则因子进行增删改查的操作,通过对不同的规则因子的组合构筑出不同的特征规则。然后可以指定试验的结果集对构筑的规则进行简单测试,测试结果合适之后再进行整体保存。不在每次操作时直接对规则库进行变更,从而实现业务规则与规则因子的解耦。Due to the constant changes in rule matching and business demand scenarios, rules often need to be adjusted frequently according to actual changes. On the front-end feature management interface, business personnel can add, delete, modify and check the rule factors in the rule base, and construct different feature rules by combining different rule factors. Then you can specify the result set of the test to conduct a simple test on the constructed rules, and save the whole after the test results are suitable. It does not directly change the rule base every time it is operated, so as to realize the decoupling of business rules and rule factors.
在衡量整体效果方面,通过对规则的命中率以及规则的报警频次的分析,确定规则是否合理。In terms of measuring the overall effect, through the analysis of the hit rate of the rule and the alarm frequency of the rule, determine whether the rule is reasonable.
需要判断规则是否失效,比如拦截率的突然降低。It is necessary to judge whether the rule is invalid, such as a sudden decrease in the interception rate.
判断规则是否多余,比如某规则从来没拦截过任何事件。Determine whether a rule is redundant, for example, a rule has never intercepted any events.
判断规则是否有漏洞,比如在进行某个操作后,但没有达到预期效果。Determine whether there is a loophole in the rule, for example, after a certain operation is performed, but the expected effect is not achieved.
此外,规则灰度上线机制也基于规则的整体效果评价构筑,通过对不同区域设置白名单或者是导入真实数据测试结果或者是限流上线正式环境等优化规则发布流程。In addition, the rule grayscale launch mechanism is also based on the overall effect evaluation of the rules, and the rule release process is optimized by setting whitelists for different regions, importing real data test results, or restricting the official environment of online streaming.
在积累分析优化数据方面,需要发现组合规则,通过识别某种行为的组合来防止出现操作中每个步骤均为正常可用,但行为结果累计出现异常;需要做出群体识别,比如通过图分析技术,发现群体并且打上群体标签,防止出现每个部分表现都正常,但整个区域内却出现异常状况的情况。In terms of accumulating, analyzing and optimizing data, it is necessary to discover combination rules, and to prevent the occurrence that each step in the operation is normally usable by identifying a certain combination of behaviors, but the accumulation of behavioral results is abnormal; it is necessary to make group identification, such as through graph analysis technology , discover groups and label them to prevent the situation where each part behaves normally, but abnormal conditions appear in the whole area.
在发明的规则的模式匹配中,使用Rate算法提升匹配效率,减少了重复计算造成的时间冗余性。在规则数量和事实样本较多时,每条事实数据都需要与Rete网络中的Alpha节点相匹配。大多数规则所含的条件原子相同,即存在被多个规则同时包含的条件原子,依次与每个Alpha节点匹配就存在了一定时间浪费。因此,一个预匹配模块,将多条规则聚合成少量的规则组。通过规则组筛选,在预匹配阶段过滤掉部分正常数据,减少事实和节点的匹配次数。实现逻辑是将含有多个相同条件原子的规则划分到同一规则组中,规则组中出现次数最多的条件原子作为该规则组的特征条件。全量数据通过预匹配模块中规则组的筛选,即可过滤掉部分数据,对剩余样本执行所在规则组内的规则判断。In the pattern matching of the invented rules, the Rate algorithm is used to improve the matching efficiency and reduce the time redundancy caused by repeated calculations. When the number of rules and fact samples is large, each piece of fact data needs to match the Alpha node in the Rete network. The conditional atoms contained in most rules are the same, that is, there are conditional atoms contained in multiple rules at the same time, and there is a certain amount of time wasted in matching each Alpha node in turn. Therefore, a pre-matching module aggregates multiple rules into a small number of rule groups. Filter out some normal data in the pre-matching stage through rule group filtering to reduce the matching times of facts and nodes. The implementation logic is to divide rules containing multiple identical condition atoms into the same rule group, and the condition atom that appears most frequently in the rule group is used as the characteristic condition of the rule group. Part of the data can be filtered out through the filtering of the rule group in the pre-matching module for the full amount of data, and the rule judgment in the rule group is executed for the remaining samples.
规则学习的目标是产生一个能覆盖尽可能多的样例的规则集,最直接的方式就是顺序覆盖,即逐条归纳,这也是规则学习最简单的一种训练思想,在训练集中每训练生成规则后,就将该规则覆盖的样本从训练集中取出,然后用剩下的样本训练出另一组规则。当然为了避免模型的过拟合,一般算法都会加入剪枝优化的策略。The goal of rule learning is to generate a rule set that can cover as many samples as possible. The most direct way is sequential coverage, that is, one-by-one induction. This is also the simplest training idea for rule learning. In the training set, each training generates rules After that, the sample covered by the rule is taken out from the training set, and then another set of rules is trained with the remaining samples. Of course, in order to avoid over-fitting of the model, the general algorithm will add a strategy of pruning optimization.
要训练出高可用的模型,优秀的特征必不可少。优秀特征的重要特点是区分度较高,这就需要算法开发人员和敏感度高的业务人员紧密配合,对配置的规则进行分析,提取出较好的规则因子,同时,算法开发人员还应对业务流程非常熟悉,结合实际情况,创造出高可用的特征。对实际业务而言,很多情况下,一个准确的特征的加入比模型参数调优甚至模型优化更加有效,同时,准确的特征还可以防止过拟合的发生。本发明中通过对历史数据的分析处理以及对工业监控业务的流程梳理抽象出详细的业务参数可供组合筛选。To train a highly available model, excellent features are essential. An important feature of excellent features is a high degree of discrimination, which requires the close cooperation between algorithm developers and highly sensitive business personnel to analyze the configured rules and extract better rule factors. At the same time, algorithm developers should also deal with business The process is very familiar, combined with the actual situation, to create high-availability features. For actual business, in many cases, the addition of an accurate feature is more effective than model parameter tuning or even model optimization. At the same time, accurate features can also prevent overfitting. In the present invention, through the analysis and processing of historical data and the combing of the process of industrial monitoring business, detailed business parameters are abstracted for combination and screening.
模型训练一个重要的需求就是样本的数量要尽可能的多,然而监控业务天然有这方面的缺陷,异常规则配置之后,实际被筛选出的案例极少,这就导致正负样本数量极不平衡,因此发明采用重采样的方法增加正样本的数量,具体方法是将样本包含的数值化特征采用SMOTE方法进行重采样,对标签化特征采用随机选择的方法重采样,最后将二者组合成新的样本。实际训练过程中,需要对算法包含的最大迭代次数、步长、最大树深等参数进行多次适当的调节,选择出识别率最高的参数。An important requirement for model training is that the number of samples should be as large as possible. However, the monitoring business naturally has defects in this regard. After the exception rules are configured, very few cases are actually screened out, which leads to an extreme imbalance in the number of positive and negative samples. , so the invention uses the method of resampling to increase the number of positive samples. The specific method is to use the SMOTE method to resample the numerical features contained in the samples, and resample the label features by random selection, and finally combine the two into a new of samples. In the actual training process, it is necessary to properly adjust the parameters of the algorithm including the maximum number of iterations, step size, and maximum tree depth several times, and select the parameters with the highest recognition rate.
通过系统处理的事件流不断增长,业务规则会跟着数据集进行一定的演化。再通过实时的业务数据和归档历史数据验证规则的有效性后,规则引擎能具备一定的进化能力。The flow of events processed by the system continues to grow, and business rules will evolve with the data set to a certain extent. After verifying the effectiveness of the rules through real-time business data and archived historical data, the rule engine can have a certain evolutionary capability.
将基于流式计算引擎的数据处理系统与基于流式计算引擎的数据监控系统相结合之后的工作场景流程如下:After combining the data processing system based on the streaming computing engine with the data monitoring system based on the streaming computing engine, the workflow of the working scenario is as follows:
1、根据场景需要,用户通过上传模块,以jar包形式上传针对业务逻辑开发的自定义函数,扩展定义数据处理过程中需要使用的数据处理算子以及SQL函数1. According to the needs of the scene, the user uploads the custom function developed for the business logic in the form of a jar package through the upload module, and expands and defines the data processing operators and SQL functions that need to be used in the data processing process
2、用户通过SQL语句描述作业中的数据流向,整理编排出作业数据处理的前后关系,再根据需要调整每个处理节点的并行度以及整体作业的资源分配。同时也支持上传jar包通过实现接口编排任务过程实现对作业过程更精细的控制。2. The user describes the data flow in the job through SQL statements, sorts out the context of job data processing, and then adjusts the parallelism of each processing node and the resource allocation of the overall job as needed. At the same time, it also supports uploading jar packages to achieve finer control over the job process by implementing the interface to orchestrate the task process.
3、根据业务需求增加规则引擎的判定规则,包括阈值判断、模式判断、组合逻辑判断等,对于复杂的关联判断规则也支持用户自定义,在上传实现系统规定接口的jar包后,便能在系统中进行配置。3. Add the judgment rules of the rule engine according to business requirements, including threshold judgment, mode judgment, combination logic judgment, etc. For complex association judgment rules, it also supports user-defined. After uploading the jar package that implements the interface specified by the system, it can be in Configure in the system.
4、根据作业需要配置系统的规则学习模块,开启平台的系统演化功能,在系统处理数据过程中不断地训练规则模型,通过学习用户的交互逻辑以及操作数据包括数据的流量峰谷特点、规则命中率、资源占用情况等优化演化平台的规则。演化出更加符合业务作业特点的细化规则经过人工确认后上线。4. Configure the system’s rule learning module according to the needs of the job, open the system evolution function of the platform, and continuously train the rule model during the system’s data processing process. By learning the user’s interaction logic and operating data, including data traffic peak and valley characteristics, and rule hits Rate, resource occupancy, etc. to optimize the rules of the evolution platform. Evolved detailed rules that are more in line with the characteristics of business operations and go online after manual confirmation.
5、配置作业的数据源以及数据下沉通道,支持从分布式存储系统、结构化数据库、NoSQL数据库、消息队列等数据通道中读写数据。同时支持json、protobuffer、avro等多种数据格式的构造及解析。5. Configure the data source and data sink channel of the job, and support reading and writing data from data channels such as distributed storage systems, structured databases, NoSQL databases, and message queues. At the same time, it supports the construction and analysis of multiple data formats such as json, protobuffer, and avro.
6、提交至作业系统编排,首先通过逻辑拆解生成处理模块同时检查作业流程中的语法规范以及自定义包的编写规范。在页面上展示分模块构建的有向无环图,能够清晰地展示作业的数据流向以及算子的编排顺序,检查无误之后再提交给任务调度模块请求资源并拉起作业。6. Submit to the operating system for orchestration, first generate a processing module through logical disassembly and check the grammar specification in the operation process and the writing specification of the custom package. The directed acyclic graph constructed by sub-modules is displayed on the page, which can clearly show the data flow direction of the job and the order of the operators. After checking, it is submitted to the task scheduling module to request resources and start the job.
7、作业请求从资源集群中申请到作业资源之后,会不断的从数据源获取对应标签的数据,根据配置的规则进行数据处理后,通过规则引擎进行规则判断后,再输出至实时展示模块或是进行数据下沉。作业过程中,用户可以通过任务监控模块了解作业详情,例如作业吞吐,网络IO、资源占用以及模块背压等信息都能让用户了解作业的运行状态。7. After the job request applies for the job resource from the resource cluster, it will continuously obtain the data of the corresponding label from the data source. After the data is processed according to the configured rules, the rules are judged by the rule engine, and then output to the real-time display module or It is data sinking. During the job process, the user can learn about the job details through the task monitoring module, such as job throughput, network IO, resource occupation, and module backpressure, etc., so that the user can know the running status of the job.
8、根据用户配置,在作业过程中会分时段或是作业批次生成确认点,包含了当前作业的全部状态信息。同时在业务流程的间隔周期会生成业务存档点,包含当前作业的计算结果。配合以上两种机制,能够累计作业指标,分析作业状态。同时在作业单元出现异常时,提供异常恢复的能力。8. According to user configuration, during the job process, a confirmation point will be generated by time period or job batch, which contains all the status information of the current job. At the same time, business archive points are generated at intervals of business processes, including the calculation results of the current job. Cooperating with the above two mechanisms, it is possible to accumulate job indicators and analyze job status. At the same time, when an exception occurs in the operation unit, it provides the ability to recover from the exception.
9、当作业流程中出现异常情况时,会根据用户配置来进行异常恢复,包括尝试作业重启、尝试从确认点进行恢复、异常数据的记录、或是快速失败然后推送等异常处理配置。作业监控模块中的任务状态也会实时的进行更新,帮助运维人员及时获知任务异常信息9. When an abnormal situation occurs in the job process, abnormal recovery will be performed according to the user configuration, including trying to restart the job, trying to recover from the confirmation point, recording abnormal data, or quickly failing and then pushing and other exception handling configurations. The task status in the job monitoring module will also be updated in real time, helping the operation and maintenance personnel to know the task abnormal information in time
10、当作业终止时,资源监控模块会在获取到当前任务的状态信息后进行资源回收,释放运算资源、清理文件缓存、根据用户配置保留或是清理存档点和确认点。将任务过程中的状态数据存入时序数据库后结束流式计算作业。10. When the job is terminated, the resource monitoring module will recycle resources after obtaining the status information of the current task, release computing resources, clean up the file cache, save or clear the archive point and confirmation point according to user configuration. After the status data in the task process is stored in the time series database, the streaming computing job is ended.
本领域技术人员知道,除了以纯计算机可读程序代码方式实现本发明提供的系统及其各个装置、模块、单元以外,完全可以通过将方法步骤进行逻辑编程来使得本发明提供的系统及其各个装置、模块、单元以逻辑门、开关、专用集成电路、可编程逻辑控制器以及嵌入式微控制器等的形式来实现相同功能。所以,本发明提供的系统及其各项装置、模块、单元可以被认为是一种硬件部件,而对其内包括的用于实现各种功能的装置、模块、单元也可以视为硬件部件内的结构;也可以将用于实现各种功能的装置、模块、单元视为既可以是实现方法的软件模块又可以是硬件部件内的结构。Those skilled in the art know that, in addition to realizing the system provided by the present invention and its various devices, modules, and units in a purely computer-readable program code mode, the system provided by the present invention and its various devices can be completely programmed by logically programming the method steps. , modules, and units implement the same functions in the form of logic gates, switches, ASICs, programmable logic controllers, and embedded microcontrollers. Therefore, the system and its various devices, modules, and units provided by the present invention can be regarded as a hardware component, and the devices, modules, and units included in it for realizing various functions can also be regarded as hardware components. The structure; the devices, modules, and units for realizing various functions can also be regarded as not only the software modules for realizing the method, but also the structures in the hardware components.
以上对本发明的具体实施例进行了描述。需要理解的是,本发明并不局限于上述特定实施方式,本领域技术人员可以在权利要求的范围内做出各种变化或修改,这并不影响本发明的实质内容。在不冲突的情况下,本申请的实施例和实施例中的特征可以任意相互组合。Specific embodiments of the present invention have been described above. It should be understood that the present invention is not limited to the specific embodiments described above, and those skilled in the art may make various changes or modifications within the scope of the claims, which do not affect the essence of the present invention. In the case of no conflict, the embodiments of the present application and the features in the embodiments can be combined with each other arbitrarily.
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202111228109.7ACN116009428B (en) | 2021-10-21 | 2021-10-21 | Industrial data monitoring system, method and medium based on streaming computing engine |
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202111228109.7ACN116009428B (en) | 2021-10-21 | 2021-10-21 | Industrial data monitoring system, method and medium based on streaming computing engine |
| Publication Number | Publication Date |
|---|---|
| CN116009428Atrue CN116009428A (en) | 2023-04-25 |
| CN116009428B CN116009428B (en) | 2025-08-22 |
| Application Number | Title | Priority Date | Filing Date |
|---|---|---|---|
| CN202111228109.7AActiveCN116009428B (en) | 2021-10-21 | 2021-10-21 | Industrial data monitoring system, method and medium based on streaming computing engine |
| Country | Link |
|---|---|
| CN (1) | CN116009428B (en) |
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN116415206A (en)* | 2023-06-06 | 2023-07-11 | 中国移动紫金(江苏)创新研究院有限公司 | Operator multiple data fusion method, system, electronic equipment and computer storage medium |
| CN116628722A (en)* | 2023-06-06 | 2023-08-22 | 重庆交通大学 | Financial data safety management processing system |
| CN117033027A (en)* | 2023-08-18 | 2023-11-10 | 百度(中国)有限公司 | Data processing method, device, electronic equipment and medium |
| CN117196651A (en)* | 2023-08-09 | 2023-12-08 | 首都经济贸易大学 | Enterprise abnormity monitoring method and device based on data asynchronous processing and storage medium |
| CN117235064A (en)* | 2023-11-13 | 2023-12-15 | 湖南中车时代通信信号有限公司 | Intelligent online monitoring method and system for urban rail equipment |
| CN117312391A (en)* | 2023-10-23 | 2023-12-29 | 中南民族大学 | Big data platform dynamic index evaluation method and system based on stream computing |
| CN118194207A (en)* | 2024-05-15 | 2024-06-14 | 中节能晶和科技有限公司 | Rule engine-based road illumination Internet of things equipment anomaly monitoring system and method |
| CN119155190A (en)* | 2024-08-01 | 2024-12-17 | 上海许长网络科技有限公司 | Visual computing model construction method based on stream computing |
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| US20020091991A1 (en)* | 2000-05-11 | 2002-07-11 | Castro Juan Carlos | Unified real-time microprocessor computer |
| CN101021810A (en)* | 2007-03-08 | 2007-08-22 | 山东浪潮齐鲁软件产业股份有限公司 | Software system performance estimating method |
| US20070195408A1 (en)* | 2001-01-12 | 2007-08-23 | Divelbiss Adam W | Method and apparatus for stereoscopic display using column interleaved data with digital light processing |
| AU2012216284B1 (en)* | 2012-08-17 | 2014-02-20 | Advanced Charging Technologies, LLC | Power device |
| CN106951464A (en)* | 2017-02-27 | 2017-07-14 | 江苏徐工信息技术股份有限公司 | Based on the personalized early warning mechanism big data computational methods of storm user orienteds |
| CN107885642A (en)* | 2017-11-29 | 2018-04-06 | 小花互联网金融服务(深圳)有限公司 | Business monitoring method and system based on machine learning |
| CN108833126A (en)* | 2018-04-02 | 2018-11-16 | 平安科技(深圳)有限公司 | Electronic device, data link method for prewarning risk and storage medium |
| US20190042290A1 (en)* | 2017-08-07 | 2019-02-07 | Open Data Group Inc. | Dynamically configurable microservice model for data analysis using sensors |
| CN109902107A (en)* | 2019-01-28 | 2019-06-18 | 中国石油大学(华东) | A Storm-based Industrial Signaling Data Streaming Computing Framework |
| CN110311806A (en)* | 2019-06-06 | 2019-10-08 | 上海交通大学 | A mobile application program interface response delay diagnosis method, system and terminal |
| CN110955547A (en)* | 2019-10-31 | 2020-04-03 | 北京浪潮数据技术有限公司 | Management method, system, equipment and readable storage medium for interface abnormal information |
| CN111405249A (en)* | 2020-03-20 | 2020-07-10 | 腾讯云计算(北京)有限责任公司 | Monitoring method, monitoring device, server and computer-readable storage medium |
| CN111526060A (en)* | 2020-06-16 | 2020-08-11 | 网易(杭州)网络有限公司 | Method and system for processing service log |
| CN111935226A (en)* | 2020-07-08 | 2020-11-13 | 上海微亿智造科技有限公司 | Method and system for realizing streaming computing by supporting industrial data |
| CN112116629A (en)* | 2020-08-11 | 2020-12-22 | 西安交通大学 | End-to-end multi-target tracking method using global response graph |
| CN113110977A (en)* | 2021-04-21 | 2021-07-13 | 中国电子科技网络信息安全有限公司 | Safety monitoring method based on block chain system |
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| US20020091991A1 (en)* | 2000-05-11 | 2002-07-11 | Castro Juan Carlos | Unified real-time microprocessor computer |
| US20070195408A1 (en)* | 2001-01-12 | 2007-08-23 | Divelbiss Adam W | Method and apparatus for stereoscopic display using column interleaved data with digital light processing |
| CN101021810A (en)* | 2007-03-08 | 2007-08-22 | 山东浪潮齐鲁软件产业股份有限公司 | Software system performance estimating method |
| AU2012216284B1 (en)* | 2012-08-17 | 2014-02-20 | Advanced Charging Technologies, LLC | Power device |
| CN106951464A (en)* | 2017-02-27 | 2017-07-14 | 江苏徐工信息技术股份有限公司 | Based on the personalized early warning mechanism big data computational methods of storm user orienteds |
| US20190042290A1 (en)* | 2017-08-07 | 2019-02-07 | Open Data Group Inc. | Dynamically configurable microservice model for data analysis using sensors |
| CN107885642A (en)* | 2017-11-29 | 2018-04-06 | 小花互联网金融服务(深圳)有限公司 | Business monitoring method and system based on machine learning |
| CN108833126A (en)* | 2018-04-02 | 2018-11-16 | 平安科技(深圳)有限公司 | Electronic device, data link method for prewarning risk and storage medium |
| CN109902107A (en)* | 2019-01-28 | 2019-06-18 | 中国石油大学(华东) | A Storm-based Industrial Signaling Data Streaming Computing Framework |
| CN110311806A (en)* | 2019-06-06 | 2019-10-08 | 上海交通大学 | A mobile application program interface response delay diagnosis method, system and terminal |
| CN110955547A (en)* | 2019-10-31 | 2020-04-03 | 北京浪潮数据技术有限公司 | Management method, system, equipment and readable storage medium for interface abnormal information |
| CN111405249A (en)* | 2020-03-20 | 2020-07-10 | 腾讯云计算(北京)有限责任公司 | Monitoring method, monitoring device, server and computer-readable storage medium |
| CN111526060A (en)* | 2020-06-16 | 2020-08-11 | 网易(杭州)网络有限公司 | Method and system for processing service log |
| CN111935226A (en)* | 2020-07-08 | 2020-11-13 | 上海微亿智造科技有限公司 | Method and system for realizing streaming computing by supporting industrial data |
| CN112116629A (en)* | 2020-08-11 | 2020-12-22 | 西安交通大学 | End-to-end multi-target tracking method using global response graph |
| CN113110977A (en)* | 2021-04-21 | 2021-07-13 | 中国电子科技网络信息安全有限公司 | Safety monitoring method based on block chain system |
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN116415206A (en)* | 2023-06-06 | 2023-07-11 | 中国移动紫金(江苏)创新研究院有限公司 | Operator multiple data fusion method, system, electronic equipment and computer storage medium |
| CN116628722A (en)* | 2023-06-06 | 2023-08-22 | 重庆交通大学 | Financial data safety management processing system |
| CN116415206B (en)* | 2023-06-06 | 2023-08-22 | 中国移动紫金(江苏)创新研究院有限公司 | Carrier multi-data fusion method, system, electronic equipment and computer storage medium |
| CN116628722B (en)* | 2023-06-06 | 2024-07-30 | 重庆交通大学 | A financial data security management and processing system |
| CN117196651A (en)* | 2023-08-09 | 2023-12-08 | 首都经济贸易大学 | Enterprise abnormity monitoring method and device based on data asynchronous processing and storage medium |
| CN117196651B (en)* | 2023-08-09 | 2024-05-03 | 首都经济贸易大学 | Enterprise abnormity monitoring method and device based on data asynchronous processing and storage medium |
| CN117033027A (en)* | 2023-08-18 | 2023-11-10 | 百度(中国)有限公司 | Data processing method, device, electronic equipment and medium |
| CN117312391A (en)* | 2023-10-23 | 2023-12-29 | 中南民族大学 | Big data platform dynamic index evaluation method and system based on stream computing |
| CN117235064A (en)* | 2023-11-13 | 2023-12-15 | 湖南中车时代通信信号有限公司 | Intelligent online monitoring method and system for urban rail equipment |
| CN117235064B (en)* | 2023-11-13 | 2024-01-23 | 湖南中车时代通信信号有限公司 | Intelligent online monitoring method and system for urban rail equipment |
| CN118194207A (en)* | 2024-05-15 | 2024-06-14 | 中节能晶和科技有限公司 | Rule engine-based road illumination Internet of things equipment anomaly monitoring system and method |
| CN119155190A (en)* | 2024-08-01 | 2024-12-17 | 上海许长网络科技有限公司 | Visual computing model construction method based on stream computing |
| Publication number | Publication date |
|---|---|
| CN116009428B (en) | 2025-08-22 |
| Publication | Publication Date | Title |
|---|---|---|
| CN116009428A (en) | Industrial data monitoring system, method, and medium based on streaming computing engine | |
| Fragkoulis et al. | A survey on the evolution of stream processing systems | |
| CN111526060B (en) | Method and system for processing service log | |
| US11468062B2 (en) | Order-independent multi-record hash generation and data filtering | |
| Cugola et al. | Processing flows of information: From data stream to complex event processing | |
| CN116010452A (en) | Industrial data processing system and method based on stream type calculation engine and medium | |
| CN107103064B (en) | Data statistical method and device | |
| US20140222843A1 (en) | Systems, Methods, and computer Program Products to Ingest, Process, and Output Large Data | |
| Baer et al. | DBStream: A holistic approach to large-scale network traffic monitoring and analysis | |
| Affetti et al. | Defining the execution semantics of stream processing engines | |
| CN113791586A (en) | Novel industrial APP and identification registration analysis integration method | |
| CN118377768A (en) | Data ETL method, device, equipment and medium based on service flow | |
| Chen et al. | Towards low-latency big data infrastructure at Sangfor | |
| CN119622206A (en) | Method, device and medium for improving data direct access efficiency based on large model | |
| CN118227181A (en) | Method and system for counting user software thermal update in real time | |
| CN113553320B (en) | Data quality monitoring method and device | |
| KR20150089693A (en) | Apparatus and Method for Extending Data Store System Based on Big Data Platform | |
| CN120561343B (en) | Hyper-converged server multi-resource integration system and scheduling method | |
| Foo | Automated discovery of performance regressions in enterprise applications | |
| Bate | Auditable Data Provenance in Streaming Data Processing | |
| Santos | Data ingestion in Smart Cities | |
| Höger | Fault tolerance in parallel data processing systems | |
| Margara | Combining expressiveness and efficiency in a complex event processing middleware | |
| Cilloni | Towards a unifying modeling framework for data-intensive tools | |
| Wecel et al. | Stream processing tools for analyzing objects in motion sending high-volume location data |
| Date | Code | Title | Description |
|---|---|---|---|
| PB01 | Publication | ||
| PB01 | Publication | ||
| SE01 | Entry into force of request for substantive examination | ||
| SE01 | Entry into force of request for substantive examination | ||
| GR01 | Patent grant | ||
| GR01 | Patent grant |