CROSS-REFERENCE TO RELATED APPLICATION This application is related to U.S. patent application Ser. No. 11/195,664 filed Aug. 3, 2005, entitled “Query Processing Method for Stream Data Processing System”, the contents of which are incorporated herein by reference into this application.
CLAIM OF PRIORITY The present application claims priority from Japanese application JP2005-211483filed on Jul. 21, 2005, the contents of which is hereby incorporated by reference into this application.
BACKGROUND OF THE INVENTION The present invention relates to a stream data processing system which executes a query registered beforehand with respect to input stream data and outputs an execution result.
A related art, for example, Motwani, Rajeev; Widom, Jennifer; Arasu, Arvind; Babcock, Brian; Babu, Shivnath; Datar, Mayur; Manku, Gurmeet; Olston, Chris; Rosenstein, Justin; Varma, Rohit, “Query Processing, Resource Management, and Approximation in a Data stream Management System”, CIDR2003, 1st of Aug., 2002, [searched on 22nd of Jun., 2005] internet URL:
http://dbpubs.stanford.edu:8090/aux/index-en.html (hereinafter, referred to as Non-patent Document 1) discloses a stream data processing system which executes a query registered beforehand with respect to input stream data and outputs an execution result.
For example, it is assumed that a customer searches a product. The customer generates a query including information on the product and desired conditions for purchasing the product and registers the generated query in the stream data processing system by using a terminal. On the other hand, each product supplier generates stream data including information on a product to offer and desired condition for offering the product and inputs the generated query in the stream data processing system by using a terminal. The stream data processing system continuously executes the query registered beforehand and determines whether or not the information on the product or the desired condition included in the input stream data is matched with the information on the product and the desired condition included in the query. If the information on the product and the desired condition included in the input stream data is matched with the information on the product and the desired condition included in the query, the stream data processing system transmits the stream data concerned to the terminal of the customer.
As described above, the stream data processing system is suitable for use in a real time application such as an Internet auction or a price trend on stocks in which sequentially input stream data should be processed in real time.
SUMMARY OF THE INVENTION However, in order to execute the query with respect to each of the sequentially input stream data in real time, it is preferred to use a high performance information processing device or a plurality of information processing devices so as to distribute loads. Here, in the case of distributing the loads by using the plurality of information processing devices, a great amount of communication traffic may be occurred by the stream data transferred among the plurality of information processing devices.
Accordingly, it is an object of the present invention to provide a technique which restrains occurrence of communication traffic in the case of distributing loads by using a plurality of information processing devices in the stream data processing system.
In order to solve the above described problems, the stream data processing system according to the invention divides global query, which is a query registered beforehand, into a plurality of processes. Here, as a preferable descriptive method of the query, CQL (Continuous Query Language) shown on the seventh line of the second clause in the above described Non-patentDocument 1 can be mentioned. A feature of the query in the steaming data process is that a window can be assigned so as to quarry a data string to be processed from infinitely continuing stream data. As a representative method of assigning the window, (1) a method of assigning the number of data strings to be quarried and (2) a method of assigning time interval of data strings to be quarried can be mentioned. For example, “Rows50 Preceding” shown in the second clause ofNon-patent Document 1 is a preferable example for quarrying data corresponding to 50 lines as a process target (1) and “Range 15 Minutes Preceding” is a preferable example for quarrying data corresponding to 15 minutes as a process target (2). Hereinafter, a window specified according to the method (1) is called as a tuple-based window and a window specified according to the method (2) is called as time-based window.
Reference number1094 shown inFIG. 15 indicates a tuple-based window whose size is three tuples. The tuple-based window whose size is three tuples holds stream data of arrived last three tuples as a process target. When newprocess target data68 arrives, the oldestprocess target data69 is ejected from the tuple-based window. On the other hand,reference number1601 shown inFIG. 16 indicates a time-based window whose seize is three units of time. The time-based window holds stream data corresponding to last three units of time as a process target. The process target data in which the three units of time are past after arrival is ejected from the time-based window.
Next, the stream data processing system generates local queries each including at least one of the processes divided from the global query and sets each local query to information processing devices different from each other. In addition, the stream data processing system transfers stream data among a plurality of information processing devices in order so as to execute the local query with respect to the stream data in each of the information processing devices. At this time, the entire stream data does not transferred among the information processing devices in order. At least a part of the corresponding stream data (for example, data which is first used for a local query process in an information processing device located in backward according to a processing order) is stored in a storage device. The information processing device reads the target data from the storage device and executes the local query, when the stream data received from an information processing device located in immediately before the information processing device concerned does not include the target data of the local query.
For example, the stream data processing system according to the present invention is a stream data processing system which executes a global query registered beforehand with respect to stream data and outputs an execution result thereof including: i-th (where, 1<i<n) query processing devices each of which executes i-th local query composing the global query; and a proxy device which stores storage data including at least apart of the stream data. The i-th query processing device includes: a data receiving unit which receives stream data in the case of i=1 and receives transfer data from the (i−1)-th query processing device in the case of i≠1; a data registration unit which stores the storage data including at least a part of the received stream data to the proxy device in the case of 1<i<n; a data obtaining unit which accesses the proxy device so as to obtain target data if the storage data includes the target data necessary for processing the i-th local query registered beforehand in the case of 1<i<n; a local query processing unit that executes the i-th local query registered beforehand with respect to the stream data received by the data receiving unit in the case of i=1, and executes an i-th local query registered beforehand with respect to the transfer data received by the data receiving unit and/or the target data obtained by the data obtaining unit in the case of i≠1; and a data transmission unit which transmits a processing result of the local query processing unit, as transfer data, to an (i+1)-th query processing device in the case of i≠n, and transmits the processing result of the local query processing unit to a predetermined device in the case of i=n. The proxy device includes: a storage data storage unit which receives the storage data from the query processing device and stores the received storage data; and a data reading unit which reads target data requested from the query processing device from the storage data stored in the storage data storage unit and transmits the read target data to the query processing device which is a request source.
According to the present invention, for example, a part of stream data (hereinafter, partial stream data) among the input stream data is stored in a storage device. The partial stream data is used first for the local query process in an information processing device located in backward according to the processing order. Therefore, it is possible to prevent the partial stream data from being transfer among a plurality of information processing devices. Accordingly, occurrence of communication traffic can be restrained when distributing loads by using the plurality of information processing devices.
BRIEF DESCRIPTION OF THE DRAWINGSFIG. 1 is a schematic view illustrating a stream data processing system according to an embodiment of the present invention;
FIGS. 2A and 2B are pattern diagrams illustrating a data format ofstream data61;
FIG. 3 is a schematic view illustratingquery processing devices11to in;
FIG. 4 is a schematic view illustrating a localquery execution unit109;
FIG. 5 is a view illustrating an example of hardware configuration of thequery processing devices11to1n;
FIG. 6 is a flow chart illustrating an set operation performed by thequery processing devices11to1nbefore the local query is executed;
FIG. 7 is a flow chart illustrating a query processing operation of a firstquery processing devices11;
FIG. 8 is a flow chart illustrating a query processing operation of second to n-thquery processing devices12to1n;
FIG. 9 is a schematic view illustrating a configuration of aproxy device2;
FIG. 10 is a flow chart illustrating an operation set by theproxy device2 before a global query is executed;
FIG. 11 is a flow chart explaining a global query optimization processing (S2100) shown inFIG. 10;
FIG. 12 is a flow chart illustrating a communication route retrieval processing (S2200) shown inFIG. 10;
FIG. 13 is a flow chart illustrating a query processing operation of theproxy device2;
FIG. 14 is a view illustrating an operation of the stream data processing system shown inFIG. 1;
FIG. 15 is a view illustrating a tuple-based window; and
FIG. 16 is a view illustrating a time-based window.
DESCRIPTION OF THE PREFERRED EMBODIMENTS Hereinafter, an embodiment according to the present invention will be described.
FIG. 1 is a schematic view of a stream data processing system applied in the present embodiment according to the invention.
As shown inFIG. 1, a streamdata processing system1001 according to the present embodiment includes nquery processing devices11to1n(n>1), aproxy device2, and anetwork5 such as LAN or WAN which connects thequery processing devices11to1nwith theproxy device2. In the embodiment as shown inFIG. 1, the streamdata processing system1001 is connected to a steamdata generating device4 and auser terminal3 through thenetwork5.
Theuser terminal3 transmits aquery66 to theproxy device2 and receives a result corresponding to thequery66 from theproxy device2. Thequery66 may be an inquiry of information corresponding to a product supplier matched with a desired condition (product name, desired purchase price, place of delivery, settlement method, and so on) or an inquiry of a price trend on stocks for a predetermined items. Hereinafter, thequery66 in which theuser terminal3 transmits to theproxy device2 will be called as aglobal query66. Since a conventional network terminal such as a web browser can be used as theuser terminal3, a detailed explanation thereof will be omitted.
The steamdata generating device4 generatesstream data61 which is a process target of theglobal query66 and transmits the generatedstream data61 to thequery processing devices11. Thestream data61 may be the information such as desired condition of the product supplier (product name, desired offering price, place of delivery, settlement method, and so on) or stock quotation (brand and price). In the case that the stream data6l is the information corresponding to the desired condition of the product supplier, the streamdata generating device4, for example, may be a network terminal of the product supplier. In addition, in the case that thestream data61 is the stock quotation, the streamdata generating device4, for example, may be a network terminal of a manager of a stock market.
Thestream data61 includes a plurality of segments. One of the segments includes a stream ID which is identification information of thestream data61. In each of the segments excluding the one segment including the stream ID, information serving as a process unit in accordance with theglobal data66 is registered. In addition, a data format of thestream data61 maybe a record format or an XML document format.
FIGS. 2A and 2B are pattern diagrams illustrating the data format of thestream data61.
FIG. 2A exemplifies thestream data61 of a record format. In this case, acolumn611 which configures the record corresponds to a segment. In addition,FIG. 2B exemplifies thestream data61 of an XML document format. In this case, aregion612 surrounded by tags which configures the XML document corresponds to a segment.
In thequery processing devices11to1n, a processing order is allocated. In addition, nlocal queries671, to67nobtained by dividing theglobal query66 into n processes are respectively allocated to thequery processing devices11to1naccording to an execution order. Specifically, a firstquery processing device11executes thelocal query671, with respect to thestream data61. An i-th query processing device1i(where, 1<i<n−1) transmits a query result as transfer data62ito the (i+1)-thquery processing device1i+1. The (i+1)-thquery processing device1i+executes an (i+1)-thlocal query67i+1with respect to the transfer data62i. The n-thquery processing device1ntransmits a query result as aprocessing result65 to theproxy device2.
FIG. 3 is a schematic view of thequery processing devices11to1n.
As shown inFIG. 3, each of thequery processing devices11to1nincludes a proxydevice access unit101 which communicates with theproxy device2 through thenetwork5, a query processingdevice access unit102 which communicates with anotherquery processing device11to1nthrough thenetwork5, adata receiving unit103, adata transmission unit104, a data registration/stamp assignment unit105, a streamdata management unit106, adata obtaining unit107, a localquery optimization unit108, and a localquery execution unit109.
Thedata receiving unit103 receives the transmission data62 from anotherquery processing device11to1n−1, located immediately before ownquery processing device12to1n−1according to the processing order, through the query processingdevice access unit102. However, adata receiving unit103 of the firstquery processing device11receives thestream data61 from the streamdata generating device4 through the query processingdevice access unit102.
Thedata transmission unit104 transmits a processing result of the localquery execution unit109 to be described later, as transfer data62, to anotherquery processing device12to1nlocated immediately after ownquery processing device11to1n−1according to the processing order, through the query processingdevice access unit102. However, thedata transmission unit104 of the n-thquery processing device1ntransmits a processing result of the localquery execution unit109, as theprocessing result65 of the global query, to theproxy device2, through the query processingdevice access unit102.
The data registration/stamp assignment unit105 performs following processes in the case of the firstquery processing device11. That is, if thedata receiving unit103 receives thestream data61 from the streamdata generating device4 through the query processingdevice access unit102, the data registration/stamp assignment unit105 quarries the segment predetermined by theproxy device2 from the receivedstream data61. And then, the data registration/stamp assignment unit105 generates a storage data registration request including the quarried segment and a stream ID of the receivedstream data61 and transmits the generated storage data registration request to theproxy device2 through the proxydevice access unit101. In addition, the data registration/stamp assignment unit105 receives a system time stamp which indicates the processing order and an expire time stamp which indicates the lifetime in the stream data processing system from theproxy device2 through the proxydevice access unit101 as a response with respect to the storage data registration request. Thereafter, the data registration/stamp assignment unit105 assigns the received system time stamp and the expire time stamp to thestream data61 in which the predetermined segment is quarried.
A method of setting a system time stamp and an expire time stamp of theproxy device2 will be described with reference toFIGS. 15 and 16. In order to simplify an explanation, in this embodiment, although the time stamp is shown by using sequential numbers in which a value increases one by one such as001,002, . . . as time progresses, it is also preferable to use another time mark method such as a time structure of UNIX (registered trademark) operating system.
Asystem time stamp681 indicates the time in which the stream data arrives in the stream data processing system. Since the stream data is processed by a window operator when the stream data arrives in the stream data processing system, it can be considered that thesystem time stamp681 is the same as a process starting time in the window operator.FIG. 15 shows that processtarget data1 and2 among the processtarget stream data69 of the tuple-basedwindow1094 are arrived in the stream data processing system at the time of001 and002, respectively.
On the other hand, the expiretime stamp682 indicates the lifetime of the stream data in the stream data processing system. There are two methods of representing the expiretime stamp682 in accordance with a method of assigning a window of a process target query.FIGS. 15 and 16 show examples of the window assigned as a tuple-based window (in the case of tuple-based window1094) and a time-based window (in the case of time-based window1601), respectively.
In the case of the tuple-basedwindow1094 as shown inFIG. 15, a value of the expiretime stamp682 can not be determined when the stream data arrives. In the tuple-basedwindow1094, the lifetime of the stream data ejected from the window is determined for the first time when the number of tuples exceeding the number which can be held by the tuple-basedwindow1094 arrive. Accordingly, when the stream data arrives in the stream data processing system, the expiretime stamp682 of the stream data concerned is set to ‘NULL’ asprocess target data2 and3 (69) and process target data4 (68) as shown inFIG. 15. When the process target data4 (68) arrives, the process target data1 (69) is ejected from the tuple-basedwindow1094 because the number of data (the number of tuples) which can be held by the tuple-basedwindow1094 is three. Since thesystem time stamp681 of the process target data4 (68) is ‘004’, the expiretime stamp682 of the process target data1 (69) becomes ‘004’.
In the case of the time-basedwindow1601 as shown inFIG. 16, a value of the expiretime stamp682 is determined when process target data arrives in the time-basedwindow1601. For example, in the embodiment as shown inFIG. 16, since the time-basedwindow1601 holds data corresponding to three units of the time, values of the expiretime stamp682 corresponding to processtarget data1 to3 (69) arrived at the time of ‘001’, ‘002’, ‘003’ become ‘004’, ‘005’, ‘006’, respectively.
In the case of a firstquery processing device11, the streamdata management unit106 performs following processes. That is, the streamdata management unit106 buffers thestream data61, in which the predetermined segment, having the system time stamp and the expire time stamp assigned by the data registration/stamp assignment unit105, is quarried, as process target data. Thereafter, the streamdata management unit106 outputs a process target data to the localquery execution unit109 which will be described later, according to an order indicated by the system time stamp. In addition, in the case of a second to n-thquery processing devices12to1n, the streamdata management unit106 performs following processes. That is, the streamdata management unit106 outputs transfer data621to62n−1received from thedata receiving unit103, as a process target data, to the localquery execution unit109 which will be described later, according to the order indicated by the system time stamp.
In addition, the streamdata management unit106 deletes process target data, in which the lifetime indicated by the expire time stamp expires, from a temporal storage, which is a process buffer, prepared in the localquery execution unit109 included in thequery processing devices11to1n. The method of deleting the process target data will be described later.
In the case of an i-th query processing device1i(where, 2<i<n), thedata obtaining unit107 performs following processes. That is, if new process target data arrives in the streamdata management unit106, thedata obtaining unit107 generates a target data request including a stream ID of the process target data concerned and a segment assignment predetermined by theproxy device2. Thereafter, thedata obtaining unit107 transmits the generated target data request to theproxy device2 through the proxydevice access unit101 and obtains target data from theproxy device2. Thedata obtaining unit107 attaches the obtained target data to the process target data concerned in the streamdata management unit106.
If the localquery optimization unit108 receives a local query candidate from theproxy device2 through the proxydevice access unit101, the localquery optimization unit108 calculates a query processing cost for the case that the local query candidate concerned is executed in the localquery execution unit109 which will be described later, and transmits the calculated query processing cost to theproxy device2. Here, the query processing cost can be calculated, for example, by adding a CPU processing cost of the local query candidate (the number of cycles requested by the CPU so as to execute the local query candidate) to an I/O processing cost (cost necessary for an I/O process issued so as to execute the local query candidate). A method of calculating the processing cost corresponding to the stream data is described in a document ‘Stratis Viglas, Jeffrey F. Naughton:candidate-based query optimization for streaming information sources SIGMOD Conference 2002, pp. 37-48’.
The localquery execution unit109 executes the local query set by theproxy device2 with respect to the process target data sequentially input from the streamdata management unit106, assigns a stream ID of the process target data to a corresponding execution result, and outputs the execution result to thedata transmitting unit104 as transfer data62i.
FIG. 4 is a schematic view of the localquery execution unit109. As shown inFIG. 4, the localquery execution unit109 includes a localquery storage unit1091, a query operatingcondition storage unit1092, and aquery processing engine1093.
The localquery storage unit1091 includes a local query having a query ID. In addition, the query operatingcondition storage unit1092 stores, for each of the local queries stored in the localquery storage unit1091, a record including an operating condition of the query concerned . The record includes a queryID registration field10921 for registering a query ID and an operatingcondition registration field10922 for registering a operating condition. As a preferable example of the operating condition, assignments for an operating time, a stream ID of process target data, the number of process, and a process data amount can be considered.
Aquery processing engine1093 searches a record which satisfies an operating condition registered in thefield10922 from the records stored in the query operatingcondition storage unit1092. Thereafter, thequery process engine1093 reads a local query having a query ID registered in thefield10921 of the searched record from the localquery storage unit1091 and continuously executes the read local query with respect to theprocess target data1095 arrives in thequery processing engine1093.
Thequery processing devices11to1ncan be embodied with a general computer system, as shown inFIG. 5, which includes aCPU901, amemory902, anexternal storage device903 such as an HDD, areading device905 which reads information from aportable recording medium904 such as a CD-ROM or a DVD-ROM, aninput device906 such as a keyboard or a mouse, anoutput device907 such as a display, acommunication device908 for accessing a communication network, and a bus909 which connects the above-described devices each other, and by executing a predetermined program loaded on thememory902 by theCPU901. It is preferable that the predetermined program is downloaded in theexternal storage device903 from therecording medium904 through thereading device905 or from the network through thecommunication device908, loaded in thememory902, and executed by theCPU901. In addition, it is also preferable that the predetermined program is directly loaded on thememory902 from therecording medium904 through thereading device905 or from the network through thecommunication device908, and executed by theCPU901.
Next, an operation of thequery processing devices11to1nwill be described.
FIG. 6 is a flow chart illustrating the set operation performed by thequery processing devices11to1nbefore the local query is executed.
If the localquery optimization unit108 receives at least one local query candidate from theproxy device2 through the proxy device access unit101 (YES in step S1001), the localquery optimization unit108 calculates, for each of the received local query candidates, a processing cost in which the localquery execution unit109 executes it (S1002) Thereafter, the localquery optimization unit108 transmits the processing cost of each local query candidate to theproxy device2 through the proxy device access unit101 (S1003) And then, the process proceeds to step S1004.
In step S1004, if the localquery execution unit109 receives a local query, a query operating condition, and information related to a processing order of each query processing device from theproxy device2 through the proxy device access unit101 (YES in step S1004), the localquery execution unit109 sets the received local query and the query operating condition in the local query execution unit109 (S1005). In addition, the localquery execution unit109 sets the information related to the processing order of each query processing device in the query processingdevice access unit102. In accordance with the information related to the processing order of each query processing device, the query processingdevice access unit102 recognizes a data input source (the query processing device located immediately before the own query processing device according to the processing order or the stream data generating device) or a data destination (the query processing device located immediately after the own query processing device according to the processing order or the proxy device). Thereafter, the process proceeds to step S1006.
In step S1006, in the case of the first query processing device11(YES in step S1006), the data registration/stamp assignment unit105 waits for an assignment of a segment which configures storage data sent from theproxy device2 through the proxy device access unit101 (YES in step S1007). The assigned segment is set to the data registration/stamp assignment unit105 (S1008). Thereafter, the process proceeds to step S1009.
In step S1009, in the case of the second to n-thquery processing devices12to1n(YES in step S1009), thedata obtaining unit107 waits for an assignment of a segment which configures target data sent from theproxy device2 through the proxy device access unit101 (YES in step S1010) and sets the assigned segment to the data obtaining unit107 (S1011). Thereafter, if the own query processing device includes data to be stored in the proxy device, thedata obtaining unit107 waits for an assignment of a segment which configures storage data sent from theproxy device2 through the proxy device access unit101 (YES in step S1012). And then, thedata obtaining unit107 sets the assigned segment to the data registration/stamp assignment unit105 (S1013) and the process are terminated.
FIG. 7 is a flow chart illustrating a query processing operation of the firstquery processing device11.
If thedata receiving unit103 receives thestream data61 from the streamdata generating device4 through the query processing device access unit102 (S1101), thedata receiving unit103 informs thestream data61 concerned to the data registration/stamp assignment unit105. The data registration/stamp assignment unit105 quarries the segment which configures the storage data assigned by theproxy device2 from thestream data61 informed from the data receiving unit103 (S1102). Thereafter, the data registration/stamp assignment unit105 generates a storage data registration request including the extracted segment and the stream ID of thestream data61 informed from thedata receiving unit103, and transmits the generated storage data registration request to theproxy device2 through the proxy device access unit101 (S1103). And then, the data registration/stamp assignment unit105 waits for thesystem time stamp681 and the expiretime stamp682, as a response corresponding to the storage data registration request, sent from the proxy device2 (S1104). Next, the data registration/stamp assignment unit105 adds thesystem time stamp681 and the expiretime stamp682 received from theproxy device2 to thestream data61 from which the segment which configures the above-described storage data is quarried, and informs it to the streamdata management unit106, as the process target data. After receiving thestream data61, the streamdata management unit106 transmits the process target data including thesystem time stamp681 and the expiretime stamp682 to the local query execution unit109 (S1105).
In addition, the localquery execution unit109 executes the local query which satisfies the query operating condition set by theproxy device2 with respect to theprocess target data68 transferred from the streamdata management unit106 along the order of the system time stamp681 (S1106). And then, the localquery execution unit109 transmits an execution result thereof to thedata transmission unit104. After receiving the execution result, thedata transmission unit104 generates transfer data62 including the stream ID of theprocess target data68, thesystem time stamp681 and the expiretime stamp682, and the execution result of the local query corresponding to theprocess target data68 concerned, and transmits the generated transfer data62 to the secondquery processing device12through the query processing device access unit102 (S1107). In addition, the streamdata management unit106 deletes process target data which has the expire time stamp whose value is older than the system time stamp of theprocess target data68 read by the localquery execution unit109 and being a current process target, from a temporal storage which is a process buffer prepared in the local query execution unit included in the own query processing device (S1108).
FIG. 8 is a flow chart illustrating a query processing operation of the second to n-thquery processing devices12to1n.
If thedata receiving unit103 receives the transfer data62 from thequery processing device11to1n−1located immediately before theown processing device12to1naccording to the processing order, through the query processing device access unit102 (S1401), thedata receiving unit103 informs a stream ID included in the transmission data62 concerned to thedata obtaining unit107. If a segment which configures the target data is assigned by the proxy device2 (YES in step S1402), thedata obtaining unit107 generates a target data request including the stream ID informed by thedata receiving unit103 and the assignment of the segment which configures the target data, and transmits the generated target data request to theproxy device2 through the proxy device access unit101 (S1403). If thedata obtaining unit107 receives the target data from the proxy device2 (S1404), thedata obtaining unit107 informs the received target data to thedata receiving unit103. On the other hand, if theproxy device2 does not assign a segment which configures the target data (NO in step S1402), thedata obtaining unit107 informs that the target data is not necessary to thedata receiving unit103. If thedata receiving unit103 receives the target data from thedata obtaining unit107, thedata receiving unit103 attaches the target data to the transfer data62 and transmits the transfer data62 as theprocess target data68 to the local query execution unit109 (S1405). On the other hand, if thedata receiving unit103 receives from the data obtaining unit107 a notice that the target data is not necessary, thedata receiving unit103 transmits the transfer data62 as theprocess target data68 to the local query execution unit109 (S1406).
In addition, the localquery execution unit109 executes a local query which satisfies the query operating condition set by theproxy device2 with respect to the transmitted process target data68 (S1407). Thereafter, the localquery execution unit109 transmits an execution result thereof with the stream ID, thesystem time stamp681, and the expiretime stamp682 of theprocess target data68 concerned to thedata transmission unit104. The localquery execution unit109 also quarries a segment which configures storage data assigned by theproxy device2 from the execution result (S1408). Next, the localquery execution unit109 generates a storage data registration request including the quarried segment and the stream ID, and transmits the storage data registration request to theproxy device2 through the proxy device access unit101 (S1409). Thedata transmission unit104 generates transfer data62i(where, 2<i<n) including the stream ID of theprocess target data68, thesystem time stamp681 and the expiretime stamp682, and the execution result of the local query with respect to theprocess target data68 concerned. Thedata transmission unit104 transmits the generated transfer data62ito thequery processing devices13to1nlocated in immediately after the ownquery processing devices12to1n−1according to the processing order, through the query processingdevice access unit102. Furthermore, in the case of the n-th (last)query processing device1n, thedata transmission unit104 transmits the execution result of the local query with respect to theprocess target data68 concerned, as theprocessing result65 of the global query, to theproxy device2 through the proxy device access unit101 (S1410).
In addition, the streamdata management unit106 deletes process target data, which is read by the localquery execution unit109 and has the expire time stamp whose value is older than the system time stamp of theprocess target data68 which is a current process target, from a temporal storage in the own query processing device (S1411).
Returning toFIG. 1, the explanation will be continued.
Theproxy device2 generates nlocal queries671, to67nby dividing theglobal query66 received from theuser terminal3 into n processes and sets thelocal queries671, to67nto thequery processing devices11to1nin order, respectively. That is, i-thlocal query67iis set to the i-th query processing device1i(where, 1<i<n). In addition, theproxy device2 determines, for each of the segments which configures thestream data61, whether transmitting the segment concerned to desiredquery processing devices11to in through otherquery processing devices11to1nor transmitting the segment concerned to the desiredquery processing devices11to in through theproxy device2. Thereafter, theproxy device2 sets, according to determined contents, the segments (segments each of which configures the storage data) which pass through theproxy device2 to the query processing device1i(where, 1<i<n) and sets the segments (segments each of which configures the target data) which pass through theproxy device2 to the query processing devices1i(where, 1<i<n)which execute the local query with respect to these segments.
FIG. 9 is a schematic view illustrating a configuration of theproxy device2.
As shown inFIG. 9, theproxy device2 includes a query processingdevice access unit201 which communicates with thequery processing device11to1nthrough thenetwork5, a userterminal access unit202 which communicates with theuser terminal3 through thenetwork5, aquery receiving unit203, astamp issue unit204, a storagedata management unit205, a query processing deviceinformation storage unit206, a globalquery optimization unit207, a communicationroute searching unit208, aresult transmission unit209, and adata extracting unit210.
Thequery receiving unit203 receives theglobal query66 and the query operating condition from theuser terminal3 through the userterminal access unit202. The query processing deviceinformation storage unit206 stores information (for example, address or specification) of a query processing device which can be used in the stream data processing system according to this embodiment.
The globalquery optimization unit207 divides theglobal query66 received by thequery receiving unit203 to n processes (where, n≦the number of query processing devices in which the information is stored in the query processing device information storage unit206). The globalquery optimization unit207 generates first to n-thlocal queries671to67nincluding the divided process and sets thelocal queries671to67nto the nquery processing devices11to1nin the order, respectively. Here, the globalquery optimization unit207 is associated with each localquery optimization unit108 and generates thelocal queries671, to67nto be set to thequery processing devices11to1nsuch that entire processing cost can be minimized.
The communicationroute searching unit208 determines a communication route of each segment which configures thestream data61 such that a communication cost at thenetwork5 of each segment can be minimized . In particular, the communicationroute searching unit208 compares, as to the segment subjected to the local query to be executed by the i-th query processing device1i(where, 1<i<n), a communication cost in which the i-thquery processing device1iobtains the segment concerned through the j-th to the (i-1)-thquery processing devices1jto1i−1(where, j<i) with a communication cost in which the i-thquery processing device1iobtains the segment concerned through the j-thquery processing device1jand theproxy device2. And then the communicationroute searching unit208 selects a communication route having a lower communication cost. Thereafter, the communicationroute searching unit208 sets the segment which is quarried from thestream data61 and is transmitted to theproxy device2 as the storage data to the query processing device1j(1<j<n) according to the selected communication route. In addition, the communicationroute searching unit208 sets the segment which is obtained by thequery processing devices12to1nfrom theproxy device2 as target data of the local query, to thequery processing devices12to1n.
If thestamp issue unit204 receives the storage data registration request from the firstquery processing device11through the query processingdevice access unit201, thestamp issue unit204 generates and transmits thesystem time stamp681 which indicates a processing order, the expiretime stamp682, and the stream ID to the firstquery processing device11through the query processingdevice access unit201. In addition, thestamp issue unit204 informs at least one segment included in the storage data registration request concerned to the storagedata management unit205 with the generatedsystem time stamp681, the expiretime stamp682, and the stream ID.
The storagedata management unit205 registers the stream ID, at least the one segment, thesystem time stamp681, and the expiretime stamp682 from thestamp issue unit204 in acache unit2051 as storage data. In addition, the storagedata management unit205 moves storage data which has been not accessed during the predetermined time from thecache unit2051 to thestorage unit2052.
Thedata extracting unit210 searches, in response to the target data request received from the second to n-thquery processing devices12to1nthrough the query processingdevice access unit201, storage data which has the stream ID included in the target data request concerned from the storagedata management unit205 , and extracts data of the segment assigned in the target data request concerned from the searched storage data. Thereafter, thedata extracting unit210 transmits the extracted data of the segment, as the target data, to a transmission source of the target data request concerned.
Theresult transmission unit209 transmits the processing result of the global query received from the n-thquery processing device1nthrough the query processingdevice access unit201 to theuser terminal3 through the userterminal access unit202.
The above-describedproxy device2, in the same way as thequery processing devices11to1n, can be realized, for example, by theCPU901 of the computer system as shown inFIG. 5 executing a predetermined program loaded in thememory902 . It is preferable that the predetermined program is downloaded to theexternal storage device903 from therecording medium904 through thereading device905 or from the network through thecommunication device908, loaded in thememory902, and executed by theCPU901. In addition, it is also preferable that the predetermined program is directly loaded in thememory902 from therecording medium904 through thereading device905 or from the network through thecommunication device908 and executed by theCPU901.
Next, operations of theproxy device2 having the above-described configuration will be described.
FIG. 10 is a flow chart illustrating an set operation performed by theproxy device2 before the global query is executed.
If the globalquery optimization unit207 receives aglobal query66 with the query operating condition from theuser terminal3 through the user terminal access unit202 (YES in step S2000), the globalquery optimization unit207 executes a global query optimization processing which will be described later so as to set thelocal queries671to671with the query operating condition in thequery processing devices11to1n(S2100). The communicationroute searching unit208 executes a communication route retrieval processing, which will be described later, so as to determine the communication route of thestream data61 output from the streamdata generating device4. And then, communicationroute searching unit208 sets the segment which configures the storage data to thequery processing device11according to the determination contents, and sets the segment which configures the target data to thequery processing devices11to1nwhich execute the local query with respect to the segment concerned(S2200).
FIG. 11 is a flow chart explaining the global query optimization processing (S2100) shown inFIG. 10.
First, the globalquery optimization unit207 divides theglobal query66 received from theuser terminal3 through the userterminal access unit202 in a process unit (S2101). The globalquery optimization unit207 checks whether or not the number of division of the global query is less than the number of query processing devices stored in the query processing device information storage unit206 (S2102). If the number of division of global query is less than the number of query processing devices (YES in step S2102), the process proceeds to step S2103. On the other hand, if the number of division of global query is not less than the number of query processing devices (NO in step S2102), the process proceeds to step S2111.
Instep S2103, the globalquery optimization unit207 randomly selects, with reference to the query processing deviceinformation storage unit206, the query processing devices corresponding to the above-described number of division of the global query among the query processing devices in which in formation is stored in the query processing deviceinformation storage unit206, randomly assigns a processing order to each of the selected query processing devices, to generate a candidate of a device group which will process the global query.
In addition, the globalquery optimization unit207 generates, for every process unit divided from the global query in step S2101, a local query candidate including the process unit concerned. Thereafter, the globalquery optimization unit207 assigns a processing order to each local query candidate (S2104). Here, each processing order assigned to the local query candidate may be an execution order (mentioned order) of the process included in the local query candidate concerned in the global query.
Next, the globalquery optimization unit207 allocates the local query candidates generated in step S2104 to the query processing devices selected in step S2103 (S2105) In particular, the i-th local query candidate (where, 1<i<n=the number of division of the global query) is allocated to i-th query processing device.
Next, the globalquery optimization unit207 transmits, to each of the query processing devices selected in step S2103, the local query candidate allocated to the query processing device concerned through the query processingdevice access unit201. Thereafter, the globalquery optimization unit207 receives, from each query processing device, the processing cost in the case that the query processing device concerned executes the local query candidate allocated to the own query processing device, through the query processing device access unit201 (S2106).
Next, the globalquery optimization unit207 calculates total sum of the processing costs received from each query processing device selected in step S2103 and the consider the total sum of the processing costs as the total processing cost of the device group candidate generated in step S2103 (S2107).
In addition, the globalquery optimization unit207 determines, with reference to the query processing deviceinformation storage unit206, whether or not a new device group candidate (combination of non-selected devices) can be generated by using the query processing device whose information is stored in the query processing device information storage unit206 (S2108). It is determined that the new device group candidate can be generated (YES in step S2108), the process returns to step S2103 and the process is continued. On the other hand, it is determined that the new device group candidate can not be generated (NO in step S2108), The device group candidate whose total processing cost is minimized is selected from the device group candidates generated in step S2103. And then, the globalquery optimization unit207 transmits, to each of the query processing devices of the selected device group, the local query candidate allocated to the query processing device concerned, as the local query, with information related to the processing order of each query processing device, through the query processing device access unit201 (S2109). In addition, for each query processing device to which the local query is allocated, information on the query processing device concerned and information on the local query allocated to the query processing device concerned are informed to the communicationroute searching unit208.
On the other hand, in step S2111, the globalquery optimization unit207 combines a part of the process units divided from the global query so as to equal the number of division of the global query (the number of processes) and the number of the query processing devices whose information is stored in the query processing deviceinformation storage unit206. Here, the combining process unit to be combined maybe sequential according to the execution order (mentioned order) of the global query. The globalquery optimization unit207 generates, for each process, the local query candidate including the process concerned. Accordingly, the local query candidate is generated as many as the number of the query processing devices whose information is stored in the query processing deviceinformation storage unit206. Thereafter, the globalquery optimization unit207 assigns the processing order to each local query candidate and generates the local query group candidate. Here, the processing order assigned to the local query candidate may be the execution order (mentioned order) of the process included in the local query candidate concerned in the global query.
Next, the globalquery optimization unit207 randomly assigns the processing order to each query processing device whose information is stored in the query processing deviceinformation storage unit206. The globalquery optimization unit207 allocates the local query candidate generated in step S2111 to each query processing device whose information is stored in the query processing device information storage unit206 (S2112). In particular, the i-th local query candidate (where, 1<i<n=the number of division of the global query) is allocated to the i-th query processing device.
Next, the globalquery optimization unit207 transmits, to each query processing device in which information is stored in the query processing deviceinformation storage unit206, the local query candidate allocated to the query processing device concerned, through the query processingdevice access unit201. In addition, the globalquery optimization unit207 receives, from each query processing device, the processing cost in the case that the query processing device concerned executes the local query candidate allocated to the own query processing device, through the query processing device access unit201 (S2103).
Next, the globalquery optimization unit207 calculates the total sum of the processing costs received from each query processing device whose information is stored in the query processing deviceinformation storage unit206 and considers the calculated total sum of the processing costs as a total processing cost of the local query group candidate generated in step S2111 (S2114).
Next, the globalquery optimization unit207 combines a part of the process units divided from the global query and determines whether or not a new local query group candidate (combination of non-selected processes) can be generated so as to equal the number of division of the global query (the number of processes) and the number of query processing devices whose information is stored in the query processing device information storage unit206 (S2115). It is determined that the new local query group candidate can be generated (YES in step S2115), the process returns to step S2111 and the process is continued. On the other hand, it is determined that the new local query group candidate can not be generated (NO in step S2115), the local query group candidate whose total processing cost is minimized is selected from the local query group candidates generated in step S2111. And then, the globalquery optimization unit207 transmits, to each of the query processing devices whose information is stored in the query processing deviceinformation storage unit206, the local query candidate allocated to the query processing device concerned in the selected local query group candidate, as the local query, with the query operating condition and the information related to the processing order of each query processing device, through the query processing device access unit210 (S2116). In addition, in every query processing device to which the local query is allocated, information on the query processing device concerned and information one the local query allocated to the query processing device concerned are informed to the communicationroute searching unit208.
FIG. 12 is a flow chart illustrating the communication route retrieval processing (S2200) shown inFIG. 10.
First, the communicationroute searching unit208 sets n to the number of query processing devices (=the number of local queries) informed by the global query optimization unit207 (S2201). Next, the communicationroute searching unit208 sets i=2 (S2202).
Thereafter, the communicationroute searching unit208 assigns the segment to be a target of the local query allocated to the i-th query processing device informed by the global query optimization unit207 (S2203).
Next, the communicationroute searching unit208 calculates a communication cost C1 in the case that the assigned segment arrives in the i-th query processing device through the j-th to (i−1)-th query processing devices (where, 1<j<i<n) (S2204). For example, in the case that the communication cost per size of the data unit between the query processing devices is set to U1, the communication cost C1 can be calculated by the total sum of each query processing device as ‘data size of the segment transferred between the query processing devices x U1’. For example, in the case that transfer data between the first and second query processing devices has the segment size V1 and transfer data between the second and third query processing devices has the segment size V2, the cost C1 can be calculated as follows: C1=V1×U1+V2×U1. Here, the communication cost U1 per size of the data unit between the query processing devices may be determined by considering the communication environment.
Next, the communicationroute searching unit208 calculates a communication cost C2 in the case that the assigned segment arrives in the i-th query processing device through the j-th query processing device and the proxy device2 (S2205). For example, assuming that the communication cost per size of the data unit between the query processing device and the proxy device is set to U2, in the case that transmitting data from the j-th query processing device to theproxy device2 has the segment size V3 and receiving data in the i-th query processing device from theproxy device2 has the segment size V4, the cost C2 is calculated as follows: C2=V3×U2+V4×U2. The communication cost U2 per size of the data unit between the query processing device and theproxy device2 is predetermined by considering communication environment and the processing cost which is necessary for management in theproxy device2.
Next, the communicationroute searching unit208 compares the communication cost C1 calculated in step S2204 with the communication cost C2 calculated in step S2205 (S2206). If the communication cost C1 is lower than the communication cost C2 (NO in step S2206), the communication route of the segment assigned in step S2203 is determined to a route passing through the first to the (i−1)-th query processing devices, and the process proceeds to step S2209. On the other hand, if the communication cost C2 is lower than the communication cost C1 (YES in step S2206), the communication route of the segment assigned in step S2203 is determined to a route passing through the first query processing device and theproxy device2, and the process proceeds to step S2207.
In step S2207, the communicationroute searching unit208 determines the segment to be the target of the local query allocated to the i-th query processing device assigned in step S2203 as the segment which configures the storage data. Thereafter, the communicationroute searching unit208 accesses the i-th query processing device through the query processingdevice access unit201, and sets the segment assigned in step S2203 to the segment which configures the target data of the query processing device concerned(S2208).
On the other hand, in step S2209, the communicationroute searching unit208 separates the segment to be the target of the local query allocated to the i-th query processing device assigned in step S2203 from the segment which configures the storage data so as not to use proxy device.
Next, the communicationroute searching unit208 checks if i=n (S2210). If i≠n (i does not arrive at n, that is, NO in step S2210), the value of i is incremented by one (S2211) and the process returns to step S2203. If i=n (YES in step S2210), the communicationroute searching unit208 accesses the first query processing device through the query processingdevice access unit201, and sets the segment which configures the storage data to the corresponding query processing device (S2212).
FIG. 13 is a flow chart illustrating a query processing operation of theproxy device2.
If thestamp issue unit204 receives the storage data registration request from the firstquery processing device11through the query processing device access unit201 (YES in step S2301), thestamp issue unit204 generates the system time stamp (for example, sequential number)681 and the expire time stamp (for example, sequential number)682 (S2302). Thereafter, thestamp issue unit204 attaches the stream ID included in the storage data registration request concerned to thesystem time stamp681 and the expiretime stamp682, and transmits thesystem time stamp681 and the expiretime stamp682 to the firstquery processing device11through the query processing device access unit201 (S2303) In addition, thestamp issue unit204 informs the stream ID included in the storage data registration request concerned and at least one segment to the storagedata management unit205 with the generatedsystem time stamp681 and the expiretime stamp682. When the storagedata management unit205 receives the stream ID, the at least one segment, thesystem time stamp681, and the expiretime stamp682 from thestamp issue unit204, the storagedata management unit205 registers the stream ID, the at least one segment, thesystem time stamp681, and the expiretime stamp682 as the storage data (S2304).
In addition, if thedata extracting unit210 receives the target data request from the second to n-thquery processing devices12to1nthrough the query processing device access unit201 (YES in step S2401), thedata extracting unit210 searches the storage data having the stream ID included in the target data request concerned from the storage data management unit205 (S2402) and extracts data of the segment assigned in the target data request concerned from the searched storage data (S2403). Thereafter, thedata extracting unit210 transmits the extracted data of the segment, as the target data, to a transmission source of the target data request concerned through the query processing device access unit201 (S2404).
Next, an operation overview of the stream data processing system according to the embodiment of the invention will be described.
FIG. 14 is a view illustrating an operation of the stream data processing system shown inFIG. 1.
If theuser terminal3 transmits the global query to the proxy device2 (T301), theproxy device2 executes the above-described global query optimization processing (seeFIG. 11) (T302) and sets the local queries to eachquery processing device11to1n(T303 to T305). In addition, theproxy device2 executes the above-described communication route retrieval processing (seeFIG. 12) (T306) and sets the segment which configures the storage data to the i-th query processing device1i(where, 1<i<n) (T307, T315, and T316). In addition, theproxy device2 sets the segments to be used as the target data in thequery processing devices12to1nwhich use the segment of the target data during the process (T308). Accordingly, various types of sets performed before the global query is executed are terminated.
The streamdata generating device4 generates the stream data including the plurality of segments S1 to Sn and transmits the stream data to the first query processing device11(T309).
The i-th query processing device1i(where, 1<i<n) quarries the segments S4 to Sn which configure the storage data from the received stream data Sl to Sn according the set of theproxy device2 and transmits the segments S4 to Sn to the proxy device2 (T310). In addition, thequery processing devices11executes the local query with respect to the segments S1 to S3 included in the remaining stream data and transmits the execution result thereof, as the transfer data, to the second query processing device12(T311).
The secondquery processing device12executes the local query with respect to the segments S2 and S3 included in the transfer data, transmits the execution result thereof, as the transfer data, to the third query processing device13(T312), and transmits the segment S3, as the storage data, to the proxy device2 (T317). Here, the i-thquery processing device1ialso performs the same process (T318). However, if the segment Si to be the target of the local query is not included in the transfer data, the segment Si is obtained from the proxy device2 (T313).
The n-thquery processing device1ninforms the processing result of the local query to theproxy device2. Theproxy device2 transmits the received processing result to theuser terminal3 as the processing result of the global query (T314).
Here in before, the embodiments of the invention have been described.
According to the above-described embodiments, for example, the segment which is first used by the rear query processing device in the process order for process of the local query, is stored in theproxy device2 as the storage data. Accordingly, it is possible to prevent the segment concerned from being transferred among a plurality of query processing devices, whereby communication traffic can be restrained.
In addition, in this embodiment, since the query is processed according the order indicated by the system time stamp assigned in the process target data, it is possible to suitably process the stream data in which the processing order is important, such as a price trend on stocks. In addition, in this embodiment, since the process target data, in which the lifetime indicated by the expire time stamp expires, is deleted from the temporal storage in the device, it is possible to prevent the unnecessary data from existing in the device and affecting the query processing result.
It is also to be understood that the technical scope of the invention is not limited to the above-described embodiment, and various modification actions can be made to the embodiment without departing from the scope and sprit of the present invention.
For example, in the above-described embodiment, the last (n-th)query processing device1ntransmits the processing result of the global query to theproxy device2, but it is also preferable to directly transmit the processing result to theuser terminal3.
In addition, in the above-described embodiment, the sequential number is used as an example of thesystem time stamp681 and the expiretime stamp682. However, the invention is not limited this, and, for example, time can be used as thesystem time stamp681 and the expiretime stamp682.
In addition, any one of thequery processing devices11to1nmay be used as theproxy device2 by having a function of theproxy device2 in the above-described embodiment.