shuffle是Map Task和 Reduce Task之间的一个阶段,本质上是一个跨节点跨进程间的数据传输,网上的资料也把MapReduce的过程细分为六个阶段:
Collect 2. Spill 3.Merge 4.Copy 5.Merge 6. Sort看过源码之后,这几个阶段划分的还是很有道理的,首先看看官网上对shuffle的描述图,有个印象
Map首先,我们先来看看Map阶段的代码,先找到Map Task的入口(org/apache/hadoop/mapred/MapTask.java)的run方法,当map task启动时都会执行这个方法。
@Overridepublic void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; // 一个taskAttempt的代理,后面比较多的地方使用 if (isMapTask()) { // If there are no reducers then there won"t be any sort. Hence the map // phase will govern the entire attempt"s progress. if (conf.getNumReduceTasks() == 0) { mapPhase = getProgress().addPhase("map", 1.0f); } else { // If there are reducers then the entire attempt"s progress will be // split between the map phase (67%) and the sort phase (33%). mapPhase = getProgress().addPhase("map", 0.667f); sortPhase = getProgress().addPhase("sort", 0.333f); } } // 启动任务状态汇报器,其内部有周期性的汇报线程(状态汇报和心跳) TaskReporter reporter = startReporter(umbilical); boolean useNewApi = job.getUseNewMapper(); initialize(job, getJobID(), reporter, useNewApi); // 重要方法,可以认为初始化task启动的一切资源了 // check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return; } if (useNewApi) { runNewMapper(job, splitMetaInfo, umbilical, reporter); // 核心代码,点进去 } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter);}
任务子进程用于联系其父进程的协议。父进程是一个守护进程,它轮询中央主进程以获取新的map或reduce Task,并将其作为子进程(Child)运行。孩子和父母之间的所有通信都是通过此协议进行的
@SuppressWarnings("unchecked")private void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException { // make a task context so we can get the classes 创建Task的上下文环境 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); // make a mapper 通过反射创建mapper org.apache.hadoop.mapreduce.Mapper mapper = (org.apache.hadoop.mapreduce.Mapper) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // make the input format 通过反射创建inputFormat,来读取数据 org.apache.hadoop.mapreduce.InputFormat inputFormat = (org.apache.hadoop.mapreduce.InputFormat) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // rebuild the input split // 获取切片信息 org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); LOG.info("Processing split: " + split); org.apache.hadoop.mapreduce.RecordReader input = new NewTrackingRecordReader //通过反射创建RecordReader。InputFormat是通过RecordReader来读取数据,这个也是大学问,在job submit时很关键 (split, inputFormat, reporter, taskContext); job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); org.apache.hadoop.mapreduce.RecordWriter output = null; // get an output object if (job.getNumReduceTasks() == 0) { // 如果没有reduce任务,则直接写入磁盘 output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { // 核心代码,创建collector收集器 ,点进去 output = new NewOutputCollector(taskContext, job, umbilical, reporter); } org.apache.hadoop.mapreduce.MapContext mapContext = new MapContextImpl(job, getTaskID(), input, output, committer, reporter, split); org.apache.hadoop.mapreduce.Mapper.Context mapperContext = new WrappedMapper().getMapContext( mapContext); try { input.initialize(split, mapperContext); mapper.run(mapperContext); // 调用我们自己实现的mapper类 mapPhase.complete(); setPhase(TaskStatus.Phase.SORT); statusUpdate(umbilical); input.close(); input = null; output.close(mapperContext); output = null; } finally { closeQuietly(input); closeQuietly(output, mapperContext); }}
马上进入collect阶段了,点进 NewOutputCollector,看看如何创建Collector
private class NewOutputCollector extends org.apache.hadoop.mapreduce.RecordWriter { private final MapOutputCollector collector; private final org.apache.hadoop.mapreduce.Partitioner partitioner; private final int partitions; @SuppressWarnings("unchecked") NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { collector = createSortingCollector(job, reporter); partitions = jobContext.getNumReduceTasks(); // partitions数等于reduce任务数 if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { partitioner = new org.apache.hadoop.mapreduce.Partitioner() { @Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1; } }; } } @Override public void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value, // 向对应分区的环形缓冲区写入(k,v) partitioner.getPartition(key, value, partitions)); } @Override public void close(TaskAttemptContext context ) throws IOException,InterruptedException { try { collector.flush();//核心方法,将数据刷出去。 } catch (ClassNotFoundException cnf) { throw new IOException("can"t find class ", cnf); } collector.close(); } }
点进 creareSortingCollector
@SuppressWarnings("unchecked")private MapOutputCollector // collector是map 类型 createSortingCollector(JobConf job, TaskReporter reporter) throws IOException, ClassNotFoundException { MapOutputCollector.Context context = new MapOutputCollector.Context(this, job, reporter); Class>[] collectorClasses = job.getClasses( // 获取Map Collector的类型 JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class); // 说到底还是MapOutputBuffer类型 int remainingCollectors = collectorClasses.length; Exception lastException = null; for (Class clazz : collectorClasses) { try { if (!MapOutputCollector.class.isAssignableFrom(clazz)) { // MapOutputCollector是不是clazz或者其父类 throw new IOException("Invalid output collector class: " + clazz.getName() + " (does not implement MapOutputCollector)"); } Class extends MapOutputCollector> subclazz = clazz.asSubclass(MapOutputCollector.class); LOG.debug("Trying map output collector class: " + subclazz.getName()); MapOutputCollector collector = ReflectionUtils.newInstance(subclazz, job); // 创建collector collector.init(context); // 初始化 点进去 LOG.info("Map output collector class = " + collector.getClass().getName()); return collector; } catch (Exception e) { String msg = "Unable to initialize MapOutputCollector " + clazz.getName(); if (--remainingCollectors > 0) { msg += " (" + remainingCollectors + " more collector(s) to try)"; } lastException = e; LOG.warn(msg, e); } }}
public void init(MapOutputCollector.Context context // 这个方法中,主要就是对收集器对象进行一些初始化 ) throws IOException, ClassNotFoundException { job = context.getJobConf(); reporter = context.getReporter(); mapTask = context.getMapTask(); mapOutputFile = mapTask.getMapOutputFile(); sortPhase = mapTask.getSortPhase(); spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS); partitions = job.getNumReduceTasks(); rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw(); //sanity checks final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); // 设置环形缓冲区溢写比例为0.8 final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB, MRJobConfig.DEFAULT_IO_SORT_MB); // 默认环形缓冲区大小为100M indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, INDEX_CACHE_MEMORY_LIMIT_DEFAULT); if (spillper > (float)1.0 || spillper <= (float)0.0) { throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT + "\": " + spillper); } if ((sortmb & 0x7FF) != sortmb) { throw new IOException( "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb); } // 排序,默认使用的快排 // 获取到排序对象,在数据由环形缓冲区溢写到磁盘中前 // 并且排序是针对索引的,并非对数据进行排序。 sorter = ReflectionUtils.newInstance(job.getClass( MRJobConfig.MAP_SORT_CLASS, QuickSort.class, IndexedSorter.class), job); // buffers and accounting // 对环形缓冲区初始化,大名鼎鼎的环形缓冲区本质上是个byte数组 int maxMemUsage = sortmb << 20; // 将MB转换为Bytes // 一对kv数据有四个元数据MATE,分别是valstart,keystart,partitions,vallen,都是int类型 // METASIZE 就是4个int转换成byte就是4*4 maxMemUsage -= maxMemUsage % METASIZE; // 计算METE数据存储的大小 kvbuffer = new byte[maxMemUsage]; // 元数据数组 以byte为单位 bufvoid = kvbuffer.length; kvmeta = ByteBuffer.wrap(kvbuffer) .order(ByteOrder.nativeOrder()) .asIntBuffer(); // 将byte单位的kvbuffer转换成int单位的kvmeta setEquator(0); bufstart = bufend = bufindex = equator; kvstart = kvend = kvindex; // kvmeta中存放元数据实体的最大个数 maxRec = kvmeta.capacity() / NMETA; softLimit = (int)(kvbuffer.length * spillper); // buffer 溢写的阈值 bufferRemaining = softLimit; LOG.info(JobContext.IO_SORT_MB + ": " + sortmb); LOG.info("soft limit at " + softLimit); LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid); LOG.info("kvstart = " + kvstart + "; length = " + maxRec); // k/v serialization comparator = job.getOutputKeyComparator(); keyClass = (Class)job.getMapOutputKeyClass(); valClass = (Class)job.getMapOutputValueClass(); serializationFactory = new SerializationFactory(job); keySerializer = serializationFactory.getSerializer(keyClass); keySerializer.open(bb); // 将key写入bb中 blockingbuffer valSerializer = serializationFactory.getSerializer(valClass); valSerializer.open(bb); // 将value写入bb中 // output counters mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES); mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); fileOutputByteCounter = reporter .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES); // compression 压缩器,减少shuffle数据量 if (job.getCompressMapOutput()) { Class extends CompressionCodec> codecClass = job.getMapOutputCompressorClass(DefaultCodec.class); codec = ReflectionUtils.newInstance(codecClass, job); } else { codec = null; } // combiner // combiner map端的reduce final Counters.Counter combineInputCounter = reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS); combinerRunner = CombinerRunner.create(job, getTaskID(), combineInputCounter, reporter, null); if (combinerRunner != null) { final Counters.Counter combineOutputCounter = reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); combineCollector= new CombineOutputCollector(combineOutputCounter, reporter, job); } else { combineCollector = null; } // 溢写线程 spillInProgress = false; minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3); spillThread.setDaemon(true); // 是个守护线程 spillThread.setName("SpillThread"); // spillLock.lock(); try { spillThread.start(); // 启动一个spill线程 while (!spillThreadRunning) { spillDone.await(); } } catch (InterruptedException e) { throw new IOException("Spill thread failed to initialize", e); } finally { spillLock.unlock(); } if (sortSpillException != null) { throw new IOException("Spill thread failed to initialize", sortSpillException); }}
public synchronized void collect(K key, V value, final int partition ) throws IOException { reporter.progress(); if (key.getClass() != keyClass) { throw new IOException("Type mismatch in key from map: expected " + keyClass.getName() + ", received " + key.getClass().getName()); } if (value.getClass() != valClass) { throw new IOException("Type mismatch in value from map: expected " + valClass.getName() + ", received " + value.getClass().getName()); } if (partition < 0 || partition >= partitions) { throw new IOException("Illegal partition for " + key + " (" + partition + ")"); } checkSpillException(); bufferRemaining -= METASIZE; // 新数据collect时,先将元数据长度前去,之后判断 if (bufferRemaining <= 0) { // 说明已经超过阈值了 // start spill if the thread is not running and the soft limit has been // reached spillLock.lock(); try { do { // 首次spill时,spillInProgress是false if (!spillInProgress) { final int kvbidx = 4 * kvindex; // 单位是byte final int kvbend = 4 * kvend; // 单位是byte // serialized, unspilled bytes always lie between kvindex and // bufindex, crossing the equator. Note that any void space // created by a reset must be included in "used" bytes final int bUsed = distanceTo(kvbidx, bufindex); // 剩下可以写入的空间大小 final boolean bufsoftlimit = bUsed >= softLimit; // true说明已经超过softLimit了 if ((kvbend + METASIZE) % kvbuffer.length != equator - (equator % METASIZE)) { // spill finished, reclaim space resetSpill(); bufferRemaining = Math.min( distanceTo(bufindex, kvbidx) - 2 * METASIZE, softLimit - bUsed) - METASIZE; // 这里是重新选择equator吧,但是计算方式不了解 continue; } else if (bufsoftlimit && kvindex != kvend) { // spill records, if any collected; check latter, as it may // be possible for metadata alignment to hit spill pcnt startSpill(); //开始溢写,里面唤醒spill线程 final int avgRec = (int) (mapOutputByteCounter.getCounter() / mapOutputRecordCounter.getCounter()); // leave at least half the split buffer for serialization data // ensure that kvindex >= bufindex final int distkvi = distanceTo(bufindex, kvbidx); final int newPos = (bufindex + Math.max(2 * METASIZE - 1, Math.min(distkvi / 2, distkvi / (METASIZE + avgRec) * METASIZE))) % kvbuffer.length; setEquator(newPos); bufmark = bufindex = newPos; final int serBound = 4 * kvend; // bytes remaining before the lock must be held and limits // checked is the minimum of three arcs: the metadata space, the // serialization space, and the soft limit bufferRemaining = Math.min( // metadata max distanceTo(bufend, newPos), Math.min( // serialization max distanceTo(newPos, serBound), // soft limit softLimit)) - 2 * METASIZE; } } } while (false); // 这是什么写法????? } finally { spillLock.unlock(); } } // 直接写入buffer,不涉及spill try { // serialize key bytes into buffer int keystart = bufindex; keySerializer.serialize(key); // key所占空间被bufvoid分隔,则移动key, // 将其值放在连续的空间中便于sort时key的对比 if (bufindex < keystart) { // wrapped the key; must make contiguous bb.shiftBufferedKey(); keystart = 0; } // serialize value bytes into buffer final int valstart = bufindex; valSerializer.serialize(value); // It"s possible for records to have zero length, i.e. the serializer // will perform no writes. To ensure that the boundary conditions are // checked and that the kvindex invariant is maintained, perform a // zero-length write into the buffer. The logic monitoring this could be // moved into collect, but this is cleaner and inexpensive. For now, it // is acceptable. bb.write(b0, 0, 0); // the record must be marked after the preceding write, as the metadata // for this record are not yet written int valend = bb.markRecord(); mapOutputRecordCounter.increment(1); mapOutputByteCounter.increment( distanceTo(keystart, valend, bufvoid)); //计数器+1 // write accounting info kvmeta.put(kvindex + PARTITION, ); kvmeta.put(kvindex + KEYSTART, keystart); kvmeta.put(kvindex + VALSTART, valstart); kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend)); // advance kvindex kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity(); } catch (MapBufferTooSmallException e) { LOG.info("Record too large for in-memory buffer: " + e.getMessage()); spillSingleRecord(key, value, partition); // 长record就直接写入磁盘 mapOutputRecordCounter.increment(1); return; }}
private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException { //approximate the length of the output file to be the length of the //buffer + header lengths for the partitions final long size = distanceTo(bufstart, bufend, bufvoid) + partitions * APPROX_HEADER_LENGTH; // 写出长度 FSDataOutputStream out = null; FSDataOutputStream partitionOut = null; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size);// 默认是output/spillx.out out = rfs.create(filename);// 创建分区文件 final int mstart = kvend / NMETA; final int mend = 1 + // kvend is a valid record (kvstart >= kvend ? kvstart : kvmeta.capacity() + kvstart) / NMETA; // 对元数据进行排序,先按照partition进行排序,再按照key值进行排序 // 二次排序,排的是元数据部分 sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); int spindex = mstart; final IndexRecord rec = new IndexRecord(); final InMemValBytes value = new InMemValBytes(); for (int i = 0; i < partitions; ++i) {//循环分区 // 溢写时的临时文件 类型是IFile IFile.Writer writer = null; try { long segmentStart = out.getPos(); partitionOut = CryptoUtils.wrapIfNecessary(job, out, false); writer = new Writer(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); // 写入相同的partition数据 while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { final int kvoff = offsetFor(spindex % maxRec); int keystart = kvmeta.get(kvoff + KEYSTART); int valstart = kvmeta.get(kvoff + VALSTART); key.reset(kvbuffer, keystart, valstart - keystart); getVBytesForOffset(kvoff, value); writer.append(key, value); ++spindex; } } else { // 进行combiner,避免小文件问题 int spstart = spindex; while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { ++spindex; } // Note: we would like to avoid the combiner if we"ve fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); } } // close the writer writer.close(); /// 将文件写入本地磁盘中,不是HDFS上 if (partitionOut != out) { partitionOut.close(); partitionOut = null; } // record offsets // 记录当前partition i的信息写入索文件rec中 rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); //spillRec中存放了spill中partition的信息 spillRec.putIndex(rec, i); writer = null; } finally { if (null != writer) writer.close(); } } if (totalIndexCacheMemory >= indexCacheMemoryLimit) { // create spill index file Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); // 将内存中的index文件写入磁盘 } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } LOG.info("Finished spill " + numSpills); ++numSpills; } finally { if (out != null) out.close(); if (partitionOut != null) { partitionOut.close(); } }}
很明显,spill有两个临时文件生成,一个是(k,v)文件,它保存在默认路径是output/spill{x}.out文件中,注意,这段代码里并没有明显的将(k,v)文件写入磁盘的代码,这些代码在writer.close()中实现。而另一个明显写入磁盘的是spillRec.writeToFile(indexFilename, job)
在SpillThread在辛辛苦苦进行sortAndSpill工作时,map Task 也不断地产生新(k,v)写入MapOutputBuffer中,环形缓冲区的读线程和写线程同时工作!!怎么避免冲突呢?答案是反向写。
最后就是Map Task中Shuffle过程的最后一个阶段Merge,这部分有点多就不贴代码了,感兴趣的同学可以查看MapOutputBuffer中mergeParts方法,这个方法在上面的flush方法里调用,该作用是合并spill阶段产生出来的out文件和index文件。
同样,第一步就是查看 Reduce Task的run方法,这是启动redduce逻辑的自动过程
public void run(JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException { job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); if (isMapOrReduce()) { // reduce的三个阶段 copyPhase = getProgress().addPhase("copy"); sortPhase = getProgress().addPhase("sort"); reducePhase = getProgress().addPhase("reduce"); } // start thread that will handle communication with parent // 启动任务状态汇报器,其内部有周期性的汇报线程(状态汇报和心跳) TaskReporter reporter = startReporter(umbilical); boolean useNewApi = job.getUseNewReducer(); initialize(job, getJobID(), reporter, useNewApi);//核心代码,初始化任务 // check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return; } // Initialize the codec codec = initCodec(); RawKeyValueIterator rIter = null; ShuffleConsumerPlugin shuffleConsumerPlugin = null; Class combinerClass = conf.getCombinerClass(); CombineOutputCollector combineCollector = (null != combinerClass) ? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null; Class extends ShuffleConsumerPlugin> clazz = job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);// 设置shuffle插件 shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job); LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin); ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter, shuffledMapsCounter, reduceShuffleBytes, failedShuffleCounter, mergedMapOutputsCounter, taskStatus, copyPhase, sortPhase, this, mapOutputFile, localMapFiles); shuffleConsumerPlugin.init(shuffleContext); // 执行shuffle过程中的远程数据拉取,在拉取的过程中 // 内部 启动 map-completion event fetch线程 获取map端完成的event信息 // 在开启默认5个的fetch 线程 拉取数据,里面核心函数就是一直点进去是doShuffle,有两种一种是in-memory另一种就是on-disk // 超出shuffle内存就merge到disk // shuffle插件内部有个mergeMangager 会在合适的时候就是快超过shuffle内存缓存的时候,启动merge线程 // 这个表面是一次网络IO,本质上是一个RPC,通过umbilical代理获取已经完成的MapTask任务的taskAttempt的ID,存入schedule中,为后面shuffle做准备 rIter = shuffleConsumerPlugin.run(); // free up the data structures // 一个sort set,是TreeSet数据结构· mapOutputFilesOnDisk.clear(); sortPhase.complete(); // sort is complete setPhase(TaskStatus.Phase.REDUCE); statusUpdate(umbilical); Class keyClass = job.getMapOutputKeyClass(); Class valueClass = job.getMapOutputValueClass(); RawComparator comparator = job.getOutputValueGroupingComparator(); if (useNewApi) { runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); // 执行reduce操作,(用户定义的逻辑) } else { runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } shuffleConsumerPlugin.close(); done(umbilical, reporter); }
Reduce Task的重点比较清晰,就是57行的初始化shuffleConsumerPlugin
public RawKeyValueIterator run() throws IOException, InterruptedException { // Scale the maximum events we fetch per RPC call to mitigate OOM issues // on the ApplicationMaster when a thundering herd of reducers fetch events // TODO: This should not be necessary after HADOOP-8942 int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH, MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks()); int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer); // Start the map-completion events fetcher thread // 启动 一个 event fetcher线程 获取map端完成的event信息 final EventFetcher eventFetcher = new EventFetcher(reduceId, umbilical, scheduler, this, maxEventsToFetch); eventFetcher.start(); // Start the map-output fetcher threads 启动fetch线程 // fetch 线程 远程从map端拉取对应partition的数据 boolean isLocal = localMapFiles != null; final int numFetchers = isLocal ? 1 : jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5); Fetcher[] fetchers = new Fetcher[numFetchers]; if (isLocal) { fetchers[0] = new LocalFetcher(jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret(), localMapFiles); fetchers[0].start(); } else { for (int i=0; i < numFetchers; ++i) { fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret()); fetchers[i].start(); } } // Wait for shuffle to complete successfully while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) { reporter.progress(); synchronized (this) { if (throwable != null) { throw new ShuffleError("error in shuffle in " + throwingThreadName, throwable); } } } // Stop the event-fetcher thread eventFetcher.shutDown(); // Stop the map-output fetcher threads for (Fetcher fetcher : fetchers) { fetcher.shutDown(); } // stop the scheduler scheduler.close(); copyPhase.complete(); // copy is already complete taskStatus.setPhase(TaskStatus.Phase.SORT); reduceTask.statusUpdate(umbilical); // Finish the on-going merges... RawKeyValueIterator kvIter = null; try { kvIter = merger.close(); } catch (Throwable e) { throw new ShuffleError("Error while doing final merge " , e); } // Sanity check synchronized (this) { if (throwable != null) { throw new ShuffleError("error in shuffle in " + throwingThreadName, throwable); } } return kvIter;}
重点就是两种线程,一种是Event fetch,另一种是fetch线程
首先,event fetch线程的作用是获取TaskAttempt的ID等信息,存入schedule中,方面以后Shuffle尤其是sort时使用,本质上这是个RPC,注意看event fetch初始化时的参数里有个umbilical
