来自 操作系统 2019-11-22 06:09 的文章
当前位置: 网上澳门金莎娱乐 > 操作系统 > 正文

Java --本地提交MapReduce作业至集群☞实现 Word Count

 

Hadoop-2.4.1学习之Mapper和Reducer

MapReduce允许程序员能够容易地编写并行运行在大规模集群上处理大量数据的程序,确保程序的运行稳定可靠和具有容错处理能力。程序员编写的运行在MapReduce上的应用程序称为作业(job),Hadoop既支持用Java编写的job,也支持其它语言编写的作业,比如Hadoop Streaming(shell、python)和Hadoop Pipes(c++)。Hadoop-2.X不再保留Hadoop-1.X版本中的JobTracker和TaskTracker组件,但这并不意味着Hadoop-2.X不再支持MapReduce作业,相反Hadoop-2.X通过唯一的主ResourceManager、每个节点一个的从NodeManager和每个应用程序一个的MRAppMaster保留了对MapReduce作业的向后兼容。在新版本中MapReduce作业依然由Map和Reduce任务组成,Map依然接收由MapReduce框架将输入数据分割为数据块,然后Map任务以完全并行的方式处理这些数据块,接着MapReduce框架对Map任务的输出进行排序,并将结果做为Reduce任务的输入,最后由Reduce任务输出最终的结果,在整个执行过程中MapReduce框架负责任务的调度,监控和重新执行失败的任务等。

通常计算节点和存储节点是相同的,MapReduce框架会有效地将任务安排在存储数据的节点上,有助于降低传输数据时的带宽使用量。MapReduce应用程序通过实现或者继承合适的接口或类提供了map和reduce函数,这两个函数负责Map任务和Reduce任务。作业客户端将编写好的作业提交给ResourceManager,而不再是JobTracker,ResourceManager负责将作业分布到从节点上,调度和监控作业,为作业客户端提供状态和诊断信息。

MapReduce框架只处理<key, value>键值对,也就是将作业的输入视为一些键值对并输出键值对。做为键值的类必须可以被MapReduce框架序列化,因此需要实现Writable接口,常用的IntWritable,LongWritable和Text都是实现该接口的类。做为键的类除了要实现Writable接口外,还需要实现WritableComparable接口,实现该接口主要为了有助于排序,上面提到的三个类也都实现了该接口。

在简要介绍了MapReduce框架后,下面深入学习框架中的两个重要概念:Mapper和Reducer,正如上文提到了,它们组成了MapReduce作业并负责完成实际的业务逻辑处理。

Mapper是独立的任务,将输入记录转换为中间记录,即对输入的键值对进行处理,并输出为一组中间键值对,输出的键值对使用context.write(WritableComparable, Writable)方法收集起来,中间记录的键值类型不必与输入记录的键值类型相同,实际上也往往是不同的。一条输入记录经由Mapper处理后可能输出为0条或者多条中间记录。比如,如果输入记录不满足业务要求(没有包含特定的值或者包含了特定的值)的话,可以直接返回,则会输出0条记录,此时Mapper起了过滤器的作用。

接着MapReduce框架将与给定键相关联的所有中间值分组,然后传递给Reducer。用户可以通过Job.setGroupingComparatorClass(Class)方法指定Comparator来控制分组。Mapper的输出被排序然后按照Reducer分区,总的分区数与作业启动的Reducer任务数相同,程序员可以通过实现自定义的Partitioner控制输出的记录由哪个Reducer处理,默认使用的是HashPartitioner。程序员还可以通过Job.setCombinerClass(Class)指定一个combiner来执行中间输出的本地聚合,这有助于减少Mapper到Reducer的数据传输。Mapper的中间输出经过排序后总是保存为(key-len, key,value-len, value)的格式,应用程序可以通过Configuration控制是否将中间输出进行压缩,以及使用何种压缩方式,相关的几个参数有:mapreduce.map.output.compress、mapreduce.map.output.compress.codec。程序员通过Job.setMapperClass(Class)将Mapper传递给Job,MapReduce框架调用Mapper的map(WritableComparable, Writable, Context)处理该任务的价值对,应用程序可以覆盖cleanup(Context)方法实现任何需要的清理工作。

MapReduce框架为每个由作业的InputFormat生成的InputSplit启动一个map任务,因此总的map任务数量由输入数据大小决定,更准确说是由输入文件总的块数决定。虽然可以为较少使用CPU的map任务在节点上设置300个map任务,但每个节点更适合并行运行10-100个map任务。由于任务的启动需要花费一些时间,所以任务的运行最好至少需要1分钟,因为如果任务运行的时间很少,整个作业的时间将大部分消耗在任务的建立上面。

