Summary of the invention
Goal of the invention: technical matters to be solved by this invention is for the deficiencies in the prior art, provides a kind of stream data processing method based on event stream processing and plug-in type Development Framework.
In order to solve the problems of the technologies described above, the invention discloses a kind of stream data processing method based on event stream processing and plug-in type Development Framework, comprising the following steps:
Step 1, creates plug-in unit host program;
Step 2, the interface specification between definition plug-in unit host program and plug-in unit;
Step 3, according to the interface specification in step 2, for the data acquisition protocol of various different outfield awareness apparatus, generates the data acquisition plug-in unit corresponding with various data acquisition protocol;
Step 4, creates event stream processing engine based on Esper framework, and as real-time event driver framework, when there being event treatment conditions to occur in flow of event, event stream processing engine triggers custom action;
Step 5 is event stream processing engine definitions time window or length window, the data within the scope of time window definition certain hour or in any one time period, the data of length window definition several events nearest;
Step 6, the data acquisition protocol for various different outfield awareness apparatus realizes the data processing rule corresponding with each data acquisition protocol;
Step 7, definition is registered events also, and define various event for various different outfield awareness apparatus, each class outfield awareness apparatus is defined as an event, and by various event registration in event stream processing engine;
Step 8, is associated with the various data processing rules realized in step 6 respectively by the event of registration in step 7, can automatically trigger relevant custom action when occurring for corresponding event;
Step 9, arranges event handling condition;
Step 10, creates source data queue PackageMessageQueue, deposits the raw data that various different outfield awareness apparatus sends over, and object queue RecordMessageQueue, is used for depositing the target data after processing of decoding;
Step 11, creates source data consuming thread ConsumePackage and target data consuming thread ConsumeRecord:
(1) source data consuming thread ConsumePackage constantly takes out data from source data queue PackageMessageQueue, and according to the type of data, this packet is dressed up the event object of corresponding types, and send to event stream processing engine, trigger corresponding data processing rule when reaching the event handling condition that step 9 is arranged to process, the data obtained after process put into the object queue RecordMessageQueue of step 10;
(2) target data consuming thread ConsumeRecord constantly takes out target data from object queue RecordMessageQueue;
Step 12, target data step 11 obtained exports to destination, such as can export in database, file or message-oriented middleware;
Step 13, creates finger daemon, and finger daemon is responsible for the duty detecting the process of eventstream data engine, initiatively closes flow data handling procedure and restart when the duty of flow data handling procedure occurs exception or quits work.
In the present invention, further, the plug-in unit host program described in step 1 comprises following functions:
(1a) registered plug-ins: plug-in unit is registered before the use in host program, for locating plug-in unit path; What after only succeeding in registration, ability was correct finds plug-in unit path;
(2) anti-registered plug-ins: can register plug-in unit is counter when host program no longer needs to use this plug-in unit time, thus make it no longer become a part for host program;
(3) plug-in unit is enabled: chartered plug-in unit can be enabled;
(4) plug-in unit is forbidden: when not re-using certain plug-in unit, the plug-in unit run can be forbidden.
The function of the data acquisition plug-in unit in step 3 can be realized by dynamic link library technology.
The constructive process of the event stream processing engine described in step 4 is as follows:
(4a) pre-define the corresponding event of various awareness apparatus according to various outfields awareness apparatus, then various event type is registered in configuration surroundings, generate relevant configuration object;
(4b) to configure object for parameter establishment event stream processing engine instance, as the container environment of event handling;
(4c) create or import event handling rule;
(4d) create audiomonitor object, and audiomonitor object is associated with event handling rule;
(4e) event stream processing engine creates the execution environment object of event stream processing, receives flow of event, carries out logic mate in engine with event handling condition;
(4f) execution environment object is responsible for monitoring all events of coming, and triggers corresponding audiomonitor carry out process to outfield awareness apparatus event and obtain target data when the event handling condition that event matches presets.
Data processing rule described in step 6 refers to receive the data of outfield awareness apparatus and decode, to decoded data according to user's request process according to the acquisition protocols of awareness apparatus.
Event described in step 7 is tlv triple <id, attrs, a time>, and wherein id is the unique identification of this event; Attrs represents the attribute of this event, for the event of the different attribute of the data packaging of different outfield awareness apparatus generations, use attrs attribute to identify various different outfield awareness apparatus, event stream processing engine selects corresponding data processing rule by attrs attribute; The time that time presentation of events occurs, identify the time of the data acquisition of various different outfield awareness apparatus, and as a condition entry of event stream processing.
The present invention, by event stream processing and plug-in type Development Framework being combined, provides one more efficient real-time streaming data disposal route more flexibly.Compared with prior art, the beneficial effect had is:
(1) plug-in type Development Framework is incorporated into Data processing by the present invention, and make to have been come by the mode of loading of plug-in the adaptation of various outfields awareness apparatus host-host protocol, more flexibly, extendability is strong.
(2) the present invention adopts the mechanism based on event stream processing, data processing is carried out based on flow of event, be event the data abstraction that will analyze, when can obtain the result of event after event stream processing engine accepts to dependent event according to the input of event and the transaction module of first registers, make data-handling efficiency higher.
By introducing the mechanism of event stream processing, make traditional data processing platform really possess the feature of flow data process, no longer the storage of focused data, but while data flow through event stream processing engine, get the data of user's care.
(3) process owing to converting data to event, statistical study can be carried out to event information according to predefined time window or length window, thus the abnormal data that can find fast to collect (such as become suddenly and diminish etc. greatly, suddenly) responding, operation system is accomplished from controlling self feed back.
(4) by introducing finger daemon, complete the management work of process self, when occur quitting work or duty exception when can restart flow data process software, ensure that flow data process software " is never shut down ".
Embodiment
Fig. 1 is the implementation procedure of the stream data processing method based on event stream processing and plug-in type Development Framework, wherein need to carry out real-time data acquisition process to various different outfield awareness apparatus 1, equipment 2..., equipment N, the data acquisition of various different sensing equipment is responsible for by data acquisition plug-in unit, different outfield awareness apparatus is docked according to different outfield awareness apparatus Data Transport Protocols, image data, the corresponding relation of data acquisition plug-in unit and equipment reads configuration file to obtain by configuration information interface, the raw data collected is put in source data queue, consuming thread A constantly takes out data and according to the type of data, packet is dressed up the event object of corresponding types from source data queue, send to event stream processing engine, event stream processing engine can mate different event handling rules according to various different event type and process event, obtain target data and put in object queue, consuming thread B is responsible for from object queue, taking out target data and inputing to derivation module, derive module, according to no use scenes, target data is exported to different data layouts.
Step 1, creates plug-in unit host program, and host is the environment of carrying plug-in component operation, for the operation of plug-in unit provides the most basic service, receives the service that plug-in unit provides simultaneously.
Plug-in part technology itself is a kind of component technology, this technology is simpler than component technologys such as com, be more prone to when constructing local application system realize, divided by the summation function that needs software in the design and R&D process of software, make program be divided into two major parts: host program and plug-in unit.The functional requirement on basis design in host program, in addition, host program also provides the interface with plug-in unit, enables corresponding plug-in unit carry out exchanges data according to certain rule, thus realizing some functions, Fig. 2 is the architecture of typical plug-in type Development Framework; Plug-in unit is then the assembly realizing partial function one by one, is adjusted the function of software, because plug-in type is relatively independent part, can independently edit by increase and decrease plug-in unit or amendment plug-in unit built-in function.Thus realize the expansion of software function and update.
The major function of host program manages plug-in unit, and major function comprises:
(1) registered plug-ins: plug-in unit must be registered before the use in host program, what after only succeeding in registration, ability was correct finds plug-in unit path.
(2) anti-registered plug-ins: can register plug-in unit is counter when host program no longer needs to use this plug-in unit time, thus make it no longer become a part for host program.
(3) plug-in unit is enabled: chartered plug-in unit can be enabled.
(4) plug-in unit is forbidden: when not re-using certain plug-in unit, the plug-in unit run can be forbidden.
Fig. 3 is the process flow diagram of host program management plug-in unit, carries out initialization operation when the plug-in unit that host program finds current loading is new plug-in unit to this plug-in unit, then will start plug-in unit in plug-in registration to host frame; Loaded before the plug-in unit of the current loading of host program and first first can forbid plug-in unit, and then plug-in unit had been upgraded, after update of plug-in completes, restart plug-in unit.
Step 2, the interface specification between definition plug-in unit host program and plug-in unit.
Host program and plug-in unit can combine work, and set of rule and agreement must be had to operate to make these Program Coordinations of separate sources.Interface specification as a rule between host program and plug-in unit all adopts the mode of abstraction interface to realize.
(1) interface between data acquisition plug-in unit and host program:
| Module effect | ● unified data acquisition interface is provided |
| Module characteristic | Abstract class, for being inherited |
| Class name (object name) | Communicater |
| Primary interface | virtual void run_collect(void)=0; |
By this interface, the collection plug-in unit of various sensor can be linked in host program, calls corresponding plug-in unit according to the type of sensor.
(2) interface between demoder plug-in unit and host program:
| Module effect | ● unified decoding interface is provided |
| Module characteristic | Abstract class, for being inherited |
| Class name (object name) | Decoder |
| Primary interface | virtual intdecode_package(ACE_Message_Block*&mb)=0; |
By this interface, can decode to gathering the sensor raw data of coming, calling corresponding plug-in unit according to the type of sensor.
Step 3, according to the interface specification in step 2, agreement for different outfield awareness apparatus data acquisitions is different, such as agreement 1, agreement 2 ..., agreement N, generating various data acquisition plug-in unit by realizing various acquisition protocols, such as gathering plug-in unit 1, gather plug-in unit 2 ..., gather plug-in unit N.
Illustrate different outfield awareness apparatus and the data acquisition protocol of correspondence thereof below:
Gather plug-in unit and be used for the collecting work of adaptive various different outfields awareness apparatus, according to the exploitation of concrete data acquisition protocol, be mounted in host program by the interface between data acquisition plug-in unit and host program.
Gathering the concrete function of plug-in unit is in communication process, obtain the data that various outfields awareness apparatus sends.Plug-in unit needs to be loaded by host program and managed by host program and control, can dynamic insertion in host program, strengthen the function of original program.The realization of plug-in unit generally has two schemes: the Component Object Model and dynamic link library.Dynamic link library technology is adopted to carry out practical function plug-in unit in the present invention, dynamic link library (DLL) be certain function can executive software module, itself can not independent operating, can only output variable, function or class, the program (host program) of independent operatings can call the function of its inside by other.
Step 4, create event stream processing engine based on Esper framework, Esper is an event stream processing framework, and when there being event condition to occur in flow of event, event stream processing engine can trigger custom action.
Fig. 4 is the constructive process of event stream processing engine, comprises the steps:
(1) process of event stream processing engine is in units of event, so need pre-defined various event in event stream processing engine, the corresponding event of various awareness apparatus is pre-defined in the present invention according to various outfields awareness apparatus, then various event type is registered in configuration surroundings, generates relevant configuration object;
(2) to configure object for parameter establishment event stream processing engine instance, as the container environment of event handling;
(3) create or import event handling rule, event handling rule carries out for the data of various different outfields awareness apparatus the method that processes, different outfield awareness apparatus data processing method methods is different, (such as, under scene based on fixed cycle collecting device data, the processing rule of event is exactly a task of timer, the mean value such as calculating in 5 minutes the ammeter data received, the peak value etc. of ammeter data received in nearest a day), the good post-registration of rule creation is in event stream processing engine.
(4) create audiomonitor object, and audiomonitor object is associated with event handling rule.
(5) event stream processing engine creates the execution environment object of event stream processing, receives flow of event, carries out logic mate in engine with event handling condition.
(6) execution environment object is responsible for monitoring all events of coming, just can trigger corresponding audiomonitor to carry out process to outfield awareness apparatus event obtain target data when the condition that event matches presets, target data is flushed to buffer memory in time, and is persisted to database.Fig. 5 is the implementation of event stream processing engine, event stream processing engine accepts carries out Condition Matching to flow of event to event, can be distributed to different event handling rules process event when condition meets the requirement preset according to different event types.
Step 5 is event stream processing engine definitions time window or length window, and time window is used for the data defined within the scope of certain hour or in certain time period and processes; The data that length window is used for defining nearest how many events process.
Fig. 6 is the processing window schematic diagram of event stream processing engine, by definition length window and event window, statistical study can be carried out to event information according to predefined time window or length window, thus the abnormal data that can find fast to collect (such as become suddenly and diminish etc. greatly, suddenly) responding, operation system is accomplished from controlling self feed back.
Step 6, to agreement 1, agreement 2 ..., agreement N, realizes the data processing method of each agreement, data processing rule 1, data processing rule 2 ..., data processing rule N.
The outfield perception sensing data received carries out decoding the process of line correlation of going forward side by side by the major function of data processing rule.
Step 7, definition is registered events also, various event is defined for various different outfield awareness apparatus, each class outfield awareness apparatus is defined as an event, such as define event 1, event 2 ..., event N respectively for sensing equipment 1, sensing equipment 2 ..., sensing equipment N, and various event type is registered in event stream processing engine.
Event is a tlv triple E=(id, attrs, time), wherein id is the mark of this event, each event has a unique mark, attrs represents the attribute of this event, data for different outfield awareness apparatus generations are packaged into the event of different attribute, this property value is used to be used for identifying various different outfield awareness apparatus, event stream processing engine selects corresponding processing rule by this property value, the time that time presentation of events occurs, the time of the collection of identification data, can as of an event stream processing condition entry.
The event object of public class device//certain equipment
{
String DeviceName; // device name
IntEventId; // event id
public String getDeviceName( )
{returnDeviceName;}
publicintgetEventId( )
{returnEventId}
}
An epimere coded representation device events object the simplest, is assigned different marks for different equipment DeviceName, and EventId is used for the id of identified event, and each event has different No. id.
Step 8, is associated with the various data rule methods realized in preceding step 6 respectively by the event of registration in step 7.So that corresponding event can trigger relevant custom action when occurring automatically.
Step 9, arranges event handling condition.
Step 10, creates source data queue PackageMessageQueue (depositing the raw data that each outfield awareness apparatus sends over) and object queue RecordMessageQueue (being used for depositing the target data after processing of decoding).
Step 11, creates consuming thread.Create source event consuming thread ConsumePackage and target data consuming thread ConsumeRecord.
(1) source event consuming thread ConsumePackage constantly can take out data from source data queue PackageMessageQueue, and according to the type of data, packet is dressed up the event object of corresponding types, and send to event stream processing engine, can trigger corresponding data processing rule when reaching certain predefine condition to process, the data obtained after process put into the object queue RecordMessageQueue of step 10.
(2) target data consuming thread ConsumeRecord constantly can take out data from object queue RecordMessageQueue.
Step 12, the target data that step 11 obtains can export to different destinations according to service needed, such as, by statistical conversion to database, by statistical conversion to file, by statistical conversion to message-oriented middleware.
Step 13, creates finger daemon, and finger daemon is responsible for the heartbeat and the duty that detect flow data treatment progress, can initiatively close residual process and start new process when the duty of flow data treatment progress occurs exception or quits work.
Finger daemon mainly contains 3 functions: (1) stream data handling procedure manages, and comprises and starts/shutoff operation to it; (2) current operating state of flow data handling procedure is obtained; (3) heartbeat message of flow data handling procedure can be obtained.
Embodiment
Below Windows to operate to example explanation.
Step 1, starts host program CollectService;
Step 2, the interface specification between definition plug-in unit host program and plug-in unit; Concrete interface specification is as follows:
(1) interface between data acquisition plug-in unit and host program:
| Module effect | ● unified data acquisition interface is provided |
| Module characteristic | Abstract class, for being inherited |
| Class name (object name) | Communicater |
| Primary interface | virtual void run_collect(void)=0; |
(2) interface between demoder plug-in unit and host program:
| Module effect | ● unified decoding interface is provided |
| Module characteristic | Abstract class, for being inherited |
| Class name (object name) | Decoder |
| Primary interface | virtual intdecode_package(ACE_Message_Block*&mb)=0; |
Step 3, according to the information that configuration load-on module provides, host program determines that the data acquisition plug-in unit that this time operation needs load is: Eda9033.dll, Eda9033.dll, Eda9033.dll, wherein Eda9033.dll plug-in unit uses NODBUS-RTU/DLT645-1997/DLT645-2007 agreement; Eda9033A.dll plug-in unit uses DL/T645-97 agreement; Ex8-33.dll plug-in unit uses DLT645-1997/DLT645-2007 agreement; Ex8.dll plug-in unit uses MODBUS-RTU agreement.Call the run_collect () method of each plug-in unit successively, start the service of collection; The decode_package (ACE_Message_Block* & mb) calling corresponding sensing equipment to the raw data of the sensor collected carries out the decoding of raw data.
Step 4, creates event stream processing engine, as the container environment of event stream processing based on Esper framework.
Step 5, for event stream processing engine definitions time window or length window, time window or length window are the conditions of event stream processing, event stream processing engine reads all event ids in event queue according to this condition and processes event, each event is a tlv triple, comprise (ammeter id, ammeter and corresponding collector attribute attrs, current time time).Attrs represents the attribute of this event, comprises collector IP (ip), port numbers (port), the numbering of ammeter in collector (collectid), decoder number (decodeid), ammeter data (data) etc.If such as need to carry out statistical study to certain outfield awareness apparatus data event of nearest a day, need time window to be set to one day.
Step 6, arranges event handling rule according to different event types (corresponding with the type of outfield awareness apparatus), and event handling rule is realized by EPL language.Such as calculate the mean value of the ammeter data of certain outfield awareness apparatus every day:
selectavg(data)fromSensorEventevery1day.
The mean value of certain sensor data event of above-mentioned Rule (SensorEvent) ammeter data of nearest a day (Data);
Step 7, definition is registered events also, and define various event for various different outfield awareness apparatus, each class outfield awareness apparatus is defined as an event, and by various event registration in event stream processing engine;
Event exists in the form of classes, and the attribute of event is member variable corresponding in class.Fig. 7 is the class figure of certain Sensor Events.Wherein Id is event id, the uniqueness (with outfield perception sensing equipment type association) of presentation of events; CollectTime is the raw data acquisition time; SourceIp is the IP address of the sensing equipment of Data Source; SourcePort is the port of the sensing equipment of Data Source; The ID of CollectId to be the ID of corresponding collection plug-in unit, DecodeId be corresponding decoding plug-in, Data is the ammeter data that actual acquisition obtains.
Step 8, is associated with the various data processing rules in step 6 respectively by the event of registration in step 7, automatically triggers relevant custom action when occurring for corresponding event;
Step 9, arranges event handling condition;
The treatment conditions of event are exactly the value of certain member variable in event class, such as the event in step 7, if the time window arranging event handling is one day, so need when writing processing rule, the span of CollectTime to be set in the scope of nearest a day; If need the data to certain IP collects to process, only need IP to be set to corresponding numerical value.
Step 10, creates source data queue PackageMessageQueue, for depositing the raw data that various different outfield awareness apparatus sends over, and object queue RecordMessageQueue, for depositing the target data after decoding process;
Step 11, creates source data consuming thread ConsumePackage and target data consuming thread ConsumeRecord:
(11a) source data consuming thread ConsumePackage constantly takes out data from source data queue PackageMessageQueue, and according to the type of data, this packet is dressed up the event object of corresponding types, and send to event stream processing engine.
| Module effect | ● unified event packaging interface is provided |
| Module characteristic | Abstract class, for being inherited |
| Class name (object name) | EventWrap |
| Primary interface | virtual intEvent_Wrap(ACE_Message_Block*&mb)=0; |
The raw sensory data collected are created event instance by the constructed fuction calling event class by interface Event_Wrap (ACE_Message_Block* & mb) by ConsumePackage, thus are certain concrete event by raw data packaging.
Trigger corresponding data processing rule when reaching the event handling condition that step 9 is arranged to process, the data obtained after process put into the object queue RecordMessageQueue of step 10;
(11b) target data consuming thread ConsumeRecord constantly takes out target data from object queue RecordMessageQueue;
Step 12, target data step 11 obtained exports to destination;
Step 13, creates finger daemon, and finger daemon is responsible for the duty detecting flow data handling procedure, initiatively closes flow data handling procedure and restart when the duty of flow data handling procedure occurs exception or quits work.
The invention provides a kind of stream data processing method based on event stream processing and plug-in type Development Framework; the method and access of this technical scheme of specific implementation is a lot; the above is only the preferred embodiment of the present invention; should be understood that; for those skilled in the art; under the premise without departing from the principles of the invention, can also make some improvements and modifications, these improvements and modifications also should be considered as protection scope of the present invention.The all available prior art of each ingredient not clear and definite in the present embodiment is realized.