MapReduce is aprogramming model and an associated implementation for processing and generatingbig data sets with aparallel anddistributed algorithm on acluster.[1][2][3]
A MapReduce program is composed of amapprocedure, which performs filtering and sorting (such as sorting students by first name into queues, one queue for each name), and areduce method, which performs a summary operation (such as counting the number of students in each queue, yielding name frequencies). The "MapReduce System" (also called "infrastructure" or "framework") orchestrates the processing bymarshalling the distributed servers, running the various tasks in parallel, managing all communications and data transfers between the various parts of the system, and providing forredundancy andfault tolerance.
The model is a specialization of thesplit-apply-combine strategy for data analysis.[4]It is inspired by themap andreduce functions commonly used infunctional programming,[5] although their purpose in the MapReduce framework is not the same as in their original forms.[6] The key contributions of the MapReduce framework are not the actual map and reduce functions (which, for example, resemble the 1995Message Passing Interface standard's[7]reduce[8] andscatter[9] operations), but the scalability and fault-tolerance achieved for a variety of applications due to parallelization. As such, asingle-threaded implementation of MapReduce is usually not faster than a traditional (non-MapReduce) implementation; any gains are usually only seen withmulti-threaded implementations on multi-processor hardware.[10] The use of this model is beneficial only when the optimized distributed shuffle operation (which reduces network communication cost) and fault tolerance features of the MapReduce framework come into play. Optimizing the communication cost is essential to a good MapReduce algorithm.[11]
MapReducelibraries have been written in many programming languages, with different levels of optimization. A popularopen-source implementation that has support for distributed shuffles is part ofApache Hadoop. The name MapReduce originally referred to the proprietaryGoogle technology, but has since become ageneric trademark. By 2014, Google was no longer using MapReduce as its primarybig data processing model,[12] and development onApache Mahout had moved on to more capable and less disk-oriented mechanisms that incorporated full map and reduce capabilities.[13]
MapReduce is a framework for processingparallelizable problems across large datasets using a large number of computers (nodes), collectively referred to as acluster (if all nodes are on the same local network and use similar hardware) or agrid (if the nodes are shared across geographically and administratively distributed systems, and use more heterogeneous hardware). Processing can occur on data stored either in afilesystem (unstructured) or in adatabase (structured). MapReduce can take advantage of the locality of data, processing it near the place it is stored in order to minimize communication overhead.
A MapReduce framework (or system) is usually composed of three operations (or steps):
map function to the local data, and writes the output to a temporary storage. A master node ensures that only one copy of the redundant input data is processed.map function), such that all data belonging to one key is located on the same worker node.MapReduce allows for the distributed processing of the map and reduction operations. Maps can be performed in parallel, provided that each mapping operation is independent of the others; in practice, this is limited by the number of independent data sources and/or the number of CPUs near each source. Similarly, a set of 'reducers' can perform the reduction phase, provided that all outputs of the map operation that share the same key are presented to the same reducer at the same time, or that the reduction function isassociative. While this process often appears inefficient compared to algorithms that are more sequential (because multiple instances of the reduction process must be run), MapReduce can be applied to significantly larger datasets than a single"commodity" server can handle – a largeserver farm can use MapReduce to sort apetabyte of data in only a few hours.[14] The parallelism also offers some possibility of recovering from partial failure of servers or storage during the operation: if one mapper or reducer fails, the work can be rescheduled – assuming the input data are still available.
Another way to look at MapReduce is as a 5-step parallel and distributed computation:
These five steps can be logically thought of as running in sequence – each step starts only after the previous step is completed – although in practice they can be interleaved as long as the final result is not affected.
In many situations, the input data might have already been distributed ("sharded") among many different servers, in which case step 1 could sometimes be greatly simplified by assigning Map servers that would process the locally present input data. Similarly, step 3 could sometimes be sped up by assigning Reduce processors that are as close as possible to the Map-generated data they need to process.
TheMap andReduce functions ofMapReduce are both defined with respect to data structured in (key, value) pairs.Map takes one pair of data with a type in onedata domain, and returns a list of pairs in a different domain:
Map(k1,v1) →list(k2,v2)
TheMap function is applied in parallel to every pair (keyed byk1) in the input dataset. This produces a list of pairs (keyed byk2) for each call.After that, the MapReduce framework collects all pairs with the same key (k2) from all lists and groups them together, creating one group for each key.
TheReduce function is then applied in parallel to each group, which in turn produces a collection of values in the same domain:
Reduce(k2, list (v2)) →list((k3, v3))[15]
EachReduce call typically produces either one key value pair or an empty return, though one call is allowed to return more than one key value pair. The returns of all calls are collected as the desired result list.
Thus the MapReduce framework transforms a list of (key, value) pairs into another list of (key, value) pairs.[16] This behavior is different from the typical functional programming map and reduce combination, which accepts a list of arbitrary values and returns one single value that combinesall the values returned by map.
It isnecessary but not sufficient to have implementations of the map and reduce abstractions in order to implement MapReduce. Distributed implementations of MapReduce require a means of connecting the processes performing the Map and Reduce phases. This may be adistributed file system. Other options are possible, such as direct streaming from mappers to reducers, or for the mapping processors to serve up their results to reducers that query them.
The canonical MapReduce example counts the appearance of each word in a set of documents:[17]
functionmap(String name, String document):// name: document name// document: document contentsfor each word win document: emit (w, 1)functionreduce(String word, Iterator partialCounts):// word: a word// partialCounts: a list of aggregated partial counts sum = 0for each pcin partialCounts: sum += pc emit (word, sum)
Here, each document is split into words, and each word is counted by themap function, using the word as the result key. The framework puts together all the pairs with the same key and feeds them to the same call toreduce. Thus, this function just needs to sum all of its input values to find the total appearances of that word.
As another example, imagine that for a database of 1.1 billion people, one would like to compute the average number of social contacts a person has according to age. InSQL, such a query could be expressed as:
SELECTage,AVG(contacts)FROMsocial.personGROUPBYageORDERBYage
Using MapReduce, theK1 key values could be the integers 1 through 1100, each representing a batch of 1 million records, theK2 key value could be a person's age in years, and this computation could be achieved using the following functions:
function Mapisinput:integer K1 between 1 and 1100, representing a batch of 1 million social.person recordsfor each social.person record in the K1 batchdolet Y be the person's agelet N be the number of contacts the person hasproduce one output record (Y,(N,1))repeatend functionfunction Reduceisinput: age (in years) Yfor each input record (Y,(N,C))do Accumulate in S the sum of N*C Accumulate in Cnew the sum of Crepeatlet A be S/Cnewproduce one output record (Y,(A,Cnew))end function
Note that in theReduce function,C is the count of people having in total N contacts, so in theMap function it is natural to writeC=1, since every output pair is referring to the contacts of one single person.
The MapReduce system would line up the 1100 Map processors, and would provide each with its corresponding 1 million input records. The Map step would produce 1.1 billion(Y,(N,1)) records, withY values ranging between, say, 8 and 103. The MapReduce System would then line up the 96 Reduce processors by performing shuffling operation of the key/value pairs due to the fact that we need average per age, and provide each with its millions of corresponding input records. The Reduce step would result in the much reduced set of only 96 output records(Y,A), which would be put in the final result file, sorted byY.
The count info in the record is important if the processing is reduced more than one time. If we did not add the count of the records, the computed average would be wrong, for example:
-- map output #1: age, quantity of contacts10, 910, 910, 9
-- map output #2: age, quantity of contacts10, 910, 9
-- map output #3: age, quantity of contacts10, 10
If we reduce files#1 and#2, we will have a new file with an average of 9 contacts for a 10-year-old person ((9+9+9+9+9)/5):
-- reduce step #1: age, average of contacts10, 9
If we reduce it with file#3, we lose the count of how many records we've already seen, so we end up with an average of 9.5 contacts for a 10-year-old person ((9+10)/2), which is wrong. The correct answer is 9.166 = 55 / 6 = (9×3+9×2+10×1)/(3+2+1).
Software framework architecture adheres toopen-closed principle where code is effectively divided into unmodifiablefrozen spots andextensiblehot spots. The frozen spot of the MapReduce framework is a large distributed sort. The hot spots, which the application defines, are:
Theinput reader divides the input into appropriate size 'splits' (in practice, typically, 64 MB to 128 MB) and the framework assigns one split to eachMap function. Theinput reader reads data from stable storage (typically, adistributed file system) and generates key/value pairs.
A common example will read a directory full of text files and return each line as a record.
TheMap function takes a series of key/value pairs, processes each, and generates zero or more output key/value pairs. The input and output types of the map can be (and often are) different from each other.
If the application is doing a word count, the map function would break the line into words and output a key/value pair for each word. Each output pair would contain the word as the key and the number of instances of that word in the line as the value.
EachMap function output is allocated to a particularreducer by the application'spartition function forsharding purposes. Thepartition function is given the key and the number of reducers and returns the index of the desiredreducer.
A typical default is tohash the key and use the hash valuemodulo the number ofreducers. It is important to pick a partition function that gives an approximately uniform distribution of data per shard forload-balancing purposes, otherwise the MapReduce operation can be held up waiting for slow reducers to finish(i.e. the reducers assigned the larger shares of the non-uniformly partitioned data).
Between the map and reduce stages, the data areshuffled (parallel-sorted / exchanged between nodes) in order to move the data from the map node that produced them to the shard in which they will be reduced. The shuffle can sometimes take longer than the computation time depending on network bandwidth, CPU speeds, data produced and time taken by map and reduce computations.
The input for eachReduce is pulled from the machine where theMap ran and sorted using the application'scomparison function.
The framework calls the application'sReduce function once for each unique key in the sorted order. TheReduce can iterate through the values that are associated with that key and produce zero or more outputs.
In the word count example, theReduce function takes the input values, sums them and generates a single output of the word and the final sum.
TheOutput Writer writes the output of theReduce to the stable storage.
Properties ofmonoids are the basis for ensuring the validity of MapReduce operations.[18][19]
In the Algebird package[20] a Scala implementation of Map/Reduce explicitly requires a monoid class type.[21]
The operations of MapReduce deal with two types: the typeA of input data being mapped, and the typeB of output data being reduced.
TheMap operation takes individual values of typeA and produces, for eacha:A a valueb:B; TheReduce operation requires a binary operation • defined on values of typeB; it consists of folding all availableb:B to a single value.
From a basic requirements point of view, any MapReduce operation must involve the ability to arbitrarily regroup data being reduced. Such a requirement amounts to two properties of the operation •:
The second property guarantees that, when parallelized over multiple nodes, the nodes that don't have any data to process would have no impact on the result.
These two properties amount to having amonoid (B, •,e) on values of typeB with operation • and with neutral elemente.
There's no requirements on the values of typeA; an arbitrary functionA →B can be used for theMap operation. This means that we have acatamorphismA* → (B, •,e). HereA* denotes aKleene star, also known as the type of lists overA.
TheShuffle operation per se is not related to the essence of MapReduce; it's needed to distribute calculations over the cloud.
It follows from the above that not every binaryReduce operation will work in MapReduce. Here are the counter-examples:
MapReduce programs are not guaranteed to be fast. The main benefit of this programming model is to exploit the optimized shuffle operation of the platform, and only having to write theMap andReduce parts of the program.In practice, the author of a MapReduce program however has to take the shuffle step into consideration; in particular the partition function and the amount of data written by theMap function can have a large impact on the performance and scalability. Additional modules such as theCombiner function can help to reduce the amount of data written to disk, and transmitted over the network. MapReduce applications can achieve sub-linear speedups under specific circumstances.[22]
When designing a MapReduce algorithm, the author needs to choose a good tradeoff[11] between the computation and the communication costs. Communication cost often dominates the computation cost,[11][22] and many MapReduce implementations are designed to write all communication to distributed storage for crash recovery.
In tuning performance of MapReduce, the complexity of mapping, shuffle, sorting (grouping by the key), and reducing has to be taken into account. The amount of data produced by the mappers is a key parameter that shifts the bulk of the computation cost between mapping and reducing. Reducing includes sorting (grouping of the keys) which has nonlinear complexity. Hence, small partition sizes reduce sorting time, but there is a trade-off because having a large number of reducers may be impractical. The influence of split unit size is marginal (unless chosen particularly badly, say <1MB). The gains from some mappers reading load from local disks, on average, is minor.[23]
For processes that complete quickly, and where the data fits into main memory of a single machine or a small cluster, using a MapReduce framework usually is not effective. Since these frameworks are designed to recover from the loss of whole nodes during the computation, they write interim results to distributed storage. This crash recovery is expensive, and only pays off when the computation involves many computers and a long runtime of the computation. A task that completes in seconds can just be restarted in the case of an error, and the likelihood of at least one machine failing grows quickly with the cluster size. On such problems, implementations keeping all data in memory and simply restarting a computation on node failures or —when the data is small enough— non-distributed solutions will often be faster than a MapReduce system.
MapReduce achieves reliability by parceling out a number of operations on the set of data to each node in the network. Each node is expected to report back periodically with completed work and status updates. If a node falls silent for longer than that interval, the master node (similar to the master server in theGoogle File System) records the node as dead and sends out the node's assigned work to other nodes. Individual operations useatomic operations for naming file outputs as a check to ensure that there are not parallel conflicting threads running. When files are renamed, it is possible to also copy them to another name in addition to the name of the task (allowing forside-effects).
The reduce operations operate much the same way. Because of their inferior properties with regard to parallel operations, the master node attempts to schedule reduce operations on the same node, or in the same rack as the node holding the data being operated on. This property is desirable as it conserves bandwidth across the backbone network of the datacenter.
Implementations are not necessarily highly reliable. For example, in older versions ofHadoop theNameNode was asingle point of failure for the distributed filesystem. Later versions of Hadoop have high availability with an active/passive failover for the "NameNode."
MapReduce is useful in a wide range of applications, including distributed pattern-based searching, distributed sorting, web link-graph reversal, Singular Value Decomposition,[24] web access log stats,inverted index construction,document clustering,machine learning,[25] andstatistical machine translation. Moreover, the MapReduce model has been adapted to several computing environments like multi-core and many-core systems,[26][27][28] desktop grids,[29]multi-cluster,[30] volunteer computing environments,[31] dynamic cloud environments,[32] mobile environments,[33] and high-performance computing environments.[34]
At Google, MapReduce was used to completely regenerate Google's index of theWorld Wide Web. It replaced the oldad hoc programs that updated the index and ran the various analyses.[35] Development at Google has since moved on to technologies such as Percolator, FlumeJava[36] andMillWheel that offer streaming operation and updates instead of batch processing, to allow integrating "live" search results without rebuilding the complete index.[37]
MapReduce's stable inputs and outputs are usually stored in adistributed file system. The transient data are usually stored on local disk and fetched remotely by the reducers.
David DeWitt andMichael Stonebraker, computer scientists specializing inparallel databases andshared-nothing architectures, have been critical of the breadth of problems that MapReduce can be used for.[38] They called its interface too low-level and questioned whether it really represents theparadigm shift its proponents have claimed it is.[39] They challenged the MapReduce proponents' claims of novelty, citingTeradata as an example ofprior art that has existed for over two decades. They also compared MapReduce programmers toCODASYL programmers, noting both are "writing in alow-level language performing low-level record manipulation."[39] MapReduce's use of input files and lack ofschema support prevents the performance improvements enabled by common database system features such asB-trees andhash partitioning, though projects such asPig (or PigLatin),Sawzall,Apache Hive,[40]HBase[41] andBigtable[41][42] are addressing some of these problems.
Greg Jorgensen wrote an article rejecting these views.[43] Jorgensen asserts that DeWitt and Stonebraker's entire analysis is groundless as MapReduce was never designed nor intended to be used as a database.
DeWitt and Stonebraker have subsequently published a detailed benchmark study in 2009 comparing performance ofHadoop's MapReduce andRDBMS approaches on several specific problems.[44] They concluded that relational databases offer real advantages for many kinds of data use, especially on complex processing or where the data is used across an enterprise, but that MapReduce may be easier for users to adopt for simple or one-time processing tasks.
The MapReduce programming paradigm was also described inDanny Hillis's 1985 thesis[45] intended for use on theConnection Machine, where it was called "xapping/reduction"[46] and relied upon that machine's special hardware to accelerate both map and reduce. The dialect ultimately used for the Connection Machine, the 1986StarLisp, had parallel*map andreduce!!,[47] which in turn was based on the 1984Common Lisp, which had non-parallelmap andreduce built in.[48] Thetree-like approach that the Connection Machine'shypercube architecture uses to executereduce in time[49] is effectively the same as the approach referred to within the Google paper as prior work.[3]: 11
In 2010 Google was granted what is described as a patent on MapReduce. The patent, filed in 2004, may cover use of MapReduce by open source software such asHadoop,CouchDB, and others. InArs Technica, an editor acknowledged Google's role in popularizing the MapReduce concept, but questioned whether the patent was valid or novel.[50][51] In 2013, as part of its "Open Patent Non-Assertion (OPN) Pledge", Google pledged to only use the patent defensively.[52][53] The patent is expected to expire on 23 December 2026.[54]
MapReduce tasks must be written as acyclic dataflow programs, i.e. a stateless mapper followed by a stateless reducer, that are executed by a batch job scheduler. This paradigm makes repeated querying of datasets difficult and imposes limitations that are felt in fields such asgraph processing[55] where iterative algorithms that revisit a singleworking set multiple times are the norm, as well as, in the presence ofdisk-based data with highlatency, even the field ofmachine learning where multiple passes through the data are required even though algorithms can tolerate serial access to the data each pass.[56]
The MapReduce implementation in MongoDB has little to do with map reduce apparently. Because for all I read, it is single-threaded, while map-reduce is meant to be used highly parallel on a cluster. ... MongoDB MapReduce is single threaded on a single server...
"We don't really use MapReduce anymore" [Urs Hölzle, senior vice president of technical infrastructure at Google]
As of October, Google was running about 3,000 computing jobs per day through MapReduce, representing thousands of machine-days, according to a presentation by Dean. Among other things, these batch routines analyze the latest Web pages and update Google's indexes.