Reducer将具有相同键的一组中间值降低为一组更小数量的值,比如合并单词的数量等。一个作业启动的Reducer数量可以通过Job.setNumReduceTasks(int)或者mapred-site.xml中的参数mapreduce.job.reduces设置,但是更推荐前者,因为可以由程序员决定启动多少个reducer,而后者更多的是提供了一种默认值。程序员使用Job.setReducerClass(Class)将Reducer提交给作业,MapReduce框架为每对<key, (list of values)>调用reduce(WritableComparable, Iterable<Writable>, Context)方法,同Mapper一样,程序员也可以覆盖cleanup(Context)方法指定需要的清理工作。

Reducer的处理过程主要包括三个阶段:shuffle(洗牌)、sort(分类)和reduce。在shuffle阶段,MapReduce框架通过HTTP获取所有Mapper输出的相关分区。在Sort阶段,框架根据键分组Reducer的输入(不同的mapper可能输出相同的键)。Shuffle和sort是同时进行的,获取Mapper的输出后然后合并它们。在reduce阶段,调用reduce(WritableComparable, Iterable<Writable>处理<key, (list of values)>对。Reducer的输出通常通过Context.write(WritableComparable,Writable)写入文件系统,比如HDFS,当然也可以通过使用DBOutputFormat将输出写入数据库。Reducer的输出是未经排序的。

如果不需要Reducer,可以使用Job.setNumReduceTasks(int)将Reducer的数量设置为0(如果不使用该方法设置Reducer的数量,由于mapreduce.job.reduces默认为1,会启动一个Reducer),在这种情况下,Mapper的输出将直接写入FileOutputFormat.setOutputPath(Job,Path)指定的路径中,并且MapReduce框架不会对Mapper的输出进行排序。

如果在进行reduce之前想使用与分组中间键时不同的比较规则,可以通过Job.setSortComparatorClass(Class)指定不同的Comparator。也就是Job.setGroupingComparatorClass(Class)控制了如何对中间输出分组,而Job.setSortComparatorClass(Class)控制了在将数据传入reduce之前进行的第二次分组。

不同于Mapper的数量由输入文件的大小确定,Reducer的数量可以由程序员明确设置,那么设置多少Reducer可以达到较好地效果呢?Reducer的数量范围为:(0.95 ~1.75 ) * 节点数量 * 每个节点上最大的容器数。参数yarn.scheduler.minimum-allocation-mb设置了每个容器可请求的最小内存,那么最大容器数可根据总的内存除以该参数计算得出。当使用0.75时,所有的Reducer会被立即加载,并当Mapper完成时开始传输Mapper的输出。使用1.75时,较快的节点将完成它们第一轮的任务,然后加载第二波任务,这样对负载平衡具有更好的效果。增加Reducer的数量虽然增加了框架开销,但增加了负载平衡和降低了失败的成本。上面的比例因子比总的Reducer数量稍微少些,以为预测执行的任务和失败的任务保留少量的Reducer槽,也就是实际的Reducer数量为上面公式得出的数量加上保留的Reducer数量。

CentOS安装和配置Hadoop2.2.0 

Ubuntu 13.04上搭建Hadoop环境

Ubuntu 12.10 +Hadoop 1.2.1版本集群配置

Ubuntu上搭建Hadoop环境(单机模式+伪分布模式)

Ubuntu下Hadoop环境的配置

单机版搭建Hadoop环境图文教程详解

搭建Hadoop环境(在Winodws环境下用虚拟机虚拟两个Ubuntu系统进行搭建)

http://www.bkjia.com/Linux/909662.htmlwww.bkjia.comtruehttp://www.bkjia.com/Linux/909662.htmlTechArticleHadoop-2.4.1学习之Mapper和Reducer MapReduce允许程序员能够容易地编写并行运行在大规模集群上处理大量数据的程序,确保程序的运行稳定可靠和...

 

网上澳门金莎娱乐 1

 

 

 

网上澳门金莎娱乐 2

 

网上澳门金莎娱乐 3

 

 

 

MapReduce中的分区默认是哈希分区,但是我们也可以自己写demo来重写Partitioner类的getPartiton方法,如下:

 

 

 

 

分区规则定后,我们需要指定客户端Job的map task的分区类并设置reducer的个数,如下

 

 

 

 

网上澳门金莎娱乐 4

 

六、如何编写map和reduce函数

 

 

 

 

        到这一步,如果你对MapReduce的工作原理已经掌握了,那么接下来,编写客户端程序,利用MapReduce的计算功能,实现文本文件中单词的出现次数的统计,将会是轻而易举的。

       

       首先,我们需要一个mapper(任务),其次是reducer(任务),有了两个任务后,我们需要创建一个Job(作业),将mapper和reducer关联起来,并提交至Hadoop集群,由集群中的JobTracker进行mapper和reducer任务的调度,并最终完成数据的计算工作。

       因此,不难发现,光有mapper和reducer任务,是无法进行MapReduce(分布式大数据计算)的,这里我们需要写三个类,一个是实现Map的类,一个是实现Reduce的类,还有一个就是提交作业的主类(Client Main Class)

      

       由于博主的Hadoop版本是3.1.0的,因此,为了兼顾3.X以下的Hadoop集群环境能够在下面提供的demo中能够跑起来,特将本文中涉及到的Hadoop依赖换成了2.7.X的版本,如下:

 

 

 

注意:不要使用过时的hadoop-core(1.2.1)依赖,否则会出现各种意想不到的的问题

       

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.7.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.7.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.1</version>
</dependency>
<!-- common 依赖 tools.jar包 -->
<dependency>
    <groupId>jdk.tools</groupId>
    <artifactId>jdk.tools</artifactId>
    <version>1.8</version>
    <scope>system</scope>
    <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>

 

 

 

(1)编写Mapper

 

 

网上澳门金莎娱乐 5

 

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Mapper 原型 : Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 * 
 * KEYIN    : 默认情况下,是mr框架所读到的一行文本内容的起始偏移量,Long,
 *            但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable
 * VALUEIN  : 默认情况下,是mr框架所读到的一行文本的内容(Java String 对应 Hadoop中的Text)
 * KEYOUT   : 用户自定义逻辑处理完成之后输出数据中的key,在此处是单词(String),同上用Text
 * VALUEOUT : 用户自定义逻辑处理完成之后输出数据中的value,在这里是单词的次数:Integer,对应Hadoop中的IntWritable
 * 
 * mapper的输入输出参数的类型必须和reducer的一致,且mapper的输出是reducer的输入
 * 
 * @blob   http://blog.csdn.net/appleyk
 * @date   2018年7月3日15:41:13
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{


    /**
     * map实现数据拆分的操作
     * 本操作主要进行Map的数据处理
     * 在Mapper父类里面接受的内容如下:
     * LongWritable:文本内容的起始偏移量
     * Text:每行的文本数据(行内容)
     * Text:每个单词分解后的统计结果
     * IntWritable:输出记录的结果
     */
     @Override
     protected void map(LongWritable key, Text value,Context context)
             throws IOException, InterruptedException {

         System.out.println("文本内容的起始偏移量:"+key);
         String line = value.toString()    ;//取出每行的数据
         String[] result  = line.split(" ");//按照空格进行数据拆分
         //循环单词
         for (int i = 0 ;i <result.length ; i++){

           //针对每一个单词,构造一个key-value
            System.out.println("key-value : <"+new Text(result[i])+","+new IntWritable(1)+">");


           /**
            * 将每个单词的key-value写入到输入输出上下文对象中
            * 并传递给mapper进行shuffle过程,待所有mapper task完成后交由reducer进行对号取走
            */
             context.write(new Text(result[i]), new IntWritable(1));
         }

         /**        map端的shuffle过程(大致简单的描述一下)
          *                       |
          *                       |  放缓存(默认100M,溢出比是0.8,即80M满进行磁盘写入并清空,
          *                       |  剩余20M继续写入缓存,二者结合完美实现边写缓存边溢写(写磁盘))
          *                       V
          *               <b,1>,<c,1>,<a,1>,<a,1>
          *                         
          *                       |
          *                       | 缓存写满了,开始shuffle(洗牌、重组)  == 包括分区,排序,以及可进行自定的合并(combine)
          *                       V     
          * 写入磁盘文件(not hdfs)并进行文件归并,成一个个的大文件 <a,<1,1>>,<b,1>,<c,1>   
          * 
          *                         |
          *                         |
          *                         V
          *   每一个大文件都有各自的分区,有几个分区就对应几个reducer,随后文件被各自的reducer领走
          *   
          *           !!! 这就是所谓的mapper的输入即是reducer的输出 !!!
          */
     }
}

网上澳门金莎娱乐, 

 

 

 

 

 

(2)编写Reducer

 

 

 

网上澳门金莎娱乐 6

 

  

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 进行合并后数据的最终统计
 * 本次要使用的类型信息如下:
 * Text:Map输出的文本内容
 * IntWritable:Map处理的个数
 * Text:Reduce输出文本
 * IntWritable:Reduce的输出个数
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context)
                    throws IOException, InterruptedException {

    //mapper的输出是reducer的输入,因此,这里打一下reducer的接收内容            
    List<Integer> list = new ArrayList<>();

    int sum = 0;//记录每个单词(key)出现的次数
        for (IntWritable value : values) {
            //从values集合里面取出key的单个频率数量(其实就是1)进行叠加
            int num = value.get();
            sum += num;
            list.add(num);

        }

       /**
    * mapper会把一堆key-value进行shuffle操作,其中涉及分区、排序以及合并(combine)
    * 注:上述shuffle中的的合并(combine)区别于map最终的的合(归)并(merge)
    * 比如有三个键值对:<a,1>,<b,1>,<a,1>
    * combine的结果:<a,2>,<b,1>      == 被reducer取走,数据小
    * merage 的结果;<a,<1,1>>,<b,1>  == 被reducer取走,数据较大(相比较上述combine来说)
    * 注:默认combiner是需要用户自定义进行开启的,所以,最终mapper的输出其实是归并(merage)后的的结果
    * 
    * 所以,下面的打印其实就是想看一下mapper在shuffle这个过程后的merage结果(一堆key-values)
    */
    System.out.println("key-values :<"+key+","+list.toString().replace("[", "<")
                .replace("]", ">")+">");

    //打印一下reduce的结果
    System.out.println("reduce计算结果 == key-value :<"+key+","+new IntWritable(sum)+">");
    //最后写入到输入输出上下文对象里面,传递给reducer进行shuffle,待所有reducer task完成后交由HDFS进行文件写入
    context.write(key, new IntWritable(sum));


    }
}

 

 

