您好,今日明帅来为大家解答以上的问题。mapreduce的具体执行过程,mapreduce的过程相信很多小伙伴还不知道,现在让我们一起来看看吧!
1、1.首先介绍一下wordcount 早mapreduce框架中的 对应关系大家都知道 mapreduce 分为 map 和reduce 两个部分,那么在wordcount例子中,很显然 对文件word 计数部分为map,对 word 数量累计部分为 reduce;大家都明白 map接受一个参数,经过map处理后,将处理结果作为reduce的入参分发给reduce,然后在reduce中统计了word 的数量,最终输出到输出结果;但是初看遇到的问题:一、map的输入参数是个 Text之类的 对象,并不是 file对象二、reduce中并没有if-else之类的判断语句 ,来说明 这个word 数量 加 一次,那个word 加一次。
2、那么这个判断到底只是在 map中已经区分了 还是在reduce的时候才判断的三、map过程到底做了什么,reduce过程到底做了什么?为什么它能够做到多个map多个reduce? 一、1. 怎么将 文件参数 传递 到 job中呢?在 client 我们调用了FileInputFormat.addInputPath(job, new Path(otherArgs[0]));实际上 addInputPath 做了以下的事情(将文件路径加载到了conf中)public static void addInputPath(Job job, Path path) throws IOException { Configuration conf = job.getConfiguration(); path = path.getFileSystem(conf).makeQualified(path); String dirStr = StringUtils.escapeString(path.toString()); String dirs = conf.get(INPUT_DIR); conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr); } 我们再来看看 FileInputFormat 是做什么用的, FileInputFormat 实现了 InputFormat 接口 ,这个接口是hadoop用来接收客户端输入参数的。
3、所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等。
4、 我们会看到 在 InputFormat 接口中 有getSplits方法,也就是说分片操作实际上实在 map之前 就已经做好了 List
5、可以通过设置mapred.min.split.size和mapred.max.split.size来设置。
6、splits链表用来存储计算得到的输入分片,files则存储作为由listStatus()获取的输入文件列表。
7、然后对于每个输入文件,判断是否可以分割,通过computeSplitSize计算出分片大小splitSize,计算方法是:Math.max(minSize, Math.min(maxSize, blockSize));也就是保证在minSize和maxSize之间,且如果minSize<=blockSize<=maxSize,则设为blockSize。
8、然后我们根据这个splitSize计算出每个文件的inputSplits集合,然后加入分片列表splits中。
9、注意到我们生成InputSplit的时候按上面说的使用文件路径,分片起始位置,分片大小和存放这个文件的hosts列表来创建。
10、最后我们还设置了输入文件数量:mapreduce.input.num.files。
11、二、计算出来的分片有时怎么传递给 map呢 ?对于单词数量如何累加?我们使用了 就是InputFormat中的另一个方法createRecordReader() 这个方法:RecordReader: RecordReader是用来从一个输入分片中读取一个一个的K -V 对的抽象类,我们可以将其看作是在InputSplit上的迭代器。
12、我们从API接口中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它获取分片上的下一个K-V 对。
13、 可以看到接口中有: public abstract boolean nextKeyValue() throws IOException, InterruptedException; public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; public abstract float getProgress() throws IOException, InterruptedException; public abstract void close() throws IOException; FileInputFormat
14、我们也可能重写setup()以对这个map task进行一些预处理,比如创建一个List之类的;我们也可能重写cleanup()方法对做一些处理后的工作,当然我们也可能在cleanup()中写出K-V对。
15、举个例子就是:InputSplit的数据是一些整数,然后我们要在mapper中算出它们的和。
16、我们就可以在先设置个sum属性,然后map()函数处理一个K-V对就是将其加到sum上,最后在cleanup()函数中调用context.write(key,value); 最后我们看看Mapper.class中的run()方法,它相当于map task的驱动,我们可以看到run()方法首先调用setup()进行初始操作,然后对每个context.nextKeyValue()获取的K-V对,就调用map()函数进行处理,最后调用cleanup()做最后的处理。
17、事实上,从context.nextKeyValue()就是使用了相应的RecordReader来获取K-V对的。
18、 我们看看Mapper.class中的Context类,它继承与MapContext,使用了一个RecordReader进行构造。
19、下面我们再看这个MapContext。
20、 public MapContextImpl(Configuration conf, TaskAttemptID taskid, RecordReader
本文就为大家分享到这里,希望小伙伴们会喜欢。