Movatterモバイル変換


[0]ホーム

URL:


CN105718244A - Streamline data shuffle Spark task scheduling and executing method - Google Patents

Streamline data shuffle Spark task scheduling and executing method
Download PDF

Info

Publication number
CN105718244A
CN105718244ACN201610029211.7ACN201610029211ACN105718244ACN 105718244 ACN105718244 ACN 105718244ACN 201610029211 ACN201610029211 ACN 201610029211ACN 105718244 ACN105718244 ACN 105718244A
Authority
CN
China
Prior art keywords
task
shuffling
transmission
data
stage
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Granted
Application number
CN201610029211.7A
Other languages
Chinese (zh)
Other versions
CN105718244B (en
Inventor
付周望
张未雨
戚正伟
管海兵
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
Shanghai Jiao Tong University
Original Assignee
Shanghai Jiao Tong University
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Shanghai Jiao Tong UniversityfiledCriticalShanghai Jiao Tong University
Priority to CN201610029211.7ApriorityCriticalpatent/CN105718244B/en
Publication of CN105718244ApublicationCriticalpatent/CN105718244A/en
Application grantedgrantedCritical
Publication of CN105718244BpublicationCriticalpatent/CN105718244B/en
Activelegal-statusCriticalCurrent
Anticipated expirationlegal-statusCritical

Links

Classifications

Landscapes

Abstract

Translated fromChinese

本发明公开了一种流水化数据洗牌传输的Spark任务调度与执行方法,从后向前提交并执行阶段和其中的任务,同时采用前驱任务发送执行结果到后继任务的内存,在不改变用户接口,不破坏阶段的完整性和容错性的同时,解决了原本Spark在不同阶段(Stage)中洗牌数据传输(Shuffle)的磁盘读写开销,从而降低了分布式计算程序在Spark上的运行时间。

The invention discloses a Spark task scheduling and execution method for streamlined data shuffling and transmission, which submits and executes stages and tasks in them from the back to the front, and at the same time uses the predecessor task to send the execution result to the memory of the successor task, without changing the user Interface, without destroying the integrity and fault tolerance of the stage, it solves the original Spark's disk read and write overhead of shuffling data transmission (Shuffle) in different stages (Stage), thereby reducing the running of distributed computing programs on Spark time.

Description

Translated fromChinese
一种流水化数据洗牌传输的Spark任务调度与执行方法A Spark task scheduling and execution method for pipelined data shuffling and transmission

技术领域technical field

本发明涉及计算机分布式计算框架领域。具体来讲,主要是在分布式计算框架Spark的基础上改变他的任务调度机制,从而提升该计算框架的性能。The invention relates to the field of computer distributed computing framework. Specifically, it is mainly to change its task scheduling mechanism based on the distributed computing framework Spark, thereby improving the performance of the computing framework.

背景技术Background technique

Spark作为目前应用最广泛的分布式计算框架,已经被部署在了无数的数据中心中。它提出的分布式回弹数据集(ResilientDistributedDataset,RDD)使得大数据的计算过程最大可能的在内存中进行。在执行逻辑上,Spark按照用户程序的逻辑从前向后生成RDD,每个RDD都会有自己的依赖。当用户的程序需要最终的输出结果时,Spark就会从最后一个RDD递归向前寻找,并且按照其中存在的洗牌传输依赖(ShuffleDependency)来划分阶段(Stage)。在划分完阶段之后,Spark就会从前向后提交阶段,先提交没有缺失依赖的阶段,依次向后。这种调度逻辑使得数据自动流向需要计算的位置,并且让计算中间结果最大可能的保存在内存中。As the most widely used distributed computing framework, Spark has been deployed in countless data centers. The Resilient Distributed Dataset (RDD) proposed by it makes the calculation process of big data possible in memory. In terms of execution logic, Spark generates RDDs from front to back according to the logic of the user program, and each RDD will have its own dependencies. When the user's program needs the final output, Spark will search forward recursively from the last RDD, and divide the stage (Stage) according to the Shuffle Dependency that exists therein. After the stages are divided, Spark will submit the stages from front to back, submitting the stages without missing dependencies first, and then going backwards in turn. This scheduling logic allows data to automatically flow to the location that needs to be calculated, and allows the calculation of intermediate results to be stored in memory as much as possible.

但是为了保证阶段(Stage)之间的分割和本身框架的容错性,在划分每个阶段的洗牌传输依赖(ShuffleDependency)时,Spark会将前驱阶段产生的中间结果存储到磁盘中,然后开始分配下一个阶段的任务,之后再由阶段的任务去远程读取磁盘上的数据,然后进行计算。However, in order to ensure the segmentation between stages and the fault tolerance of its own framework, when dividing the Shuffle Dependency of each stage, Spark will store the intermediate results generated by the predecessor stage in the disk, and then start to allocate The task of the next stage, and then the task of the stage will remotely read the data on the disk, and then perform calculations.

在目前磁盘速度远远慢于内存的现实中,这部分的数据读写成为了限制Spark性能提升的最大瓶颈。然而碍于阶段的划分的完整性以及容错性,目前仍然没有对于优化这部分瓶颈的补丁或者解决方案出现。In the current reality that the speed of the disk is much slower than that of the memory, this part of the data read and write has become the biggest bottleneck restricting the performance improvement of Spark. However, due to the integrity and fault tolerance of the stage division, there are still no patches or solutions to optimize this part of the bottleneck.

发明内容Contents of the invention

本发明针对在Spark数据洗牌传输时读写硬盘的瓶颈,提出了一种流水化数据洗牌传输(Shuffle)的Spark任务调度方法,通过改变Spark任务的提交顺序,使得一个任务先于他的前驱任务开始被调度分发,同时采用前驱任务发送执行结果到后继任务的内存中的方法,在不改变用户接口,不破坏阶段的完整性和容错性的同时,解决了原本Spark在不同阶段(Stage)中洗牌数据传输(Shuffle)的磁盘读写开销。使得前驱任务在执行完的产生中间结果的同时通过网络将数据发送给后继任务,从而避免了磁盘IO的读写,提升Spark分布式计算框架的性能。Aiming at the bottleneck of reading and writing the hard disk when Spark data is shuffled and transmitted, the present invention proposes a Spark task scheduling method for pipelined data shuffled transmission (Shuffle), by changing the submission order of Spark tasks, a task is prior to his The predecessor task starts to be scheduled and distributed, and at the same time, the method of sending the execution result of the predecessor task to the memory of the successor task is adopted, without changing the user interface, and without destroying the integrity and fault tolerance of the stage, it solves the problem of the original Spark in different stages (Stage ) The disk read and write overhead of shuffling data transmission (Shuffle). This enables the predecessor tasks to send data to the successor tasks through the network while generating intermediate results after execution, thereby avoiding disk IO read and write, and improving the performance of the Spark distributed computing framework.

本发明的技术解决方案如下:Technical solution of the present invention is as follows:

一种流水化数据洗牌传输的Spark任务调度与执行方法,包括如下步骤:A Spark task scheduling and execution method for pipelined data shuffling and transmission, comprising the following steps:

步骤1:当Spark提交一个任务且该任务被划分成多个阶段进行提交时,首先找到用户执行任务生成结果的最后一个阶段;Step 1: When Spark submits a task and the task is divided into multiple stages for submission, first find the last stage in which the user executes the task to generate results;

步骤2:从最后一个阶段开始,判断该阶段是否包含未完成的前驱阶段:Step 2: Starting from the last stage, determine whether this stage contains an unfinished precursor stage:

如果这个阶段的前驱阶段全部执行完成,则提交该阶段进行执行;If the predecessor phases of this phase are all executed, submit this phase for execution;

如果有前驱阶段没有被执行,则将该阶段标记为等待,同时提交该阶段进行执行,并且递归提交该阶段的前驱阶段;If there is a predecessor stage that has not been executed, mark the stage as waiting, submit the stage for execution at the same time, and recursively submit the predecessor stage of the stage;

步骤3:在提交一个阶段进行执行之后,调度器将该阶段拆分成多个任务,并判断该阶段:是否为等待阶段:Step 3: After submitting a phase for execution, the scheduler splits the phase into multiple tasks and judges whether the phase is a waiting phase:

如果该阶段被标记为等待,则调度器向资源管理器请求与任务个数相同的空闲执行节点,调度器获取相应执行任务的执行节点之后,根据该阶段包含的分布式回弹数据集的依赖关系递归向前寻找洗牌传输依赖,调度器每找到一个洗牌传输依赖就会向的映射输出追踪表注册该次洗牌传输流水信息,在注册完成之后,调度器还会通知每一个即将要运行这个任务的执行节点准备好相应的内存来缓存他们前驱任务发送的中间结果;每个执行节点收到调度器的注册信息之后,会在本地缓存中新建一个以洗牌传输依赖ID为索引,值为规约数据块总数个的缓存数组的键值对,同时还会在本地新建一个以洗牌传输依赖ID为索引,值为规约数据块总数的信号量数据结构的键值对,其中每个信号量包含这次洗牌依赖的洗牌任务映射总数,;If the stage is marked as waiting, the scheduler requests from the resource manager the same number of idle execution nodes as the number of tasks. The relationship recursively looks forward for the shuffling transfer dependency. Whenever the scheduler finds a shuffling transfer dependency, it will register the shuffling transfer flow information with the mapping output tracking table. After the registration is completed, the scheduler will also notify each upcoming transfer. The execution nodes running this task prepare corresponding memory to cache the intermediate results sent by their predecessor tasks; after each execution node receives the registration information of the scheduler, it will create a new one in the local cache with the shuffle transmission dependency ID as the index, The value is the key-value pair of the cache array of the total number of data blocks in the specification, and at the same time, a key-value pair of the semaphore data structure whose value is the total number of data blocks in the specification will be created locally, indexed by the shuffle transmission dependency ID, where each The semaphore contains the total number of shuffling task mappings that this shuffling depends on;

否则,直接进入下一步;Otherwise, go directly to the next step;

步骤4:在调度器封装阶段的任务集合时,判断该阶段是否是一个洗牌映射阶段:Step 4: When the scheduler encapsulates the task set of the stage, determine whether the stage is a shuffle mapping stage:

如果该阶段是一个洗牌映射阶段,则对该阶段中的每个任务都设置对应的洗牌传输依赖ID;If the stage is a shuffle mapping stage, set the corresponding shuffle transfer dependency ID for each task in the stage;

否则,直接进入下一步;Otherwise, go directly to the next step;

步骤5:调度器将封装好的任务分发给各个执行节点;Step 5: The scheduler distributes the packaged tasks to each execution node;

步骤6:当任务被分配到每个执行节点的执行器上进行执行时,执行器会判断这个任务是否为洗牌映射任务:Step 6: When the task is assigned to the executor of each execution node for execution, the executor will judge whether the task is a shuffling mapping task:

如果是,则根据该任务包含的洗牌传输依赖ID,向映射输出追踪表请求该ID对应的规约任务的执行节点的集合信息,然后,设置洗牌映射任务的对应规约信息,将收到的结合信息中的规约数据块号和远程地址封装成一个哈希表传给该洗牌映射任务,并进入步骤7;If so, according to the shuffle transmission dependency ID contained in the task, request the set information of the execution nodes of the protocol task corresponding to the ID from the mapping output tracking table, and then set the corresponding protocol information of the shuffle mapping task, and send the received Combining the protocol data block number and the remote address in the information into a hash table and passing it to the shuffling mapping task, and entering step 7;

如果执行器判断该任务为规约任务,调用该规约任务对应的函数进行计算,并进入步骤11;If the executor determines that the task is a statute task, call the function corresponding to the statute task for calculation, and enter step 11;

步骤7:当一个洗牌映射任务开始执行时,会检查是否需要流水化数据输出;Step 7: When a shuffling mapping task starts to execute, it will check whether pipelined data output is required;

如果需要,首先根据用户指定的分类器或者Spark默认的分类器将中间结果键值对按照键计算出他对应的规约数据块号,根据设置的数据块号和远程地址哈希表,将计算获得的数据结果发送给对应的负责后继规约任务的执行节点,发送的信息包括:洗牌传输依赖ID,规约数据块号,数据结果的键值对;在发送数据的同时,执行器向磁盘写入数据,并进入步骤8;同时,当负责规约任务的执行节点收到流水数据之后,会将洗牌数据依赖ID作为索引,保存该ID对应的缓存数组的第规约数据块号的缓存中,进入步骤8;If necessary, first calculate the intermediate result key-value pair according to the key according to the user-specified classifier or Spark’s default classifier to calculate its corresponding specification data block number, and then calculate according to the set data block number and remote address hash table The data results are sent to the corresponding execution nodes responsible for subsequent protocol tasks. The information sent includes: shuffling transmission dependency ID, protocol data block number, and key-value pairs of data results; while sending data, the executor writes to the disk data, and enter step 8; at the same time, when the execution node in charge of the statute task receives the flow data, it will use the shuffle data dependency ID as an index, and save the cache of the first statute data block number of the cache array corresponding to the ID, and enter Step 8;

如果不需要,则直接将执行结果写入磁盘,进入步骤8;If not needed, directly write the execution result to the disk and go to step 8;

步骤8:执行器完成洗牌映射任务;Step 8: The executor completes the shuffling and mapping task;

步骤9:当一个洗牌映射任务运行结束时,就会向所有的负责规约任务的执行节点发送流水结束信息,该信息包括:洗牌传输依赖ID,该任务负责的映射数据块号,和该条信息对应的执行节点负责的规约数据块号;Step 9: When a shuffle mapping task finishes running, it will send a pipeline end message to all execution nodes responsible for the statute task, which includes: shuffle transfer dependency ID, the mapping data block number that the task is responsible for, and the The protocol data block number that the execution node corresponding to each piece of information is responsible for;

步骤10:当负责规约任务的执行节点收到流水结束的信息后,会根据洗牌传输依赖ID作为索引,找到该ID对应的信号量数组,将其中第规约数据块号个CountDownLatch减一。如果这个信号量被减到0,则表示该规约数据块依赖的数据映射全部传输结束;Step 10: When the execution node responsible for the protocol task receives the message of the end of the pipeline, it will use the shuffled transmission dependency ID as an index to find the semaphore array corresponding to the ID, and subtract one from the CountDownLatch of the protocol data block number. If this semaphore is reduced to 0, it means that all the data mappings that the protocol data block depends on have been transferred;

步骤11:当执行器执行规约任务的指定函数时,会调用相应的规约函数,该函数在执行任务读取数据时,会向执行节点请求一个读入数据的迭代器;Step 11: When the executor executes the specified function of the specification task, it will call the corresponding specification function, which will request an iterator for reading data from the execution node when executing the task to read data;

步骤12:在生成迭代器的时候会向执行节点询问这次洗牌传输是否有本地缓存,即是否被流水化数据传输:Step 12: When generating the iterator, the execution node will be asked whether there is a local cache for this shuffling transmission, that is, whether it is pipelined data transmission:

如果是,则调用执行节点的获取缓存方法,根据规约任务的洗牌传输依赖ID和该任务负责的规约数据块号向执行节点请求缓存,并进入步骤13;If so, call the execution node's acquisition cache method, request the cache from the execution node according to the shuffle transfer dependency ID of the protocol task and the protocol data block number responsible for the task, and enter step 13;

否则,读取远程数据,进入步骤15;Otherwise, read the remote data and go to step 15;

步骤13:执行节点收到获取缓存的调用后,就会以洗牌传输依赖ID为索引,找到缓存中对应的缓存数组,并且返回第规约数据块号个缓存的异步引用;Step 13: After the execution node receives the call to obtain the cache, it will use the shuffle transfer dependency ID as an index to find the corresponding cache array in the cache, and return the asynchronous reference to the cache of the protocol data block number;

步骤14:在迭代器收到缓存的异步引用时,开始等待,直到该任务所需的洗牌传输依赖的规约数据块号中的CountDownLatch信号量变为0,表示该规约数据块所依赖的映射数据块全部完成,进入步骤15;Step 14: When the iterator receives the cached asynchronous reference, it starts to wait until the CountDownLatch semaphore in the protocol data block number on which the shuffling transmission depends on the task becomes 0, indicating that the mapping data that the protocol data block depends on All blocks are completed, go to step 15;

步骤15:执行器执行指定规约函数。Step 15: The executor executes the designated reduction function.

与现有技术相比,本发明的有益效果是:能够大幅缩短Spark任务执行过程中数据洗牌传输的时间,缩短分布式计算任务的执行时间。Compared with the prior art, the invention has the beneficial effects of greatly shortening the data shuffling and transmission time during the execution of the Spark task, and shortening the execution time of the distributed computing task.

附图说明Description of drawings

图1.架构示意图Figure 1. Schematic diagram of the architecture

图2.调度器调度方法流程图Figure 2. Flowchart of scheduler scheduling method

图3.执行节点流程图Figure 3. Execution Node Flowchart

图4.流水注册信息Figure 4. Streaming registration information

图5.流水通知信息Figure 5. Flow notification information

图6.流水传输信息Figure 6. Pipeline transmission information

图7.流水结束信息Figure 7. Flow end information

图8.执行节点缓存架构Figure 8. Execution node cache architecture

图9.执行节点信号控制架构Figure 9. Execution node signal control architecture

具体实施方法Specific implementation method

以下将结合附图对本发明的实施例做详细说明。本实施例在本发明技术方案和算法的前提下进行实施,并给出详细实施方式和具体操作过程,但是适用平台不限于下述实施例。只要集群兼容原版的Spark就能运行本发明。本实例的具体操作平台是由两台普通服务器组成的小型集群,每个服务器上装有UbuntuServer14.04.1LTS64bit,并且配备8GB内存。本发明的具体开发是基于ApacheSpark1.5的源码版本。Embodiments of the present invention will be described in detail below in conjunction with the accompanying drawings. This embodiment is implemented on the premise of the technical solution and algorithm of the present invention, and provides detailed implementation and specific operation process, but the applicable platform is not limited to the following embodiments. As long as the cluster is compatible with the original Spark, the invention can be run. The specific operating platform of this example is a small cluster composed of two ordinary servers, each server is equipped with UbuntuServer14.04.1LTS64bit, and is equipped with 8GB memory. The specific development of the present invention is based on the source code version of Apache Spark1.5.

如图1所示,本发明基于Spark原有的框架包括调度器(DAGScheduler),资源管理器(BlockMangerMaster),映射输出追踪表(MapOutputTracker)以及执行节点(BlockManger)和执行器(Executor),通过改变任务调度算法和执行流程来实现不破坏阶段的完成性和容错性的基础上的洗牌数据传输的流水化。调度器和执行节点分别按照附图2与附图3中的流程运行,从而实现分布式计算性能的提升。As shown in Figure 1, the original framework of the present invention based on Spark includes a scheduler (DAGScheduler), a resource manager (BlockMangerMaster), a mapping output tracking table (MapOutputTracker), an execution node (BlockManger) and an executor (Executor), by changing The task scheduling algorithm and execution process are used to realize the streamlining of shuffling data transmission on the basis of not destroying the completion and fault tolerance of the stage. The scheduler and the execution node run according to the processes in Figure 2 and Figure 3 respectively, so as to improve the performance of distributed computing.

每台服务器上都部署了包含本发明的Spark,其中一台服务器作为Spark集群的Master,另外一台机器作为集群的Slave。值得注意的是为了保证本发明的性能,部署的集群应答配置比原始Spark集群更大的内存,具体的内存大小视运行的数据量大小而定。The Spark containing the present invention is deployed on each server, wherein one server is used as the Master of the Spark cluster, and the other machine is used as the Slave of the cluster. It is worth noting that in order to ensure the performance of the present invention, the deployed cluster responds with a larger memory than the original Spark cluster, and the specific memory size depends on the size of the running data.

当部署完成之后即可以按照Spark的运行方式运行分布式计算应用。改动对于使用Spark计算框架的用户完全透明。After the deployment is complete, distributed computing applications can be run in the same way as Spark. The changes are completely transparent to users using the Spark computing framework.

一种流水化数据洗牌传输的Spark任务调度与执行方法,包括如下步骤:A Spark task scheduling and execution method for pipelined data shuffling and transmission, comprising the following steps:

步骤1:当Spark提交一个任务且该任务被划分成多个阶段进行提交时,首先找到用户执行任务生成结果的最后一个阶段;Step 1: When Spark submits a task and the task is divided into multiple stages for submission, first find the last stage in which the user executes the task to generate results;

步骤2:从最后一个阶段开始,判断该阶段是否包含未完成的前驱阶段:Step 2: Starting from the last stage, determine whether this stage contains an unfinished precursor stage:

如果这个阶段的前驱阶段全部执行完成,则提交该阶段进行执行;If the predecessor phases of this phase are all executed, submit this phase for execution;

如果有前驱阶段没有被执行,则将该阶段标记为等待,同时提交该阶段进行执行,并且递归提交该阶段的前驱阶段;If there is a predecessor stage that has not been executed, mark the stage as waiting, submit the stage for execution at the same time, and recursively submit the predecessor stage of the stage;

步骤3:在提交一个阶段进行执行之后,调度器将该阶段拆分成多个任务,并判断该阶段:是否为等待阶段:Step 3: After submitting a phase for execution, the scheduler splits the phase into multiple tasks and judges whether the phase is a waiting phase:

如果该阶段被标记为等待,则调度器向资源管理器请求与任务个数相同的空闲执行节点,调度器获取相应执行任务的执行节点之后,根据该阶段包含的分布式回弹数据集的依赖关系递归向前寻找洗牌传输依赖,调度器每找到一个洗牌传输依赖就会向的映射输出追踪表注册该次洗牌传输流水信息,在注册完成之后,调度器还会通知每一个即将要运行这个任务的执行节点准备好相应的内存来缓存他们前驱任务发送的中间结果;每个执行节点收到调度器的注册信息之后,会在本地缓存中新建一个以洗牌传输依赖ID为索引,值为规约数据块总数个的缓存数组的键值对,同时还会在本地新建一个以洗牌传输依赖ID为索引,值为规约数据块总数的信号量数据结构的键值对,其中每个信号量包含这次洗牌依赖的洗牌任务映射总数,;If the stage is marked as waiting, the scheduler requests from the resource manager the same number of idle execution nodes as the number of tasks. The relationship recursively looks forward for the shuffling transfer dependency. Whenever the scheduler finds a shuffling transfer dependency, it will register the shuffling transfer flow information with the mapping output tracking table. After the registration is completed, the scheduler will also notify each upcoming transfer. The execution nodes running this task prepare corresponding memory to cache the intermediate results sent by their predecessor tasks; after each execution node receives the registration information of the scheduler, it will create a new one in the local cache with the shuffle transmission dependency ID as the index, The value is the key-value pair of the cache array of the total number of data blocks in the specification, and at the same time, a key-value pair of the semaphore data structure whose value is the total number of data blocks in the specification will be created locally, indexed by the shuffle transmission dependency ID, where each The semaphore contains the total number of shuffling task mappings that this shuffling depends on;

否则,直接进入下一步;Otherwise, go directly to the next step;

步骤4:在调度器封装阶段的任务集合时,判断该阶段是否是一个洗牌映射阶段:Step 4: When the scheduler encapsulates the task set of the stage, determine whether the stage is a shuffle mapping stage:

如果该阶段是一个洗牌映射阶段,则对该阶段中的每个任务都设置对应的洗牌传输依赖ID;If the stage is a shuffle mapping stage, set the corresponding shuffle transfer dependency ID for each task in the stage;

否则,直接进入下一步;Otherwise, go directly to the next step;

步骤5:调度器将封装好的任务分发给各个执行节点;Step 5: The scheduler distributes the packaged tasks to each execution node;

步骤6:当任务被分配到每个执行节点的执行器上进行执行时,执行器会判断这个任务是否为洗牌映射任务:Step 6: When the task is assigned to the executor of each execution node for execution, the executor will judge whether the task is a shuffling mapping task:

如果是,则根据该任务包含的洗牌传输依赖ID,向映射输出追踪表请求该ID对应的规约任务的执行节点的集合信息,然后,设置洗牌映射任务的对应规约信息,将收到的结合信息中的规约数据块号和远程地址封装成一个哈希表传给该洗牌映射任务,并进入步骤7;If so, according to the shuffle transmission dependency ID contained in the task, request the set information of the execution nodes of the protocol task corresponding to the ID from the mapping output tracking table, and then set the corresponding protocol information of the shuffle mapping task, and send the received Combining the protocol data block number and the remote address in the information into a hash table and passing it to the shuffling mapping task, and entering step 7;

如果执行器判断该任务为规约任务,调用该规约任务对应的函数进行计算,并进入步骤11;If the executor determines that the task is a statute task, call the function corresponding to the statute task for calculation, and enter step 11;

步骤7:当一个洗牌映射任务开始执行时,会检查是否需要流水化数据输出;Step 7: When a shuffling mapping task starts to execute, it will check whether pipelined data output is required;

如果需要,首先根据用户指定的分类器或者Spark默认的分类器将中间结果键值对按照键计算出他对应的规约数据块号,根据设置的数据块号和远程地址哈希表,将计算获得的数据结果发送给对应的负责后继规约任务的执行节点,发送的信息包括:洗牌传输依赖ID,规约数据块号,数据结果的键值对;在发送数据的同时,执行器向磁盘写入数据,并进入步骤8;同时,当负责规约任务的执行节点收到流水数据之后,会将洗牌数据依赖ID作为索引,保存该ID对应的缓存数组的第规约数据块号的缓存中,进入步骤8;If necessary, first calculate the intermediate result key-value pair according to the key according to the user-specified classifier or Spark’s default classifier to calculate its corresponding specification data block number, and then calculate according to the set data block number and remote address hash table The data results are sent to the corresponding execution nodes responsible for subsequent protocol tasks. The information sent includes: shuffling transmission dependency ID, protocol data block number, and key-value pairs of data results; while sending data, the executor writes to the disk data, and enter step 8; at the same time, when the execution node in charge of the statute task receives the flow data, it will use the shuffle data dependency ID as an index, and save the cache of the first statute data block number of the cache array corresponding to the ID, and enter Step 8;

如果不需要,则直接将执行结果写入磁盘,进入步骤8;If not needed, directly write the execution result to the disk and go to step 8;

步骤8:执行器完成洗牌映射任务;Step 8: The executor completes the shuffling and mapping task;

步骤9:当一个洗牌映射任务运行结束时,就会向所有的负责规约任务的执行节点发送流水结束信息,该信息包括:洗牌传输依赖ID,该任务负责的映射数据块号,和该条信息对应的执行节点负责的规约数据块号;Step 9: When a shuffle mapping task finishes running, it will send a pipeline end message to all execution nodes responsible for the statute task, which includes: shuffle transfer dependency ID, the mapping data block number that the task is responsible for, and the The protocol data block number that the execution node corresponding to each piece of information is responsible for;

步骤10:当负责规约任务的执行节点收到流水结束的信息后,会根据洗牌传输依赖ID作为索引,找到该ID对应的信号量数组,将其中第规约数据块号个CountDownLatch减一。如果这个信号量被减到0,则表示该规约数据块依赖的数据映射全部传输结束;Step 10: When the execution node responsible for the protocol task receives the message of the end of the pipeline, it will use the shuffled transmission dependency ID as an index to find the semaphore array corresponding to the ID, and subtract one from the CountDownLatch of the protocol data block number. If this semaphore is reduced to 0, it means that all the data mappings that the protocol data block depends on have been transferred;

步骤11:当执行器执行规约任务的指定函数时,会调用相应的规约函数,该函数在执行任务读取数据时,会向执行节点请求一个读入数据的迭代器;Step 11: When the executor executes the specified function of the specification task, it will call the corresponding specification function, which will request an iterator for reading data from the execution node when executing the task to read data;

步骤12:在生成迭代器的时候会向执行节点询问这次洗牌传输是否有本地缓存,即是否被流水化数据传输:Step 12: When generating the iterator, the execution node will be asked whether there is a local cache for this shuffling transmission, that is, whether it is pipelined data transmission:

如果是,则调用执行节点的获取缓存方法,根据规约任务的洗牌传输依赖ID和该任务负责的规约数据块号向执行节点请求缓存,并进入步骤13;If so, call the execution node's acquisition cache method, request the cache from the execution node according to the shuffle transfer dependency ID of the protocol task and the protocol data block number responsible for the task, and enter step 13;

否则,读取远程数据,进入步骤15;Otherwise, read the remote data and go to step 15;

步骤13:执行节点收到获取缓存的调用后,就会以洗牌传输依赖ID为索引,找到缓存中对应的缓存数组,并且返回第规约数据块号个缓存的异步引用;Step 13: After the execution node receives the call to obtain the cache, it will use the shuffle transfer dependency ID as an index to find the corresponding cache array in the cache, and return the asynchronous reference to the cache of the protocol data block number;

步骤14:在迭代器收到缓存的异步引用时,开始等待,直到该任务所需的洗牌传输依赖的规约数据块号中的CountDownLatch信号量变为0,表示该规约数据块所依赖的映射数据块全部完成,进入步骤15;Step 14: When the iterator receives the cached asynchronous reference, it starts to wait until the CountDownLatch semaphore in the protocol data block number on which the shuffling transmission depends on the task becomes 0, indicating that the mapping data that the protocol data block depends on All blocks are completed, go to step 15;

步骤15:执行器执行指定规约函数。Step 15: The executor executes the designated reduction function.

在执行以上步骤时,其中任一一个步骤出错都会触发容错性机制:如果错误发生于一个流水化的洗牌传输的前驱任务,即洗牌映射任务的步骤,以及该步骤之前的任一步骤,那么他的后继都会被标记为失败并且重新提交,继续执行流水化的洗牌传输;如果错误发生于一个流水化的洗牌传输的后继任务,即规约任务的执行步骤,那么前驱任务并不会受到影响,而失败的后继任务会被重新提交,此时重新提交的后继任务就会从前驱任务的磁盘中去读取所需要的数据。When executing the above steps, an error in any one of the steps will trigger the fault tolerance mechanism: if the error occurs in the predecessor task of a pipelined shuffling transmission, that is, the step of the shuffling mapping task, and any step before this step , then its successor will be marked as failed and resubmitted, and continue to execute the pipelined shuffle transmission; if the error occurs in a successor task of a pipelined shuffle transmission, that is, the execution step of the specification task, then the predecessor task does not will be affected, and the failed successor task will be resubmitted. At this time, the resubmitted successor task will read the required data from the disk of the predecessor task.

由于本实施例在执行洗牌映射任务的同时将数据发送到等待执行规约任务的节点,从而将原本Spark洗牌传输的等待时间隐藏起来。当洗牌映射任务结束时,规约任务就能在较原先更短的时间内启动,从而加快整个分布式计算的速度。Since this embodiment sends data to the nodes waiting to execute the protocol task while executing the shuffling mapping task, the waiting time of the original Spark shuffling transmission is hidden. When the shuffling and mapping task ends, the specification task can be started in a shorter time than before, thus speeding up the entire distributed computing.

在本次实施例的基础上通过WordCount等相关Spark的benchmark程序,验证了本发明的正确性,同时本发明在性能上相较于原版的Spark在不同的benchmark程序上都有不同程度的提升。On the basis of this embodiment, the correctness of the present invention has been verified by benchmark programs related to Spark such as WordCount, and the performance of the present invention has been improved in different degrees compared with the original Spark in different benchmark programs.

以上详细描述了本发明的较佳具体实施例。应当理解,本领域的普通技术无需创造性劳动就可以根据本发明的构思做出诸多修改和变化。因此,凡本技术领域中技术人员依本发明的构思在现有技术的基础上通过逻辑分析、推理或者有限的实验可以得到的技术方案,皆应在由权利要求书所确定的保护范围内。The preferred specific embodiments of the present invention have been described in detail above. It should be understood that those skilled in the art can make many modifications and changes according to the concept of the present invention without creative efforts. Therefore, all technical solutions that can be obtained by those skilled in the art based on the concept of the present invention through logical analysis, reasoning or limited experiments on the basis of the prior art shall be within the scope of protection defined by the claims.

Claims (6)

If this stage is marked as wait, then scheduler performs node to the free time that explorer request is identical with task number, after scheduler obtains the corresponding execution node performing task, the dependence recurrence of the distributed rebound data collection comprised according to this stage is found forward transmission of shuffling and is relied on, scheduler often find one shuffle transmission rely on will to mapping output trace table register this time shuffle transmission flowing water information, after registration is complete, scheduler is also notified that each execution node that namely will run this task gets out corresponding internal memory and carrys out the intermediate object program that their predecessor task of buffer memory sends;After each execution node receives the log-on message of scheduler, can in local cache newly-built one with shuffle transmission rely on ID for index, value is the key-value pair of the always several buffer memory array of conventions data block, also can rely on ID for index at local newly-built one with transmission of shuffling simultaneously, value is the key-value pair of the semaphore data structure of conventions data block sum, wherein each semaphore comprises the duty mapping sum of shuffling relied on of shuffling specifically;
The need to, intermediate object program key-value pair is calculated the conventions data block number of his correspondence by the grader of the grader first specified according to user or Spark acquiescence according to key, according to the data block number arranged and remote address Hash table, the execution node that the data result obtained is sent to the responsible follow-up stipulations task of correspondence will be calculated, the information sent includes: transmission of shuffling relies on ID, conventions data block number, the key-value pair of data result;While sending data, executor is to disk write data, and enters step 8;Meanwhile, after the execution node of responsible stipulations task receives pipelined data, can, using the data dependence ID that shuffles as index, preserve in the buffer memory of conventions data block number of buffer memory array corresponding for this ID, enter step 8;
2. streamlined data according to claim 1 are shuffled Spark task scheduling and the execution method of transmission, it is characterized in that, in described step 1 to 15, an arbitrary step is made mistakes and all can be triggered fault-tolerance mechanism: if mistake betides the predecessor task of the transmission of shuffling of a streamlined, namely shuffle the step of mapping tasks, and the either step before this step, so his follow-up all can be marked as failure and resubmit, and continues executing with the transmission of shuffling of streamlined;If mistake betides the subsequent tasks of the transmission of shuffling of a streamlined, the i.e. execution step of stipulations task, so predecessor task can't be affected, and the subsequent tasks of failure can be resubmited, the subsequent tasks now resubmited will go to read required data from the disk of predecessor task.
CN201610029211.7A2016-01-182016-01-18A kind of streamlined data are shuffled Spark task schedulings and the execution method of transmissionActiveCN105718244B (en)

Priority Applications (1)

Application NumberPriority DateFiling DateTitle
CN201610029211.7ACN105718244B (en)2016-01-182016-01-18A kind of streamlined data are shuffled Spark task schedulings and the execution method of transmission

Applications Claiming Priority (1)

Application NumberPriority DateFiling DateTitle
CN201610029211.7ACN105718244B (en)2016-01-182016-01-18A kind of streamlined data are shuffled Spark task schedulings and the execution method of transmission

Publications (2)

Publication NumberPublication Date
CN105718244Atrue CN105718244A (en)2016-06-29
CN105718244B CN105718244B (en)2018-01-12

Family

ID=56147869

Family Applications (1)

Application NumberTitlePriority DateFiling Date
CN201610029211.7AActiveCN105718244B (en)2016-01-182016-01-18A kind of streamlined data are shuffled Spark task schedulings and the execution method of transmission

Country Status (1)

CountryLink
CN (1)CN105718244B (en)

Cited By (17)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
CN106168963A (en)*2016-06-302016-11-30北京金山安全软件有限公司Real-time streaming data processing method and device and server
CN106371919A (en)*2016-08-242017-02-01上海交通大学Shuffle data caching method based on mapping-reduction calculation model
CN107612886A (en)*2017-08-152018-01-19中国科学院大学A kind of Spark platforms Shuffle process compresses algorithm decision-making techniques
CN107885587A (en)*2017-11-172018-04-06清华大学A kind of executive plan generation method of big data analysis process
CN109951556A (en)*2019-03-272019-06-28联想(北京)有限公司A kind of Spark task processing method and system
CN110083441A (en)*2018-01-262019-08-02中兴飞流信息科技有限公司A kind of distributed computing system and distributed computing method
CN110109747A (en)*2019-05-212019-08-09北京百度网讯科技有限公司Method for interchanging data and system, server based on Apache Spark
CN110134714A (en)*2019-05-222019-08-16东北大学 A distributed computing framework cache index suitable for iterative computing of big data
CN110750341A (en)*2018-07-242020-02-04深圳市优必选科技有限公司 Task scheduling method, device, system, terminal device and storage medium
CN111061565A (en)*2019-12-122020-04-24湖南大学Two-stage pipeline task scheduling method and system in Spark environment
CN111258785A (en)*2020-01-202020-06-09北京百度网讯科技有限公司 Data shuffling method and device
CN111782367A (en)*2020-06-302020-10-16北京百度网讯科技有限公司 Distributed storage method and apparatus, electronic device, computer readable medium
CN112269648A (en)*2020-11-132021-01-26北京轩宇信息技术有限公司Parallel task allocation method and device for multi-stage program analysis
CN113364603A (en)*2020-03-062021-09-07华为技术有限公司Fault recovery method of ring network and physical node
CN113495679A (en)*2020-04-012021-10-12孟彤Optimization method for large data storage access and processing based on nonvolatile storage medium
CN114448660A (en)*2021-12-162022-05-06国网江苏省电力有限公司电力科学研究院 An Internet of Things data access method
CN117891391A (en)*2023-12-142024-04-16中科驭数(北京)科技有限公司 A data reading and writing method, system and storage medium for data shuffling and transmission process of Spark application

Citations (4)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
US20130346988A1 (en)*2012-06-222013-12-26Microsoft CorporationParallel data computing optimization
CN103605576A (en)*2013-11-252014-02-26华中科技大学Multithreading-based MapReduce execution system
US20140358977A1 (en)*2013-06-032014-12-04Zettaset, Inc.Management of Intermediate Data Spills during the Shuffle Phase of a Map-Reduce Job
CN104750482A (en)*2015-03-132015-07-01合一信息技术(北京)有限公司Method for constructing dynamic script execution engine based on MapReduce

Patent Citations (4)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
US20130346988A1 (en)*2012-06-222013-12-26Microsoft CorporationParallel data computing optimization
US20140358977A1 (en)*2013-06-032014-12-04Zettaset, Inc.Management of Intermediate Data Spills during the Shuffle Phase of a Map-Reduce Job
CN103605576A (en)*2013-11-252014-02-26华中科技大学Multithreading-based MapReduce execution system
CN104750482A (en)*2015-03-132015-07-01合一信息技术(北京)有限公司Method for constructing dynamic script execution engine based on MapReduce

Non-Patent Citations (2)

* Cited by examiner, † Cited by third party
Title
WEIKUAN YU等: "Virtual shuffling for efficient data movement in mapduce", 《IEEE TRANSACTIONS ON COMPUTERS》*
XUANHUA SHI等: "Mammoth:Gearing Hadoop towards memory-intensive mapreduce applications", 《IEEE TRANSACTIONS ON PARALLEL AND DISTRIBUTED SYSTEMS》*

Cited By (31)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
CN106168963B (en)*2016-06-302019-06-11北京金山安全软件有限公司Real-time streaming data processing method and device and server
CN106168963A (en)*2016-06-302016-11-30北京金山安全软件有限公司Real-time streaming data processing method and device and server
CN106371919A (en)*2016-08-242017-02-01上海交通大学Shuffle data caching method based on mapping-reduction calculation model
CN106371919B (en)*2016-08-242019-07-16上海交通大学 A shuffling data cache method based on map-reduce computing model
CN107612886B (en)*2017-08-152020-06-30中国科学院大学 A Spark Platform Shuffle Process Compression Algorithm Decision Method
CN107612886A (en)*2017-08-152018-01-19中国科学院大学A kind of Spark platforms Shuffle process compresses algorithm decision-making techniques
CN107885587A (en)*2017-11-172018-04-06清华大学A kind of executive plan generation method of big data analysis process
CN107885587B (en)*2017-11-172018-12-07清华大学A kind of executive plan generation method of big data analysis process
CN110083441A (en)*2018-01-262019-08-02中兴飞流信息科技有限公司A kind of distributed computing system and distributed computing method
CN110083441B (en)*2018-01-262021-06-04中兴飞流信息科技有限公司Distributed computing system and distributed computing method
CN110750341A (en)*2018-07-242020-02-04深圳市优必选科技有限公司 Task scheduling method, device, system, terminal device and storage medium
CN109951556A (en)*2019-03-272019-06-28联想(北京)有限公司A kind of Spark task processing method and system
CN110109747A (en)*2019-05-212019-08-09北京百度网讯科技有限公司Method for interchanging data and system, server based on Apache Spark
CN110109747B (en)*2019-05-212021-05-14北京百度网讯科技有限公司Apache Spark-based data exchange method, system and server
CN110134714B (en)*2019-05-222021-04-20东北大学 Distributed computing framework cache index method suitable for iterative computing of big data
CN110134714A (en)*2019-05-222019-08-16东北大学 A distributed computing framework cache index suitable for iterative computing of big data
CN111061565B (en)*2019-12-122023-08-25湖南大学Two-section pipeline task scheduling method and system in Spark environment
CN111061565A (en)*2019-12-122020-04-24湖南大学Two-stage pipeline task scheduling method and system in Spark environment
CN111258785A (en)*2020-01-202020-06-09北京百度网讯科技有限公司 Data shuffling method and device
CN113364603B (en)*2020-03-062023-05-02华为技术有限公司Fault recovery method of ring network and physical node
CN113364603A (en)*2020-03-062021-09-07华为技术有限公司Fault recovery method of ring network and physical node
CN113495679B (en)*2020-04-012022-10-21北京大学 Optimization method of big data storage access and processing based on non-volatile storage medium
CN113495679A (en)*2020-04-012021-10-12孟彤Optimization method for large data storage access and processing based on nonvolatile storage medium
CN111782367A (en)*2020-06-302020-10-16北京百度网讯科技有限公司 Distributed storage method and apparatus, electronic device, computer readable medium
CN111782367B (en)*2020-06-302023-08-08北京百度网讯科技有限公司Distributed storage method and device, electronic equipment and computer readable medium
CN112269648A (en)*2020-11-132021-01-26北京轩宇信息技术有限公司Parallel task allocation method and device for multi-stage program analysis
CN112269648B (en)*2020-11-132024-05-31北京轩宇信息技术有限公司Parallel task allocation method and device for multi-stage program analysis
CN114448660A (en)*2021-12-162022-05-06国网江苏省电力有限公司电力科学研究院 An Internet of Things data access method
CN114448660B (en)*2021-12-162024-06-04国网江苏省电力有限公司电力科学研究院Internet of things data access method
CN117891391A (en)*2023-12-142024-04-16中科驭数(北京)科技有限公司 A data reading and writing method, system and storage medium for data shuffling and transmission process of Spark application
CN117891391B (en)*2023-12-142024-12-13中科驭数(北京)科技有限公司 A data reading and writing method, system and storage medium for data shuffling and transmission process of Spark application

Also Published As

Publication numberPublication date
CN105718244B (en)2018-01-12

Similar Documents

PublicationPublication DateTitle
CN105718244B (en)A kind of streamlined data are shuffled Spark task schedulings and the execution method of transmission
CN108140009B (en)Distributed autonomous RDMA-based B-tree key value manager
US10375167B2 (en)Low latency RDMA-based distributed storage
US9996394B2 (en)Scheduling accelerator tasks on accelerators using graphs
CN111309649B (en)Data transmission and task processing method, device and equipment
US20170154091A1 (en)Conditional master election in distributed databases
CN109144710B (en) Resource scheduling method, apparatus, and computer-readable storage medium
US8572614B2 (en)Processing workloads using a processor hierarchy system
CN111522640B (en) Parallel execution method and device for computational graph
US20190042149A1 (en)Writing composite objects to a data store
US20190146847A1 (en)Dynamic distributed resource management
US9201691B2 (en)Method, apparatus and system for coordinating execution of tasks in a computing system having a distributed shared memory
CN113570460B (en) Method and device for concurrently executing transactions in a blockchain
US8161492B2 (en)Continuation based runtimes in transactions
CN114253713B (en)Asynchronous batch processing method and system based on reactor
CN117377943A (en)Memory-calculation integrated parallel processing system and method
CN110704112A (en) Method and apparatus for concurrently executing transactions in a blockchain
US20240385902A1 (en)Synchronization Method and Apparatus
CN106250348A (en)A kind of heterogeneous polynuclear framework buffer memory management method based on GPU memory access characteristic
CN106371919B (en) A shuffling data cache method based on map-reduce computing model
US10289306B1 (en)Data storage system with core-affined thread processing of data movement requests
CN104252416B (en) A kind of accelerator and data processing method
CN120196573A (en) RDMA data transmission method, network device, system, and electronic device
CN118511160A (en) Distributed Accelerator
CN118689888A (en) Database concurrent transaction management method, device, management system and medium

Legal Events

DateCodeTitleDescription
C06Publication
PB01Publication
C10Entry into substantive examination
SE01Entry into force of request for substantive examination
GR01Patent grant
GR01Patent grant

[8]ページ先頭

©2009-2025 Movatter.jp