(3)编写Partition分区类(如果需要修改Map默认的哈希分区规则的话)

 

 

网上澳门金莎娱乐 7

 

 

 

 1 import org.apache.hadoop.io.IntWritable;
 2 import org.apache.hadoop.io.Text;
 3 import org.apache.hadoop.mapreduce.Partitioner;
 4 
 5 public class PartitionTest extends Partitioner<Text, IntWritable> {
 6 
 7     /**
 8      * key          : map的输出key 
 9      * value        : map的输出value 
10      * numReduceTask: reduce的task数量
11      * 返回值,指定reduce,从0开始
12      * 比如,分区0交由reducer0拿走
13      */
14     @Override
15     public int getPartition(Text key, IntWritable value, int numReduceTask) {
16         
17         if (key.toString().equals("a")) {
18             //如果key的值等于a,则将其分区指定为0,对应第一个reducer拿走进行reduce
19             return 0;
20         } else {
21             return 1;
22         }
23     }
24 }

 

 

 

 

(4)编写Job类(Main Class)

 

 

网上澳门金莎娱乐 8

 

 

 

 

 1 import org.apache.hadoop.conf.Configuration;
 2 import org.apache.hadoop.fs.Path;
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Job;
 6 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 7 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 8 
 9 import com.appleyk.hdfs.mapper.WordCountMapper;
