Disclosure of Invention
In order to solve the problems in the prior art, embodiments of the present invention provide a method and an apparatus for balancing distributed memory database loads. The technical scheme is as follows:
in a first aspect, a method for balancing distributed in-memory database load is provided, where the method includes:
acquiring the number of current fragments of each node in a target distributed memory database and an identifier of the current fragments, and acquiring a first time length and a second time length, wherein the first time length is the average time length of the single fragment processed on a local node, and the second time length is the average time length of the single fragment processed on a non-local node in a cross-network mode;
according to the number of the current fragments of each node, the identifier of the current fragments of each node, the first time length and the second time length, determining indication information of the fragments processed by each node in load balancing, wherein the indication information is used for indicating the fragments from each node;
and for each node, sending the indication information of the fragment of the node to the node so that the node acquires the fragment according to the indication information of the fragment of the node and processes the fragment.
Optionally, the obtaining the first duration and the second duration includes:
counting the time lengths of the first preset number of fragments which are respectively processed on the local node, and counting the time lengths of the second preset number of fragments which are respectively processed on the cross-network non-local node;
and determining that the counted average value of the time lengths of the first preset number of fragments respectively processed on the local nodes is a first time length, and determining that the counted average value of the time lengths of the second preset number of fragments respectively processed on the cross-network non-local nodes is a second time length.
Thus, the determined first duration and the second duration can be more accurate.
Optionally, the determining, according to the number of current fragments of each node, the identifier of the current fragment of each node, the first time length, and the second time length, indication information of a fragment processed by each node in load balancing, where the indication information is used to indicate a fragment from each node, includes:
determining the number of fragments from each node in the fragments processed by each node during load balancing according to the number of current fragments of each node, the first time length and the second time length;
and determining the indication information of the fragments processed by each node according to the number of the fragments from each node in the fragments processed by each node and the identification of the current fragment of each node.
Optionally, the indication information includes:
starting fragment identification and ending fragment identification in fragments from each node; or,
the number of fragments from each node and the starting fragment identifier; or,
the number of fragments from each node and the end fragment identification.
Optionally, the determining, according to the number of current fragments of each node, the first duration and the second duration, the number of fragments from each node in the fragments processed by each node in load balancing includes:
determining the number of fragments from each node in the fragments processed by each node in load balancing by using the principle that the TotalCost value is minimum, wherein all the nodes in the target distributed memory databaseThe sum of the number of the current fragments of the point is equal to the sum of the number of the fragments processed on all the nodes during load balancing, the TotalCost is the sum of the variances of the total time length of the fragments processed by each node in the target distributed memory database,
Cost
ktotal duration of fragment processing for node k, Cost
kIncluding processing the duration X of the processing of the fragment on the node k by the node k
kkT and the time length of the node k for processing the fragments of other nodes
t is a first duration, ct is a second duration, n is the number of nodes in the target distributed memory database, X
kkProcessing the number of fragments stored on the node k for the node k, X
ikAnd processing the number of fragments stored on the node i for the node k, wherein the node k is any node in the target distributed memory database, and k is greater than or equal to 1.
Optionally, the determining, in the principle of minimizing the TotalCost value, the number of fragments from each node in the fragments processed by each node in load balancing includes:
converting the TotalCost into a matrix form TotalCost ═ XTHX, wherein X ═ X (X)11,X12,...,X1n,...Xij,...,Xn1,Xn2,...,Xnn)T,XijProcessing the number of fragments on the ith node for the jth node, wherein i, j and n are positive integers, H is a symmetric matrix, and the sum of elements in X is less than or equal to the sum of the current number of fragments of all nodes in the target distributed memory database;
with XTAnd determining the number of fragments from each node in the fragments processed by each node in load balancing by taking the minimum value of HX as a principle.
Optionally, the obtaining the number of current fragments of each node in the target distributed memory database includes:
when a data query request is received, acquiring the number of current fragments of each node in a target distributed memory database;
the method further comprises the following steps:
receiving the result of processing the fragments of each node in the target distributed memory database;
summarizing the results of processing the fragments of each node to obtain a total result;
and feeding back the total result.
In a second aspect, an apparatus for balancing distributed in-memory database load is provided, the apparatus comprising:
the system comprises an acquisition module, a processing module and a processing module, wherein the acquisition module is used for acquiring the number of current fragments of each node in a target distributed memory database and the identifier of the current fragments, and acquiring a first time length and a second time length, the first time length is the average time length of the single fragment processed on a local node, and the second time length is the average time length of the single fragment processed on a non-local node in a cross-network mode;
a determining module, configured to determine, according to the number of current fragments of each node, an identifier of the current fragment of each node, the first time length, and the second time length, indication information of a fragment processed by each node in load balancing, where the indication information is used to indicate the fragment from each node;
and the sending module is used for sending the indication information of the fragments of the nodes to the nodes so that the nodes acquire the fragments according to the indication information of the fragments of the nodes for processing.
Optionally, the obtaining module is configured to:
counting the time lengths of the first preset number of fragments which are respectively processed on the local node, and counting the time lengths of the second preset number of fragments which are respectively processed on the cross-network non-local node;
and determining that the counted average value of the time lengths of the first preset number of fragments respectively processed on the local nodes is a first time length, and determining that the counted average value of the time lengths of the second preset number of fragments respectively processed on the cross-network non-local nodes is a second time length.
Optionally, the determining module is configured to:
determining the number of fragments from each node in the fragments processed by each node during load balancing according to the number of current fragments of each node, the first time length and the second time length;
and determining the indication information of the fragments processed by each node according to the number of the fragments from each node in the fragments processed by each node and the identification of the current fragment of each node.
Optionally, the indication information includes:
starting fragment identification and ending fragment identification in fragments from each node; or,
the number of fragments from each node and the starting fragment identifier; or,
the number of fragments from each node and the end fragment identification.
Optionally, the determining module is configured to:
determining the number of fragments from each node in the fragments processed by each node in load balancing on the basis of the minimum value of TotalCost, wherein the sum of the current numbers of the fragments of all nodes in the target distributed memory database is equal to the sum of the numbers of the fragments processed on all nodes in load balancing, TotalCost is the sum of the variances of the total time lengths of the fragments processed by each node in the target distributed memory database,
Cost
ktotal duration of fragment processing for node k, Cost
kIncluding processing the duration X of the processing of the fragment on the node k by the node k
kkT and the node kManaging the duration of fragmentation of other nodes
t is a first duration, ct is a second duration, n is the number of nodes in the target distributed memory database, X
kkProcessing the number of fragments stored on the node k for the node k, X
ikAnd processing the number of fragments stored on the node i for the node k, wherein the node k is any node in the target distributed memory database, and k is greater than or equal to 1.
Optionally, the determining module is configured to:
converting the TotalCost into a matrix form TotalCost ═ XTHX, wherein X ═ X (X)11,X12,...,X1n,...Xij,...,Xn1,Xn2,...,Xnn)T,XijProcessing the number of fragments on the ith node for the jth node, wherein i, j and n are positive integers, H is a symmetric matrix, and the sum of elements in X is less than or equal to the sum of the current number of fragments of all nodes in the target distributed memory database;
with XTAnd determining the number of fragments from each node in the fragments processed by each node in load balancing by taking the minimum value of HX as a principle.
Optionally, the obtaining module is configured to:
when a data query request is received, acquiring the number of current fragments of each node in a target distributed memory database;
the device further comprises:
the receiving module is used for receiving the result of processing the fragments of each node in the target distributed memory database; summarizing the results of processing the fragments of each node to obtain a total result;
the sending module is further configured to feed back the total result.
In a third aspect, a computer-readable storage medium is provided, in which a computer program is stored which, when being executed by a processor, carries out the method steps of the first aspect described above.
In a fourth aspect, a management device is provided, comprising a processor and a memory, wherein the memory is configured to store a computer program; the processor is configured to execute the program stored in the memory, so as to implement the method steps of the first aspect.
The technical scheme provided by the embodiment of the invention has the beneficial effects that at least:
in the embodiment of the present invention, when balancing the load of the distributed memory database, the indication information of the fragment processed by each node in load balancing may be determined based on the number of the current fragments of each node, the average duration of the processing of the single fragment on the local node, and the average duration of the processing of the single fragment on the non-local node in a cross-network manner, where the indication information is used to indicate the fragment from each node and notify each node, and each subsequent node may acquire the fragment for processing based on the indication information. In this way, the fragments to be processed calculated during load balancing are determined firstly, and then the fragments to be processed calculated during load balancing are processed by each node, so that load balancing can be ensured as much as possible, and the fragment processing efficiency is improved.
Detailed Description
In order to make the objects, technical solutions and advantages of the present invention more apparent, embodiments of the present invention will be described in detail with reference to the accompanying drawings.
To facilitate understanding of the embodiments of the present invention, a system architecture related to the embodiments of the present invention and concepts related to the terms are first described below.
The embodiment of the invention can be applied to a distributed system, wherein the distributed system is a set of software system established on a network and mainly comprises a distributed memory database, as shown in fig. 1, the distributed memory database comprises a plurality of nodes, and the nodes can be servers.
The distributed memory database stores data in a memory for operation, and has higher reading and writing speed compared with a magnetic disk.
And the local node fragments the stored nodes.
Spark, itself a distributed computing framework, is a top level item of the Apache foundation.
SQL (Structured Query Language), a database Query and programming Language, is used to access data and Query, update, and manage relational database systems.
The shard is a data storage mode in a database, and is a logic unit formed by data with fixed line number. Can be region in HBase, rowgroup in partial/RCFile, etc. When Spark calculation is used, one slice may correspond to one partition of Spark.
A quadratic polynomial, where the number of unknowns is n and the power of each term is 2, is called quadratic.
The load balancing mainly refers to the CPU occupancy rate balancing in the embodiment of the invention, namely the CPU occupancy rates of all the nodes are basically the same.
The embodiment of the invention provides a method for balancing distributed memory database load, wherein an execution main body of the method can be management equipment, and the management equipment can be an independent node or a certain node in the distributed memory database.
The management device may be provided with a processor, a storage and a transceiver, the processor may be configured to perform processing for balancing the load of the distributed memory database, the storage may be configured to perform data required and generated in the process of balancing the load of the distributed memory database, and the transceiver may be configured to receive and transmit data.
Before implementation, it is assumed that the processing capacity of each node is the same, the network capacity of each node is consistent, and the size of the fragments is the same. The implementation of the present invention is described by taking Spark SQL as a query processing engine of a distributed memory database.
An embodiment of the present invention provides a method for balancing a load of a distributed memory database, and as shown in fig. 2, an execution flow of the method may be as follows:
step 201, acquiring the number of current fragments of each node in the target distributed memory database, and acquiring a first time length and a second time length.
The target distributed memory database is any distributed memory database, a first time length and a second time length can be stored in the management device in advance, the first time length is the average time length of a single fragment processed on a local node, namely the average time length of the single fragment processed on a node for storing the fragment, the second time length is the average time length of the single fragment processed on a non-local node in a cross-network mode, namely the average time length of the single fragment not processed on the node for storing the fragment, the second time length comprises two parts, the first part is the time length for transmitting the fragment, and the second part is the time length for processing the fragment.
In implementation, each node in the distributed memory database may periodically report its own fragmentation number and fragmentation identifier to the management device, and the management device may store the received fragmentation number, fragmentation identifier, and node identifier correspondingly. Subsequently, the management device wants to balance the load of each node in the in-memory database, may obtain the number of stored current fragments of each node and the identifier of the fragments, and may obtain the first duration and the second duration that are stored in advance.
Or, the management device may send a fragment acquisition request to each node, and each node may send its own fragment number and fragment identifier to the management device, so that the management device may acquire the current fragment number and fragment identifier of each node, in order to balance the load of each node in the in-memory database. And the management apparatus may acquire the first time period and the second time period stored in advance.
It should be noted that, for a certain node, the current shard refers to the shard currently stored by the node.
Optionally, the process of obtaining the first duration and the second duration may be as follows:
counting the time lengths of the first preset number of fragments which are respectively processed on the local node, and counting the time lengths of the second preset number of fragments which are respectively processed on the cross-network non-local node; and determining that the average value of the counted time lengths of the first preset number of fragments respectively processed on the local node is a first time length, and determining that the average value of the counted time lengths of the second preset number of fragments respectively processed on the cross-network non-local node is a second time length.
The first preset number and the second preset number may be preset and stored in the management device, for example, the first preset number is 1000, the second preset number is 1500, and the first preset number and the second preset number may be the same, for example, the first preset number and the second preset number are both 1000.
In implementation, in the target distributed memory database, each node may count the time length for which the stored fragments are processed on the node itself, and report the time length to the management device, and after receiving the time length for which the first preset number of fragments are processed on the local node, the management device may calculate an average value of the first preset number of time lengths, and determine the average value as the first time length.
In the target distributed memory database, each node may count the time length for processing the fragments stored in other nodes (including the time length for obtaining the fragments from other nodes and the time length for obtaining post-processing), and report the counted time length to the management device, and after receiving the time length for which the processing of a second preset number of fragments on the cross-network non-local node is completed, the management device may calculate an average value of the second preset number of time lengths, and determine the average value as the second time length.
Optionally, load balancing considered in the embodiment of the present invention is CPU balancing, so that when a data query request is received, the processing instep 201 may be started to be executed:
and when a data query request is received, acquiring the current fragmentation number of each node in the target distributed memory database.
In implementation, a user may input a content to be queried in a search input box, click a search key, trigger a sending of a search request to a management device, and the management device may obtain the number of current segments of each node in a target distributed memory database (the obtaining manner is described in detail above, and is not described here again). The query processing engine for this process may be Spark SQL.
Therefore, when the fragmentation processing is carried out, the load imbalance of the distributed memory database is increased, so that the load balancing processing can be started when a data query request is received, the processing resources can be saved, and the query performance is improved.
Step 202, determining indication information of the fragments processed by each node during load balancing according to the number of the current fragments of each node, the identifier of the current fragments of each node, the first time length and the second time length.
In implementation, after obtaining the number of current fragments of each node, the identifier of the current fragment of each node, the first time length, and the second time length, the management device may determine, according to the number of current fragments of each node, the identifier of the current fragment of each node, the first time length, and the second time length, indication information of fragments processed by each node in load balancing, where the indication information is used to indicate the fragments from each node.
Optionally, the number of fragments from each node may be determined first, and then the indication information is determined, and the corresponding processing instep 202 may be as follows:
determining the number of fragments from each node in the fragments processed by each node during load balancing according to the number of current fragments of each node, the first time length and the second time length; and determining the indication information of the fragments processed by each node according to the number of the fragments from each node in the fragments processed by each node and the identification of the current fragment of each node.
In implementation, after obtaining the number of current fragments of each node, the identifier of the current fragment of each node, the first time length and the second time length, the management device may determine the number of fragments from each node in the fragments processed by each node in load balancing. For the current fragment of the target node (any one of all nodes), the management device may determine all nodes processing the current fragment of the target node and the number of each of all nodes processing the current fragment, then arrange the sequence numbers of all the nodes in a descending order, the sequence numbers of all the nodes are sequentially N1, N2, N3 and N4 … from the descending order, the number of the current fragment of the target node is respectively N1, N2, N3 and N4 …, the node with the sequence number of N1 determines the identifiers of the first N1 fragments from the current fragment of the target node, then the node with the sequence number of N2 determines the identifiers of the N2 fragments adjacent to the first N1 fragments from the current fragment of the target node, then the node with the sequence number of N3 determines the identifiers of the N3 fragments adjacent to the N1+ N2 fragments from the current fragment of the target node, according to the mode, the identifiers of the fragments in the current fragments of the target nodes to be processed by all the nodes are sequentially determined, then the indication information is determined based on the identifiers of the fragments, and according to the mode, the indication information of the fragments processed by each node in the distributed memory database can be determined.
For example, there are three nodes, node a, node b, and node c, where the number of fragments stored by node a is 10, the number of fragments stored by node b is 3, the number of fragments stored by node c is 2, and to implement load balancing, the number of fragments stored by node a in processing itself is 5, the number of fragments stored by node a in processing node b in processing node a is 0, the number of fragments stored by node a in processing node c in processing node a is 0, the number of fragments stored by node b in processing node b is 3, the number of fragments stored by node b in processing node a in processing node b is 2, the number of fragments stored by node b in processing node c in processing node b is 0, the number of fragments stored by node c in processing node a is 2, the number of fragments stored by node c in processing node a is 3, and the number of fragments stored by node c in processing node b is 0. The sequence numbers of the nodes are sequentially a node a, a node b and a node c from small to large, for the node a, the node a only processes fragments stored by the node a, the first 5 fragments can be obtained, and the indication information of the fragments processed by the node a is only used for indicating the first 5 fragments. For the node b, the node a does not process the fragment of the node b, the fragment of the node b processing the node a may be the 6 th fragment and the 7 th fragment, the node b processes the 3 own fragments, and the indication information of the fragment processed by the node b is only used for indicating the 6 th fragment and the 7 th fragment of the node a and the fragment stored by the node b. For the node c, the node a and the node b do not process the fragment of the node c, the fragment of the node a processed by the node c may be from the 8 th fragment to the 10 th fragment, the node c processes two fragments stored by itself, and the indication information of the fragment processed by the node c is only used for indicating the fragments from the 8 th fragment to the 10 th fragment of the node a and the fragments stored by itself.
In addition, for the current segment of the target node (any one of all nodes), the management device may determine all nodes that process the current segment of the target node and the number of each of all nodes that process the current segment, then arrange the sequence numbers of all nodes in a descending order, where the sequence numbers of all nodes are N1, N2, N3, and N4 … in that order, the number of the current segments that process the target node is N1, N2, N3, and N4 … in that order, the target node is N3, the node with the sequence number N3 first determines the identities of the first N3 segments from the current segment of the target node, then the node with the sequence number N1 determines the identities of the N1 segments that are adjacent after the first N3 segments from the current segment of the target node, then the node with the sequence number N48 determines the identities of the N2 segments that are adjacent after the N1+ N3 segments from the current segment of the target node, according to the mode, the identifiers of the fragments in the current fragments of the target nodes to be processed by all the nodes are sequentially determined, then the indication information is determined based on the identifiers of the fragments, and according to the mode, the indication information of the fragments processed by each node in the distributed memory database can be determined.
For example, there are four nodes in total, node a, node b, node c, and node d, and the sequence numbers of the nodes are, in order from small to large, node a, node b, node c, and node d. The number of fragments stored by the node a is 10, the number of fragments stored by the node b is 3, the number of fragments stored by the node c is 2, and the number of fragments stored by the node d is 9. To realize load balancing, the node a only processes fragments stored by itself, the number of the fragments is 6, and the number of the fragments stored on the node d processed by the node a is 0. The number of the fragments stored in the node b processing node b is 3, the number of the fragments stored in the node b processing node a is 2, the number of the fragments stored in the node b processing node c is 0, and the number of the fragments stored in the node b processing node d is 1. The number of the fragments stored by the node c for processing the node c is 2, the number of the fragments stored by the node c for processing the node a is 2, the number of the fragments stored by the node c for processing the node b is 0, the number of the fragments stored by the node c for processing the node d is 2, and the number of the fragments stored by the node d for processing the node c is only 6.
For the node a, the node a only processes the fragments stored by the node a, and can acquire the first 6 fragments, and the indication information of the fragments processed by the node a is only used for indicating the first 6 fragments.
For the node d, the node d only processes the fragments stored by itself, and may obtain the first 6 fragments, and the indication information of the fragments processed by the node d is only used to indicate the first 6 fragments.
For the node b, the fragment of the node b processing the node a may be the 7 th fragment and the 8 th fragment, the fragment of the node b processing the node d may be the 7 th fragment, the node b processes the 3 own fragments, and the indication information of the fragment processed by the node b is only used for indicating the 7 th fragment of the node a, the fragment stored by the node b, and the 7 th fragment of the node d.
For the node c, the fragments of the node a processed by the node c may be the 9 th fragment and the 10 th fragment, the node c processes the 8 th fragment and the 9 th fragment of the node d, the node c processes two fragments stored by itself, and the indication information of the fragment processed by the node c is only used for indicating the 9 th fragment and the 10 th fragment of the node a, the 8 th fragment and the 9 th fragment of the node d, and the fragment stored by itself.
Optionally, the content of the indication information is various, and three possible contents are given as follows:
A. and the starting fragment identifier and the ending fragment identifier in the fragments from each node.
In an implementation, instep 202, after determining that the fragment of another node is to be processed, for each node, an identifier of a first fragment (i.e., a starting fragment identifier) and an identifier of a last fragment (i.e., an ending fragment identifier) of the fragment to be processed in another node may be obtained. The subsequent indication information includes the start fragment identifier and the end fragment identifier, that is, the start fragment identifier and the end fragment identifier can be used to find all the fragments to be processed.
B. The number of fragments from each node and the starting fragment identification.
In an implementation, instep 202, for each node, after determining that the fragment of another node is to be processed, an identifier of a first fragment of the fragments to be processed in another node (i.e., a starting fragment identifier) and a number of fragments of another node to be processed may be obtained. The subsequent indication information includes the starting fragment identifier and the number of fragments, that is, all fragments to be processed can be found by using the starting fragment identifier and the number of fragments.
C. The number of fragments from each node and the end fragment identification.
In an implementation, instep 202, for each node, after determining the fragments to be processed in another node, an identification of a last fragment of the fragments to be processed in another node (i.e., a starting fragment identification) and a number of fragments of another node to be processed may be obtained. The subsequent indication information includes the ending fragment identifier and the number of the fragments, that is, the ending fragment identifier and the number of the fragments can be used to find all the fragments to be processed.
Optionally, the number of the fragments from each node in the fragments processed by each node may be determined based on a principle that the processing time is the smallest, and the corresponding processing may be as follows:
determining the number of fragments from each node in the fragments processed by each node in load balancing by using the principle that the value of TotalCost is minimum, wherein the sum of the current fragments of all nodes in the target distributed memory database is equal to the sum of the fragments processed on all nodes in load balancing, TotalCost is the sum of the variances of the total time length of the fragments processed by each node in the target distributed memory database,
Cost
ktotal duration of fragment processing for node k, Cost
kIncluding processing node k processing the duration X of a fragment on node k
kkDuration of processing fragments of other nodes by t and node k
t is a first duration, ct is a second duration, n is the number of nodes in the target distributed memory database, X
kkProcessing the number of fragments stored on node k for node k, X
ikAnd processing the number of fragments stored on the node i for a node k, wherein the node k is any node in the target distributed memory database, and k is greater than or equal to 1.
In the implementation, assume that there are n nodes, taking Node k (kth Node, Node _ k) as an example, during load balancing, it is assumed that all nodes in the distributed memory database are to process fragments on Node _ k, and after scheduling, each Node processes the number of fragments on Node k, and we mark X ask1,Xk2,…,Xkn. Thus, we can get a matrix of n × n:
in the n × n matrix, each column corresponds to a node, the kth column corresponds to a node k, and the number of fragments to be processed by the node k in load balancing can be obtained by adding elements in the kth column, and the number of fragments stored in the node k can be obtained by adding elements in the kth row and corresponding to a fragment stored in a node (i.e., a current fragment in a node) in each row.
Assuming that the first duration is represented by t and the second duration is represented by ct, in load balancing, the duration required for processing all the fragments on the node k is:
in the formula (2), X
kkT is the time duration for processing node k to process its own fragment,
the length of time to process the shards of other nodes for node k.
The sum of the variances of the total durations of the nodes in the target distributed memory database may then be determined:
in the formula (3), the reaction mixture is,
in equation (4), meanCost represents an average value of the processing time of the fragments by each node in the target distributed memory database during load balancing.
Under the allocation of a certain fragment, the TotalCost is minimized, that is, the processing capability of each node in the target distributed memory database is considered to be exerted in a balanced manner, and load balancing is also achieved.
Optionally, on the principle of TotalCost minimum, the manner of calculating the number of fragments from each node in the fragments processed by each node may be as follows:
converting the TotalCost into a matrix form TotalCost ═ XTHX, wherein X ═ X (X)11,X12,...,X1n,...Xij,...,Xn1,Xn2,...,Xnn)T,XijProcessing the number of fragments on the ith node for the jth node, wherein i, j and n are positive integers, H is a symmetric matrix, and the sum of elements in X is less than or equal to the sum of the current fragments of all nodes in the target distributed memory database; with XTAnd determining the number of fragments from each node in the fragments processed by each node in load balancing by taking the minimum value of HX as a principle.
In practice, as can be seen from formula (4), X11,X12,...,X1n,...Xij,...,Xn1,Xn2,...,XnnAll are unknowns, so TotalCost is a polynomial with n x n unknowns, and each monomial of the polynomial is a quadratic, so TotalCost is a quadratic polynomial with n x n unknowns, based on the properties of the quadratic polynomial, there must be a symmetric matrix such that:
TotalCost=XTHX (5)
in formula (5), X ═ X11,X12,...,X1n,...Xij,...,Xn1,Xn2,...,Xnn)T。
Thus, based on equation (5), we can convert the load balancing problem into a quadratic programming problem with constraints, that is:
taking the minimum value of TotalCost: MinXTHX, and the requirement that HX is less than or equal to b is met.
The matrix H is:
in the formula (6), the reaction mixture is,
in formula (7), Y
nIndicates the number of current shards (i.e., stored shards) on the nth node during load balancing, i.e., the number of current shards
In formula (7), X
k=(X
k1,X
k2,...,X
kn)
T,Y
k=(N
k,0,...,0)
T。
Note that T represents transposition.
X in equation (7) can be determined by solving a quadratic optimal solution. In this way, the number of fragments from each node in the fragments processed by each node is determined.
It should be noted that the method for solving the quadratic programming optimal solution is already a conventional method in the prior art, and a lagrangian multiplier method may be used in the calculation process, which is not described herein again.
Step 203, for each node, sending the indication information of the node fragment to the node, so that the node acquires the fragment according to the indication information of the node fragment and processes the fragment.
In implementation, as shown in fig. 3, for each node, the management device may send the indication information of the fragment of the node to the node, and after the node receives the indication information, the node may enable the received indication information to obtain the fragment indicated by the indication information for processing.
Optionally, when the indication information is a start fragment identifier and an end fragment identifier in the fragments from each node, for a certain node, if the node is to acquire a fragment from another node for processing, the start fragment identifier and the end fragment identifier of the fragment from the another node may be determined based on the indication information, and then the fragments between the start fragment identifier and the end fragment identifier (the fragments including the fragment of the start fragment identifier and the fragment of the end fragment identifier) are acquired from a storage area of the another node for processing.
Optionally, when the indication information is the number of fragments from each node and the start fragment identifier, for a certain node, if the node is to acquire a fragment from another node for processing, the number of fragments from the another node and the start fragment identifier may be determined based on the indication information, and then the number of fragments identified by the start fragment and the number of fragments adjacent to the fragments identified by the start fragment identifier are acquired from the storage area of the another node and are reduced by one fragment for processing.
Optionally, when the indication information is the number of fragments from each node and the end fragment identifier, for a node, if the node is to acquire a fragment from another node for processing, the number of fragments from the another node and the end fragment identifier may be determined based on the indication information, and then the number of fragments before the fragment with the end fragment identifier and the fragment with the end fragment identifier is acquired from the storage area of the another node, minus one fragment, for processing.
It should be noted that, when acquiring the fragment of another node, the above-mentioned acquisition may be performed based on a metadata table, and a storage area of each node and a storage location of the fragment of each node are recorded in the metadata table.
It should be further noted that, in the distributed memory database, the storage area of each node is a logical storage area.
It should be noted that, in the above-mentioned acquiring the fragment of the other node for processing, only the data of the fragment is read from the other node for processing, and is not written into the other node, so that compared with the fragment migration in the prior art, the time for writing the data of the fragment can be saved. And when the time length for completing the processing of the fragments is longer than the time length for transmitting the data (namely the time length for transmitting the fragments to another node), the CPU occupancy rate of the local node can be reduced by rapidly completing the processing of the fragments by other idle nodes, so that the load balancing capability for the scene is stronger.
Based on thestep 201 being executed when the data query request is received instep 201, and thus the query result needs to be fed back to the user afterstep 203, the corresponding processing may be as follows:
receiving the result of processing the fragments of each node in the target distributed memory database; summarizing the results of processing the fragments of each node to obtain a total result; and feeding back the total result.
In implementation, after obtaining the fragments and performing query processing, each node may respectively feed back the query results to the management device, and after receiving the query results, the management device may collect the received query results to obtain a total result (i.e., a query result), and then feed back the collected total result to the user, so that the user can check the total result.
In the embodiment of the present invention, when determining load balancing of the distributed memory database, indication information of a fragment processed by each node during load balancing may be determined based on the number of current fragments of each node, an average duration of processing of a single fragment performed on a local node, and an average duration of processing of a single fragment performed on a non-local node in a cross-network manner, where the indication information is used to indicate a fragment from each node and notify each node, and each subsequent node may acquire a fragment for processing based on the indication information. In this way, the fragments to be processed calculated during load balancing are determined firstly, and then the fragments to be processed calculated during load balancing are processed by each node, so that load balancing can be ensured as much as possible, and the fragment processing efficiency is improved.
Based on the same technical concept, an embodiment of the present invention further provides a device for balancing a load of a distributed memory database, as shown in fig. 4, where the device includes:
an obtainingmodule 410, configured to obtain the number of current fragments of each node in a target distributed memory database and an identifier of the current fragments, and obtain a first duration and a second duration, where the first duration is an average duration for a single fragment to be processed on a local node, and the second duration is an average duration for a single fragment to be processed on a non-local node in a cross-network manner;
a determiningmodule 420, configured to determine, according to the number of current fragments of each node, an identifier of the current fragment of each node, the first time length, and the second time length, indication information of a fragment processed by each node in load balancing, where the indication information is used to indicate a fragment from each node;
a sendingmodule 430, configured to send, to each node, the indication information of the node fragment to the node, so that the node acquires the fragment according to the indication information of the node fragment and processes the fragment.
Optionally, the obtainingmodule 410 is configured to:
counting the time lengths of the first preset number of fragments which are respectively processed on the local node, and counting the time lengths of the second preset number of fragments which are respectively processed on the cross-network non-local node;
and determining that the counted average value of the time lengths of the first preset number of fragments respectively processed on the local nodes is a first time length, and determining that the counted average value of the time lengths of the second preset number of fragments respectively processed on the cross-network non-local nodes is a second time length.
Optionally, the determiningmodule 420 is configured to:
determining the number of fragments from each node in the fragments processed by each node during load balancing according to the number of current fragments of each node, the first time length and the second time length;
and determining the indication information of the fragments processed by each node according to the number of the fragments from each node in the fragments processed by each node and the identification of the current fragment of each node.
Optionally, the indication information includes:
starting fragment identification and ending fragment identification in fragments from each node; or,
the number of fragments from each node and the starting fragment identifier; or,
the number of fragments from each node and the end fragment identification.
Optionally, the determiningmodule 420 is configured to:
determining the number of fragments from each node in the fragments processed by each node in load balancing on the basis of the minimum value of TotalCost, wherein the sum of the current numbers of the fragments of all nodes in the target distributed memory database is equal to the sum of the numbers of the fragments processed on all nodes in load balancing, TotalCost is the sum of the variances of the total time lengths of the fragments processed by each node in the target distributed memory database,
Cost
ktotal duration of fragment processing for node k, Cost
kIncluding processing the duration X of the processing of the fragment on the node k by the node k
kkT and the time length of the node k for processing the fragments of other nodes
t is a first duration, ct is a second duration, n is the number of nodes in the target distributed memory database, X
kkProcessing the number of fragments stored on the node k for the node k, X
ikAnd processing the number of fragments stored on the node i for the node k, wherein the node k is any node in the target distributed memory database, and k is greater than or equal to 1.
Optionally, the determiningmodule 420 is configured to:
converting the TotalCost into a matrix form TotalCost ═ XTHX, wherein X ═ X (X)11,X12,...,X1n,...Xij,...,Xn1,Xn2,...,Xnn)T,XijProcessing the number of fragments on the ith node for the jth node, wherein i, j and n are positive integers, H is a symmetric matrix, and the sum of elements in X is less than or equal to the target fragmentThe sum of the current fragmentation numbers of all nodes in the distributed memory database;
with XTAnd determining the number of fragments from each node in the fragments processed by each node in load balancing by taking the minimum value of HX as a principle.
Optionally, the obtainingmodule 410 is configured to:
when a data query request is received, acquiring the number of current fragments of each node in a target distributed memory database;
as shown in fig. 5, the apparatus further includes:
areceiving module 440, configured to receive a result of processing the fragment by each node in the target distributed memory database; summarizing the results of processing the fragments of each node to obtain a total result;
the sendingmodule 430 is further configured to feed back the total result.
In the embodiment of the present invention, when balancing the load of the distributed memory database, the indication information of the fragment processed by each node in load balancing may be determined based on the number of the current fragments of each node, the average duration of the processing of the single fragment on the local node, and the average duration of the processing of the single fragment on the non-local node in a cross-network manner, where the indication information is used to indicate the fragment from each node and notify each node, and each subsequent node may acquire the fragment for processing based on the indication information. In this way, the fragments to be processed calculated during load balancing are determined firstly, and then the fragments to be processed calculated during load balancing are processed by each node, so that load balancing can be ensured as much as possible, and the fragment processing efficiency is improved.
It should be noted that: in the apparatus for balancing the load of the distributed memory database provided in the foregoing embodiment, when balancing the load of the distributed memory database, only the division of each functional module is used for illustration, and in practical applications, the function distribution may be completed by different functional modules according to needs, that is, the internal structure of the apparatus is divided into different functional modules, so as to complete all or part of the functions described above. In addition, the apparatus for balancing the load of the distributed memory database and the method embodiment for balancing the load of the distributed memory database provided in the above embodiments belong to the same concept, and specific implementation processes thereof are detailed in the method embodiment and are not described herein again.
Fig. 6 is a schematic structural diagram of a management device according to an embodiment of the present invention, where themanagement device 600 may generate a relatively large difference due to different configurations or performances, and may include one or more processors (CPUs) 601 and one ormore memories 602, where thememory 602 stores at least one instruction, and the at least one instruction is loaded and executed by theprocessor 601 to implement the method for balancing the load of the distributed memory database.
The embodiment of the invention also provides a computer readable storage medium, wherein a computer program is stored in the storage medium, and when being executed by a processor, the computer program realizes the method for balancing the load of the distributed memory database.
The embodiment of the invention also provides a management device, which comprises a processor and a memory, wherein the memory is used for storing the computer program; the processor is used for executing the program stored in the memory and realizing the method for balancing the load of the distributed memory database.
It will be understood by those skilled in the art that all or part of the steps for implementing the above embodiments may be implemented by hardware, or may be implemented by a program instructing relevant hardware, where the program may be stored in a computer-readable storage medium, and the above-mentioned storage medium may be a read-only memory, a magnetic disk or an optical disk, etc.
The above description is only for the purpose of illustrating the preferred embodiments of the present invention and is not to be construed as limiting the invention, and any modifications, equivalents, improvements and the like that fall within the spirit and principle of the present invention are intended to be included therein.