技术领域Technical Field
本发明涉及数据处理技术领域,尤其涉及一种数据实时解析输出方法及系统。The present invention relates to the field of data processing technology, and in particular to a method and system for real-time data analysis and output.
背景技术Background Art
在当前无线网络运维环境中,基站、小区等网元会周期性地产生大量性能数据文件,数据格式存在厂家差异,文件结构常采用压缩封装并以异步方式写入指定目录。In the current wireless network operation and maintenance environment, network elements such as base stations and cells periodically generate a large number of performance data files. The data formats vary among manufacturers, and the file structures are often compressed and encapsulated and written to a specified directory asynchronously.
为对这些数据进行分析,现有方案普遍采用批处理架构,即通过调度任务周期性扫描数据目录、解压文件、入库计数器原始数据,再在数据库中执行指标计算。虽然此方案在离线统计场景下具备一定处理能力,但由于其高度依赖磁盘I/O及多次中间存储,且文件格式异构需进行手工适配处理,造成处理流程冗长、响应延迟高、重复计算多,难以支撑高频次的指标更新和故障快速定位。To analyze this data, existing solutions generally use a batch processing architecture. This involves scheduling tasks to periodically scan data directories, decompress files, store raw counter data, and then perform metric calculations in the database. While this solution offers some processing power in offline statistical scenarios, it relies heavily on disk I/O and multiple intermediate storages, and the need for manual adaptation to heterogeneous file formats results in lengthy processing, high response latency, and frequent recalculation, making it difficult to support frequent metric updates and rapid fault location.
因此,如何提升数据实时解析输出的及时性和准确性,成为亟需解决的技术问题。Therefore, how to improve the timeliness and accuracy of real-time data analysis and output has become a technical problem that needs to be solved urgently.
发明内容Summary of the Invention
本发明提供一种数据实时解析输出方法、系统、电子设备及存储介质,用以解决现有技术中的缺陷,实现提升数据实时解析输出的及时性和准确性。The present invention provides a method, system, electronic device and storage medium for real-time data analysis and output, which are used to solve the defects in the existing technology and improve the timeliness and accuracy of real-time data analysis and output.
本发明提供一种数据实时解析输出方法,包括如下步骤:The present invention provides a method for real-time data analysis and output, comprising the following steps:
扫描厂家服务器的预设目录,对于检测到的每一数据文件,生成用于表征所述数据文件的触发消息,并将所述触发消息写入消息中间件;Scan the preset directory of the manufacturer's server, generate a trigger message for each data file detected to represent the data file, and write the trigger message into the message middleware;
在流处理框架中,通过数据源组件从所述消息中间件中实时消费所述触发消息,按照触发消息中指示的文件路径读取对应的数据文件,形成原始数据流;In the stream processing framework, the trigger message is consumed from the message middleware in real time by the data source component, and the corresponding data file is read according to the file path indicated in the trigger message to form an original data stream;
在所述流处理框架中,对所述原始数据流执行实时转换处理,生成用于表征目标对象性能指标的指标数据流;In the stream processing framework, real-time conversion processing is performed on the original data stream to generate an indicator data stream for representing the performance indicator of the target object;
将所述指标数据流写入目标存储介质和/或所述消息中间件。The indicator data stream is written into a target storage medium and/or the message middleware.
根据本发明提供的一种数据实时解析输出方法,在流处理框架中,通过数据源组件从所述消息中间件中实时消费所述触发消息,按照触发消息中指示的文件路径读取对应的数据文件,形成原始数据流,包括:According to a real-time data parsing and output method provided by the present invention, in a stream processing framework, a data source component consumes the trigger message from the message middleware in real time, reads the corresponding data file according to the file path indicated in the trigger message, and forms an original data stream, including:
由所述数据源组件订阅所述消息中间件中与所述触发消息对应的主题,并按所述触发消息携带的分区号与偏移量顺序拉取触发消息;The data source component subscribes to the topic corresponding to the trigger message in the message middleware, and pulls the trigger message in the order of the partition number and offset carried by the trigger message;
解析每条所述触发消息中携带的数据文件路径及文件大小信息,通过分布式文件系统接口以分块方式顺序读取数据文件内容;Parsing the data file path and file size information carried in each trigger message, and sequentially reading the data file content in a block manner through a distributed file system interface;
为每个读取块附加当前处理时间戳并封装为数据元素;Attach the current processing timestamp to each read block and encapsulate it as a data element;
将各个所述数据元素依次输出,得到所述原始数据流。Output each of the data elements in sequence to obtain the original data stream.
根据本发明提供的一种数据实时解析输出方法,所述在所述流处理框架中,对所述原始数据流执行实时转换处理,生成用于表征目标对象性能指标的指标数据流,包括:According to a real-time data parsing and output method provided by the present invention, in the stream processing framework, real-time conversion processing is performed on the original data stream to generate an indicator data stream for representing the performance indicators of the target object, including:
对所述原始数据流进行合法性校验,剔除不满足预设规则的文件记录,得到目标文件流;Performing a validity check on the original data stream, eliminating file records that do not meet preset rules, and obtaining a target file stream;
在所述流处理框架内,对所述目标文件流进行解压,得到包含厂家标识及文件格式信息的文件实体对象;In the stream processing framework, the target file stream is decompressed to obtain a file entity object including a manufacturer identification and file format information;
根据所述文件实体对象中的厂家标识与文件格式,选择匹配的解析器解析文件内容,提取底层计数器数据并附加标签信息,形成标记计数器数据流;According to the manufacturer identifier and file format in the file entity object, a matching parser is selected to parse the file content, extract the underlying counter data and append tag information to form a marked counter data stream;
根据所述标签信息对所述标记计数器数据流进行拆分,生成至少包括小区级数据流和基站级数据流的多条逻辑数据流;Splitting the marker counter data stream according to the label information to generate a plurality of logical data streams including at least a cell-level data stream and a base station-level data stream;
将各逻辑数据流中的底层计数器数据转换为行记录并根据所述行记录注册相应表结构,形成表化数据流;Convert the underlying counter data in each logical data stream into row records and register the corresponding table structure according to the row records to form a tabular data stream;
在预设时间窗口内对所述表化数据流进行聚合计算,并临时关联维度表,得到所述指标数据流。Aggregate calculations are performed on the tabular data stream within a preset time window, and the dimension table is temporarily associated to obtain the indicator data stream.
根据本发明提供的一种数据实时解析输出方法,所述在所述流处理框架内,对所述目标文件流进行解压,得到包含厂家标识及文件格式信息的文件实体对象,包括:According to a real-time data parsing and output method provided by the present invention, the target file stream is decompressed within the stream processing framework to obtain a file entity object containing a manufacturer identifier and file format information, including:
根据所述目标文件流中每个文件记录的文件后缀判定其压缩格式是否为预设后缀中的任一种;Determining whether the compression format of each file record in the target file stream is any one of the preset suffixes according to the file suffix of the file record;
在所述流处理框架内动态调用与所述压缩格式对应的解压插件,以流式方式逐字节解码而不落盘生成解压后文件;Dynamically calling a decompression plug-in corresponding to the compression format within the stream processing framework, decoding byte by byte in a streaming manner without writing to a disk to generate a decompressed file;
在解码过程中解析所述解压后文件的文件头或预置标识字段,提取厂家标识和数据文件格式信息;During the decoding process, the file header or preset identification field of the decompressed file is parsed to extract the manufacturer identification and data file format information;
将解压后文件内容与所述厂家标识及文件格式信息封装为所述文件实体对象。The decompressed file content, the manufacturer identifier and the file format information are encapsulated into the file entity object.
根据本发明提供的一种数据实时解析输出方法,所述根据所述文件实体对象中的厂家标识与文件格式,选择匹配的解析器解析文件内容,提取底层计数器数据并附加标签信息,形成标记计数器数据流,包括:According to a real-time data parsing and output method provided by the present invention, the method selects a matching parser to parse the file content based on the manufacturer identifier and file format in the file entity object, extracts the underlying counter data, and appends tag information to form a labeled counter data stream, including:
预先在所述流处理框架中维护解析器路由表,所述解析器路由表将每一厂家标识与文件格式的组合键映射至对应的解析器类;Maintaining a parser routing table in advance in the stream processing framework, wherein the parser routing table maps each combination key of the manufacturer identifier and the file format to a corresponding parser class;
当接收到文件实体对象时,查询所述解析器路由表并动态加载与该组合键匹配的解析器类;When a file entity object is received, query the parser routing table and dynamically load the parser class that matches the composite key;
当所述数据文件格式为层次化标记文件格式时,调用根据SAX的XML解析器,且当所述数据文件格式为定长文本或逗号分隔格式时,调用根据行列解析的CSV解析器;When the data file format is a hierarchical markup file format, an XML parser based on SAX is called, and when the data file format is a fixed-length text or comma-delimited format, a CSV parser based on row and column parsing is called;
利用所述解析器解析文件内容,获取包含计数器名称-值对的原始计数器数据;Parsing the file content using the parser to obtain raw counter data including counter name-value pairs;
为每一计数器数据追加标签信息,所述标签信息至少包括厂家标识、数据采样时间以及由文件名解析得到的网元标识或小区标识;Add tag information to each counter data, where the tag information includes at least a manufacturer identifier, a data sampling time, and a network element identifier or a cell identifier obtained by parsing the file name;
将附加所述标签信息后的计数器数据封装为所述标记计数器数据流。The counter data after the tag information is added is encapsulated into the marked counter data stream.
根据本发明提供的一种数据实时解析输出方法,所述根据所述标签信息对所述标记计数器数据流进行拆分,生成至少包括小区级数据流和基站级数据流的多条逻辑数据流,包括:According to a real-time data parsing and output method provided by the present invention, the marking counter data stream is split according to the label information to generate multiple logical data streams including at least a cell-level data stream and a base station-level data stream, including:
在所述流处理框架中设置侧输出路由单元,所述路由单元根据每条计数器数据随附的网元层级标签进行匹配;A side output routing unit is provided in the flow processing framework, and the routing unit performs matching according to the network element level label attached to each counter data;
当所述网元层级标签同时包含小区标识和基站标识时,将所述标记计数器数据流分配至小区级数据流;When the network element level label includes both a cell identifier and a base station identifier, allocating the marker counter data flow to the cell level data flow;
当所述网元层级标签仅包含基站标识而不包含小区标识时,将所述标记计数器数据流分配至基站级数据流。When the network element level tag only includes a base station identifier but not a cell identifier, the marker counter data flow is allocated to a base station level data flow.
根据本发明提供的一种数据实时解析输出方法,所述将各逻辑数据流中的底层计数器数据转换为行记录并根据所述行记录注册相应表结构,形成表化数据流,包括:According to a real-time data parsing and output method provided by the present invention, the underlying counter data in each logical data stream is converted into row records and a corresponding table structure is registered according to the row records to form a tabular data stream, including:
针对任一所述逻辑数据流,解析其中的计数器名称集合,并以采样时间、网元标识、计数器名称三类字段为基础,自动生成字段描述元数据;For any of the logical data flows, parse the counter name set therein and automatically generate field description metadata based on the three fields of sampling time, network element identifier, and counter name;
调用所述流处理框架的表注册接口,依据所述字段描述元数据动态创建临时表,所述临时表以采样时间与网元标识的组合字段为主键;Calling the table registration interface of the stream processing framework to dynamically create a temporary table based on the field description metadata, wherein the temporary table uses the combined field of sampling time and network element identifier as the primary key;
将逻辑数据流中的每条计数器数据与对应标签字段拼接成一条行记录,并实时插入至所述临时表,得到携带水印时间戳的表化数据流。Each counter data in the logical data stream is spliced with the corresponding tag field into a row record, and inserted into the temporary table in real time to obtain a tabular data stream carrying a watermark timestamp.
根据本发明提供的一种数据实时解析输出方法,所述在预设时间窗口内对所述表化数据流进行聚合计算,并临时关联维度表,得到所述指标数据流,包括:According to a real-time data parsing and output method provided by the present invention, performing aggregation calculation on the tabular data stream within a preset time window and temporarily associating the dimension table to obtain the indicator data stream includes:
为所述表化数据流配置滚动时间窗口,所述滚动时间窗口的长度为预设时长,步长与长度一致;Configuring a rolling time window for the tabular data stream, wherein the length of the rolling time window is a preset time length, and the step size is consistent with the length;
在每一所述滚动时间窗口内,按照网元标识和计数器名称对行记录进行分组,并计算各组计数器值的求和、最大值和最小值,得到窗口聚合结果;In each of the rolling time windows, the row records are grouped according to the network element identifier and the counter name, and the sum, maximum value, and minimum value of the counter values in each group are calculated to obtain a window aggregation result;
在输出窗口聚合结果之前,根据窗口结束时间的水印时间戳,将所述窗口聚合结果与包含地市、区县及厂家信息的维度表进行临时关联,补充区域维度及厂家维度字段;Before outputting the window aggregation result, the window aggregation result is temporarily associated with the dimension table containing the city, district, county and manufacturer information according to the watermark timestamp of the window end time, and the regional dimension and manufacturer dimension fields are supplemented;
将关联后的窗口聚合结果作为所述指标数据流。The associated window aggregation result is used as the indicator data stream.
本发明还提供一种数据实时解析输出系统,包括如下模块:The present invention also provides a real-time data analysis and output system, comprising the following modules:
第一处理模块,用于扫描厂家服务器的预设目录,对于检测到的每一数据文件,生成用于表征所述数据文件的触发消息,并将所述触发消息写入消息中间件;A first processing module is configured to scan a preset directory of a manufacturer's server, generate a trigger message for representing each data file detected, and write the trigger message into a message middleware;
第二处理模块,用于在流处理框架中,通过数据源组件从所述消息中间件中实时消费所述触发消息,按照触发消息中指示的文件路径读取对应的数据文件,形成原始数据流;A second processing module is configured to consume the trigger message from the message middleware in real time through a data source component in a stream processing framework, read the corresponding data file according to the file path indicated in the trigger message, and form an original data stream;
第三处理模块,用于在所述流处理框架中,对所述原始数据流执行实时转换处理,生成用于表征目标对象性能指标的指标数据流;A third processing module is used to perform real-time conversion processing on the original data stream in the stream processing framework to generate an indicator data stream for representing the performance indicator of the target object;
第四处理模块,用于将所述指标数据流写入目标存储介质和/或所述消息中间件。The fourth processing module is used to write the indicator data stream into a target storage medium and/or the message middleware.
本发明还提供一种电子设备,包括存储器、处理器及存储在存储器上并可在处理器上运行的计算机程序,所述处理器执行所述程序时实现如上述任一种所述数据实时解析输出方法。The present invention also provides an electronic device, comprising a memory, a processor, and a computer program stored in the memory and executable on the processor, wherein when the processor executes the program, the method for real-time data analysis and output as described above is implemented.
本发明还提供一种非暂态计算机可读存储介质,其上存储有计算机程序,该计算机程序被处理器执行时实现如上述任一种所述数据实时解析输出方法。The present invention also provides a non-transitory computer-readable storage medium having a computer program stored thereon, which, when executed by a processor, implements any of the above-mentioned methods for real-time data analysis and output.
本发明还提供一种计算机程序产品,包括计算机程序,所述计算机程序被处理器执行时实现如上述任一种所述数据实时解析输出方法。The present invention also provides a computer program product, comprising a computer program, wherein when the computer program is executed by a processor, the computer program implements any of the above-mentioned methods for real-time data analysis and output.
综上所述,本申请实施例中提供的一个或多个技术方案,至少具有如下技术效果或优点:In summary, one or more technical solutions provided in the embodiments of the present application have at least the following technical effects or advantages:
通过扫描厂家服务器的预设目录,对于检测到的每一数据文件生成用于表征该数据文件的触发消息,并将所述触发消息写入消息中间件,从而实现了对性能数据产生事件的实时感知与异步驱动,有效避免了传统定时轮询模式带来的处理延迟与资源浪费。在此基础上,在流处理框架中通过数据源组件从消息中间件中实时消费触发消息,并按照触发消息中指示的文件路径读取对应的数据文件,形成原始数据流,从而实现了从消息驱动到数据加载的全流程自动化处理。该路径通过绑定触发消息与具体文件路径,确保了文件读取过程的准确性与顺序性。接着,在流处理框架中对原始数据流执行实时转换处理,生成用于表征目标对象性能指标的指标数据流,从而实现了从原始文件内容到结构化、可计算性能指标的快速转化。最后,将指标数据流写入目标存储介质和消息中间件中的至少一者,以供后续系统实时消费,从而有效提升数据实时解析输出的及时性和准确性。By scanning the pre-set directory of the manufacturer's server, a trigger message representing each detected data file is generated and written to the message-based middleware. This enables real-time detection and asynchronous actuation of performance data generation events, effectively avoiding the processing delays and resource waste associated with traditional timed polling. Furthermore, within the stream processing framework, the data source component consumes trigger messages from the message-based middleware in real time and reads the corresponding data file according to the file path indicated in the trigger message, forming a raw data stream. This automates the entire process from message-driven processing to data loading. By binding the trigger message to a specific file path, this path ensures the accuracy and sequential nature of the file reading process. Next, the stream processing framework performs real-time conversion on the raw data stream to generate an indicator data stream representing the target object's performance indicators, enabling rapid conversion from raw file content to structured, computable performance indicators. Finally, the indicator data stream is written to at least one of the target storage medium or the message-based middleware for real-time consumption by subsequent systems, effectively improving the timeliness and accuracy of real-time data analysis and output.
附图说明BRIEF DESCRIPTION OF THE DRAWINGS
为了更清楚地说明本发明或现有技术中的技术方案,下面将对实施例或现有技术描述中所需要使用的附图作一简单地介绍,显而易见地,下面描述中的附图是本发明的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他的附图。In order to more clearly illustrate the technical solutions in the present invention or the prior art, a brief introduction is given below to the drawings required for use in the embodiments or the description of the prior art. Obviously, the drawings described below are some embodiments of the present invention. For ordinary technicians in this field, other drawings can be obtained based on these drawings without paying any creative work.
图1是本发明提供的数据实时解析输出方法的流程示意图之一。FIG1 is a flow chart of a method for real-time data analysis and output provided by the present invention.
图2是本发明提供的数据实时解析输出方法的流程示意图之二。FIG2 is a second flow chart of the method for real-time data analysis and output provided by the present invention.
图3是本发明提供的数据实时解析输出方法的流程示意图之三。FIG3 is a third flow chart of the real-time data analysis and output method provided by the present invention.
图4是本发明提供的数据实时解析输出方法的流程示意图之四。FIG4 is a fourth flow chart of the real-time data analysis and output method provided by the present invention.
图5是本发明提供的数据实时解析输出方法的流程示意图之五。FIG5 is a fifth flow chart of the real-time data analysis and output method provided by the present invention.
图6是本发明提供的数据实时解析输出方法的流程示意图之六。FIG6 is a sixth flow chart of the real-time data analysis and output method provided by the present invention.
图7是本发明提供的数据实时解析输出方法的流程示意图之七。FIG. 7 is a seventh flow chart of the method for real-time data analysis and output provided by the present invention.
图8是本发明提供的数据实时解析输出方法的流程示意图之八。FIG8 is an eighth flow chart of the real-time data analysis and output method provided by the present invention.
图9是本发明提供的数据实时解析输出系统的结构示意图。FIG9 is a schematic structural diagram of the real-time data analysis and output system provided by the present invention.
图10是本发明提供的电子设备的结构示意图。FIG10 is a schematic structural diagram of an electronic device provided by the present invention.
具体实施方式DETAILED DESCRIPTION
为使本发明的目的、技术方案和优点更加清楚,下面将结合本发明中的附图,对本发明中的技术方案进行清楚、完整地描述,显然,所描述的实施例是本发明一部分实施例,而不是全部的实施例。根据本发明中的实施例,本领域普通技术人员在没有作出创造性劳动前提下所获得的所有其他实施例,都属于本发明保护的范围。To make the objectives, technical solutions, and advantages of the present invention more clear, the technical solutions of the present invention will be clearly and completely described below in conjunction with the accompanying drawings. Obviously, the embodiments described are only some embodiments of the present invention, not all embodiments. All other embodiments obtained by ordinary technicians in this field without making creative efforts based on the embodiments of the present invention are within the scope of protection of the present invention.
需要说明的是,在本发明的描述中,术语“包括”、“包含”或者其任何其他变体意在涵盖非排他性的包含,从而使得包括一系列要素的过程、方法、物品或者设备不仅包括那些要素,而且还包括没有明确列出的其他要素,或者是还包括为这种过程、方法、物品或者设备所固有的要素。在没有更多限制的情况下,由语句“包括一个……”限定的要素,并不排除在包括所述要素的过程、方法、物品或者设备中还存在另外的相同要素。术语“上”、“下”等指示的方位或位置关系为根据附图所示的方位或位置关系,仅是为了便于描述本发明和简化描述,而不是指示或暗示所指的系统或元件必须具有特定的方位、以特定的方位构造和操作,因此不能理解为对本发明的限制。对于本领域的普通技术人员而言,可以根据具体情况理解上述术语在本发明中的具体含义。It should be noted that, in the description of the present invention, the terms "comprise", "include" or any other variants thereof are intended to cover non-exclusive inclusion, so that a process, method, article or device comprising a series of elements includes not only those elements, but also includes other elements not explicitly listed, or also includes elements inherent to such process, method, article or device. In the absence of further restrictions, an element defined by the sentence "comprises a ..." does not exclude the presence of other identical elements in the process, method, article or device comprising the element. The orientation or positional relationship indicated by the terms "upper", "lower", etc. is based on the orientation or positional relationship shown in the accompanying drawings, and is only for the convenience of describing the present invention and simplifying the description, rather than indicating or implying that the system or element referred to must have a specific orientation, be constructed and operated in a specific orientation, and therefore cannot be understood as a limitation on the present invention. For those of ordinary skill in the art, the specific meanings of the above terms in the present invention can be understood according to the specific circumstances.
本发明中的术语“第一”、“第二”等是用于区别类似的对象,而不用于描述特定的顺序或先后次序。应该理解这样使用的数据在适当情况下可以互换,以便本发明的实施例能够以除了在这里图示或描述的那些以外的顺序实施,且“第一”、“第二”等所区分的对象通常为一类,并不限定对象的个数,例如第一对象可以是一个,也可以是多个。此外,“和/或”表示所连接对象的至少其中之一,字符“/”,一般表示前后关联对象是一种“或”的关系。The terms "first," "second," and so forth, used herein are used to distinguish similar objects, not to describe a specific order or precedence. It should be understood that such terms are interchangeable where appropriate, allowing embodiments of the present invention to be implemented in an order other than that illustrated or described herein. Furthermore, the terms "first," "second," and so forth generally distinguish objects of a single type, and do not limit the number of objects. For example, the first object may be one or more. Furthermore, "and/or" indicates at least one of the connected objects, and the character "/" generally indicates an "or" relationship between the connected objects.
下面结合图1-图10描述本发明所提供的数据实时解析输出方法、系统、电子设备及存储介质。The following describes the real-time data analysis and output method, system, electronic device, and storage medium provided by the present invention in conjunction with Figures 1 to 10.
图1是本发明提供的数据实时解析输出方法的流程示意图之一,如图1所示,包括步骤101至104:FIG1 is a flow chart of a method for real-time data analysis and output provided by the present invention, which includes steps 101 to 104:
步骤101:扫描厂家服务器的预设目录,对于检测到的每一数据文件,生成用于表征数据文件的触发消息,并将触发消息写入消息中间件。Step 101: Scan the preset directory of the manufacturer's server, generate a trigger message for representing each data file detected, and write the trigger message into the message middleware.
在本发明的一种优选实施例中,步骤101涉及对厂家服务器的预设目录进行扫描,并在检测到数据文件时生成用于表征该数据文件的触发消息,并将该触发消息写入消息中间件。该步骤作为本发明数据实时解析输出流程的起始动作,其主要目的是实现对厂家侧性能数据文件的增量发现与及时通知机制,以支持后续流处理框架中基于事件驱动的数据读取和处理操作,从而克服传统批处理方式在时间粒度与处理延迟方面的劣势。In a preferred embodiment of the present invention, step 101 involves scanning a preset directory on the manufacturer's server. Upon detecting a data file, a trigger message representing the data file is generated and written to the message middleware. This step, the initial step of the present invention's real-time data parsing and output process, primarily aims to implement a mechanism for incremental discovery and timely notification of manufacturer-side performance data files, supporting event-driven data reading and processing in the subsequent stream processing framework. This overcomes the time granularity and processing latency disadvantages of traditional batch processing.
在具体实施过程中,系统通过周期性或事件驱动方式监控预设的厂家服务器目录,该目录通常为厂家设备按照特定规则上传性能数据文件的统一存储路径。当系统检测到该目录中新出现的数据文件(例如文件名未被历史记录标记或创建时间在上次扫描之后),即认为该数据文件为待处理的新数据。此时,系统基于该数据文件生成触发消息,其中所述触发消息至少携带数据文件的路径信息、创建时间、文件大小、所属厂家标识等关键字段,用于准确标识该文件并为后续处理提供必要的上下文信息。In specific implementations, the system periodically or event-drivenly monitors a pre-set manufacturer server directory. This directory typically serves as a unified storage path for performance data files uploaded by manufacturer devices according to specific rules. When the system detects a new data file in this directory (e.g., a file name not marked in the history record or created after the last scan), it identifies the file as new data to be processed. At this point, the system generates a trigger message based on the data file. This message carries at least key fields such as the data file's path, creation time, file size, and manufacturer identifier, accurately identifying the file and providing the necessary context for subsequent processing.
生成的触发消息随后被写入预设的消息中间件中,所述消息中间件可为Kafka、RocketMQ等具备高吞吐与分布式订阅能力的流式消息系统。通过将触发消息写入消息中间件,可实现数据驱动的解耦式处理模式,即数据文件的检测与解析过程在逻辑上进行隔离,解除了时间同步的约束,有利于支持弹性可扩展的实时处理管道构建。此外,消息中间件提供的分区、偏移量机制也为后续流处理系统中实现精准消费、容错恢复和数据幂等处理提供了良好的基础。The generated trigger message is then written to the pre-set message middleware, which can be a streaming messaging system with high throughput and distributed subscription capabilities, such as Kafka or RocketMQ. By writing the trigger message to the message middleware, a data-driven, decoupled processing model can be implemented. This logically isolates the detection and parsing processes of data files, removing time synchronization constraints and supporting the construction of elastic and scalable real-time processing pipelines. Furthermore, the partitioning and offset mechanisms provided by the message middleware provide a solid foundation for precise consumption, fault-tolerant recovery, and idempotent data processing in subsequent stream processing systems.
步骤102:在流处理框架中,通过数据源组件从消息中间件中实时消费触发消息,按照触发消息中指示的文件路径读取对应的数据文件,形成原始数据流。Step 102: In the stream processing framework, the trigger message is consumed from the message middleware in real time through the data source component, and the corresponding data file is read according to the file path indicated in the trigger message to form an original data stream.
在本实施例中,步骤102旨在基于步骤101中写入消息中间件的触发消息,驱动流处理框架实现数据文件的自动读取与原始数据流生成。该步骤的设计初衷在于,传统性能文件的解析多由调度脚本主动拉取目录文件,这种“定时轮询+批量加载”的模式在多厂家、大规模异构数据接入场景中面临显著的延迟和资源浪费问题。而通过构建“消息驱动的数据获取机制”,能够实现基于触发事件的文件按需读取,有效提升文件到数据流的响应速度与资源利用效率。In this embodiment, step 102 is intended to drive the stream processing framework to automatically read data files and generate original data streams based on the trigger message written to the message middleware in step 101. The original intention of the design of this step is that the parsing of traditional performance files is mostly done by the scheduling script actively pulling directory files. This "scheduled polling + batch loading" model faces significant delays and resource waste in multi-vendor, large-scale heterogeneous data access scenarios. By building a "message-driven data acquisition mechanism", it is possible to achieve on-demand reading of files based on trigger events, effectively improving the response speed and resource utilization efficiency of files to data streams.
在一种可能的实施方式中,图2是本发明提供的数据实时解析输出方法的流程示意图之二,如图2所示,步骤102具体包括如下步骤:In one possible implementation, FIG2 is a second flow chart of the method for real-time data analysis and output provided by the present invention. As shown in FIG2 , step 102 specifically includes the following steps:
步骤201:由数据源组件订阅消息中间件中与触发消息对应的主题,并按触发消息携带的分区号与偏移量顺序拉取触发消息。Step 201: The data source component subscribes to the topic corresponding to the trigger message in the message middleware, and pulls the trigger message in the order of the partition number and offset carried in the trigger message.
步骤202:解析每条触发消息中携带的数据文件路径及文件大小信息,通过分布式文件系统接口以分块方式顺序读取数据文件内容。Step 202: parse the data file path and file size information carried in each trigger message, and read the data file content sequentially in blocks through the distributed file system interface.
步骤203:为每个读取块附加当前处理时间戳并封装为数据元素。Step 203: Add the current processing timestamp to each read block and encapsulate it as a data element.
步骤204:将各个数据元素依次输出,得到原始数据流。Step 204: Output each data element in sequence to obtain the original data stream.
在本实施例中,步骤201至步骤204用于详细实现数据源组件如何从消息中间件中拉取触发消息,并基于触发消息中包含的信息高效地读取数据文件,最终构建出具备时间标识的原始数据流。该子流程作为步骤102的具体实现,关键在于保障文件读取的可控性、顺序性与数据结构化完整性,以便后续流处理环节在不引入中间磁盘落地的前提下,直接完成实时指标提取。In this embodiment, steps 201 through 204 detail how the data source component pulls trigger messages from the message-based middleware and efficiently reads data files based on the information contained in the trigger messages, ultimately constructing a time-stamped raw data stream. This sub-process, as a specific implementation of step 102, is crucial for ensuring the controllability, sequentiality, and structural integrity of file reading, allowing subsequent stream processing to directly extract real-time metrics without introducing intermediate disks.
首先,在步骤201中,数据源组件被配置为订阅消息中间件(例如Kafka)中指定的主题,该主题用于承载步骤101所产生的触发消息。为保证读取顺序一致且可追溯,数据源组件基于每条触发消息中的分区编号和偏移量字段,按序拉取触发消息,并记录消费位点。当系统发生重启或故障恢复时,可基于该位点实现断点续读,避免文件重复读取或遗漏。First, in step 201, the data source component is configured to subscribe to a specified topic in a message-based middleware (e.g., Kafka). This topic carries the trigger message generated in step 101. To ensure consistent and traceable reading order, the data source component sequentially pulls trigger messages based on the partition number and offset fields in each trigger message and records the consumption location. This location allows for continued reading after a system restart or failure recovery, preventing duplicate or missed file reads.
接着,在步骤202中,数据源组件对触发消息进行解析,提取其中携带的数据文件路径及文件大小等字段。根据所述路径字段,组件通过预配置的分布式文件系统接口(如HDFS或OSS)发起按块顺序读取操作。系统采用固定块大小(如64KB或128KB)将大文件拆解为若干连续数据块,并以流式方式顺序读取,避免将整个文件加载至内存造成资源阻塞。Next, in step 202, the data source component parses the trigger message, extracting fields such as the data file path and file size. Based on the path field, the component initiates a block-by-block sequential read operation through a preconfigured distributed file system interface (such as HDFS or OSS). The system uses a fixed block size (such as 64KB or 128KB) to break large files into several contiguous data blocks and read them sequentially in a streaming manner, avoiding resource congestion caused by loading the entire file into memory.
随后,步骤203中,为实现时间驱动下游处理及支持窗口聚合等时序算子操作,系统在每个读取块生成完成后,立即附加当前处理时间戳,并将该时间戳与读取块共同封装为结构化数据元素。该时间戳既可表征实际文件处理时间,也可与文件内容中的业务采样时间配合,构成多粒度时间特征。Subsequently, in step 203, to enable time-driven downstream processing and support timing operators like window aggregation, the system immediately appends the current processing timestamp to each read block after it is generated. This timestamp and the read block are encapsulated together as a structured data element. This timestamp can represent the actual file processing time and, in conjunction with the business sampling time in the file content, form a multi-granularity time feature.
最后,步骤204中,系统将上述结构化数据元素依序输出,构成原始数据流的组成单元。该原始数据流具备有序性、结构完整性及时间属性,能够被下游合法性校验、解压与指标解析模块直接接收处理,无需中间缓存或数据落盘。Finally, in step 204, the system sequentially outputs the structured data elements to form the raw data stream. This raw data stream possesses order, structural integrity, and temporal properties, enabling it to be directly received and processed by downstream validation, decompression, and metrics analysis modules without the need for intermediate caching or data storage.
综上,步骤201至204在技术上实现了从触发消息到结构化原始数据流的高效桥接,其一通过位点控制确保拉取顺序与可恢复性,其二通过块级读取保障系统资源负载平衡,其三通过时间戳附加提升数据流的时序处理能力,整体显著提升了性能数据采集的实时性、稳定性与可扩展性。In summary, steps 201 to 204 technically achieve an efficient bridge from trigger messages to structured raw data streams. First, they ensure the pulling order and recoverability through site control. Second, they ensure the load balance of system resources through block-level reading. Third, they enhance the timing processing capability of data streams by adding timestamps. Overall, they significantly improve the real-time, stability, and scalability of performance data collection.
步骤103:在流处理框架中,对原始数据流执行实时转换处理,生成用于表征目标对象性能指标的指标数据流。Step 103: In the stream processing framework, perform real-time conversion processing on the original data stream to generate an indicator data stream for representing the performance indicators of the target object.
在本实施例中,步骤103旨在将步骤102所形成的原始数据流,转化为具备分析意义的指标数据流。该步骤是整条数据处理链路中最核心的转换环节,其设计目的是通过流处理框架在不中断数据通路的前提下,完成从文件数据到性能指标的全流程解析、过滤、标注、结构化与计算,确保最终结果具备实时性和完整性。In this embodiment, step 103 aims to convert the raw data stream generated in step 102 into an analytically meaningful indicator data stream. This step is the core conversion link in the entire data processing chain. Its design goal is to complete the entire process from file data to performance indicator parsing, filtering, annotation, structuring, and calculation through the stream processing framework without interrupting the data path, ensuring the real-time and completeness of the final results.
在一种可能的实施方式中,图3是本发明提供的数据实时解析输出方法的流程示意图之三,如图3所示,步骤103具体包括步骤301-306:In a possible implementation, FIG3 is a third flow chart of the method for real-time data analysis and output provided by the present invention. As shown in FIG3 , step 103 specifically includes steps 301-306:
步骤301:对原始数据流进行合法性校验,剔除不满足预设规则的文件记录,得到目标文件流。Step 301: Perform a validity check on the original data stream, remove file records that do not meet preset rules, and obtain a target file stream.
在本实施例中,步骤301的作用是在原始数据流正式进入解压、解析等计算密集型处理之前,首先完成对数据文件的合法性校验。该步骤的设计初衷在于,厂家生成的性能数据文件在上传或生成过程中可能存在异常情况,例如文件截断、命名错误、采样时间不一致、格式破损等问题,若直接进入解析环节不仅可能导致程序异常终止,还会浪费大量计算资源,甚至造成指标误报。因此,建立一道“合法性过滤”环节,在源头保障数据可用性,是确保整体流式链路稳定性和准确性的关键环节。In this embodiment, the role of step 301 is to first complete the legitimacy check of the data file before the original data stream officially enters the decompression, parsing and other computationally intensive processing. The original intention of the design of this step is that the performance data files generated by the manufacturer may have abnormal situations during the upload or generation process, such as file truncation, naming errors, inconsistent sampling time, format damage and other problems. If they enter the parsing link directly, it may not only cause the program to terminate abnormally, but also waste a lot of computing resources and even cause false indicators. Therefore, establishing a "legality filtering" link to ensure data availability at the source is a key link to ensure the stability and accuracy of the overall streaming link.
在具体实施方式中,合法性校验针对每一条原始数据流中的文件记录,执行以下内容校验逻辑。首先,系统依据配置的厂家规则,校验文件名称是否满足预设的命名规范,例如是否包含有效的厂家标识、时间戳、区域编码等字段,确保该文件可被唯一识别与归属。其次,校验文件头部记录的采样时间是否处于当前系统时间之前的有效时间窗口,防止未来时间的异常数据进入指标计算逻辑。再次,读取文件头中记录的文件大小字段,结合实际读取到的字节数进行对比,判断是否存在文件内容丢失或截断问题。最后,对于具备校验和字段的数据文件,系统根据内容重新计算校验值,并与文件头中给出的校验和进行比对,以检验内容完整性。In a specific implementation method, the legitimacy check executes the following content verification logic for each file record in the original data stream. First, the system verifies whether the file name meets the preset naming specifications based on the configured manufacturer rules, such as whether it contains valid manufacturer identification, timestamp, area code and other fields to ensure that the file can be uniquely identified and attributed. Secondly, verify whether the sampling time recorded in the file header is in the valid time window before the current system time to prevent abnormal data in the future from entering the indicator calculation logic. Thirdly, read the file size field recorded in the file header, and compare it with the actual number of bytes read to determine whether there is a problem of file content loss or truncation. Finally, for data files with a checksum field, the system recalculates the check value based on the content, and compares it with the checksum given in the file header to verify the integrity of the content.
合法性校验过程均在流处理框架中以流式函数实现,具备无阻塞、可并行、状态可维护的特点。若某一文件记录未通过上述任一校验条件,则该记录在不触发异常的情况下被过滤掉,不再参与后续步骤处理,从而保证系统处理链路的稳定性。The validity verification process is implemented as a streaming function within the stream processing framework, offering non-blocking, parallelizable, and stateful execution. If a file record fails any of these verification criteria, it is filtered out without triggering an exception and is not processed in subsequent steps, thus ensuring the stability of the system's processing chain.
通过该步骤的引入,得到了显著的技术效果:一是大幅提升了输入数据质量,避免异常文件引发的链路中断或解析失败;二是在流处理框架中以并行过滤方式完成预筛选,充分利用集群资源,实现高性能与高鲁棒性并存;三是将传统批处理过程中的文件预筛选能力迁移至实时数据通道,为大规模厂家异构数据的稳定接入提供了前置保障机制。该实施方式作为步骤103的首个子步骤,为后续解压、解析、建表等操作提供了干净、稳定的数据基础。The introduction of this step yields significant technical benefits: first, it significantly improves the quality of input data, avoiding link interruptions or parsing failures caused by abnormal files; second, it implements pre-screening in a parallel filtering manner within the stream processing framework, fully utilizing cluster resources and achieving both high performance and high robustness; and third, it migrates the file pre-screening capabilities of traditional batch processing to real-time data channels, providing a pre-emptive guarantee mechanism for the stable access of heterogeneous data from large-scale manufacturers. This implementation, as the first sub-step of step 103, provides a clean and stable data foundation for subsequent operations such as decompression, parsing, and table creation.
步骤302:在流处理框架内,对目标文件流进行解压,得到包含厂家标识及文件格式信息的文件实体对象。Step 302: In the stream processing framework, the target file stream is decompressed to obtain a file entity object including the manufacturer identification and file format information.
在本实施例中,步骤302用于对经过合法性校验后的目标文件流执行在线解压操作,以获得包含厂家标识及文件格式信息的文件实体对象。该步骤的实施原因在于,通信厂家生成的性能数据文件通常采用压缩格式进行存储和传输,以减少磁盘占用与网络传输负担。然而,传统批处理架构中常将压缩文件解压为中间文件后再进行解析,这不仅增加了I/O成本,还引入磁盘写入与清理的额外负担,不适用于对低延迟和高吞吐要求极高的实时处理场景。因此,本步骤在流处理框架中引入流式解压机制,旨在实现文件在内存中按需解压、即时转化为可解析对象,从而打通原始压缩数据至结构化指标的实时路径。In this embodiment, step 302 is used to perform an online decompression operation on the target file stream after the legality verification to obtain a file entity object containing the manufacturer's identification and file format information. The reason for implementing this step is that the performance data files generated by the communication manufacturers are usually stored and transmitted in a compressed format to reduce disk occupancy and network transmission burden. However, in traditional batch processing architectures, compressed files are often decompressed into intermediate files before being parsed, which not only increases the I/O cost, but also introduces the additional burden of disk writing and cleaning, and is not suitable for real-time processing scenarios with extremely high requirements for low latency and high throughput. Therefore, this step introduces a streaming decompression mechanism into the stream processing framework, aiming to achieve on-demand decompression of files in memory and immediate conversion into parsable objects, thereby opening up a real-time path from original compressed data to structured indicators.
在一种可能的实施方式中,图4是本发明提供的数据实时解析输出方法的流程示意图之四,如图4所示,步骤302具体包括如下步骤:In one possible implementation, FIG. 4 is a fourth flow chart of the real-time data analysis and output method provided by the present invention. As shown in FIG. 4 , step 302 specifically includes the following steps:
步骤401:根据目标文件流中每个文件记录的文件后缀判定其压缩格式是否为预设后缀中的任一种。Step 401: Determine, based on the file suffix of each file record in the target file stream, whether its compression format is any one of the preset suffixes.
步骤402:在流处理框架内动态调用与压缩格式对应的解压插件,以流式方式逐字节解码而不落盘生成解压后文件。Step 402: Dynamically call the decompression plug-in corresponding to the compression format within the stream processing framework, and decode byte by byte in a streaming manner without writing to the disk to generate a decompressed file.
步骤403:在解码过程中解析解压后文件的文件头或预置标识字段,提取厂家标识和数据文件格式信息。Step 403: During the decoding process, the file header or preset identification field of the decompressed file is parsed to extract the manufacturer identification and data file format information.
步骤404:将解压后文件内容与厂家标识及文件格式信息封装为文件实体对象。Step 404: Encapsulate the decompressed file content, manufacturer identification, and file format information into a file entity object.
在本实施例中,步骤401至步骤404作为步骤302的具体子步骤,详细描述了目标文件流的在线解压过程,如何识别文件压缩格式、执行解压操作、提取关键信息并构建文件实体对象。这一过程不仅实现了压缩数据向可解析对象的实时转化,还为后续解析器的选择与指标抽取提供了结构化的输入载体,具有明确的中间处理目标与技术优势。In this embodiment, steps 401 through 404, as specific substeps of step 302, describe in detail the online decompression process of the target file stream, including identifying the file compression format, performing decompression operations, extracting key information, and constructing a file entity object. This process not only achieves the real-time conversion of compressed data into a parsable object but also provides a structured input vector for subsequent parser selection and metric extraction, achieving clear intermediate processing goals and technical advantages.
首先,步骤401通过分析目标文件的文件名后缀字段,判断其压缩格式是否属于系统支持的预设格式集合。该格式集合可包括但不限于“.tar.gz”、“.zip”、“.gz”和“.tar”等压缩格式,预设支持范围可根据平台运行环境和厂商接入实际进行动态扩展。系统依据文件后缀名匹配压缩格式的方式具备通用性强、无侵入、实现成本低等优点,可快速完成压缩类型识别,作为解压操作的调度入口。First, step 401 analyzes the target file's filename suffix field to determine whether its compression format belongs to a set of pre-set formats supported by the system. This set of formats may include, but is not limited to, compression formats such as ".tar.gz," ".zip," ".gz," and ".tar." The pre-set supported range can be dynamically expanded based on the platform's operating environment and vendor access. The system's method of matching compression formats based on file extensions offers advantages such as high versatility, non-intrusiveness, and low implementation cost. It can quickly identify the compression type and serves as a scheduling entry for decompression operations.
在步骤402中,系统在流处理框架内调用与识别出的压缩格式类型相对应的解压插件,所述插件基于流式解码机制逐字节(或按块)解压文件内容,且不将解压结果落盘,而是通过内存缓冲方式直接在管道中传递。该处理方式有效避免了磁盘I/O瓶颈,保障了解压过程的低延迟和高吞吐特性,尤其适用于实时性能数据处理对计算与响应速度的高要求场景。解压插件按标准接口设计,便于对不同压缩格式解码逻辑进行隔离和扩展。In step 402, the system invokes the decompression plug-in corresponding to the identified compression format within the stream processing framework. This plug-in decompresses the file contents byte by byte (or block by block) using a streaming decoding mechanism. The decompressed results are not written to disk, but instead are passed directly through the pipeline via a memory buffer. This processing approach effectively avoids disk I/O bottlenecks, ensuring low latency and high throughput during the decompression process. It is particularly suitable for scenarios where real-time data processing requires high computational and responsiveness. The decompression plug-in is designed with a standard interface, facilitating the isolation and scalability of decoding logic for different compression formats.
在步骤403中,系统对解压后文件的开头部分进行结构化解析,从中提取厂家标识字段及数据文件格式字段。这些关键信息可能出现在文件头的特定标识段、配置区域或命名字段中,系统依据预置解析规则自动抽取并进行标准化处理。例如,厂家标识可被规范化为“HUAWEI”、“ZTE”、“ERICSSON”等,而数据格式则统一标识为“XML”、“CSV”、“TXT”等。提取出的元信息用于指导后续解析器的选型和数据结构识别。In step 403, the system performs a structured analysis on the beginning of the decompressed file to extract the manufacturer identification field and the data file format field. These key information may appear in a specific identification segment, configuration area or named field in the file header. The system automatically extracts and standardizes them according to preset parsing rules. For example, the manufacturer identification can be standardized as "HUAWEI", "ZTE", "ERICSSON", etc., and the data format is uniformly identified as "XML", "CSV", "TXT", etc. The extracted metadata is used to guide the selection of subsequent parsers and data structure identification.
最后,在步骤404中,系统将上述解压后得到的文件内容,与所提取的厂家标识和数据文件格式信息共同封装为一个文件实体对象。该对象包含原始内容引用、压缩格式标识、来源路径、解压时间戳等字段,具有良好的结构化、可序列化特征,并作为后续解析步骤(如步骤303)的直接输入单元。Finally, in step 404, the system encapsulates the decompressed file content, along with the extracted manufacturer identifier and data file format information, into a file entity object. This object, which contains fields such as the original content reference, compression format identifier, source path, and decompression timestamp, is well structured and serializable, and serves as a direct input unit for subsequent parsing steps (e.g., step 303).
通过实施步骤401至404,本发明实现了对多格式压缩性能文件的统一识别与高效解压,不依赖外部存储落盘过程,具备极强的实时性和资源利用效率。同时,通过在解压阶段即完成元信息抽取与封装,使得后续解析器选择与处理逻辑更加清晰和自动化,提升了解析链路的模块化程度与扩展能力,是实现跨厂家、多格式性能数据实时处理能力的重要技术保障。By implementing steps 401 to 404, the present invention achieves unified recognition and efficient decompression of multi-format compressed performance files, independent of external storage and disk transfer, and exhibits strong real-time performance and resource efficiency. Furthermore, by extracting and encapsulating metadata during the decompression phase, subsequent parser selection and processing logic becomes clearer and more automated, enhancing the modularity and scalability of the parsing chain. This is a crucial technical guarantee for achieving real-time processing of cross-vendor, multi-format performance data.
步骤303:根据文件实体对象中的厂家标识与文件格式,选择匹配的解析器解析文件内容,提取底层计数器数据并附加标签信息,形成标记计数器数据流。Step 303: According to the manufacturer identifier and file format in the file entity object, a matching parser is selected to parse the file content, extract the underlying counter data and append tag information to form a tagged counter data stream.
在本实施例中,步骤303用于对步骤302中获得的文件实体对象执行解析操作,旨在提取数据文件中的底层计数器数据,并为每一计数器数据附加具有语义指示作用的标签信息,最终形成可供下游流式计算使用的标记计数器数据流。该步骤的实施目的在于建立从文件实体到结构化指标源数据之间的映射路径,解决多厂家、多格式数据在实时解析过程中存在的解析器适配复杂、语义缺失、结构不一致等关键问题。In this embodiment, step 303 parses the file entity object obtained in step 302, extracting the underlying counter data from the data file and attaching semantically indicative tags to each counter data point, ultimately forming a labeled counter data stream for downstream streaming computation. This step aims to establish a mapping path from the file entity to the structured indicator source data, resolving key issues such as complex parser adaptation, missing semantics, and inconsistent structure that arise in the real-time parsing of multi-vendor, multi-format data.
在一种可能的实施方式中,图5是本发明提供的数据实时解析输出方法的流程示意图之五,如图5所示,步骤303具体包括如下步骤:In one possible implementation, FIG5 is a fifth flow chart of the real-time data analysis and output method provided by the present invention. As shown in FIG5 , step 303 specifically includes the following steps:
步骤501:预先在流处理框架中维护解析器路由表,解析器路由表将每一厂家标识与文件格式的组合键映射至对应的解析器类。Step 501: A parser routing table is maintained in advance in the stream processing framework. The parser routing table maps a combination key of each manufacturer identifier and file format to a corresponding parser class.
步骤502:当接收到文件实体对象时,查询解析器路由表并动态加载与该组合键匹配的解析器类。Step 502: When the file entity object is received, the parser routing table is queried and the parser class matching the composite key is dynamically loaded.
步骤503:当数据文件格式为层次化标记文件格式时,调用根据SAX的XML解析器,且当数据文件格式为定长文本或逗号分隔格式时,调用根据行列解析的CSV解析器。Step 503: When the data file format is a hierarchical markup file format, an XML parser based on SAX is called, and when the data file format is a fixed-length text or comma-delimited format, a CSV parser based on row and column parsing is called.
步骤504:利用解析器解析文件内容,获取包含计数器名称-值对的原始计数器数据。Step 504: parse the file content using a parser to obtain raw counter data including counter name-value pairs.
步骤505:为每一计数器数据追加标签信息,标签信息至少包括厂家标识、数据采样时间以及由文件名解析得到的网元标识或小区标识。Step 505: Add tag information to each counter data. The tag information at least includes the manufacturer ID, data sampling time, and the network element ID or cell ID obtained by parsing the file name.
步骤506:将附加标签信息后的计数器数据封装为标记计数器数据流。Step 506: Encapsulate the counter data with the tag information added into a marked counter data stream.
在本实施例中,步骤501至步骤506为步骤303的具体子步骤,旨在细化文件实体对象的解析流程,完成从解析器匹配到标记计数器数据流生成的全过程。该流程解决了多厂家、多格式数据在实时流处理中的异构解析问题,确保每条输出的数据不仅具备数值意义,同时也携带完整的业务标签信息,具备良好的可识别性和可处理性,是后续拆分与指标计算的基础。In this embodiment, steps 501 through 506 are specific substeps of step 303, designed to refine the parsing process for file entity objects and complete the entire process from parser matching to generation of the tag counter data stream. This process addresses the heterogeneous parsing of multi-vendor, multi-format data in real-time stream processing, ensuring that each output data item not only has numerical significance but also carries complete business tag information, ensuring good identifiability and processability, and forming the foundation for subsequent segmentation and indicator calculation.
首先,在步骤501中,系统在流处理框架内部预先维护了解析器路由表。该路由表以“厂家标识+文件格式”作为组合键,将其映射至相应的解析器类。例如,厂家标识为“HUAWEI”且格式为“XML”的组合将指向专用于解析华为XML结构的SAX解析器类;若格式为“CSV”,则映射至按字段位置解析的CSV解析器类。该路由表可由系统配置模块进行动态更新,支持厂家扩展与格式演进,具备良好的通用性与可维护性。First, in step 501, the system pre-maintains a parser routing table within the stream processing framework. This routing table uses "vendor identifier + file format" as a key, mapping it to the corresponding parser class. For example, a combination of "Huawei" as the vendor identifier and "XML" as the format will be mapped to a SAX parser class dedicated to parsing Huawei XML structures; if the format is "CSV," it will be mapped to a CSV parser class that parses by field position. This routing table can be dynamically updated by the system configuration module, supporting vendor expansion and format evolution, and has good versatility and maintainability.
接着,步骤502中,当系统接收到文件实体对象后,会自动根据其携带的厂家标识与文件格式字段查询解析器路由表,获取对应解析器类并在运行时环境中动态加载。加载完成后,系统根据解压后的文件内容初始化解析器,并将数据流作为输入流传入解析器中执行解析任务。Next, in step 502, when the system receives the file entity object, it automatically queries the parser routing table based on the manufacturer identifier and file format fields it carries, obtains the corresponding parser class, and dynamically loads it in the runtime environment. Once loaded, the system initializes the parser based on the decompressed file contents and passes the data stream as input to the parser to perform the parsing task.
在步骤503中,系统根据数据文件的格式选择适配的解析方式。若文件格式为层次化标记语言(如XML),则解析器采用基于SAX模型的事件驱动式解析逻辑,不断读取标签节点并实时输出解析结果,适用于结构复杂但内存敏感的场景;若格式为定长文本或逗号分隔格式(如CSV),则使用按行读取并按字段位置映射字段名称与计数器值的方法进行解析。通过分类解析方法,有效适应了不同厂家对性能数据的组织差异。In step 503, the system selects an appropriate parsing method based on the data file format. If the file format is a hierarchical markup language (such as XML), the parser uses event-driven parsing logic based on the SAX model, continuously reading tag nodes and outputting parsing results in real time. This is suitable for complex but memory-sensitive scenarios. If the format is fixed-length text or comma-delimited format (such as CSV), the parsing method uses row-by-row reading and mapping field names and counter values by field position. This categorized parsing method effectively adapts to the differences in performance data organization among different manufacturers.
随后,在步骤504中,解析器对文件内容进行遍历,提取出原始计数器数据,通常以“计数器名称–数值对”形式存在。这些原始数据代表了基础网络指标的原始观测量,是构成后续复杂性能指标的底层依据。系统将提取到的每条计数器数据以标准结构存储,并准备附加上下文标签信息。Next, in step 504, the parser traverses the file contents and extracts raw counter data, typically in the form of counter name-value pairs. This raw data represents the raw observations of basic network metrics and forms the underlying basis for subsequent complex performance metrics. The system stores each extracted counter data entry in a standard structure and prepares to attach contextual tags.
在步骤505中,系统为每条原始计数器数据附加标签信息,该标签包括但不限于厂家标识、数据采样时间(由文件名或文件内部字段解析)、网元标识(如eNodeB ID、gNB ID)以及小区标识(如Cell ID)。标签信息的附加过程可通过规则匹配、字段提取与上下文推理等方式自动完成,确保所有数据具备完整语义背景,便于后续按标签进行流拆分与聚合操作。In step 505, the system adds tags to each piece of raw counter data. These tags include, but are not limited to, the manufacturer's identifier, the data sampling time (parsed from the file name or internal fields), the network element identifier (e.g., eNodeB ID, gNB ID), and the cell identifier (e.g., Cell ID). This tagging process is automated through rule matching, field extraction, and contextual reasoning, ensuring that all data has complete semantic context and facilitating subsequent tag-based stream splitting and aggregation.
最后,在步骤506中,系统将每条附加标签后的计数器数据封装为统一结构的标记计数器数据单元,并构成标记计数器数据流作为输出。该数据流结构化程度高、字段稳定,已具备直接参与流式处理、窗口计算和维度关联的能力,可作为后续逻辑数据流构建与表化处理的输入源。Finally, in step 506, the system encapsulates each tagged counter data entry into a uniformly structured labeled counter data unit, generating a labeled counter data stream as output. This highly structured data stream, with stable fields, is capable of directly participating in stream processing, window calculations, and dimension association, serving as an input source for subsequent logical data stream construction and table-based processing.
通过步骤501至506的实施,系统实现了从文件实体对象向标记计数器数据流的端到端自动转换。其技术效果体现在以下几个方面:一是通过双因子解析器匹配机制,实现了面向多厂家多格式的统一解析入口;二是解析过程全程流式运行,支持高并发、高吞吐的实时计算需求;三是输出数据具备完备标签体系,为多维度分析与动态路由奠定结构基础,显著提升整体指标输出链路的自动化与可扩展能力。Through the implementation of steps 501 to 506, the system achieves end-to-end automatic conversion from file entity objects to tagged counter data streams. Its technical benefits are reflected in the following aspects: First, through a dual-factor parser matching mechanism, a unified parsing entry point for multiple vendors and formats is achieved; second, the parsing process runs in a streaming manner throughout, supporting high-concurrency, high-throughput real-time computing requirements; and third, the output data has a complete labeling system, laying the structural foundation for multi-dimensional analysis and dynamic routing, significantly improving the automation and scalability of the overall indicator output link.
步骤304:根据标签信息对标记计数器数据流进行拆分,生成至少包括小区级数据流和基站级数据流的多条逻辑数据流。Step 304: Split the marker counter data stream according to the label information to generate multiple logical data streams including at least a cell-level data stream and a base station-level data stream.
在本实施例中,步骤304用于对步骤303中生成的标记计数器数据流进行进一步的逻辑拆分,形成多条具有明确语义层级的逻辑数据流,至少包括小区级数据流和基站级数据流。该步骤的设置目的在于应对无线网络性能数据天然存在的多层级网元结构,如小区(Cell)、基站(gNodeB)、区域(Region)等,若不在数据解析阶段就按标签维度进行分类,后续在指标建模、聚合计算等阶段将面临数据混杂、处理复杂度高、语义冲突等问题。因此,在数据流形成初期进行逻辑拆分,生成多条逻辑数据流,是实现高效流计算与精准指标输出的必要条件。In this embodiment, step 304 is used to further logically split the tag counter data stream generated in step 303 to form multiple logical data streams with clear semantic hierarchies, including at least cell-level data streams and base station-level data streams. This step is designed to address the inherent multi-level network element structure of wireless network performance data, such as cells, gNodeBs, and regions. If classification by tag dimension is not performed during the data parsing phase, subsequent stages such as indicator modeling and aggregate calculations will face problems such as data congestion, high processing complexity, and semantic conflicts. Therefore, logically splitting the data stream at the initial stage of formation to generate multiple logical data streams is a necessary condition for achieving efficient stream computing and accurate indicator output.
在一种可能的实施方式中,图6是本发明提供的数据实时解析输出方法的流程示意图之六,如图6所示,步骤304具体包括如下步骤:In one possible implementation, FIG6 is a sixth flow chart of the method for real-time data analysis and output provided by the present invention. As shown in FIG6 , step 304 specifically includes the following steps:
步骤601:在流处理框架中设置侧输出路由单元,路由单元根据每条计数器数据随附的网元层级标签进行匹配。Step 601: a side output routing unit is set in the flow processing framework, and the routing unit performs matching according to the network element level label attached to each counter data.
步骤602:当网元层级标签同时包含小区标识和基站标识时,将标记计数器数据流分配至小区级数据流。Step 602: When the network element level tag includes both the cell identifier and the base station identifier, the marker counter data flow is allocated to the cell level data flow.
步骤603:当网元层级标签仅包含基站标识而不包含小区标识时,将标记计数器数据流分配至基站级数据流。Step 603: When the network element level label only includes the base station identifier but not the cell identifier, the marker counter data flow is allocated to the base station level data flow.
在本实施例中,步骤601至步骤603具体细化了步骤304中标记计数器数据流的拆分逻辑,描述了如何通过设置路由机制对不同网元层级的数据进行分流操作,最终生成结构清晰、语义明确的小区级数据流和基站级数据流。该流程的设计核心在于:无线性能数据在传输和解析过程中可能混杂不同粒度的指标,如果不在早期阶段进行合理区分,后续的建表、聚合与指标生成不仅计算冗余,还可能产生语义误判。因此,通过标签驱动的自动化分流机制,确保数据在进入不同业务处理通道前就已具备清晰的层级归属,是提升流处理效率和指标准确性的关键步骤。In this embodiment, steps 601 to 603 specifically refine the splitting logic of the marked counter data stream in step 304, and describe how to perform diversion operations on data at different network element levels by setting a routing mechanism, ultimately generating cell-level data streams and base station-level data streams with clear structure and clear semantics. The core design of this process is that wireless performance data may be mixed with indicators of different granularities during transmission and parsing. If they are not properly distinguished in the early stages, the subsequent table creation, aggregation, and indicator generation will not only be computationally redundant, but may also result in semantic misjudgments. Therefore, ensuring that data has a clear hierarchical affiliation before entering different business processing channels through a label-driven automated diversion mechanism is a key step in improving stream processing efficiency and indicator accuracy.
首先,在步骤601中,系统在流处理框架中配置了一个侧输出路由单元。该单元通过自定义流处理算子实现,用于解析每一条标记计数器数据中所携带的标签信息,并根据标签中的“网元层级”字段对数据进行实时分流。该字段是在文件解析过程中提取或计算得出的,表征当前计数器数据所属的网络结构层级,例如是否包含小区标识(如Cell ID)或仅包含基站标识(如gNodeB ID)。First, in step 601, the system configures a side-output routing unit within the stream processing framework. This unit, implemented using a custom stream processing operator, parses the tag information carried in each piece of tagged counter data and performs real-time data flow distribution based on the "network element level" field in the tag. This field, extracted or calculated during file parsing, indicates the network hierarchy level to which the current counter data belongs, for example, whether it includes a cell identifier (e.g., Cell ID) or only a base station identifier (e.g., gNodeB ID).
随后,进入步骤602,系统根据标签信息进行条件匹配:当发现某条计数器数据的标签中同时包含小区标识和基站标识时,系统判断其粒度为小区级,具备明确的小区归属关系,因此将该数据路由至小区级数据流中。这类数据通常涉及无线接入链路的关键性能指标,如上行PRB利用率、RRC连接建立成功率等,需在小区层面进行聚合与建模处理。Next, the system proceeds to step 602, where it performs conditional matching based on the tag information. If a counter data entry's tag contains both a cell ID and a base station ID, the system determines that the granularity is cell-level and clearly identifies the cell as belonging to that cell, and therefore routes the data into the cell-level data stream. This type of data typically involves key performance indicators of the radio access link, such as uplink PRB utilization and RRC connection establishment success rate, requiring aggregation and modeling at the cell level.
而在步骤603中,当系统检测到某条计数器数据的标签仅包含基站标识但不包含小区标识时,则判定该数据为基站级粒度。例如某些厂家提供的板卡性能指标或基站电源状态信息,不涉及具体小区划分,此类数据被系统路由至基站级数据流,用于生成站点层面的资源状态、健康监控等指标。In step 603, if the system detects that a counter data tag contains only a base station identifier but not a cell identifier, it determines that the data is at the base station level. For example, board performance indicators or base station power status information provided by certain manufacturers do not involve specific cell divisions. Such data is routed to the base station-level data stream and used to generate site-level resource status and health monitoring indicators.
整个侧输出路由机制在流处理框架内部实现,具备高吞吐、低延迟的处理特性,不引入数据复制或广播过程,确保资源使用效率最优。同时,路由逻辑可通过配置文件动态调整,支持引入更多层级(如区域级、邻区级)或调整标签字段匹配规则,具备良好的灵活性与扩展能力。The entire side-output routing mechanism is implemented within the stream processing framework, offering high throughput and low latency without requiring data replication or broadcasting, ensuring optimal resource utilization. Routing logic can also be dynamically adjusted through configuration files, supporting the introduction of additional levels (such as regional and neighboring levels) or adjustments to label field matching rules, providing excellent flexibility and scalability.
通过实施步骤601至603,系统实现了基于标签驱动的自动化多层级数据拆分机制,带来的技术效果主要包括:其一,确保同一类型的指标数据在相同语义维度下处理,避免误聚合或指标冲突;其二,将拆分逻辑从静态规则迁移至实时流中完成,提升系统响应效率与适应异构数据格式的能力;其三,为后续的逻辑表注册与指标计算提供清晰的数据流分支结构,降低了计算链路的复杂度,是实现大规模无线性能数据指标解耦与高效处理的关键技术环节。By implementing steps 601 to 603, the system implements a label-driven automated multi-level data splitting mechanism, which brings the following technical effects: First, it ensures that indicator data of the same type is processed under the same semantic dimension to avoid misaggregation or indicator conflicts; second, it migrates the splitting logic from static rules to real-time streams, improving the system's response efficiency and ability to adapt to heterogeneous data formats; third, it provides a clear data flow branch structure for subsequent logical table registration and indicator calculation, reducing the complexity of the calculation link. It is a key technical link in achieving the decoupling and efficient processing of large-scale wireless performance data indicators.
步骤305:将各逻辑数据流中的底层计数器数据转换为行记录并根据行记录注册相应表结构,形成表化数据流。Step 305: Convert the underlying counter data in each logical data stream into row records and register the corresponding table structure according to the row records to form a tabular data stream.
在本实施例中,步骤305用于将步骤304中拆分得到的各逻辑数据流进一步转化为具备结构化语义的表化数据流。该步骤的实施原因在于,传统性能数据多以文本或半结构化形式存在,缺乏标准化结构,不利于流处理框架中的算子统一调度与计算。而在Flink等流式处理平台中,通过将数据转化为表结构并注册为临时视图,可借助SQL或Table API对数据进行统一建模、窗口聚合与维度关联,极大提升了指标构建的表达力与计算效率。因此,在逻辑数据流进入计算层之前完成表结构注册,形成表化数据流,是打通“流数据-指标模型-实时计算”链路的必要环节。In this embodiment, step 305 is used to further convert the logical data streams obtained by splitting in step 304 into tabular data streams with structured semantics. The reason for implementing this step is that traditional performance data mostly exists in text or semi-structured form, lacks a standardized structure, and is not conducive to the unified scheduling and calculation of operators in the stream processing framework. In streaming processing platforms such as Flink, by converting data into a table structure and registering it as a temporary view, the data can be uniformly modeled, windowed, and dimensionally associated with SQL or Table API, which greatly improves the expressiveness and computing efficiency of indicator construction. Therefore, completing the table structure registration before the logical data flow enters the computing layer to form a tabular data flow is a necessary link to open up the "stream data-indicator model-real-time calculation" link.
在一种可能的实施方式中,图7是本发明提供的数据实时解析输出方法的流程示意图之七,如图7所示,步骤305具体包括如下步骤:In one possible implementation, FIG. 7 is a seventh flow chart of the method for real-time data analysis and output provided by the present invention. As shown in FIG. 7 , step 305 specifically includes the following steps:
步骤701:针对任一逻辑数据流,解析其中的计数器名称集合,并以采样时间、网元标识、计数器名称三类字段为基础,自动生成字段描述元数据。Step 701: For any logical data flow, parse the counter name set therein, and automatically generate field description metadata based on the three fields of sampling time, network element identifier, and counter name.
步骤702:调用流处理框架的表注册接口,依据字段描述元数据动态创建临时表,临时表以采样时间与网元标识的组合字段为主键。Step 702: Call the table registration interface of the stream processing framework to dynamically create a temporary table based on the field description metadata. The temporary table uses the combined field of sampling time and network element identifier as the primary key.
步骤703:将逻辑数据流中的每条计数器数据与对应标签字段拼接成一条行记录,并实时插入至临时表,得到携带水印时间戳的表化数据流。Step 703: Each counter data in the logical data stream is concatenated with the corresponding tag field into a row record, and inserted into a temporary table in real time to obtain a tabular data stream carrying a watermark timestamp.
在本实施例中,步骤701至步骤703具体实现了步骤305中“将各逻辑数据流转换为表化数据流”的内部流程,详述了如何基于逻辑数据流动态构建字段结构、生成表结构并实现数据写入,以便在流处理框架中支持后续的聚合计算、SQL查询及维度关联等操作。该流程的设置核心在于:原始性能数据源异构性强,不同厂家或格式中计数器字段差异显著,若采用静态字段定义方式将严重限制系统的适配能力和处理弹性。因此,通过在运行过程中自动解析字段并动态注册表结构,是实现高通用性、高实时性指标处理的关键路径。In this embodiment, steps 701 to 703 specifically implement the internal process of "converting each logical data stream into a tabular data stream" in step 305, and detail how to dynamically build a field structure, generate a table structure, and implement data writing based on the logical data stream, so as to support subsequent aggregation calculations, SQL queries, dimension associations and other operations in the stream processing framework. The core of the setting of this process is that the original performance data source is highly heterogeneous, and the counter fields of different manufacturers or formats vary significantly. If a static field definition method is used, the system's adaptability and processing flexibility will be severely limited. Therefore, automatically parsing fields and dynamically registering table structures during operation is a key path to achieving highly versatile and real-time indicator processing.
首先,在步骤701中,系统针对任一拆分后的逻辑数据流(例如小区级或基站级),自动识别并解析该数据流中包含的计数器名称集合。该计数器集合来源于前述解析步骤提取的原始指标字段,并结合标签信息如采样时间、网元标识等元字段形成完整的字段集合。系统以“采样时间”、“网元标识”和“计数器名称”作为核心维度构建字段描述元数据结构,该结构记录字段名、字段类型、是否主键字段等属性,形成表结构定义的基础。First, in step 701, the system automatically identifies and parses the set of counter names contained in any split logical data stream (e.g., cell-level or base station-level). This set of counters is derived from the original indicator fields extracted in the previous parsing step and combined with tag information such as sampling time and network element ID to form a complete field set. The system constructs a field description metadata structure using "sampling time," "network element ID," and "counter name" as core dimensions. This structure records attributes such as field name, field type, and whether it is a primary key field, forming the basis for table structure definition.
随后,进入步骤702,系统通过调用流处理框架(如Flink Table API)提供的表注册接口,基于步骤701中生成的字段描述元数据,动态创建临时表结构。临时表不依赖物理数据库,仅作为逻辑视图存在于流处理运行时环境中,但具备完整的字段定义和主键约束。其中,系统将“采样时间”和“网元标识”字段的组合作为逻辑主键,确保相同时间与网络对象的计数器记录唯一识别,并支持基于这些主键字段的窗口聚合与分组操作。表结构注册后,系统即完成了从“原始数据字段集”向“标准表结构”的动态转化,为后续基于SQL或窗口函数的流式计算提供结构支撑。Then, entering step 702, the system dynamically creates a temporary table structure based on the field description metadata generated in step 701 by calling the table registration interface provided by the stream processing framework (such as Flink Table API). The temporary table does not rely on the physical database and only exists as a logical view in the stream processing runtime environment, but has complete field definitions and primary key constraints. Among them, the system uses the combination of the "sampling time" and "network element identification" fields as the logical primary key to ensure that counter records of the same time and network objects are uniquely identified, and supports window aggregation and grouping operations based on these primary key fields. After the table structure is registered, the system completes the dynamic conversion from the "original data field set" to the "standard table structure", providing structural support for subsequent streaming calculations based on SQL or window functions.
接下来在步骤703中,系统将逻辑数据流中的每条标记计数器数据按字段描述映射为一条标准化的行记录,并实时插入至已注册的临时表中。行记录中不仅包含各类计数器数值字段,也包含标签字段如厂家标识、小区ID、采样时间等。为满足时序计算要求,系统同时为每条行记录附加事件时间戳,并通过水印机制维护系统当前可接受的最大延迟范围。最终,所有结构化行记录在时间戳驱动下连续生成,形成具备表结构、时间属性与可查询性的表化数据流。Next, in step 703, the system maps each tagged counter data item in the logical data stream into a standardized row record based on the field description and inserts it into a registered temporary table in real time. The row record contains not only the various counter value fields, but also tag fields such as manufacturer identification, cell ID, and sampling time. To meet time series calculation requirements, the system also adds an event timestamp to each row record and uses a watermark mechanism to maintain the system's current maximum acceptable latency range. Ultimately, all structured row records are continuously generated, driven by timestamps, forming a tabular data stream with table structure, time attributes, and queryability.
通过上述步骤701至703的实现,系统完成了逻辑数据流向表化数据流的结构重建,带来的技术效果包括:其一,利用自动字段解析与动态表结构注册机制,大幅提升系统对多种数据格式、不同指标维度的适配能力;其二,通过逻辑主键与结构映射,统一了数据语义表达形式,便于后续指标开发与维护;其三,通过时间戳与水印嵌入,实现了流式数据在窗口处理与延迟容忍中的时间一致性保障。整体上,该流程有效支持了海量异构性能数据的结构化接入与高效流计算,是构建大规模指标输出系统中的核心步骤之一。By implementing steps 701 to 703 above, the system completes the structural reconstruction of the logical data flow into a tabular data flow, resulting in the following technical effects: First, by utilizing automatic field parsing and a dynamic table structure registration mechanism, the system's adaptability to multiple data formats and different indicator dimensions is greatly improved; second, by mapping logical primary keys to structures, the data semantic expression is unified, facilitating subsequent indicator development and maintenance; third, by embedding timestamps and watermarks, temporal consistency is ensured for streaming data in window processing and delay tolerance. Overall, this process effectively supports the structured access and efficient stream computing of massive amounts of heterogeneous performance data, and is one of the core steps in building a large-scale indicator output system.
步骤306:在预设时间窗口内对表化数据流进行聚合计算,并临时关联维度表,得到指标数据流。Step 306: Perform aggregation calculation on the tabular data stream within a preset time window and temporarily associate it with the dimension table to obtain an indicator data stream.
在本实施例中,步骤306用于对步骤305中生成的表化数据流进行窗口聚合计算,并在聚合基础上临时关联维度表,最终生成用于表征目标对象性能状态的指标数据流。该步骤作为整个数据实时解析链路的末端计算单元,核心目标在于将大量底层计数器数据转化为具备业务语义的关键性能指标(如接通率、掉线率、资源利用率等),并通过关联维度信息补全地理、厂家等上下文字段,从而使输出结果具备业务可用性和系统可消费性。In this embodiment, step 306 performs windowed aggregation on the tabular data stream generated in step 305 and temporarily associates dimension tables based on the aggregation, ultimately generating an indicator data stream representing the performance status of the target object. This step, as the terminal computing unit of the entire real-time data analysis chain, aims to transform large amounts of underlying counter data into key performance indicators (KPIs) with business semantics (such as connection rate, drop rate, and resource utilization). By associating dimension information with contextual fields such as geography and manufacturer, the output results are business-usable and system-consumable.
在一种可能的实施方式中,图8是本发明提供的数据实时解析输出方法的流程示意图之八,如图8所示,步骤306具体包括如下步骤:In one possible implementation, FIG8 is a flowchart of the eighth method for real-time data analysis and output provided by the present invention. As shown in FIG8 , step 306 specifically includes the following steps:
步骤801:为表化数据流配置滚动时间窗口,滚动时间窗口的长度为预设时长,步长与长度一致。Step 801: Configure a rolling time window for the tabular data stream. The length of the rolling time window is a preset time length, and the step length is consistent with the length.
步骤802:在每一滚动时间窗口内,按照网元标识和计数器名称对行记录进行分组,并计算各组计数器值的求和、最大值和最小值,得到窗口聚合结果。Step 802: In each rolling time window, the row records are grouped according to the network element identifier and the counter name, and the sum, maximum value and minimum value of the counter values in each group are calculated to obtain the window aggregation result.
步骤803:在输出窗口聚合结果之前,根据窗口结束时间的水印时间戳,将窗口聚合结果与包含地市、区县及厂家信息的维度表进行临时关联,补充区域维度及厂家维度字段。Step 803: Before outputting the window aggregation result, the window aggregation result is temporarily associated with the dimension table containing the city, district, county and manufacturer information according to the watermark timestamp of the window end time, and the regional dimension and manufacturer dimension fields are supplemented.
步骤804:将关联后的窗口聚合结果作为指标数据流。Step 804: The associated window aggregation result is used as the indicator data stream.
在本实施例中,步骤801至步骤804进一步细化了步骤306中的关键计算过程,具体描述了如何基于时间驱动机制完成窗口聚合、并结合事件时间的水印策略实现与维度表的临时关联,最终输出具有完整语义的指标数据流。这一子流程不仅实现了计数器数据向性能指标的实时转换,也通过引入动态维度扩展机制提升了结果数据的业务可解释性和分析价值,是无线性能数据计算流程中最具综合性和输出导向的计算环节。In this embodiment, steps 801 through 804 further refine the key computational process in step 306, specifically describing how to implement window aggregation based on a time-driven mechanism, and how to implement temporary associations with dimension tables in conjunction with an event-time watermarking strategy, ultimately outputting a semantically complete indicator data stream. This sub-process not only enables the real-time conversion of counter data into performance metrics but also enhances the business interpretability and analytical value of the resulting data by introducing a dynamic dimension expansion mechanism. It is the most comprehensive and output-oriented computational step in the wireless performance data calculation process.
首先,在步骤801中,系统对表化数据流配置滚动时间窗口。该窗口采用固定长度和固定步长,窗口长度设定为十五分钟,步长与长度一致,确保系统能够每十五分钟输出一次完整的指标计算结果。窗口以事件时间为驱动基础,结合前述表化数据流中携带的时间戳字段运行。系统配合使用水印策略,以容忍数据乱序与网络抖动带来的延迟,确保在最大可接受延迟范围内仍可准确地划分数据归属窗口,保证计算的时效性与准确性。First, in step 801, the system configures a rolling time window for the tabulated data stream. This window uses a fixed length and fixed step size. The window length is set to fifteen minutes, and the step size is consistent with the window length, ensuring that the system can output complete metric calculation results every fifteen minutes. The window is driven by event time and operates in conjunction with the timestamp field carried in the tabulated data stream. The system uses a watermarking strategy to tolerate delays caused by data out-of-order and network jitter, ensuring that data can be accurately windowed within the maximum acceptable latency range, ensuring timely and accurate calculations.
在步骤802中,系统在每一个滚动窗口闭合时,对窗口内的所有数据进行聚合计算。聚合操作以“网元标识+计数器名称”字段组合为分组依据,分别对每一组数据执行预设的聚合函数,包括但不限于:求和(例如累计RRC连接请求数)、最大值(例如15分钟内最大PRB利用率)、最小值(例如最小CQI质量值)等。这些统计结果代表了底层计数器在一个周期窗口内的变化区间,是生成衍生性能指标的核心基础数据。In step 802, when each rolling window closes, the system aggregates all data within the window. This aggregation operation uses the "NE ID + Counter Name" field combination as the grouping basis, applying preset aggregation functions to each data set. These include, but are not limited to, sum (e.g., cumulative RRC connection requests), maximum (e.g., maximum PRB utilization within 15 minutes), and minimum (e.g., minimum CQI quality value). These statistical results represent the variation range of the underlying counter within a periodic window and serve as the core data for generating derived performance indicators.
随后,在步骤803中,为提升指标的语义完整性与业务价值,系统对聚合结果执行维度表关联操作。该关联采用“临时维表”策略,通过事件时间驱动的Temporal Join方式进行匹配,确保每条聚合结果在关联维度信息时使用与其时间戳相一致的维表快照版本。维度表可包括地市、区县、厂家类型、站点归属关系等字段,其数据来源于外部运维系统或配置中心,系统通过广播机制或侧输入机制将维表数据加载至运行时环境中,以实现低延迟访问与匹配。Subsequently, in step 803, to enhance the semantic integrity and business value of the metrics, the system performs a dimension table join operation on the aggregated results. This join employs a "temporary dimension table" strategy, matching via an event-time-driven Temporal Join approach. This ensures that each aggregated result uses a snapshot version of the dimension table that matches its timestamp when joining dimension information. The dimension table may include fields such as prefecture, county, manufacturer type, and site affiliation. The data for this dimension table originates from an external operation and maintenance system or configuration center. The system loads this dimension table data into the runtime environment via a broadcast mechanism or side-input mechanism for low-latency access and matching.
在匹配过程中,系统以网元标识作为连接主键字段,根据该标识找到对应的区域维度与厂家维度信息,并将其补充至当前聚合结果中,形成结构完整、标签丰富的指标数据记录。通过该操作,每条指标结果不仅具备指标名称与数值,还具备明确的时间、空间和运营归属背景。During the matching process, the system uses the network element ID as the primary key field. Based on this ID, it finds the corresponding regional and manufacturer dimension information and adds it to the current aggregation results, forming a complete and richly labeled indicator data record. This operation ensures that each indicator result not only has the indicator name and value, but also has clear time, space, and operational context.
最后,在步骤804中,系统将上述已完成维度补充的聚合结果封装为统一格式的指标数据流,并将其作为最终结果输出至目标存储介质或实时消息通道。指标数据流具备统一字段定义、时序标识、维度信息与性能数值,便于后续系统进行实时展示、历史归档、智能告警等业务操作。Finally, in step 804, the system encapsulates the dimensionally enriched aggregation results into a uniformly formatted indicator data stream and outputs it as the final result to the target storage medium or real-time message channel. The indicator data stream includes uniform field definitions, time series identifiers, dimension information, and performance values, facilitating subsequent system operations such as real-time display, historical archiving, and intelligent alerting.
综上所述,步骤801至804通过构建时间窗口、执行分组聚合、关联维度信息并输出结构化指标数据,实现了从原始数据到可消费指标的最后一跳转换。In summary, steps 801 to 804 achieve the final hop conversion from raw data to consumable indicators by constructing a time window, performing group aggregation, associating dimension information, and outputting structured indicator data.
步骤104:将指标数据流写入目标存储介质和/或消息中间件。Step 104: Write the indicator data stream to the target storage medium and/or message middleware.
在本实施例中,步骤104用于将步骤306(特别是其子步骤804)中生成的指标数据流写入目标存储介质、消息中间件中的至少一者,以供后续系统进行实时消费、展示、归档或告警处理。该步骤的设置动因在于,前述各步骤虽已完成性能数据的解析与指标计算,但若指标结果无法及时传送至下游系统或持久化保存,则无法形成完整的闭环数据服务。因此,指标数据的最终输出与分发不仅是全流程的必要结束点,更是构建高可用、高可视、高响应的实时指标平台的关键技术保障。In this embodiment, step 104 is used to write the indicator data stream generated in step 306 (particularly its substep 804) to at least one of the target storage medium and the message middleware for subsequent system real-time consumption, display, archiving, or alarm processing. This step is motivated by the fact that, although the aforementioned steps have completed performance data analysis and indicator calculation, if the indicator results cannot be promptly transmitted to downstream systems or persisted, a complete closed-loop data service cannot be formed. Therefore, the final output and distribution of indicator data is not only a necessary endpoint of the entire process but also a key technical guarantee for building a highly available, highly visible, and highly responsive real-time indicator platform.
在具体实施过程中,系统根据预设配置将指标数据流的输出目标分为两类:其一为目标存储介质,如关系型数据库(如Greenplum、StarRocks)、分布式数仓(如Hive、Hudi)等;其二为实时消息中间件,如Kafka、Pulsar等,供实时告警模块、数据总线或可视化平台即时订阅使用。指标数据流在输出前根据目标类型被序列化为对应格式,例如Kafka采用Avro或JSON结构,数据库采用批量写入或流式写入接口。During implementation, the system categorizes the output targets of indicator data streams into two types based on pre-set configurations: first, target storage media, such as relational databases (e.g., Greenplum, StarRocks) or distributed data warehouses (e.g., Hive, Hudi); and second, real-time messaging middleware, such as Kafka and Pulsar, for immediate subscription by real-time alerting modules, data buses, or visualization platforms. Before output, indicator data streams are serialized into the corresponding format based on the target type, such as Avro or JSON structures for Kafka and batch or streaming write interfaces for databases.
系统通过在流处理框架中配置Sink算子,以异步、非阻塞方式将指标数据写入目标端。针对消息中间件,系统按指标所属地域或厂家等维度字段设置Topic或分区键,实现数据在消息通道内的有序分布与并行消费。针对数据库类存储,系统则支持自动建表、字段映射与主键去重机制,确保数据写入的一致性与规范性。The system writes indicator data to the target end asynchronously and non-blockingly by configuring a Sink operator within the stream processing framework. For message-based middleware, the system sets topics or partition keys based on dimensional fields such as the region or manufacturer to which the indicator belongs, enabling orderly distribution and parallel consumption of data within the message channel. For database storage, the system supports automatic table creation, field mapping, and primary key deduplication to ensure consistent and standardized data writing.
同时,系统支持多目标输出模式,即指标数据可同时写入多个目标端,例如同时写入Kafka供实时消费与写入数仓供后续历史分析,提升了系统的业务兼容性与数据可复用性。输出路径可通过配置动态选择和切换,具备良好的扩展性和容灾能力。The system also supports multi-target output, meaning that indicator data can be written to multiple destinations simultaneously. For example, it can be written to Kafka for real-time consumption and to a data warehouse for subsequent historical analysis, improving the system's business compatibility and data reusability. Output paths can be dynamically selected and switched through configuration, providing excellent scalability and disaster recovery capabilities.
需要说明的是,在步骤104之后,方法还可以进一步根据指标数据流进行异常检测,并在检测到异常事件时生成并输出结构化告警事件。It should be noted that, after step 104 , the method may further perform anomaly detection based on the indicator data stream, and generate and output a structured alarm event when an abnormal event is detected.
在本实施例中,在完成步骤104中指标数据流的写入操作之后,系统进一步引入了指标异常检测与告警联动的拓展处理流程,以实现对关键性能指标的异常波动进行实时识别,并基于识别结果触发自动化告警联动操作。该步骤的设置目的在于弥补传统无线性能监控系统在实时性和响应机制方面的不足,特别是在高密度网络部署环境下,性能数据更新频率高、故障变化快,若不能在指标异常初期即快速发现并联动处置,往往会导致故障放大、定位延迟和服务损耗等问题。为此,本发明在指标流处理链路中引入流内嵌式异常检测模块,并结合灵活的告警触发机制,构建了从指标流到运维响应的闭环体系。In this embodiment, after completing the write operation of the indicator data stream in step 104, the system further introduces an extended processing flow of indicator anomaly detection and alarm linkage to achieve real-time identification of abnormal fluctuations in key performance indicators, and trigger an automated alarm linkage operation based on the identification results. The purpose of setting this step is to make up for the shortcomings of traditional wireless performance monitoring systems in terms of real-time and response mechanisms, especially in high-density network deployment environments, where performance data updates frequently and faults change rapidly. If the indicator anomaly cannot be quickly discovered and linked in the early stages, it will often lead to problems such as fault amplification, positioning delay and service loss. To this end, the present invention introduces a stream-embedded anomaly detection module in the indicator stream processing link, and combines it with a flexible alarm triggering mechanism to construct a closed-loop system from indicator stream to operation and maintenance response.
在具体实施过程中,系统基于流处理框架,在指标数据流输出路径旁部署一组异常检测算子。该检测算子直接接收指标数据流作为输入,具备低延迟、并发可扩展的处理能力。系统可支持多种检测策略并行运行,包括静态阈值规则、动态变化率监测以及滑动窗口比较等方式。例如,对于接通率、掉线率等关键指标,系统可预设上下限阈值,当某条指标数据的数值超过阈值范围,即触发异常标记。同时,系统还可维护历史窗口数据,计算当前值与过去平均值之间的偏移程度,并设置偏移百分比阈值,若当前值变化过快亦可触发异常识别。此外,对于多维指标联合异常场景,系统支持将多个指标关联建模,当组合关系出现结构性变化(如请求数暴涨而成功率下跌)时,亦可判断为异常事件。In its implementation, the system, based on a stream processing framework, deploys a set of anomaly detection operators alongside the output path of the metric data stream. These operators directly receive the metric data stream as input, offering low-latency, concurrent, and scalable processing capabilities. The system supports multiple detection strategies, including static threshold rules, dynamic rate-of-change monitoring, and sliding window comparisons. For example, for key metrics like call connection rate and call drop rate, the system can preset upper and lower thresholds. When a metric value exceeds the threshold, an anomaly is flagged. The system also maintains historical window data, calculates the deviation between the current value and the historical average, and sets a percentage threshold for the deviation. Rapid changes in the current value can also trigger anomaly detection. Furthermore, for scenarios involving multi-dimensional anomalies, the system supports modeling multiple metrics in a correlated manner. Structural changes in the combined relationships (such as a surge in request count and a decrease in success rate) can be identified as an anomaly.
一旦检测算子识别出异常事件,系统即进入联动处理流程,将异常指标记录封装为结构化告警事件对象,包含异常指标项、指标值、触发时间、网元标识、地域信息、所属厂家等多个字段。系统支持将告警事件通过多种方式进行下发,包括推送至消息中间件的专用告警通道供运维平台实时订阅,也可通过调用告警管理API实现与短信、邮件或即时通信平台的集成通知,甚至联动自动化运维系统执行临时处置动作如配置下发、故障隔离等。Once the detection operator identifies an abnormal event, the system enters a coordinated processing flow, encapsulating the abnormal indicator record into a structured alarm event object, which contains multiple fields such as the abnormal indicator item, indicator value, trigger time, network element identifier, region information, and manufacturer. The system supports distributing alarm events through various methods, including pushing them to a dedicated alarm channel of the messaging middleware for real-time subscription by the operations and maintenance platform, integrating notifications with SMS, email, or instant messaging platforms by calling the alarm management API, and even linking with the automated operations and maintenance system to perform temporary disposition actions such as configuration distribution and fault isolation.
参照图9,图9是本发明提供的数据实时解析输出系统的结构示意图,系统包括:9, FIG9 is a schematic structural diagram of the data real-time analysis and output system provided by the present invention, the system includes:
第一处理模块,用于扫描厂家服务器的预设目录,对于检测到的每一数据文件,生成用于表征数据文件的触发消息,并将触发消息写入消息中间件;The first processing module is used to scan the preset directory of the manufacturer's server, generate a trigger message for representing each data file detected, and write the trigger message to the message middleware;
第二处理模块,用于在流处理框架中,通过数据源组件从消息中间件中实时消费触发消息,按照触发消息中指示的文件路径读取对应的数据文件,形成原始数据流;The second processing module is used to consume trigger messages from the message middleware in real time through the data source component in the stream processing framework, read the corresponding data file according to the file path indicated in the trigger message, and form the original data stream;
第三处理模块,用于在流处理框架中,对原始数据流执行实时转换处理,生成用于表征目标对象性能指标的指标数据流;A third processing module is used to perform real-time conversion processing on the original data stream in the stream processing framework to generate an indicator data stream for representing the performance indicator of the target object;
第四处理模块,用于将指标数据流写入目标存储介质和/或消息中间件。The fourth processing module is used to write the indicator data stream into a target storage medium and/or message middleware.
在一种可能的实施方式中,第二处理模块,还用于:In a possible implementation, the second processing module is further configured to:
由数据源组件订阅消息中间件中与触发消息对应的主题,并按触发消息携带的分区号与偏移量顺序拉取触发消息;The data source component subscribes to the topic corresponding to the trigger message in the message middleware and pulls the trigger message in the order of the partition number and offset carried by the trigger message;
解析每条触发消息中携带的数据文件路径及文件大小信息,通过分布式文件系统接口以分块方式顺序读取数据文件内容;Parse the data file path and file size information carried in each trigger message, and read the data file content sequentially in blocks through the distributed file system interface;
在读取过程中,为每个读取块附加当前处理时间戳并封装为数据元素;During the reading process, the current processing timestamp is appended to each read block and encapsulated as a data element;
将各个数据元素依次输出,得到原始数据流。Output each data element in sequence to obtain the original data stream.
在一种可能的实施方式中,第三处理模块,还用于:In a possible implementation, the third processing module is further configured to:
对原始数据流进行合法性校验,剔除不满足预设规则的文件记录,得到目标文件流;Perform a validity check on the original data stream, remove file records that do not meet the preset rules, and obtain the target file stream;
在流处理框架内,对目标文件流进行解压,得到包含厂家标识及文件格式信息的文件实体对象;In the stream processing framework, the target file stream is decompressed to obtain the file entity object containing the manufacturer identification and file format information;
根据文件实体对象中的厂家标识与文件格式,选择匹配的解析器解析文件内容,提取底层计数器数据并附加标签信息,形成标记计数器数据流;According to the manufacturer identifier and file format in the file entity object, a matching parser is selected to parse the file content, extract the underlying counter data and append tag information to form a tagged counter data stream;
根据标签信息对标记计数器数据流进行拆分,生成至少包括小区级数据流和基站级数据流的多条逻辑数据流;Splitting the marker counter data stream according to the label information to generate multiple logical data streams including at least a cell-level data stream and a base station-level data stream;
将各逻辑数据流中的底层计数器数据转换为行记录并根据行记录注册相应表结构,形成表化数据流;Convert the underlying counter data in each logical data flow into row records and register the corresponding table structure according to the row records to form a tabular data flow;
在预设时间窗口内对表化数据流进行聚合计算,并临时关联维度表,得到指标数据流。Perform aggregation calculations on the tabular data stream within a preset time window and temporarily associate it with the dimension table to obtain the indicator data stream.
在一种可能的实施方式中,第三处理模块,还用于:In a possible implementation, the third processing module is further configured to:
根据目标文件流中每个文件记录的文件后缀判定其压缩格式是否为预设后缀中的任一种;Determine whether the compression format of each file record in the target file stream is any of the preset suffixes according to the file suffix of the file record;
在流处理框架内动态调用与压缩格式对应的解压插件,以流式方式逐字节解码而不落盘生成解压后文件;Dynamically call the decompression plug-in corresponding to the compression format within the stream processing framework, and decode byte by byte in a streaming manner without writing to the disk to generate the decompressed file;
在解码过程中解析解压后文件的文件头或预置标识字段,提取厂家标识和数据文件格式信息;During the decoding process, the file header or preset identification field of the decompressed file is parsed to extract the manufacturer identification and data file format information;
将解压后文件内容与厂家标识及文件格式信息封装为文件实体对象。The decompressed file content, manufacturer identification and file format information are encapsulated into a file entity object.
在一种可能的实施方式中,第三处理模块,还用于:In a possible implementation, the third processing module is further configured to:
预先在流处理框架中维护解析器路由表,解析器路由表将每一厂家标识与文件格式的组合键映射至对应的解析器类;A parser routing table is maintained in the stream processing framework in advance, and the parser routing table maps each combination key of the manufacturer identifier and the file format to the corresponding parser class;
当接收到文件实体对象时,查询解析器路由表并动态加载与该组合键匹配的解析器类;When a file entity object is received, the parser routing table is queried and the parser class that matches the composite key is dynamically loaded;
当数据文件格式为层次化标记文件格式时,调用根据SAX的XML解析器,且当数据文件格式为定长文本或逗号分隔格式时,调用根据行列解析的CSV解析器;When the data file format is a hierarchical markup file format, the XML parser based on SAX is called, and when the data file format is a fixed-length text or comma-delimited format, the CSV parser based on row and column parsing is called;
利用解析器解析文件内容,获取包含计数器名称-值对的原始计数器数据;Use the parser to parse the file content and obtain the raw counter data containing the counter name-value pairs;
为每一计数器数据追加标签信息,标签信息至少包括厂家标识、数据采样时间以及由文件名解析得到的网元标识或小区标识;Add tag information to each counter data, the tag information at least including the manufacturer ID, data sampling time, and the network element ID or cell ID obtained by parsing the file name;
将附加标签信息后的计数器数据封装为标记计数器数据流。The counter data after the tag information is added is encapsulated into a labeled counter data stream.
在一种可能的实施方式中,第三处理模块,还用于:In a possible implementation, the third processing module is further configured to:
在流处理框架中设置侧输出路由单元,路由单元根据每条计数器数据随附的网元层级标签进行匹配;A side output routing unit is set in the flow processing framework, and the routing unit matches the counter data according to the network element level label attached to each counter data;
当网元层级标签同时包含小区标识和基站标识时,将标记计数器数据流分配至小区级数据流;When the network element level label contains both the cell identifier and the base station identifier, the marker counter data flow is assigned to the cell level data flow;
当网元层级标签仅包含基站标识而不包含小区标识时,将标记计数器数据流分配至基站级数据流。When the network element level label only includes the base station identifier but not the cell identifier, the marker counter data flow is allocated to the base station level data flow.
在一种可能的实施方式中,第三处理模块,还用于:In a possible implementation, the third processing module is further configured to:
针对任一逻辑数据流,解析其中的计数器名称集合,并以采样时间、网元标识、计数器名称三类字段为基础,自动生成字段描述元数据;For any logical data flow, the counter name set is parsed and field description metadata is automatically generated based on the three fields of sampling time, network element identifier, and counter name.
调用流处理框架的表注册接口,依据字段描述元数据动态创建临时表,临时表以采样时间与网元标识的组合字段为主键;Call the table registration interface of the stream processing framework to dynamically create a temporary table based on the field description metadata. The temporary table uses the combination of sampling time and network element identifier as the primary key.
将逻辑数据流中的每条计数器数据与对应标签字段拼接成一条行记录,并实时插入至临时表,得到携带水印时间戳的表化数据流。Each counter data in the logical data stream is concatenated with the corresponding label field into a row record and inserted into a temporary table in real time to obtain a tabular data stream with a watermark timestamp.
在一种可能的实施方式中,第三处理模块,还用于:In a possible implementation, the third processing module is further configured to:
为表化数据流配置滚动时间窗口,滚动时间窗口的长度为预设时长,步长与长度一致;Configure a tumbling time window for the tabular data stream. The length of the tumbling time window is the preset duration, and the step size is the same as the length.
在每一滚动时间窗口内,按照网元标识和计数器名称对行记录进行分组,并计算各组计数器值的求和、最大值和最小值,得到窗口聚合结果;In each rolling time window, row records are grouped according to network element ID and counter name, and the sum, maximum and minimum values of the counter values in each group are calculated to obtain the window aggregation result;
在输出窗口聚合结果之前,根据窗口结束时间的水印时间戳,将窗口聚合结果与包含地市、区县及厂家信息的维度表进行临时关联,补充区域维度及厂家维度字段;Before outputting the window aggregation results, temporarily associate the window aggregation results with the dimension table containing prefecture-level cities, districts, counties, and manufacturer information based on the watermark timestamp of the window end time, and supplement the regional dimension and manufacturer dimension fields.
将关联后的窗口聚合结果作为指标数据流。The associated window aggregation results are used as the indicator data stream.
需要说明的是,本发明提供的数据实时解析输出系统,在具体运行时,可以执行上述任一实施例的数据实时解析输出方法,对此本实施例不作赘述。It should be noted that the real-time data analysis and output system provided by the present invention can execute the real-time data analysis and output method of any of the above embodiments during specific operation, which will not be described in detail in this embodiment.
图10是本发明提供的电子设备的结构示意图,如图10所示,该电子设备可以包括:处理器1010(processor)、通信接口1020(CommunicationsInterface)、存储器1030(memory)和通信总线1040,其中,处理器1010,通信接口1020,存储器1030通过通信总线1040完成相互间的通信。处理器1010可以调用存储器1030中的逻辑指令,以执行数据实时解析输出方法,该方法包括:扫描厂家服务器的预设目录,对于检测到的每一数据文件,生成用于表征数据文件的触发消息,并将触发消息写入消息中间件;在流处理框架中,通过数据源组件从消息中间件中实时消费触发消息,按照触发消息中指示的文件路径读取对应的数据文件,形成原始数据流;在流处理框架中,对原始数据流执行实时转换处理,生成用于表征目标对象性能指标的指标数据流;将指标数据流写入目标存储介质和/或消息中间件。FIG10 is a schematic diagram of the structure of an electronic device provided by the present invention. As shown in FIG10 , the electronic device may include: a processor 1010, a communication interface 1020, a memory 1030, and a communication bus 1040. The processor 1010, the communication interface 1020, and the memory 1030 communicate with each other via the communication bus 1040. The processor 1010 may invoke logic instructions in the memory 1030 to execute a real-time data parsing and output method. The method includes: scanning a preset directory of a manufacturer's server, generating a trigger message representing each data file detected, and writing the trigger message to a message middleware; in a stream processing framework, consuming the trigger message from the message middleware in real time through a data source component, reading the corresponding data file according to the file path indicated in the trigger message, and forming a raw data stream; in the stream processing framework, performing real-time conversion processing on the raw data stream to generate an indicator data stream representing the performance indicators of the target object; and writing the indicator data stream to a target storage medium and/or the message middleware.
此外,上述的存储器1030中的逻辑指令可以通过软件功能单元的形式实现并作为独立的产品销售或使用时,可以存储在一个计算机可读取存储介质中。根据这样的理解,本发明的技术方案本质上或者说对现有技术做出贡献的部分或者该技术方案的部分可以以软件产品的形式体现出来,该计算机软件产品存储在一个存储介质中,包括若干指令用以使得一台计算机设备(可以是个人计算机,服务器,或者网络设备等)执行本发明各个实施例方法的全部或部分步骤。而前述的存储介质包括:U盘、移动硬盘、只读存储器(ROM,Read-OnlyMemory)、随机存取存储器(RAM,RandomAccessMemory)、磁碟或者光盘等各种可以存储程序代码的介质。Furthermore, the logic instructions in the aforementioned memory 1030 can be implemented as software functional units and, when sold or used as independent products, can be stored in a computer-readable storage medium. Based on this understanding, the technical solution of the present invention, or the portion that contributes to the prior art, or a portion of the technical solution, can be embodied in the form of a software product. This computer software product, stored in a storage medium, includes instructions for enabling a computer device (which can be a personal computer, server, or network device, etc.) to execute all or part of the steps of the various embodiments of the method of the present invention. The aforementioned storage medium includes various media capable of storing program code, such as a USB flash drive, a mobile hard drive, a read-only memory (ROM), a random access memory (RAM), a magnetic disk, or an optical disk.
另一方面,本发明还提供一种计算机程序产品,计算机程序产品包括存储在非暂态计算机可读存储介质上的计算机程序,计算机程序包括程序指令,当程序指令被计算机执行时,计算机能够执行上述各实施例所提供的数据实时解析输出方法,该方法包括:扫描厂家服务器的预设目录,对于检测到的每一数据文件,生成用于表征数据文件的触发消息,并将触发消息写入消息中间件;在流处理框架中,通过数据源组件从消息中间件中实时消费触发消息,按照触发消息中指示的文件路径读取对应的数据文件,形成原始数据流;在流处理框架中,对原始数据流执行实时转换处理,生成用于表征目标对象性能指标的指标数据流;将指标数据流写入目标存储介质和/或消息中间件。On the other hand, the present invention also provides a computer program product, which includes a computer program stored on a non-transitory computer-readable storage medium, and the computer program includes program instructions. When the program instructions are executed by a computer, the computer can execute the real-time data parsing and output method provided by the above-mentioned embodiments, the method including: scanning the preset directory of the manufacturer's server, and for each data file detected, generating a trigger message for characterizing the data file, and writing the trigger message to the message middleware; in the stream processing framework, consuming the trigger message from the message middleware in real time through the data source component, reading the corresponding data file according to the file path indicated in the trigger message, and forming an original data stream; in the stream processing framework, performing real-time conversion processing on the original data stream to generate an indicator data stream for characterizing the performance indicators of the target object; and writing the indicator data stream to the target storage medium and/or message middleware.
又一方面,本发明还提供一种非暂态计算机可读存储介质,其上存储有计算机程序,该计算机程序被处理器执行时实现以执行上述各实施例提供的数据实时解析输出方法,该方法包括:扫描厂家服务器的预设目录,对于检测到的每一数据文件,生成用于表征数据文件的触发消息,并将触发消息写入消息中间件;在流处理框架中,通过数据源组件从消息中间件中实时消费触发消息,按照触发消息中指示的文件路径读取对应的数据文件,形成原始数据流;在流处理框架中,对原始数据流执行实时转换处理,生成用于表征目标对象性能指标的指标数据流;将指标数据流写入目标存储介质和/或消息中间件。On the other hand, the present invention also provides a non-transitory computer-readable storage medium having a computer program stored thereon, which, when executed by a processor, is implemented to execute the real-time data parsing and output method provided in the above-mentioned embodiments, the method comprising: scanning a preset directory of the manufacturer's server, generating a trigger message for characterizing the data file for each detected data file, and writing the trigger message to the message middleware; in a stream processing framework, consuming the trigger message from the message middleware in real time through the data source component, reading the corresponding data file according to the file path indicated in the trigger message, and forming an original data stream; in the stream processing framework, performing real-time conversion processing on the original data stream to generate an indicator data stream for characterizing the performance indicators of the target object; and writing the indicator data stream to the target storage medium and/or message middleware.
以上所描述的系统实施例仅仅是示意性的,其中作为分离部件说明的单元可以是或者也可以不是物理上分开的,作为单元显示的部件可以是或者也可以不是物理单元,即可以位于一个地方,或者也可以分布到多个网络单元上。可以根据实际的需要选择其中的部分或者全部模块来实现本实施例方案的目的。本领域普通技术人员在不付出创造性的劳动的情况下,即可以理解并实施。The system embodiments described above are merely illustrative. The units described as separate components may or may not be physically separate, and the components shown as units may or may not be physical units. That is, they may be located in one place or distributed across multiple network units. Some or all of the modules may be selected based on actual needs to achieve the objectives of this embodiment. Persons of ordinary skill in the art will be able to understand and implement the present invention without inventive effort.
通过以上的实施方式的描述,本领域的技术人员可以清楚地了解到各实施方式可借助软件加必需的通用硬件平台的方式来实现,当然也可以通过硬件。根据这样的理解,上述技术方案本质上或者说对现有技术做出贡献的部分可以以软件产品的形式体现出来,该计算机软件产品可以存储在计算机可读存储介质中,如ROM/RAM、磁碟、光盘等,包括若干指令用以使得一台计算机设备(可以是个人计算机,服务器,或者网络设备等)执行各个实施例或者实施例的某些部分的方法。Through the description of the above embodiments, those skilled in the art will clearly understand that each embodiment can be implemented using software plus a necessary general-purpose hardware platform, or of course, hardware. Based on this understanding, the essence of the above technical solution, or the portion that contributes to the prior art, can be embodied in the form of a software product. This computer software product can be stored in a computer-readable storage medium, such as ROM/RAM, a magnetic disk, or an optical disk, and includes a number of instructions for causing a computer device (such as a personal computer, server, or network device) to execute the methods of each embodiment or certain portions of the embodiments.
最后应说明的是:以上实施例仅用以说明本发明的技术方案,而非对其限制;尽管参照前述实施例对本发明进行了详细的说明,本领域的普通技术人员应当理解:其依然可以对前述各实施例所记载的技术方案进行修改,或者对其中部分技术特征进行等同替换;而这些修改或者替换,并不使相应技术方案的本质脱离本发明各实施例技术方案的精神和范围。Finally, it should be noted that the above embodiments are only used to illustrate the technical solutions of the present invention, rather than to limit it. Although the present invention has been described in detail with reference to the aforementioned embodiments, those skilled in the art should understand that they can still modify the technical solutions described in the aforementioned embodiments, or make equivalent replacements for some of the technical features therein. However, these modifications or replacements do not deviate the essence of the corresponding technical solutions from the spirit and scope of the technical solutions of the various embodiments of the present invention.
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202510570829.3ACN120086256B (en) | 2025-05-06 | 2025-05-06 | Data real-time analysis output method and system |
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202510570829.3ACN120086256B (en) | 2025-05-06 | 2025-05-06 | Data real-time analysis output method and system |
| Publication Number | Publication Date |
|---|---|
| CN120086256A CN120086256A (en) | 2025-06-03 |
| CN120086256Btrue CN120086256B (en) | 2025-08-15 |
| Application Number | Title | Priority Date | Filing Date |
|---|---|---|---|
| CN202510570829.3AActiveCN120086256B (en) | 2025-05-06 | 2025-05-06 | Data real-time analysis output method and system |
| Country | Link |
|---|---|
| CN (1) | CN120086256B (en) |
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN120596443A (en)* | 2025-08-11 | 2025-09-05 | 珠海翔翼航空技术有限公司 | Standardized processing method, system and equipment for original engineering parameters of flight simulator |
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN113468019A (en)* | 2021-06-28 | 2021-10-01 | 康键信息技术(深圳)有限公司 | Hbase-based index monitoring method, device, equipment and storage medium |
| CN113742298A (en)* | 2021-11-04 | 2021-12-03 | 南京芯传汇电子科技有限公司 | Airborne binary file general parallel analysis method and device and electronic equipment |
| CN119106026A (en)* | 2024-08-29 | 2024-12-10 | 上海浪潮云计算服务有限公司 | A method for data verification and conversion based on Flink |
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| US7254590B2 (en)* | 2003-12-03 | 2007-08-07 | Informatica Corporation | Set-oriented real-time data processing based on transaction boundaries |
| US20120327956A1 (en)* | 2011-06-24 | 2012-12-27 | Altior Inc. | Flow compression across multiple packet flows |
| CN113568677A (en)* | 2021-07-14 | 2021-10-29 | 上海淇玥信息技术有限公司 | Data object analysis method and device and electronic equipment |
| CN116107985A (en)* | 2022-12-30 | 2023-05-12 | 中国邮政储蓄银行股份有限公司 | Data synchronization method, device and data synchronization system under distributed architecture |
| CN118939638A (en)* | 2023-05-09 | 2024-11-12 | 马上消费金融股份有限公司 | Data processing system, method, device and storage medium |
| CN116743558A (en)* | 2023-06-29 | 2023-09-12 | 招商银行股份有限公司 | Concurrent traffic monitoring method, device, terminal equipment and storage medium |
| CN119202100A (en)* | 2024-09-05 | 2024-12-27 | 京东方能源科技股份有限公司 | Real-time processing method and device for equipment operation data of Internet of Things |
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN113468019A (en)* | 2021-06-28 | 2021-10-01 | 康键信息技术(深圳)有限公司 | Hbase-based index monitoring method, device, equipment and storage medium |
| CN113742298A (en)* | 2021-11-04 | 2021-12-03 | 南京芯传汇电子科技有限公司 | Airborne binary file general parallel analysis method and device and electronic equipment |
| CN119106026A (en)* | 2024-08-29 | 2024-12-10 | 上海浪潮云计算服务有限公司 | A method for data verification and conversion based on Flink |
| Publication number | Publication date |
|---|---|
| CN120086256A (en) | 2025-06-03 |
| Publication | Publication Date | Title |
|---|---|---|
| CN120086256B (en) | Data real-time analysis output method and system | |
| CN111526060B (en) | Method and system for processing service log | |
| CN111339071B (en) | Method and device for processing multi-source heterogeneous data | |
| CN100554985C (en) | A kind of method of inspection of north direction interface data integrity | |
| CN111949633A (en) | ICT system operation log analysis method based on parallel stream processing | |
| CN111241182A (en) | Data processing method and device, storage medium and electronic device | |
| CN113094166B (en) | Link tracking method, device, medium and computing equipment | |
| CN105227367A (en) | A kind of warning analysis processing method of low delay | |
| CN105205167A (en) | Log data system | |
| CN113792008B (en) | Method and device for acquiring network topology structure, electronic equipment and storage medium | |
| CN114125049A (en) | Telemetry message processing method, device, equipment and storage medium | |
| CN116126552A (en) | Mass meteorological observation data processing method and device based on Storm | |
| CN118396099A (en) | Micro-service time sequence knowledge graph construction method based on Kubernetes | |
| CN118377768A (en) | Data ETL method, device, equipment and medium based on service flow | |
| CN113141269B (en) | Data acquisition method, device and system | |
| CN118779189A (en) | Data processing method, device, electronic device, storage medium and program product | |
| CN115952142A (en) | System, method, device, processor and storage medium for realizing transaction log storage and message information extraction and summarization in trusted environment | |
| CN117407445B (en) | Data storage method, system and storage medium for Internet of Vehicles data platform | |
| CN117785971A (en) | Multi-satellite situation information processing system and process organization method | |
| CN116383186A (en) | Data processing system and processing method thereof | |
| CN116185298A (en) | Method for distributed storage of logs | |
| CN114398333A (en) | A real-time synchronization method, device, electronic device and storage medium for incremental data | |
| EP2533153B1 (en) | Unit for managing messages indicating event situations of monitored objects | |
| CN118626879B (en) | BUFR data processing method, device and equipment based on template recognition | |
| CN111597198A (en) | A kind of Internet of Things data query method for heterogeneous resource access and related equipment |
| Date | Code | Title | Description |
|---|---|---|---|
| PB01 | Publication | ||
| PB01 | Publication | ||
| SE01 | Entry into force of request for substantive examination | ||
| SE01 | Entry into force of request for substantive examination | ||
| GR01 | Patent grant | ||
| GR01 | Patent grant |