10 import com.appleyk.hdfs.part.PartitionTest;
11 import com.appleyk.hdfs.reducer.WordCountReducer;
12 
13 /**
14  * Client端,提交作业
15  * @author yukun24@126.com
16  * @blob   http://blog.csdn.net/appleyk
17  * @date   2018年7月3日-上午9:51:49
18  */
19 public class WordCountApp {
20 
21     public static void main(String[] args) throws Exception{
22         
23         Configuration conf = new Configuration();
24         //配置uri
25         conf.set("fs.defaultFS", "hdfs://192.168.142.138:9000");
26     
27         //创建一个作业,作业名"wordCount",作用在Hadoop集群上(remote)
28         Job job = Job.getInstance(conf, "wordCount");
29         
30         /**
31          * 设置jar包的主类(如果样例demo打成Jar包扔在Linux下跑任务,
32          * 需要指定jar包的Main Class,也就是指定jar包运行的主入口main函数)
33          */
34         job.setJarByClass(WordCountApp.class);
35         
36         //设置Mapper 任务的类(自己写demo实现map)
37         job.setMapperClass(WordCountMapper.class);
38         //设置Reducer任务的类(自己写demo实现reduce)
39         job.setReducerClass(WordCountReducer.class);
40 
41         //指定mapper的分区类
42         //job.setPartitionerClass(PartitionTest.class);
43         
44         //设置reducer(reduce task)的数量(从0开始)
45         //job.setNumReduceTasks(2);
46         
47         
48         //设置映射输出数据的键(key)  类(型)
49         job.setMapOutputKeyClass(Text.class);
50         //设置映射输出数据的值(value)类(型)
51         job.setMapOutputValueClass(IntWritable.class);
52 
53         //设置作业(Job)输出数据的键(key)  类(型)   == 最后要写入到输出文件里面
54         job.setOutputKeyClass(Text.class);
55         //设置作业(Job)输出数据的值(value)类(型)   == 最后要写入到输出文件里面
56         job.setOutputValueClass(IntWritable.class);
57 
58         //设置输入的Path列表(可以是单个文件也可以是多个文件(目录表示即可))
59         FileInputFormat.setInputPaths (job, new Path("hdfs://192.168.142.138:9000/input" ));
60         //设置输出的目录Path(确认输出Path不存在,如存在,请先进行目录删除)
61         FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.142.138:9000/output"));
62 
63         //将作业提交到集群并等待它完成。
64         boolean bb =job.waitForCompletion(true);
65         
66         if (!bb) {
67             System.out.println("Job作业执行失败!");
68         } else {
69             System.out.println("Job作业执行成功!");
70         }
71     }
72 
73 }

 

 

 

 

 

