Movatterモバイル変換


[0]ホーム

URL:


CN105718244B - A kind of streamlined data are shuffled Spark task schedulings and the execution method of transmission - Google Patents

A kind of streamlined data are shuffled Spark task schedulings and the execution method of transmission
Download PDF

Info

Publication number
CN105718244B
CN105718244BCN201610029211.7ACN201610029211ACN105718244BCN 105718244 BCN105718244 BCN 105718244BCN 201610029211 ACN201610029211 ACN 201610029211ACN 105718244 BCN105718244 BCN 105718244B
Authority
CN
China
Prior art keywords
task
transmission
shuffling
data
stage
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Active
Application number
CN201610029211.7A
Other languages
Chinese (zh)
Other versions
CN105718244A (en
Inventor
付周望
张未雨
戚正伟
管海兵
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
Shanghai Jiao Tong University
Original Assignee
Shanghai Jiao Tong University
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Shanghai Jiao Tong UniversityfiledCriticalShanghai Jiao Tong University
Priority to CN201610029211.7ApriorityCriticalpatent/CN105718244B/en
Publication of CN105718244ApublicationCriticalpatent/CN105718244A/en
Application grantedgrantedCritical
Publication of CN105718244BpublicationCriticalpatent/CN105718244B/en
Activelegal-statusCriticalCurrent
Anticipated expirationlegal-statusCritical

Links

Classifications

Landscapes

Abstract

Translated fromChinese

本发明公开了一种流水化数据洗牌传输的Spark任务调度与执行方法,从后向前提交并执行阶段和其中的任务,同时采用前驱任务发送执行结果到后继任务的内存,在不改变用户接口,不破坏阶段的完整性和容错性的同时,解决了原本Spark在不同阶段(Stage)中洗牌数据传输(Shuffle)的磁盘读写开销,从而降低了分布式计算程序在Spark上的运行时间。

The invention discloses a Spark task scheduling and execution method for streamlined data shuffling and transmission, which submits and executes stages and tasks in them from the back to the front, and at the same time uses the predecessor task to send the execution result to the memory of the successor task, without changing the user Interface, without destroying the integrity and fault tolerance of the stage, it solves the original Spark's disk read and write overhead of shuffling data transmission (Shuffle) in different stages (Stage), thereby reducing the running of distributed computing programs on Spark time.

Description

Translated fromChinese
一种流水化数据洗牌传输的Spark任务调度与执行方法A Spark task scheduling and execution method for pipelined data shuffling and transmission

技术领域technical field

本发明涉及计算机分布式计算框架领域。具体来讲,主要是在分布式计算框架Spark的基础上改变他的任务调度机制,从而提升该计算框架的性能。The invention relates to the field of computer distributed computing framework. Specifically, it is mainly to change its task scheduling mechanism based on the distributed computing framework Spark, thereby improving the performance of the computing framework.

背景技术Background technique

Spark作为目前应用最广泛的分布式计算框架,已经被部署在了无数的数据中心中。它提出的分布式回弹数据集(Resilient Distributed Dataset,RDD)使得大数据的计算过程最大可能的在内存中进行。在执行逻辑上,Spark按照用户程序的逻辑从前向后生成RDD,每个RDD都会有自己的依赖。当用户的程序需要最终的输出结果时,Spark就会从最后一个RDD递归向前寻找,并且按照其中存在的洗牌传输依赖(Shuffle Dependency)来划分阶段(Stage)。在划分完阶段之后,Spark就会从前向后提交阶段,先提交没有缺失依赖的阶段,依次向后。这种调度逻辑使得数据自动流向需要计算的位置,并且让计算中间结果最大可能的保存在内存中。As the most widely used distributed computing framework, Spark has been deployed in countless data centers. The Resilient Distributed Dataset (RDD) proposed by it makes the calculation process of big data possible in memory. In terms of execution logic, Spark generates RDDs from front to back according to the logic of the user program, and each RDD will have its own dependencies. When the user's program needs the final output, Spark will search forward recursively from the last RDD, and divide the stage according to the Shuffle Dependency existing therein. After the stages are divided, Spark will submit the stages from front to back, submitting the stages without missing dependencies first, and then going backwards in turn. This scheduling logic allows data to automatically flow to the location that needs to be calculated, and allows the calculation of intermediate results to be stored in memory as much as possible.

但是为了保证阶段(Stage)之间的分割和本身框架的容错性,在划分每个阶段的洗牌传输依赖(Shuffle Dependency)时,Spark会将前驱阶段产生的中间结果存储到磁盘中,然后开始分配下一个阶段的任务,之后再由阶段的任务去远程读取磁盘上的数据,然后进行计算。However, in order to ensure the segmentation between stages and the fault tolerance of its own framework, when dividing the Shuffle Dependency of each stage, Spark will store the intermediate results generated in the precursor stage to the disk, and then start Assign the tasks of the next stage, and then the tasks of the stage will remotely read the data on the disk, and then perform calculations.

在目前磁盘速度远远慢于内存的现实中,这部分的数据读写成为了限制Spark性能提升的最大瓶颈。然而碍于阶段的划分的完整性以及容错性,目前仍然没有对于优化这部分瓶颈的补丁或者解决方案出现。In the current reality that the speed of the disk is much slower than that of the memory, this part of the data read and write has become the biggest bottleneck restricting the performance improvement of Spark. However, due to the integrity and fault tolerance of the stage division, there are still no patches or solutions to optimize this part of the bottleneck.

发明内容Contents of the invention

本发明针对在Spark数据洗牌传输时读写硬盘的瓶颈,提出了一种流水化数据洗牌传输(Shuffle)的Spark任务调度方法,通过改变Spark任务的提交顺序,使得一个任务先于他的前驱任务开始被调度分发,同时采用前驱任务发送执行结果到后继任务的内存中的方法,在不改变用户接口,不破坏阶段的完整性和容错性的同时,解决了原本Spark在不同阶段(Stage)中洗牌数据传输(Shuffle)的磁盘读写开销。使得前驱任务在执行完的产生中间结果的同时通过网络将数据发送给后继任务,从而避免了磁盘IO的读写,提升Spark分布式计算框架的性能。Aiming at the bottleneck of reading and writing the hard disk when Spark data is shuffled and transmitted, the present invention proposes a Spark task scheduling method for pipelined data shuffled transmission (Shuffle), by changing the submission order of Spark tasks, a task is prior to his The predecessor task starts to be scheduled and distributed, and at the same time, the method of sending the execution result of the predecessor task to the memory of the successor task is adopted, without changing the user interface, and without destroying the integrity and fault tolerance of the stage, it solves the problem of the original Spark in different stages (Stage ) The disk read and write overhead of shuffling data transmission (Shuffle). This enables the predecessor tasks to send data to the successor tasks through the network while generating intermediate results after execution, thereby avoiding disk IO read and write, and improving the performance of the Spark distributed computing framework.

本发明的技术解决方案如下:Technical solution of the present invention is as follows:

一种流水化数据洗牌传输的Spark任务调度与执行方法,包括如下步骤:A Spark task scheduling and execution method for pipelined data shuffling and transmission, comprising the following steps:

步骤1:当Spark提交一个任务且该任务被划分成多个阶段进行提交时,首先找到用户执行任务生成结果的最后一个阶段;Step 1: When Spark submits a task and the task is divided into multiple stages for submission, first find the last stage in which the user executes the task to generate results;

步骤2:从最后一个阶段开始,判断该阶段是否包含未完成的前驱阶段:Step 2: Starting from the last stage, determine whether this stage contains an unfinished precursor stage:

如果这个阶段的前驱阶段全部执行完成,则提交该阶段进行执行;If the predecessor phases of this phase are all executed, submit this phase for execution;

如果有前驱阶段没有被执行,则将该阶段标记为等待,同时提交该阶段进行执行,并且递归提交该阶段的前驱阶段;If there is a predecessor stage that has not been executed, mark the stage as waiting, submit the stage for execution at the same time, and recursively submit the predecessor stage of the stage;

步骤3:在提交一个阶段进行执行之后,调度器将该阶段拆分成多个任务,并判断该阶段:是否为等待阶段:Step 3: After submitting a phase for execution, the scheduler splits the phase into multiple tasks and judges whether the phase is a waiting phase:

如果该阶段被标记为等待,则调度器向资源管理器请求与任务个数相同的空闲执行节点,调度器获取相应执行任务的执行节点之后,根据该阶段包含的分布式回弹数据集的依赖关系递归向前寻找洗牌传输依赖,调度器每找到一个洗牌传输依赖就会向的映射输出追踪表注册该次洗牌传输流水信息,在注册完成之后,调度器还会通知每一个即将要运行这个任务的执行节点准备好相应的内存来缓存他们前驱任务发送的中间结果;每个执行节点收到调度器的注册信息之后,会在本地缓存中新建一个以洗牌传输依赖ID为索引,值为规约数据块总数个的缓存数组的键值对,同时还会在本地新建一个以洗牌传输依赖ID为索引,值为规约数据块总数的信号量数据结构的键值对,其中每个信号量包含这次洗牌依赖的洗牌任务映射总数,;If the stage is marked as waiting, the scheduler requests from the resource manager the same number of idle execution nodes as the number of tasks. The relationship recursively looks forward for the shuffling transfer dependency. Whenever the scheduler finds a shuffling transfer dependency, it will register the shuffling transfer flow information with the mapping output tracking table. After the registration is completed, the scheduler will also notify each upcoming transfer. The execution nodes running this task prepare corresponding memory to cache the intermediate results sent by their predecessor tasks; after each execution node receives the registration information of the scheduler, it will create a new one in the local cache with the shuffle transmission dependency ID as the index, The value is the key-value pair of the cache array of the total number of data blocks in the specification, and at the same time, a key-value pair of the semaphore data structure whose value is the total number of data blocks in the specification will be created locally, indexed by the shuffle transmission dependency ID, where each The semaphore contains the total number of shuffling task mappings that this shuffling depends on;

否则,直接进入下一步;Otherwise, go directly to the next step;

步骤4:在调度器封装阶段的任务集合时,判断该阶段是否是一个洗牌映射阶段:Step 4: When the scheduler encapsulates the task set of the stage, determine whether the stage is a shuffle mapping stage:

如果该阶段是一个洗牌映射阶段,则对该阶段中的每个任务都设置对应的洗牌传输依赖ID;If the stage is a shuffle mapping stage, set the corresponding shuffle transfer dependency ID for each task in the stage;

否则,直接进入下一步;Otherwise, go directly to the next step;

步骤5:调度器将封装好的任务分发给各个执行节点;Step 5: The scheduler distributes the packaged tasks to each execution node;

步骤6:当任务被分配到每个执行节点的执行器上进行执行时,执行器会判断这个任务是否为洗牌映射任务:Step 6: When the task is assigned to the executor of each execution node for execution, the executor will judge whether the task is a shuffling mapping task:

如果是,则根据该任务包含的洗牌传输依赖ID,向映射输出追踪表请求该ID对应的规约任务的执行节点的集合信息,然后,设置洗牌映射任务的对应规约信息,将收到的结合信息中的规约数据块号和远程地址封装成一个哈希表传给该洗牌映射任务,并进入步骤7;If so, according to the shuffle transmission dependency ID contained in the task, request the set information of the execution nodes of the protocol task corresponding to the ID from the mapping output tracking table, and then set the corresponding protocol information of the shuffle mapping task, and send the received Combining the protocol data block number and the remote address in the information into a hash table and passing it to the shuffling mapping task, and entering step 7;

如果执行器判断该任务为规约任务,调用该规约任务对应的函数进行计算,并进入步骤11;If the executor determines that the task is a statute task, call the function corresponding to the statute task for calculation, and enter step 11;

步骤7:当一个洗牌映射任务开始执行时,会检查是否需要流水化数据输出;Step 7: When a shuffling mapping task starts to execute, it will check whether pipelined data output is required;

如果需要,首先根据用户指定的分类器或者Spark默认的分类器将中间结果键值对按照键计算出他对应的规约数据块号,根据设置的数据块号和远程地址哈希表,将计算获得的数据结果发送给对应的负责后继规约任务的执行节点,发送的信息包括:洗牌传输依赖ID,规约数据块号,数据结果的键值对;在发送数据的同时,执行器向磁盘写入数据,并进入步骤8;同时,当负责规约任务的执行节点收到流水数据之后,会将洗牌数据依赖ID作为索引,保存该ID对应的缓存数组的第规约数据块号的缓存中,进入步骤8;If necessary, first calculate the intermediate result key-value pair according to the key according to the user-specified classifier or Spark’s default classifier to calculate its corresponding specification data block number, and then calculate according to the set data block number and remote address hash table The data results are sent to the corresponding execution nodes responsible for subsequent protocol tasks. The information sent includes: shuffling transmission dependency ID, protocol data block number, and key-value pairs of data results; while sending data, the executor writes to the disk data, and enter step 8; at the same time, when the execution node in charge of the statute task receives the flow data, it will use the shuffle data dependency ID as an index, and save the cache of the first statute data block number of the cache array corresponding to the ID, and enter Step 8;

如果不需要,则直接将执行结果写入磁盘,进入步骤8;If not needed, directly write the execution result to the disk and go to step 8;

步骤8:执行器完成洗牌映射任务;Step 8: The executor completes the shuffling and mapping task;

步骤9:当一个洗牌映射任务运行结束时,就会向所有的负责规约任务的执行节点发送流水结束信息,该信息包括:洗牌传输依赖ID,该任务负责的映射数据块号,和该条信息对应的执行节点负责的规约数据块号;Step 9: When a shuffle mapping task finishes running, it will send a pipeline end message to all execution nodes responsible for the statute task, which includes: shuffle transfer dependency ID, the mapping data block number that the task is responsible for, and the The protocol data block number that the execution node corresponding to each piece of information is responsible for;

步骤10:当负责规约任务的执行节点收到流水结束的信息后,会根据洗牌传输依赖ID作为索引,找到该ID对应的信号量数组,将其中第规约数据块号个CountDownLatch减一。如果这个信号量被减到0,则表示该规约数据块依赖的数据映射全部传输结束;Step 10: When the execution node responsible for the protocol task receives the message of the end of the pipeline, it will use the shuffled transmission dependency ID as an index to find the semaphore array corresponding to the ID, and subtract one from the CountDownLatch of the protocol data block number. If this semaphore is reduced to 0, it means that all the data mappings that the protocol data block depends on have been transferred;

步骤11:当执行器执行规约任务的指定函数时,会调用相应的规约函数,该函数在执行任务读取数据时,会向执行节点请求一个读入数据的迭代器;Step 11: When the executor executes the specified function of the specification task, it will call the corresponding specification function, which will request an iterator for reading data from the execution node when executing the task to read data;

步骤12:在生成迭代器的时候会向执行节点询问这次洗牌传输是否有本地缓存,即是否被流水化数据传输:Step 12: When generating the iterator, the execution node will be asked whether there is a local cache for this shuffling transmission, that is, whether it is pipelined data transmission:

如果是,则调用执行节点的获取缓存方法,根据规约任务的洗牌传输依赖ID和该任务负责的规约数据块号向执行节点请求缓存,并进入步骤13;If so, call the execution node's acquisition cache method, request the cache from the execution node according to the shuffle transfer dependency ID of the protocol task and the protocol data block number responsible for the task, and enter step 13;

否则,读取远程数据,进入步骤15;Otherwise, read the remote data and go to step 15;

步骤13:执行节点收到获取缓存的调用后,就会以洗牌传输依赖ID为索引,找到缓存中对应的缓存数组,并且返回第规约数据块号个缓存的异步引用;Step 13: After the execution node receives the call to obtain the cache, it will use the shuffle transfer dependency ID as an index to find the corresponding cache array in the cache, and return the asynchronous reference to the cache of the protocol data block number;

步骤14:在迭代器收到缓存的异步引用时,开始等待,直到该任务所需的洗牌传输依赖的规约数据块号中的CountDownLatch信号量变为0,表示该规约数据块所依赖的映射数据块全部完成,进入步骤15;Step 14: When the iterator receives the cached asynchronous reference, it starts to wait until the CountDownLatch semaphore in the protocol data block number on which the shuffling transmission depends on the task becomes 0, indicating that the mapping data that the protocol data block depends on All blocks are completed, go to step 15;

步骤15:执行器执行指定规约函数。Step 15: The executor executes the designated reduction function.

与现有技术相比,本发明的有益效果是:能够大幅缩短Spark任务执行过程中数据洗牌传输的时间,缩短分布式计算任务的执行时间。Compared with the prior art, the invention has the beneficial effects of greatly shortening the data shuffling and transmission time during the execution of the Spark task, and shortening the execution time of the distributed computing task.

附图说明Description of drawings

图1.架构示意图Figure 1. Schematic diagram of the architecture

图2.调度器调度方法流程图Figure 2. Flowchart of scheduler scheduling method

图3.执行节点流程图Figure 3. Execution Node Flowchart

图4.流水注册信息Figure 4. Streaming registration information

图5.流水通知信息Figure 5. Flow notification information

图6.流水传输信息Figure 6. Pipeline transmission information

图7.流水结束信息Figure 7. Flow end information

图8.执行节点缓存架构Figure 8. Execution node cache architecture

图9.执行节点信号控制架构Figure 9. Execution node signal control architecture

具体实施方法Specific implementation method

以下将结合附图对本发明的实施例做详细说明。本实施例在本发明技术方案和算法的前提下进行实施,并给出详细实施方式和具体操作过程,但是适用平台不限于下述实施例。只要集群兼容原版的Spark就能运行本发明。本实例的具体操作平台是由两台普通服务器组成的小型集群,每个服务器上装有UbuntuServer 14.04.1LTS 64bit,并且配备8GB内存。本发明的具体开发是基于Apache Spark 1.5的源码版本。Embodiments of the present invention will be described in detail below in conjunction with the accompanying drawings. This embodiment is implemented on the premise of the technical solution and algorithm of the present invention, and provides detailed implementation and specific operation process, but the applicable platform is not limited to the following embodiments. As long as the cluster is compatible with the original Spark, the invention can be run. The specific operating platform of this example is a small cluster composed of two ordinary servers, each of which is equipped with UbuntuServer 14.04.1LTS 64bit and equipped with 8GB of memory. The specific development of the present invention is based on the source code version of Apache Spark 1.5.

如图1所示,本发明基于Spark原有的框架包括调度器(DAGScheduler),资源管理器(BlockMangerMaster),映射输出追踪表(MapOutputTracker)以及执行节点(BlockManger)和执行器(Executor),通过改变任务调度算法和执行流程来实现不破坏阶段的完成性和容错性的基础上的洗牌数据传输的流水化。调度器和执行节点分别按照附图2与附图3中的流程运行,从而实现分布式计算性能的提升。As shown in Figure 1, the original framework of the present invention based on Spark includes a scheduler (DAGScheduler), a resource manager (BlockMangerMaster), a mapping output tracking table (MapOutputTracker), an execution node (BlockManger) and an executor (Executor), by changing The task scheduling algorithm and execution process are used to realize the streamlining of shuffling data transmission on the basis of not destroying the completion and fault tolerance of the stage. The scheduler and the execution node run according to the processes in Figure 2 and Figure 3 respectively, so as to improve the performance of distributed computing.

每台服务器上都部署了包含本发明的Spark,其中一台服务器作为Spark集群的Master,另外一台机器作为集群的Slave。值得注意的是为了保证本发明的性能,部署的集群应答配置比原始Spark集群更大的内存,具体的内存大小视运行的数据量大小而定。The Spark containing the present invention is deployed on each server, wherein one server is used as the Master of the Spark cluster, and the other machine is used as the Slave of the cluster. It is worth noting that in order to ensure the performance of the present invention, the deployed cluster responds with a larger memory than the original Spark cluster, and the specific memory size depends on the size of the running data.

当部署完成之后即可以按照Spark的运行方式运行分布式计算应用。改动对于使用Spark计算框架的用户完全透明。After the deployment is complete, distributed computing applications can be run in the same way as Spark. The changes are completely transparent to users using the Spark computing framework.

一种流水化数据洗牌传输的Spark任务调度与执行方法,包括如下步骤:A Spark task scheduling and execution method for pipelined data shuffling and transmission, comprising the following steps:

步骤1:当Spark提交一个任务且该任务被划分成多个阶段进行提交时,首先找到用户执行任务生成结果的最后一个阶段;Step 1: When Spark submits a task and the task is divided into multiple stages for submission, first find the last stage in which the user executes the task to generate results;

步骤2:从最后一个阶段开始,判断该阶段是否包含未完成的前驱阶段:Step 2: Starting from the last stage, determine whether this stage contains an unfinished precursor stage:

如果这个阶段的前驱阶段全部执行完成,则提交该阶段进行执行;If the predecessor phases of this phase are all executed, submit this phase for execution;

如果有前驱阶段没有被执行,则将该阶段标记为等待,同时提交该阶段进行执行,并且递归提交该阶段的前驱阶段;If there is a predecessor stage that has not been executed, mark the stage as waiting, submit the stage for execution at the same time, and recursively submit the predecessor stage of the stage;

步骤3:在提交一个阶段进行执行之后,调度器将该阶段拆分成多个任务,并判断该阶段:是否为等待阶段:Step 3: After submitting a phase for execution, the scheduler splits the phase into multiple tasks and judges whether the phase is a waiting phase:

如果该阶段被标记为等待,则调度器向资源管理器请求与任务个数相同的空闲执行节点,调度器获取相应执行任务的执行节点之后,根据该阶段包含的分布式回弹数据集的依赖关系递归向前寻找洗牌传输依赖,调度器每找到一个洗牌传输依赖就会向的映射输出追踪表注册该次洗牌传输流水信息,在注册完成之后,调度器还会通知每一个即将要运行这个任务的执行节点准备好相应的内存来缓存他们前驱任务发送的中间结果;每个执行节点收到调度器的注册信息之后,会在本地缓存中新建一个以洗牌传输依赖ID为索引,值为规约数据块总数个的缓存数组的键值对,同时还会在本地新建一个以洗牌传输依赖ID为索引,值为规约数据块总数的信号量数据结构的键值对,其中每个信号量包含这次洗牌依赖的洗牌任务映射总数,;If the stage is marked as waiting, the scheduler requests from the resource manager the same number of idle execution nodes as the number of tasks. The relationship recursively looks forward for the shuffling transfer dependency. Whenever the scheduler finds a shuffling transfer dependency, it will register the shuffling transfer flow information with the mapping output tracking table. After the registration is completed, the scheduler will also notify each upcoming transfer. The execution nodes running this task prepare corresponding memory to cache the intermediate results sent by their predecessor tasks; after each execution node receives the registration information of the scheduler, it will create a new one in the local cache with the shuffle transmission dependency ID as the index, The value is the key-value pair of the cache array of the total number of data blocks in the specification, and at the same time, a key-value pair of the semaphore data structure whose value is the total number of data blocks in the specification will be created locally, indexed by the shuffle transmission dependency ID, where each The semaphore contains the total number of shuffling task mappings that this shuffling depends on;

否则,直接进入下一步;Otherwise, go directly to the next step;

步骤4:在调度器封装阶段的任务集合时,判断该阶段是否是一个洗牌映射阶段:Step 4: When the scheduler encapsulates the task set of the stage, determine whether the stage is a shuffle mapping stage:

如果该阶段是一个洗牌映射阶段,则对该阶段中的每个任务都设置对应的洗牌传输依赖ID;If the stage is a shuffle mapping stage, set the corresponding shuffle transfer dependency ID for each task in the stage;

否则,直接进入下一步;Otherwise, go directly to the next step;

步骤5:调度器将封装好的任务分发给各个执行节点;Step 5: The scheduler distributes the packaged tasks to each execution node;

步骤6:当任务被分配到每个执行节点的执行器上进行执行时,执行器会判断这个任务是否为洗牌映射任务:Step 6: When the task is assigned to the executor of each execution node for execution, the executor will judge whether the task is a shuffling mapping task:

如果是,则根据该任务包含的洗牌传输依赖ID,向映射输出追踪表请求该ID对应的规约任务的执行节点的集合信息,然后,设置洗牌映射任务的对应规约信息,将收到的结合信息中的规约数据块号和远程地址封装成一个哈希表传给该洗牌映射任务,并进入步骤7;If so, according to the shuffle transmission dependency ID contained in the task, request the set information of the execution nodes of the protocol task corresponding to the ID from the mapping output tracking table, and then set the corresponding protocol information of the shuffle mapping task, and send the received Combining the protocol data block number and the remote address in the information into a hash table and passing it to the shuffling mapping task, and entering step 7;

如果执行器判断该任务为规约任务,调用该规约任务对应的函数进行计算,并进入步骤11;If the executor determines that the task is a statute task, call the function corresponding to the statute task for calculation, and enter step 11;

步骤7:当一个洗牌映射任务开始执行时,会检查是否需要流水化数据输出;Step 7: When a shuffling mapping task starts to execute, it will check whether pipelined data output is required;

如果需要,首先根据用户指定的分类器或者Spark默认的分类器将中间结果键值对按照键计算出他对应的规约数据块号,根据设置的数据块号和远程地址哈希表,将计算获得的数据结果发送给对应的负责后继规约任务的执行节点,发送的信息包括:洗牌传输依赖ID,规约数据块号,数据结果的键值对;在发送数据的同时,执行器向磁盘写入数据,并进入步骤8;同时,当负责规约任务的执行节点收到流水数据之后,会将洗牌数据依赖ID作为索引,保存该ID对应的缓存数组的第规约数据块号的缓存中,进入步骤8;If necessary, first calculate the intermediate result key-value pair according to the key according to the user-specified classifier or Spark’s default classifier to calculate its corresponding specification data block number, and then calculate according to the set data block number and remote address hash table The data results are sent to the corresponding execution nodes responsible for subsequent protocol tasks. The information sent includes: shuffling transmission dependency ID, protocol data block number, and key-value pairs of data results; while sending data, the executor writes to the disk data, and enter step 8; at the same time, when the execution node in charge of the statute task receives the flow data, it will use the shuffle data dependency ID as an index, and save the cache of the first statute data block number of the cache array corresponding to the ID, and enter Step 8;

如果不需要,则直接将执行结果写入磁盘,进入步骤8;If not needed, directly write the execution result to the disk and go to step 8;

步骤8:执行器完成洗牌映射任务;Step 8: The executor completes the shuffling and mapping task;

步骤9:当一个洗牌映射任务运行结束时,就会向所有的负责规约任务的执行节点发送流水结束信息,该信息包括:洗牌传输依赖ID,该任务负责的映射数据块号,和该条信息对应的执行节点负责的规约数据块号;Step 9: When a shuffle mapping task finishes running, it will send a pipeline end message to all execution nodes responsible for the statute task, which includes: shuffle transfer dependency ID, the mapping data block number that the task is responsible for, and the The protocol data block number that the execution node corresponding to each piece of information is responsible for;

步骤10:当负责规约任务的执行节点收到流水结束的信息后,会根据洗牌传输依赖ID作为索引,找到该ID对应的信号量数组,将其中第规约数据块号个CountDownLatch减一。如果这个信号量被减到0,则表示该规约数据块依赖的数据映射全部传输结束;Step 10: When the execution node responsible for the protocol task receives the message of the end of the pipeline, it will use the shuffled transmission dependency ID as an index to find the semaphore array corresponding to the ID, and subtract one from the CountDownLatch of the protocol data block number. If this semaphore is reduced to 0, it means that all the data mappings that the protocol data block depends on have been transferred;

步骤11:当执行器执行规约任务的指定函数时,会调用相应的规约函数,该函数在执行任务读取数据时,会向执行节点请求一个读入数据的迭代器;Step 11: When the executor executes the specified function of the specification task, it will call the corresponding specification function, which will request an iterator for reading data from the execution node when executing the task to read data;

步骤12:在生成迭代器的时候会向执行节点询问这次洗牌传输是否有本地缓存,即是否被流水化数据传输:Step 12: When generating the iterator, the execution node will be asked whether there is a local cache for this shuffling transmission, that is, whether it is pipelined data transmission:

如果是,则调用执行节点的获取缓存方法,根据规约任务的洗牌传输依赖ID和该任务负责的规约数据块号向执行节点请求缓存,并进入步骤13;If so, call the execution node's acquisition cache method, request the cache from the execution node according to the shuffle transfer dependency ID of the protocol task and the protocol data block number responsible for the task, and enter step 13;

否则,读取远程数据,进入步骤15;Otherwise, read the remote data and go to step 15;

步骤13:执行节点收到获取缓存的调用后,就会以洗牌传输依赖ID为索引,找到缓存中对应的缓存数组,并且返回第规约数据块号个缓存的异步引用;Step 13: After the execution node receives the call to obtain the cache, it will use the shuffle transfer dependency ID as an index to find the corresponding cache array in the cache, and return the asynchronous reference to the cache of the protocol data block number;

步骤14:在迭代器收到缓存的异步引用时,开始等待,直到该任务所需的洗牌传输依赖的规约数据块号中的CountDownLatch信号量变为0,表示该规约数据块所依赖的映射数据块全部完成,进入步骤15;Step 14: When the iterator receives the cached asynchronous reference, it starts to wait until the CountDownLatch semaphore in the protocol data block number on which the shuffling transmission depends on the task becomes 0, indicating that the mapping data that the protocol data block depends on All blocks are completed, go to step 15;

步骤15:执行器执行指定规约函数。Step 15: The executor executes the designated reduction function.

在执行以上步骤时,其中任一一个步骤出错都会触发容错性机制:如果错误发生于一个流水化的洗牌传输的前驱任务,即洗牌映射任务的步骤,以及该步骤之前的任一步骤,那么他的后继都会被标记为失败并且重新提交,继续执行流水化的洗牌传输;如果错误发生于一个流水化的洗牌传输的后继任务,即规约任务的执行步骤,那么前驱任务并不会受到影响,而失败的后继任务会被重新提交,此时重新提交的后继任务就会从前驱任务的磁盘中去读取所需要的数据。When executing the above steps, an error in any one of the steps will trigger the fault tolerance mechanism: if the error occurs in the predecessor task of a pipelined shuffling transmission, that is, the step of the shuffling mapping task, and any step before this step , then its successor will be marked as failed and resubmitted, and continue to execute the pipelined shuffle transmission; if the error occurs in a successor task of a pipelined shuffle transmission, that is, the execution step of the specification task, then the predecessor task does not will be affected, and the failed successor task will be resubmitted. At this time, the resubmitted successor task will read the required data from the disk of the predecessor task.

由于本实施例在执行洗牌映射任务的同时将数据发送到等待执行规约任务的节点,从而将原本Spark洗牌传输的等待时间隐藏起来。当洗牌映射任务结束时,规约任务就能在较原先更短的时间内启动,从而加快整个分布式计算的速度。Since this embodiment sends data to the nodes waiting to execute the protocol task while executing the shuffling mapping task, the waiting time of the original Spark shuffling transmission is hidden. When the shuffling and mapping task ends, the specification task can be started in a shorter time than before, thus speeding up the entire distributed computing.

在本次实施例的基础上通过Word Count等相关Spark的benchmark程序,验证了本发明的正确性,同时本发明在性能上相较于原版的Spark在不同的benchmark程序上都有不同程度的提升。On the basis of this embodiment, the correctness of the present invention has been verified through the benchmark programs of Word Count and other related Spark, and the present invention has different degrees of improvement in performance compared to the original Spark in different benchmark programs .

以上详细描述了本发明的较佳具体实施例。应当理解,本领域的普通技术无需创造性劳动就可以根据本发明的构思做出诸多修改和变化。因此,凡本技术领域中技术人员依本发明的构思在现有技术的基础上通过逻辑分析、推理或者有限的实验可以得到的技术方案,皆应在由权利要求书所确定的保护范围内。The preferred specific embodiments of the present invention have been described in detail above. It should be understood that those skilled in the art can make many modifications and changes according to the concept of the present invention without creative efforts. Therefore, all technical solutions that can be obtained by those skilled in the art based on the concept of the present invention through logical analysis, reasoning or limited experiments on the basis of the prior art shall be within the scope of protection defined by the claims.

Claims (6)

  1. If the stage is marked as waiting, scheduler performs section to explorer request and task number identical free timePoint, after scheduler obtains the corresponding execution node for performing task, the distributed rebound data collection that is included according to the stage according toRely relation recurrence to find transmission of shuffling forward to rely on, scheduler often finds a transmission dependence of shuffling will be to mapping output trackingTable register this time shuffle transmission Dependency Specification, after registration is complete, scheduler is also notified that each will run thisThe execution node of task gets out corresponding internal memory to cache the intermediate result that their predecessor tasks are sent;Each node that performs is receivedTo after the log-on message of scheduler, meeting relies on ID as index for newly-built one in local cache using transmission of shuffling, and is worth for stipulationsThe key-value pair of the caching array of data block total number, while also ID can be relied on as index using transmission of shuffling at local newly-built one, valueFor conventions data block sum semaphore data structure key-value pair, wherein each semaphore include specifically shuffle transmission rely onDuty mapping of shuffling sum,;
    If desired, specified first according to user grader or Spark acquiescence grader by intermediate result key-value pair byConventions data block number corresponding to him is calculated according to key, according to the data block number of setting and remote address Hash table, is obtained calculatingData result be sent to corresponding to be responsible for the execution node of follow-up stipulations task, the information of transmission includes:Transmission of shuffling relies onID, conventions data block number, the key-value pair of data result;While data are sent, actuator writes data to disk, and entersStep 8;Meanwhile after the execution node for being responsible for stipulations task receives pipelined data, the transmission that will can shuffle relies on ID as ropeDraw, be stored in the transmission of shuffling and rely in the caching of the conventions data block number of caching array corresponding to ID, into step 8;
  2. 2. streamlined data according to claim 1 are shuffled, the Spark task schedulings of transmission and execution method, its feature existIn any step, which malfunctions, in described step 1 to 15 can all trigger fault-tolerance mechanism:If mistake betides a flowing waterThe predecessor task for the transmission of shuffling changed, that is, shuffle mapping tasks the step of, and the either step before the step, then heIt is follow-up to be all marked as failure and resubmit, continue executing with the transmission of shuffling of streamlined;If mistake betides oneThe execution step of the subsequent tasks of the transmission of shuffling of streamlined, i.e. stipulations task, then predecessor task can't be affected, andThe subsequent tasks of failure can be resubmited, and the subsequent tasks now resubmited will go to read from the disk of predecessor taskRequired data.
CN201610029211.7A2016-01-182016-01-18A kind of streamlined data are shuffled Spark task schedulings and the execution method of transmissionActiveCN105718244B (en)

Priority Applications (1)

Application NumberPriority DateFiling DateTitle
CN201610029211.7ACN105718244B (en)2016-01-182016-01-18A kind of streamlined data are shuffled Spark task schedulings and the execution method of transmission

Applications Claiming Priority (1)

Application NumberPriority DateFiling DateTitle
CN201610029211.7ACN105718244B (en)2016-01-182016-01-18A kind of streamlined data are shuffled Spark task schedulings and the execution method of transmission

Publications (2)

Publication NumberPublication Date
CN105718244A CN105718244A (en)2016-06-29
CN105718244Btrue CN105718244B (en)2018-01-12

Family

ID=56147869

Family Applications (1)

Application NumberTitlePriority DateFiling Date
CN201610029211.7AActiveCN105718244B (en)2016-01-182016-01-18A kind of streamlined data are shuffled Spark task schedulings and the execution method of transmission

Country Status (1)

CountryLink
CN (1)CN105718244B (en)

Families Citing this family (17)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
CN106168963B (en)*2016-06-302019-06-11北京金山安全软件有限公司Real-time streaming data processing method and device and server
CN106371919B (en)*2016-08-242019-07-16上海交通大学 A shuffling data cache method based on map-reduce computing model
CN107612886B (en)*2017-08-152020-06-30中国科学院大学 A Spark Platform Shuffle Process Compression Algorithm Decision Method
CN107885587B (en)*2017-11-172018-12-07清华大学A kind of executive plan generation method of big data analysis process
CN110083441B (en)*2018-01-262021-06-04中兴飞流信息科技有限公司Distributed computing system and distributed computing method
CN110750341B (en)*2018-07-242022-08-02深圳市优必选科技有限公司Task scheduling method, device, system, terminal equipment and storage medium
CN109951556A (en)*2019-03-272019-06-28联想(北京)有限公司A kind of Spark task processing method and system
CN110109747B (en)*2019-05-212021-05-14北京百度网讯科技有限公司Apache Spark-based data exchange method, system and server
CN110134714B (en)*2019-05-222021-04-20东北大学 Distributed computing framework cache index method suitable for iterative computing of big data
CN111061565B (en)*2019-12-122023-08-25湖南大学Two-section pipeline task scheduling method and system in Spark environment
CN111258785B (en)*2020-01-202023-09-08北京百度网讯科技有限公司 Data shuffling method and device
CN113364603B (en)*2020-03-062023-05-02华为技术有限公司Fault recovery method of ring network and physical node
CN113495679B (en)*2020-04-012022-10-21北京大学 Optimization method of big data storage access and processing based on non-volatile storage medium
CN111782367B (en)*2020-06-302023-08-08北京百度网讯科技有限公司Distributed storage method and device, electronic equipment and computer readable medium
CN112269648B (en)*2020-11-132024-05-31北京轩宇信息技术有限公司Parallel task allocation method and device for multi-stage program analysis
CN114448660B (en)*2021-12-162024-06-04国网江苏省电力有限公司电力科学研究院Internet of things data access method
CN117891391B (en)*2023-12-142024-12-13中科驭数(北京)科技有限公司 A data reading and writing method, system and storage medium for data shuffling and transmission process of Spark application

Citations (2)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
CN103605576A (en)*2013-11-252014-02-26华中科技大学Multithreading-based MapReduce execution system
CN104750482A (en)*2015-03-132015-07-01合一信息技术(北京)有限公司Method for constructing dynamic script execution engine based on MapReduce

Family Cites Families (2)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
US9235446B2 (en)*2012-06-222016-01-12Microsoft Technology Licensing, LlcParallel computing execution plan optimization
US9424274B2 (en)*2013-06-032016-08-23Zettaset, Inc.Management of intermediate data spills during the shuffle phase of a map-reduce job

Patent Citations (2)

* Cited by examiner, † Cited by third party
Publication numberPriority datePublication dateAssigneeTitle
CN103605576A (en)*2013-11-252014-02-26华中科技大学Multithreading-based MapReduce execution system
CN104750482A (en)*2015-03-132015-07-01合一信息技术(北京)有限公司Method for constructing dynamic script execution engine based on MapReduce

Non-Patent Citations (2)

* Cited by examiner, † Cited by third party
Title
Mammoth:Gearing Hadoop towards memory-intensive mapreduce applications;Xuanhua Shi等;《IEEE Transactions on Parallel and Distributed Systems》;20150801;第26卷(第8期);全文*
Virtual shuffling for efficient data movement in mapduce;Weikuan Yu等;《IEEE transactions on Computers》;20150228;第64卷(第2期);全文*

Also Published As

Publication numberPublication date
CN105718244A (en)2016-06-29

Similar Documents

PublicationPublication DateTitle
CN105718244B (en)A kind of streamlined data are shuffled Spark task schedulings and the execution method of transmission
US10375167B2 (en)Low latency RDMA-based distributed storage
US9996394B2 (en)Scheduling accelerator tasks on accelerators using graphs
CN108140009B (en)Distributed autonomous RDMA-based B-tree key value manager
US8572614B2 (en)Processing workloads using a processor hierarchy system
US20190042149A1 (en)Writing composite objects to a data store
CN113485834B (en)Shared memory management method, device, computer equipment and storage medium
US9553951B1 (en)Semaphores in distributed computing environments
US9201691B2 (en)Method, apparatus and system for coordinating execution of tasks in a computing system having a distributed shared memory
US11281967B1 (en)Event-based device performance monitoring
CN110704112B (en) Method and apparatus for concurrently executing transactions in a blockchain
US8161492B2 (en)Continuation based runtimes in transactions
WO2018120233A1 (en)Transaction processing method and apparatus
CN114253713B (en)Asynchronous batch processing method and system based on reactor
CN106250348B (en) A cache management method for heterogeneous multi-core architecture based on GPU memory access characteristics
US11416749B2 (en)Execution synchronization and tracking
CN101546275A (en) Multiprocessor system with hardware semaphore module and its realization method
CN106371919B (en) A shuffling data cache method based on map-reduce computing model
CN104252416B (en) A kind of accelerator and data processing method
Andreoli et al.Priority-driven differentiated performance for nosql database-as-a-service
US20230236889A1 (en)Distributed accelerator
CN118689888A (en) Database concurrent transaction management method, device, management system and medium
CN117828127B (en) A tree-like hierarchical cluster user management method based on semi-structured storage
US20060149885A1 (en)Enforcing global ordering through a caching bridge in a multicore multiprocessor system
WO2024174258A1 (en)Deep neural network checkpoint optimization system and method based on nonvolatile memory

Legal Events

DateCodeTitleDescription
C06Publication
PB01Publication
C10Entry into substantive examination
SE01Entry into force of request for substantive examination
GR01Patent grant
GR01Patent grant

[8]ページ先頭

©2009-2025 Movatter.jp