Specific implementation mode
In order to make the object, technical scheme and advantages of the embodiment of the invention clearer, below in conjunction with the embodiment of the present inventionIn attached drawing, technical scheme in the embodiment of the invention is clearly and completely described, it is clear that described embodiment isA part of the embodiment of the present invention, instead of all the embodiments.The present invention being usually described and illustrated herein in the accompanying drawings is implementedThe component of example can be arranged and be designed with a variety of different configurations.
Therefore, below the detailed description of the embodiment of the present invention to providing in the accompanying drawings be not intended to limit it is claimedThe scope of the present invention, but be merely representative of the present invention selected embodiment.Based on the embodiments of the present invention, this field is commonThe every other embodiment that technical staff is obtained without creative efforts belongs to the model that the present invention protectsIt encloses.
It should be noted that:Similar label and letter indicate similar terms in following attached drawing, therefore, once a certain Xiang YiIt is defined, then it further need not be defined and explained in subsequent attached drawing in a attached drawing.
Fig. 1 is please referred to, Fig. 1 is the schematic diagram for being the data processing system 10 that present pre-ferred embodiments provide, the numberInclude data storage cell 100, processing unit 200 and calculating service unit 400 according to processing system 10.
In an embodiment of the present embodiment, the data processing system 10 can be distributed processing system(DPS),In, the data storage cell 100, processing unit 200 or calculating service unit 400 can be in the distributed processing system(DPS)One or more independently operated data processing terminals or virtual machine.
In the another embodiment of the present embodiment, the data processing system 10 may be to operate in a dataProcessing system on processing terminal, wherein the data storage cell 100, processing unit 200 and calculating service unit 400 areRun on the stand-alone program or process of the data processing system 10.
In the present embodiment, the data storage cell 100 for storing data, support by the data storage cell 100The sequence of data is read and random writing, and meets the needs of solving mapping-reduction (Map-Reduce).By orderly storing,It may be implemented to compare (compare) function.By mapping mechanism, subregion/break up (partition/shuffle) work(may be implementedEnergy.Merge plug-in unit by embedded data, may be implemented to merge (combine) function.In this way, ensure that the data storage is singlePending data in member 100 can carry out streaming and read and write one by one.
Specifically, it in the data storage cell 100 can be the storage system that sequence reading and random writing are provided, instituteStating data storage cell 100 can be but be not limited only to relevant database, NoSql databases, data warehouse or orderly key assignmentsTo storage system.In the present embodiment, it is carried out by taking the data storage cell 100 preferably orderly key-value pair storage system as an exampleIllustrate, the data storage cell 100 can be Apache Hbase or Google Bigtable systems etc..
The processing unit 200 for reading pending data one by one from the data storage cell 100, to by certainlyThe data processing process of body calls the calculating service for calculating service unit 400 to carry out at calculating the pending dataReason.
Please refer to Fig. 2, Fig. 2 is a kind of data processing method applied to data processing system 10 shown in FIG. 1, below it is rightEach step is described in detail in the method.
Step S110, the processing unit 200 read the pending data in the data storage cell 100 one by one.
Specifically, referring once again to Fig. 1, in the present embodiment, the data processing system 10 can also include that scheduling is singleMember 300, the processing unit 200 is obtained pending in the data storage cell 100 one by one by the scheduling unit 300Data.
In the present embodiment, Fig. 3 is please referred to, the scheduling unit 300 can pass through step S210, step S220 and stepS230 carries out the distribution of the pending data.
Pending data in the data storage cell 100 is divided into multiple by step S210, the scheduling unit 300Data fragmentation, wherein each data fragmentation includes at least one pending data.
In the present embodiment, the scheduling unit 300 is according to preset data processing total duration and preset data fragmentationPending data in the data storage cell 100 is divided into multiple data fragmentations, wherein the data by handling durationProcessing total duration is the total duration needed for all pending datas of processing, and the data fragmentation handling duration is that processing is each describedDuration needed for data fragmentation.
Specifically, when the scheduling unit 300 is handled according to preset data processing total duration and preset data fragmentationLong calculate obtains data fragmentation quantity.
The data fragmentation quantity depends on each data fragmentation handling duration, and the data fragmentation handling duration canTo be set as (such as 5 minutes) of minute grade.In this way, in data fragmentation handling failure, even if treatment progress fails or is killedIt is also minute grade to restore cost afterwards.Compared with the existing technology need half an hour or more long recovery time, the present embodiment to carryThe scheme gone out obviously has higher safety and stability.
In said circumstances, calculate the data processing total duration and the data fragmentation handling duration quotient obtain described inThe quantity of data fragmentation.For example, when needing all pending datas of interior processing completion when 24 is small, when the data processing is totalLength could be provided as 24 hours, and have the data fragmentation handling duration to be set as 5 minutes, then can obtain the number of the data fragmentationAmount is 288.
Further, in order to make computing resource using more smoothly to improve resource utilization, it is also desirable toIt is expected that just having handled all fragments in processing total duration.But actual conditions are more complicated, the reality of each data fragmentationProcessing time may differ, and the data fragmentation handling duration as expected is 5 minutes, but is possible at 3~7 points under actual conditionsClock is completed, and the overall time of every wheel processing is caused not fully to be fixed.
Therefore, in the present embodiment, when carrying out the data processing of more rounds, one section of reserved stand-by period can be set,If it is expected that processing total duration is 24 hours, that is, it is expected to carry out the data processing of a round in 24 hours, then by the numberIt is set as 23 hours according to processing total duration, if in the case of fulfiling round processing ahead of schedule, waits for start next round again within 1 hourIf processing immediately begins to next round processing in the normal or overtime processing for completing this round.
After determining the data fragmentation quantity, the scheduling unit 300 waits locating according to the data fragmentation quantity by describedReason data are divided into multiple data fragmentations, wherein each data fragmentation corresponds to a range of key values.
Specifically, in the present embodiment, the data storage cell 100 includes being stored by way of key-value pairA plurality of data, per data correspond to a key assignments.The scheduling unit 300 is according to the data fragmentation quantity and needs to be locatedThe range of key values for managing data, multiple data fragmentations are divided by pending data.
For example, in said circumstances, if the key assignments of the pending data is calculated according to 128 MD5 hashed values, then can take preceding 20 bit range of key values 0~1048576 as a whole, according to 0~1,1~2 ..., 1048575~1048576 range of key values divides to obtain 1048576 data fragmentations.Or can according to 0~16,16~32,1048560~1048576 range of key values divides to obtain 65536 data fragmentations.
It is worth noting that the setting method of the key assignments, is not limited to above-mentioned 128 MD5 hashed values.In the present embodimentIn other embodiment, other key assignments set-up modes can also be taken to ensure that the number of data in each range of key values is substantiallyClose, the handling duration to ensure each data fragmentation is substantially close.For example, in the storage using plaintext string as key assignmentsIn system, can alphabetically divide range of key values, such as by the first two letter of key assignments can according to aa~ac, ac~Ae ..., the range of key values of zx~zz divides to obtain 338 data fragmentations etc..
Step S220, for each data fragmentation, the scheduling unit 300 is that the data fragmentation distributes described in oneThe treatment progress of processing unit 200 is used to handle the pending data in the data fragmentation.
In an embodiment of the present embodiment, the scheduling unit 300 obtain handled in the processing unit 200 intoThe working condition of journey, and pending data fragmentation is pushed to treatment progress according to the working condition.
Specifically, the scheduling unit 300 is according to the working condition of the treatment progress of the processing unit 200, from describedAt least one data fragmentation is chosen in data storage cell 100, and the corresponding range of key values of the data fragmentation is sent to the placeManage an idle treatment progress of unit 200, wherein the free time treatment progress can be the process for not carrying out data processing,It can certainly be the less process of data processing task.
In the another embodiment of the present embodiment, some treatment progress of the processing unit 200 is not into line numberWhen according to processing, data fragmentation that can be in data storage cell 100 described in active pull carries out data processing.
Specifically, when the 200 available free treatment progress of processing unit, the free time treatment progress is to the scheduling unit300 transmission data fragments obtain request.The scheduling unit 300 is sent according to the idle treatment progress of the processing unit 200Data fragmentation obtain request, at least one data fragmentation is chosen from the data storage cell 100, by the data fragmentation pairThe range of key values answered is sent to the free time treatment progress of the processing unit 200.
Further, the scheduling unit 300 is according to the number for carrying out data processing in the data storage cell 100The selection of the data fragmentation is carried out according to the quantity of fragment.
Specifically, the data storage cell 100 would generally either statically or dynamically mark off multiple storages according to data scaleSubregion, each partition holding operate on certain machine of distributed system, and safeguard a part of data.In order to avoid the storageData congestion (such as read-write congestion and calculating hot spot etc.) where subregion on machine, the scheduling unit 300 should avoid as possibleIt distributes simultaneously and processing falls multiple data fragmentations on the same partition holding, to prevent these data fragmentations processing time superIt crosses and is expected, and bring the risk of disposed of in its entirety time time-out.
Therefore in the present embodiment, the scheduling unit 300 obtains the number that data processing is being carried out in each partition holdingAccording to the quantity of fragment, the partition holding for the data fragmentation minimum number for carrying out data processing is chosen as target storage pointArea, and choose a data fragmentation from the target partition holding and be sent to the idle treatment progress.
It is worth noting that the choosing method of above-mentioned data fragmentation is only a kind of embodiment provided in this embodiment,In the other embodiment of the present embodiment, the selection of the data fragmentation can also be carried out according to other scheduling strategies, to reduceThe risk of data congestion.
Step S230, the treatment progress of the processing unit 200 obtain pending in assigned data fragmentation one by oneData.
Further, Fig. 4 is please referred to, the processing unit 200 may include digital independent subelement 210 and data processingSubelement 220.The digital independent subelement 210 is used to read pending data one by one from the data storage cell 100, instituteIt includes treatment progress to state data processing subelement 220, for carrying out data processing to the pending data.
In the present embodiment, the digital independent subelement 210 is according to the range of key values of the data fragmentation, successively from instituteState the pending data obtained one by one in data storage cell 100 in the range of key values.In the present embodiment, data actual treatmentSequence and read in memory sequence can be consistent, can not also be consistent, depend on calculating task setting.If need not keepIt is completely the same, then the mechanism such as multithreading can be set and carry out concurrent processing, to improve speed.
Each data processing subelement 220 obtains the pending data given in range of key values, and enters step S120 and openBegin to carry out Stream Processing one by one to the pending data.
Step S120 judges whether the pending data needs to carry out polymerization calculating according to preset calculating task.
The data processing subelement 220 is directed to every pending data, according to calculating task to this pending dataJudged.
The preset calculating task can be the data processing topological structure pre-set, be wrapped in the topological structureIt includes pending data and needs the data processing node flowed through.The calculating task includes that the common calculating and polymerization calculate,In, the common calculating one by one for being calculated as independently carrying out for every pending data, the polymerization calculating includes needingThe calculating to be carried out in conjunction with a plurality of pending data.
Therefore in the present embodiment, the data processing subelement 220 judges described pending according to preset calculating taskWhether data need to carry out polymerization calculating, to execute different calculation processings.
When that need not carry out polymerization calculating to the pending data, go to step S130, when needs are waited for describedWhen processing data carry out polymerization calculating, go to step S140.
Step S130, be sent to by the processing unit 200 or by the processing data calculating service unit 400 intoRow is common to be calculated.
The common calculating includes that light-duty calculating, heavy calculating and general-purpose computations, the light-duty calculating include preset meterThe smaller service logic calculation processing of calculation amount, heavy calculate includes the larger calculation processing of preset calculation amount, described logicalInclude the general calculation processing of preset different computing tasks with calculating.
It may include one of following manner or the combination between it to carry out the common calculation.
When carrying out light-duty calculation processing, light-duty calculating is carried out to the pending data in the processing unit 200Processing.
The data processing subelement 220 of the processing unit 200 can be used for executing the light-duty calculation processing, described lightType calculation processing is the smaller simple computation of performance cost, for example, the conversion of certain field format in data, from one or moreField derives a new field etc..The light-duty calculation processing can also be one and occupy the less complicated calculation of computing resourceMethod.
Fig. 5 is please referred to, the calculating service unit 400 may include heavy computation subunit 410, general-purpose computations subelement420 or polymerization computation subunit 430.
When needing to carry out heavy calculate to the pending data, the heavy type of the heavy computation subunit 410 is calledThe service of calculating carries out the pending data heavy calculation processing one by one.
For the heavy type computation subunit 410 for executing the heavy calculation processing, the heavy type calculation processing is to needExpend a large amount of complicated calculations calculated with Internet resources.For example, heavy calculation processing usually requires prodigious memory, for example needProdigious configuration file is loaded, or needs to preserve prodigious ephemeral data etc. in memory.
When needing to carry out general-purpose computations to the pending data, the general of the general-purpose computations subelement 420 is calledThe service of calculating carries out general-purpose computations processing one by one to the pending data.
For the general-purpose computations subelement 420 for executing the general-purpose computations processing, the general-purpose computations processing is to have veryThe data handling procedure that multi-service flow can all be shared.Such as in search engine off-line data process flow, to webpage url'sMutual turn of standardization processing, duplicate removal processing, pc pages and move page etc., can all be shared by multiple operation flows.For another example, general-purpose computationsTask can be the key-value pair retrieval service provided by general random key-value pair storage system (such as redis, tair).
The pending data is sent to calculating service unit 400 and carries out polymerization calculating by step S140.
When needing to carry out polymerization calculating to the pending data, which is sent to the polymerization and is calculatedSubelement 430 carries out polymerization calculating.
It due to the data stored by key-value pair mode, is usually independent from each other between data, and is carrying out full dose dataIn the case of processing, it is sometimes desirable to which a plurality of data aggregate is calculated, i.e., has certain correlation between data.For example, instituteIt includes the number for needing to count the appearance of certain class data to state polymerization and calculate, or needs to be ranked up data by certain rule.
In an embodiment of the present embodiment, pending data is sent to the polymerization by the processing unit 200Computation subunit 430.It is pending in processing time section to presetting that the polymerization computation subunit 430 calls polymerization to calculate serviceData carry out partial polymerization calculating, or carry out global polymerization to all pending datas and calculate.
Further, the system 10 further includes polymerization result storage unit.The calculating service unit 400 counts polymerizationResult during calculation is preserved to the polymerization result storage unit, so that when calculating service unit 400 is restarted, is gathered from describedClose the result for restoring that the global polymerization calculates or partial polymerization calculates in result storage unit again.
Under certain business scenarios, the polymerization of above-mentioned pure streaming is calculated there are certain restrictions, i.e. partial polymerization can influence to gatherIntegrality is closed, and global polymerization is to the more demanding of computing resource especially data buffer storage.Therefore it provides in the present embodiment anotherKind realizes the embodiment that polymerization calculates, and in this embodiment, pending data is sent to described by the processing unit 200Calculate service unit 400.Then the calculating service unit 400 calls polymerization to calculate service advance to the data progress receivedIt handles one by one, and records the results of intermediate calculations for handling acquisition one by one in advance, after pre-set delay duration, execute batch meterCalculation task carries out unified polymerization to the results of intermediate calculations and calculates.
Based on above-mentioned design, is handled one by one by streaming and complete most of time-consuming more calculating step, obtain dataThen the results of intermediate calculations of scale is smaller is calculated by batch wise polymerization and integrates the results of intermediate calculations.For example, solid dailyFixed time point carries out batch wise polymerization processing to the results of intermediate calculations that streaming before is handled one by one.By flowing one by one aboveThe method of formula processing cooperation batch wise polymerization processing, can obtain more complete polymerization result of calculation, and locate one by one by streamingThe combination of reason and batch wise polymerization processing can make the use of the computing resource of the data processing system 10 more smooth, improveThe utilization rate of computing resource.
It polymerize after calculation processing as a result, currently processed process can be returned to, is also sent to and needs the polymerization knotOther treatment progress of fruit.In this way, both realized the polymerizable functional of lot data, can also make full use of that streaming computing brings isThe promotion for the scalability and resource utilization of uniting.
In practical applications, data processing system 10 may be performed simultaneously multiple calculating tasks, and each calculating task needsIndividual data processing is carried out to pending data.The method provided through this embodiment, by being converted to batch processedStream Processing becomes smoothly to use in 24 hours 1 day the processing to data from concentrating a period of time to monopolize cluster resourceThe resource quota (quota) that one relatively fixed in cluster resource.Therefore, under the scene that multiple calculating tasks are carried out at the same time,Each calculating task is assigned certain resource quota, and is run with same priority, then resource contention between calculating taskSituation can be greatly reduced, to improve resource utilization.
Further, the processing unit 200 detects whether the data fragmentation handles completion in data processing,The data fragmentation is identified when data fragmentation processing is completed.
During carrying out data processing, if some treatment progress exits (such as fail or killed) extremely, it can leadCause the remaining data of the data fragmentation for the treatment of progress processing that can not handle.In the present embodiment, can be arranged it is described handle intoJourney to it is processed at data fragmentation be identified, e.g., ensure data quilt by mechanism such as checkpoints (checkpoint)All processing.
Specifically, the treatment progress can record a checkpoint, table after the data for having handled a data fragmentationIt is bright to have handled the data fragmentation.In order to ensure checkpoint will not be lost, it can be recorded in distributed memory system.After treatment progress exits and restarts, the checkpoint can be checked, untreated complete data fragmentation, which can be reallocated, goes forward side by sideRow processing.Since each data fragmentation processing time is minute grade, the cost very little that treatment progress restores.
Particularly, above-mentioned recovery scheme can ensure that the data of at-least-once (i.e. same data are at least written once)Consistency.If operation flow needs to realize at-most-once (the at most write-in of i.e. same data is primary) or exactly-onceThe strong consistency of (i.e. same data must just be written once), reprocessing and write-back can cause data different with batch of dataOften.It is therefore desirable to have other external mechanism ensures the strong consistency, for example, by using the Trident of streaming computing platform StormFunction module.
Step S150 preserves the data that processing is completed by way of random writing to the data storage cell 100.
The processing unit 200 will treated write back data to the data storage cell 100, in the present embodiment,The key assignments for data that treated may have occurred that variation compared with the key assignments of the same data read in step s 110.Or in processing procedure, the data read in step s 110 are derived as a plurality of new data and respectively tool by calculation processingThere is different key assignments.Therefore, in step S150 by write back data to storage system when, can not ensure the order of key assignments again.Therefore, in the present embodiment, write back data is random without being ordered into.
Based on above-mentioned design, the present embodiment stores data, light-duty calculation processing and calculate between service only by IO orService call couples, and each module is relatively independent, and can facilitate and linearly carry out behavior extension, such as simplyGround increases calculating/storage resource.For example, when carrying out data dilatation, as long as correspondingly increasing data storage cell 100The quantity of capacity or partition holding, increase processing unit 200 treatment progress quantity, and increase calculate service unit 400 intoNumber of passes, and three is independent of each other, and practical operability is very strong, and O&M risk is also relatively low.Therefore, the present embodimentMethod have good performance scalability.
In addition, after making batch processed into Stream Processing, can be stored with higher and identical priority to execute dataUnit 100, processing unit 200 and the task process for calculating service unit 400.Each data processing task is assigned fixed quotaThe resources such as memory/cpu/ networks.Then, the resource utilization of entire cluster becomes very smooth, and each data processing is appointedResource contention between business reduces and (reduces the possibility for entering journey mutually), and cluster resource overall utilization rate has obtained carrying greatly very muchIt rises.Fig. 6 and Fig. 7 is please referred to, Fig. 6 is the utilization power of cluster resource in prior art batch processed scheme, and Fig. 7 is the present embodimentThe utilization power of middle cluster resource, wherein abscissa is the time, and ordinate is the occupancy of computing resource., it is apparent thatCluster resource use becomes smoother in the present embodiment, and the promotion of cluster resource utilization rate is embodied in indirectly to data dilatationSupport on.After Stream Processing, under same cluster scale, the scale of supported data processing increase 30% withOn.
In conclusion data processing method provided by the invention and system, by data carry out relatively independent storage,Scheduling and processing, and using method batch processed converted for Stream Processing, enhance the data processing system 10Scalability, and the data processing system 10 is improved when carrying out data processing to the utilization rate of computing resource.
In embodiment provided herein, it should be understood that disclosed device and method, it can also be by otherMode realize.The apparatus embodiments described above are merely exemplary, for example, the flow chart and block diagram in attached drawing are shownThe device of multiple embodiments according to the present invention, the architectural framework in the cards of method and computer program product, functionAnd operation.In this regard, each box in flowchart or block diagram can represent one of a module, section or codePoint, a part for the module, section or code includes one or more for implementing the specified logical function executableInstruction.It should also be noted that at some as in the realization method replaced, the function of being marked in box can also be attached to be different fromThe sequence marked in figure occurs.For example, two continuous boxes can essentially be basically executed in parallel, they also may be used sometimesTo execute in the opposite order, this is depended on the functions involved.It is also noted that each of block diagram and or flow chartThe combination of box in box and block diagram and or flow chart, function or the dedicated of action are based on as defined in executionThe system of hardware is realized, or can be realized using a combination of dedicated hardware and computer instructions.
In addition, each function module in each embodiment of the present invention can integrate to form an independent portionPoint, can also be modules individualism, can also two or more modules be integrated to form an independent part.
It, can be with if the function is realized and when sold or used as an independent product in the form of software function moduleIt is stored in a computer read/write memory medium.Based on this understanding, technical scheme of the present invention is substantially in other wordsThe part of the part that contributes to existing technology or the technical solution can be expressed in the form of software products, the meterCalculation machine software product is stored in a storage medium, including some instructions are used so that a computer equipment (can bePeople's computer, server or network equipment etc.) it performs all or part of the steps of the method described in the various embodiments of the present invention.And storage medium above-mentioned includes:USB flash disk, mobile hard disk, read-only memory (ROM, Read-Only Memory), arbitrary access are depositedThe various media that can store program code such as reservoir (RAM, Random Access Memory), magnetic disc or CD.
It should be noted that herein, relational terms such as first and second and the like are used merely to a realityBody or operation are distinguished with another entity or operation, are deposited without necessarily requiring or implying between these entities or operationIn any actual relationship or order or sequence.Moreover, the terms "include", "comprise" or its any other variant are intended toNon-exclusive inclusion, so that the process, method, article or equipment including a series of elements is not only wanted including thoseElement, but also include other elements that are not explicitly listed, or further include for this process, method, article or equipmentIntrinsic element.In the absence of more restrictions, the element limited by sentence "including a ...", it is not excluded thatThere is also other identical elements in process, method, article or equipment including the element.
The foregoing is only a preferred embodiment of the present invention, is not intended to restrict the invention, for the skill of this fieldFor art personnel, the invention may be variously modified and varied.All within the spirits and principles of the present invention, any made by repairChange, equivalent replacement, improvement etc., should all be included in the protection scope of the present invention.It should be noted that:Similar label and letter existSimilar terms are indicated in following attached drawing, therefore, once being defined in a certain Xiang Yi attached drawing, are then not required in subsequent attached drawingIt is further defined and is explained.
The above description is merely a specific embodiment, but scope of protection of the present invention is not limited thereto, anyThose familiar with the art in the technical scope disclosed by the present invention, can easily think of the change or the replacement, and should all containLid is within protection scope of the present invention.Therefore, the protection scope of the present invention shall be subject to the protection scope of the claims.