(5)运行main方法,提交作业

 

 

出现异常:

 

Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

 

 

 

由于作业是在本地(Windows)跑的,因此,这里遇到一个本地IO读写权限的问题,具体代码可以见NativeIO这个Java类的源码,在此处:

 

 

网上澳门金莎娱乐 9

 

 

 

 

将源代码全部拷贝出来,在项目下新建一个同包名同类名称的文件如下:

 

 

网上澳门金莎娱乐 10

 

 

 

 

 

打开后,修改代码如下(去掉验证):

 

 

网上澳门金莎娱乐 11

 

 

 

 

(6)再次运行main方法,提交作业

 

 

再次出现异常:

 

 

org.apache.hadoop.security.AccessControlException:

Permission denied: user=Administrator, access=WRITE,inode="/":root:supergroup:drwxr-xr-x

 

 

 

       意思是再说,我当前使用的user是Windows下的Administrator,但是在Hadoop的HDFS文件系统中,没有这个用户,因此,我想用Administrator这个用户向HDFS文件系统Write的时候出现权限不足的异常,因为HDFS文件系统根目录下的文件对其他用户来说,不具备w和r的权限

       原本把mapreduce程序打包放在集群中跑是不用担心用户的hdfs权限问题的,但是,我一开始说了,我不想那么麻烦,无非就是Hadoop开启了HDFS文件系统的权限验证功能,我给它关了(开放)不就行了,因此,我决定直接在hdfs-site.xml配置文件里进行权限验证的修改,添加内容如下:

 

<property>
    <name>dfs.permissions</name>
    <value>false</value>
</property>

 

 

 

网上澳门金莎娱乐 12

 

 

保存后,重启Hadoop集群

 

先stop,再start

 

 

网上澳门金莎娱乐 13

 

 

 

 

(7)再次运行main方法,提交作业

 

 

16:55:44.385 [main] INFO org.apache.hadoop.mapreduce.Job -  map 100% reduce 100%
16:55:44.385 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getTaskCompletionEvents(Job.java:670)
16:55:44.385 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:320)
16:55:44.386 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:320)
16:55:44.386 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getTaskCompletionEvents(Job.java:670)
16:55:44.386 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:320)
16:55:44.386 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:320)
16:55:44.387 [main] INFO org.apache.hadoop.mapreduce.Job - Job job_local539916280_0001 completed successfully
16:55:44.388 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getCounters(Job.java:758)
16:55:44.407 [main] INFO org.apache.hadoop.mapreduce.Job - Counters: 35
 File System Counters
  FILE: Number of bytes read=560
  FILE: Number of bytes written=573902
  FILE: Number of read operations=0
  FILE: Number of large read operations=0
  FILE: Number of write operations=0
  HDFS: Number of bytes read=46
  HDFS: Number of bytes written=24
  HDFS: Number of read operations=13
  HDFS: Number of large read operations=0
  HDFS: Number of write operations=4
 Map-Reduce Framework
  Map input records=3
  Map output records=12
  Map output bytes=72
  Map output materialized bytes=102
  Input split bytes=112
  Combine input records=0
  Combine output records=0
  Reduce input groups=6
  Reduce shuffle bytes=102
  Reduce input records=12
  Reduce output records=6
  Spilled Records=24
  Shuffled Maps =1
  Failed Shuffles=0
  Merged Map outputs=1
  GC time elapsed (ms)=3
  Total committed heap usage (bytes)=605028352
 Shuffle Errors
  BAD_ID=0
  CONNECTION=0
  IO_ERROR=0
  WRONG_LENGTH=0
  WRONG_MAP=0
  WRONG_REDUCE=0
 File Input Format Counters 
  Bytes Read=23
 File Output Format Counters 
  Bytes Written=24
16:55:44.407 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:Administrator (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:320)
Job作业执行成功!

 

 

 

ok,至此,本地执行mapreduce作业已经完成,接下来就是查看我们要的结果了

 

 

 

 

(8)利用Java HDFS API,打开/output/part-r-00000文件内容,输出到控制台

 

 

 

网上澳门金莎娱乐 14

 

 

 

a b c d
e f a c
a c b f

 

 

 

网上澳门金莎娱乐 15

 

 

 

 


 

D、最后,待任务全部完成,交由HDFS进行结果的文件写入

二、MapReduce执行过程

 

 

 

 

网上澳门金莎娱乐 16

 

补充:

 

1、JobTracker  对应于 NameNode,TaskTracker 对应于 DataNode。

2、JobTracker是一个master服务,软件启动之后JobTracker接收Job,负责调度Job的每一个子任务task运行于TaskTracker上,并监控它们,如果发现有失败的task就重新运行它。一般情况应该把JobTracker部署在单独的机器上。

 

 

Client         :客户端提交一个Job给JobTracker

JobTracker :调度Job的每一个mapper和reducer,并运行在TaskTracker上,并监控它们

Mapper      :拿到符合自己的InputSplit(输入内容),进行map后,输出MapOutPut(map的输出内容)

Reducer     : 拿到MapOutPut作为自己的ReduceInput,进行reduce计算,最后输出OutPut结果

 

 

 

 

 

 

 

四、Shuffle过程简介

 

 

 

 

分区1对应的reduce结果文件如下:

 


 

 

2、上传至HDFS的intput目录下

 

 

 

 

 

 

 

 

 

我们分别下载文件*.*-00000和*.*-00001至本地,并进行结果验证,效果如下:

 

网上澳门金莎娱乐 17

网上澳门金莎娱乐 18

 

C、reduce的计算结果(代码实现细节先暂时忽略,文章中会讲到)


五、Map端的Shuffle过程

 

 

 

 

 

 网上澳门金莎娱乐 19

    

 需要知道的是:

 

1、缓存的大小是可以设置的(mapreduce.task.io.sort.mb,默认100M)

2、溢出比(缓存使用率有一个软阈值 == mapreduce.map.sort.spill.percent,默认0.80),当超过阈值时,溢出行为会在后台起一个线程执行从而使Map任务不会因为缓存的溢出而被阻塞。但如果达到硬限制,Map任务会被阻塞,直到溢出行为结束

 

 

 

               注: 本文最后,还会附上一套自己写的HDFS的文件操作Java API,使用起来也很方便,API还在不断的完善..

 

 

 

 

                我就想知道MapReduce的工作原理,而知道原理后,我就想在本地用Java程序跑一遍整个MapReduce的计算过程,这个很难吗? 搜遍全网,没发现几个是自己想要的(也有可能漏掉了),都是可以参考的,但是零零散散不对胃口,不适合入门级“玩家”像我一样的人,最后,下定决心,看视频搜集资料,从头到尾的捋一下MapReduce的原理,以及如何在本地,通过编写map和reduce函数对一个文本文件中的单词进行出现次数的统计,并将结果输出到HDFS文件系统上

网上澳门金莎娱乐 20

 

分区0对应的reduce结果文件如下:

 

 

 

 

 

本文由网上澳门金莎娱乐发布于操作系统,转载请注明出处:Java --本地提交MapReduce作业至集群☞实现 Word Count

关键词: