目的
这篇教程从用户的角度出发,全面地先容了Hadoop Map/Reduce框架的各个方面。
先决条件

概述
Hadoop Map/Reduce是一个利用大略单纯的软件框架,基于它写出来的运用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的办法并行处理上T级别的数据集。
一个Map/Reduce 作业(job) 常日会把输入的数据集切分为多少独立的数据块,由 map任务(task)以完备并行的办法处理它们。框架会对map的输出前辈行排序, 然后把结果输入给reduce任务。常日作业的输入和输出都会被存储在文件系统中。 全体框架负任务务的调度和监控,以及重新实行已经失落败的任务。
常日,Map/Reduce框架和分布式文件系统是运行在一组相同的节点上的,也便是说,打算节点和存储节点常日在一起。这种配置许可框架在那些已经存好数据的节点上高效地调度任务,这可以使全体集群的网络带宽被非常高效地利用。
Map/Reduce框架由一个单独的master JobTracker 和每个集群节点一个slave TaskTracker共同组成。master卖力调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的实行,重新实行已经失落败的任务。而slave仅卖力实行由master指派的任务。
运用程序至少该当指明输入/输出的位置(路径),并通过实现得当的接口或抽象类供应map和reduce函数。再加上其他作业的参数,就构成了作业配置(job configuration)。然后,Hadoop的 job client提交作业(jar包/可实行程序等)和配置信息给JobTracker,后者卖力分发这些软件和配置信息给slave、调度任务并监控它们的实行,同时供应状态和诊断信息给job-client。
虽然Hadoop框架是用JavaTM实现的,但Map/Reduce运用程序则不一定要用 Java来写 。
Hadoop Streaming是一种运行作业的实用工具,它许可用户创建和运行任何可实行程序 (例如:Shell工具)来做为mapper和reducer。Hadoop Pipes是一个与SWIG兼容的C++ API (没有基于JNITM技能),它也可用于实现Map/Reduce运用程序。输入与输出
Map/Reduce框架运转在<key, value> 键值对上,也便是说, 框架把作业的输入看为是一组<key, value> 键值对,同样也产出一组 <key, value> 键值对做为作业的输出,这两组键值对的类型可能不同。
框架须要对key和value的类(classes)进行序列化操作, 因此,这些类须要实现 Writable接口。 其余,为了方便框架实行排序操作,key类必须实现 WritableComparable接口。
一个Map/Reduce 作业的输入和输出类型如下所示:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
例子:WordCount v1.0
在深入细节之前,让我们先看一个Map/Reduce的运用示例,以便对它们的事情办法有一个初步的认识。
WordCount是一个大略的运用,它可以打算出指天命据集中每一个单词涌现的次数。
这个运用适用于 单机模式, 伪分布式模式 或 完备分布式模式 三种Hadoop安装办法。
源代码
WordCount.java1.package org.myorg;2.3.import java.io.IOException;4.import java.util.;5.6.import org.apache.hadoop.fs.Path;7.import org.apache.hadoop.conf.;8.import org.apache.hadoop.io.;9.import org.apache.hadoop.mapred.;10.import org.apache.hadoop.util.;11.12.public class WordCount {13.14. public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {15. private final static IntWritable one = new IntWritable(1);16. private Text word = new Text();17.18. public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {19. String line = value.toString();20. StringTokenizer tokenizer = new StringTokenizer(line);21. while (tokenizer.hasMoreTokens()) {22. word.set(tokenizer.nextToken());23. output.collect(word, one);24. }25. }26. }27.28. public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {29. public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {30. int sum = 0;31. while (values.hasNext()) {32. sum += values.next().get();33. }34. output.collect(key, new IntWritable(sum));35. }36. }37.38. public static void main(String[] args) throws Exception {39. JobConf conf = new JobConf(WordCount.class);40. conf.setJobName(\公众wordcount\"大众);41.42. conf.setOutputKeyClass(Text.class);43. conf.setOutputValueClass(IntWritable.class);44.45. conf.setMapperClass(Map.class);46. conf.setCombinerClass(Reduce.class);47. conf.setReducerClass(Reduce.class);48.49. conf.setInputFormat(TextInputFormat.class);50. conf.setOutputFormat(TextOutputFormat.class);51.52. FileInputFormat.setInputPaths(conf, new Path(args[0]));53. FileOutputFormat.setOutputPath(conf, new Path(args[1]));54.55. JobClient.runJob(conf);57. }58.}59.用法
假设环境变量HADOOP_HOME对应安装时的根目录,HADOOP_VERSION对应Hadoop确当前安装版本,编译WordCount.java来创建jar包,可如下操作:
$ mkdir wordcount_classes
$ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar -d wordcount_classes WordCount.java
$ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/ .
假设:
/usr/joe/wordcount/input - 是HDFS中的输入路径/usr/joe/wordcount/output - 是HDFS中的输出路径用示例文本文件做为输入:
$ bin/hadoop dfs -ls /usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
Hello World Bye World
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop
运行运用程序:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output
输出是:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
运用程序能够利用-files选项来指定一个由逗号分隔的路径列表,这些路径是task确当前事情目录。利用选项-libjars可以向map和reduce的classpath中添加jar包。利用-archives选项程序可以通报档案文件做为参数,这些档案文件会被解压并且在task确当前事情目录下会创建一个指向解压天生的目录的符号链接(以压缩包的名字命名)。 有关命令行选项的更多细节请参考 Commands manual。
利用-libjars和-files运行wordcount例子:
hadoop jar hadoop-examples.jar wordcount -files cachefile.txt -libjars mylib.jar input output
阐明
WordCount运用程序非常刀切斧砍。
Mapper(14-26行)中的map方法(18-25行)通过指定的 TextInputFormat(49行)一次处理一行。然后,它通过StringTokenizer 以空格为分隔符将一行切分为多少tokens,之后,输出< <word>, 1> 形式的键值对。
对付示例中的第一个输入,map输出是:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>
第二个输入,map输出是:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>
关于组成一个指定作业的map数目的确定,以及如何以更风雅的办法去掌握这些map,我们将在教程的后续部分学习到更多的内容。
WordCount还指定了一个combiner (46行)。因此,每次map运行之后,会对输出按照key进行排序,然后把输出通报给本地的combiner(按照作业的配置与Reducer一样),进行本地聚合。
第一个map的输出是:
< Bye, 1>
< Hello, 1>
< World, 2>
第二个map的输出是:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
Reducer(28-36行)中的reduce方法(29-35行) 仅是将每个key(本例中便是单词)涌现的次数求和。
因此这个作业的输出便是:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>
代码中的run方法中指定了作业的几个方面, 例如:通过命令行通报过来的输入/输出路径、key/value的类型、输入/输出的格式等等JobConf中的配置信息。随后程序调用了JobClient.runJob(55行)来提交作业并且监控它的实行。
我们将在本教程的后续部分学习更多的关于JobConf, JobClient, Tool和其他接口及类(class)。
Map/Reduce - 用户界面
这部分文档为用户将会面临的Map/Reduce框架中的各个环节供应了适当的细节。这该当会帮助用户更细粒度地去实现、配置和调优作业。然而,请把稳每个类/接口的javadoc文档供应最全面的文档;本文只是想起到指南的浸染。
我们会先看看Mapper和Reducer接口。运用程序常日会通过供应map和reduce方法来实现它们。
然后,我们会谈论其他的核心接口,个中包括: JobConf,JobClient,Partitioner, OutputCollector,Reporter, InputFormat,OutputFormat等等。
末了,我们将通过谈论框架中一些有用的功能点(例如:DistributedCache, IsolationRunner等等)来扫尾。
核心功能描述
运用程序常日会通过供应map和reduce来实现 Mapper和Reducer接口,它们组成作业的核心。
Mapper
Mapper将输入键值对(key/value pair)映射到一组中间格式的键值对凑集。
Map是一类将输入记录集转换为中间格式记录集的独立任务。 这种转换的中间格式记录集不须要与输入记录集的类型同等。一个给定的输入键值对可以映射成0个或多个输出键值对。
Hadoop Map/Reduce框架为每一个InputSplit产生一个map任务,而每个InputSplit是由该作业的InputFormat产生的。
概括地说,对Mapper的实现者须要重写 JobConfigurable.configure(JobConf)方法,这个方法须要通报一个JobConf参数,目的是完成Mapper的初始化事情。然后,框架为这个任务的InputSplit中每个键值对调用一次 map(WritableComparable, Writable, OutputCollector, Reporter)操作。运用程序可以通过重写Closeable.close()方法来实行相应的清理事情。
输出键值对不须要与输入键值对的类型同等。一个给定的输入键值对可以映射成0个或多个输出键值对。通过调用 OutputCollector.collect(WritableComparable,Writable)可以网络输出的键值对。
运用程序可以利用Reporter报告进度,设定运用级别的状态,更新Counters(计数器),或者仅是表明自己运行正常。
框架随后会把与一个特定key关联的所有中间过程的值(value)分成组,然后把它们传给Reducer以产出终极的结果。用户可以通过 JobConf.setOutputKeyComparatorClass(Class)来指定详细卖力分组的 Comparator。
Mapper的输出被排序后,就被划分给每个Reducer。分块的总数目和一个作业的reduce任务的数目是一样的。用户可以通过实现自定义的 Partitioner来掌握哪个key被分配给哪个 Reducer。
用户可选择通过 JobConf.setCombinerClass(Class)指定一个combiner,它卖力对中间过程的输出进行本地的聚拢,这会有助于降落从Mapper到 Reducer数据传输量。
这些被排好序的中间过程的输出结果保存的格式是(key-len, key, value-len, value),运用程序可以通过JobConf掌握对这些中间结果是否进行压缩以及怎么压缩,利用哪种 CompressionCodec。
须要多少个Map?
Map的数目常日是由输入数据的大小决定的,一样平常便是所有输入文件的总块(block)数。
Map正常的并行规模大致是每个节点(node)大约10到100个map,对付CPU 花费较小的map任务可以设到300个旁边。由于每个任务初始化须要一定的韶光,因此,比较合理的情形是map实行的韶光至少超过1分钟。
这样,如果你输入10TB的数据,每个块(block)的大小是128MB,你将须要大约82,000个map来完成任务,除非利用 setNumMapTasks(int)(把稳:这里仅仅是对框架进行了一个提示(hint),实际决定成分见这里)将这个数值设置得更高。
Reducer
Reducer将与一个key关联的一组中间数值集归约(reduce)为一个更小的数值集。
用户可以通过 JobConf.setNumReduceTasks(int)设定一个作业中reduce任务的数目。
概括地说,对Reducer的实现者须要重写 JobConfigurable.configure(JobConf)方法,这个方法须要通报一个JobConf参数,目的是完成Reducer的初始化事情。然后,框架为成组的输入数据中的每个<key, (list of values)>对调用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。之后,运用程序可以通过重写Closeable.close()来实行相应的清理事情。
Reducer有3个紧张阶段:shuffle、sort和reduce。
Shuffle
Reducer的输入便是Mapper已经排好序的输出。在这个阶段,框架通过HTTP为每个Reducer得到所有Mapper输出中与之干系的分块。
Sort
这个阶段,框架将按照key的值对Reducer的输入进行分组 (由于不同mapper的输出中可能会有相同的key)。
Shuffle和Sort两个阶段是同时进行的;map的输出也是一边被取回一边被合并的。
Secondary Sort
如果须要中间过程对key的分组规则和reduce前对key的分组规则不同,那么可以通过 JobConf.setOutputValueGroupingComparator(Class)来指定一个Comparator。再加上 JobConf.setOutputKeyComparatorClass(Class)可用于掌握中间过程的key如何被分组,以是结合两者可以实现按值的二次排序。
Reduce
在这个阶段,框架为已分组的输入数据中的每个 <key, (list of values)>对调用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。
Reduce任务的输出常日是通过调用 OutputCollector.collect(WritableComparable, Writable)写入 文件系统的。
运用程序可以利用Reporter报告进度,设定运用程序级别的状态,更新Counters(计数器),或者仅是表明自己运行正常。
Reducer的输出是没有排序的。
须要多少个Reduce?
Reduce的数目建议是0.95或1.75乘以 (<no. of nodes> mapred.tasktracker.reduce.tasks.maximum)。
用0.95,所有reduce可以在maps一完成时就急速启动,开始传输map的输出结果。用1.75,速率快的节点可以在完成第一轮reduce任务后,可以开始第二轮,这样可以得到比较好的负载均衡的效果。
增加reduce的数目会增加全体框架的开销,但可以改进负载均衡,降落由于实行失落败带来的负面影响。
上述比例因子比整体数目稍小一些是为了给框架中的推测性任务(speculative-tasks) 或失落败的任务预留一些reduce的资源。
无Reducer
如果没有归约要进行,那么设置reduce任务的数目为零是合法的。
这种情形下,map任务的输出会直接被写入由 setOutputPath(Path)指定的输出路径。框架在把它们写入FileSystem之前没有对它们进行排序。
Partitioner
Partitioner用于划分键值空间(key space)。
Partitioner卖力掌握map输出结果key的分割。Key(或者一个key子集)被用于产生分区,常日利用的是Hash函数。分区的数目与一个作业的reduce任务的数目是一样的。因此,它掌握将中间过程的key(也便是这条记录)该当发送给m个reduce任务中的哪一个来进行reduce操作。
HashPartitioner是默认的 Partitioner。
Reporter
Reporter是用于Map/Reduce运用程序报告进度,设定运用级别的状态, 更新Counters(计数器)的机制。
Mapper和Reducer的实现可以利用Reporter 来报告进度,或者仅是表明自己运行正常。在那种运用程序须要花很永劫光处理个别键值对的场景中,这种机制是很关键的,由于框架可能会以为这个任务超时了,从而将它强行杀去世。另一个避免这种情形发生的办法是,将配置参数mapred.task.timeout设置为一个足够高的值(或者干脆设置为零,则没有超时限定了)。
运用程序可以用Reporter来更新Counter(计数器)。
OutputCollector
OutputCollector是一个Map/Reduce框架供应的用于网络 Mapper或Reducer输出数据的通用机制 (包括中间输出结果和作业的输出结果)。
Hadoop Map/Reduce框架附带了一个包含许多实用型的mapper、reducer和partitioner 的类库。
作业配置
JobConf代表一个Map/Reduce作业的配置。
JobConf是用户向Hadoop框架描述一个Map/Reduce作业如何实行的紧张接口。框架会按照JobConf描述的信息虔诚地去考试测验完成这个作业,然而:
一些参数可能会被管理者标记为 final,这意味它们不能被变动。一些作业的参数可以被直截了当地进行设置(例如: setNumReduceTasks(int)),而另一些参数则与框架或者作业的其他参数之间奇妙地相互影响,并且设置起来比较繁芜(例如: setNumMapTasks(int))。常日,JobConf会指明Mapper、Combiner(如果有的话)、 Partitioner、Reducer、InputFormat和 OutputFormat的详细实现。JobConf还能指定一组输入文件 (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以及输出文件该当写在哪儿 (setOutputPath(Path))。
JobConf可选择地对作业设置一些高等选项,例如:设置Comparator; 放到DistributedCache上的文件;中间结果或者作业输出结果是否须要压缩以及怎么压缩; 利用用户供应的脚本(setMapDebugScript(String)/setReduceDebugScript(String)) 进行调试;作业是否许可预防性(speculative)任务的实行 (setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean)) ;每个任务最大的考试测验次数 (setMaxMapAttempts(int)/setMaxReduceAttempts(int)) ;一个作业能容忍的任务失落败的百分比 (setMaxMapTaskFailuresPercent(int)/setMaxReduceTaskFailuresPercent(int)) ;等等。
当然,用户能利用 set(String, String)/get(String, String) 来设置或者取得运用程序须要的任意参数。然而,DistributedCache的利用是面向大规模只读数据的。
任务的实行和环境
TaskTracker是在一个单独的jvm上以子进程的形式实行 Mapper/Reducer任务(Task)的。
子任务会继续父TaskTracker的环境。用户可以通过JobConf中的 mapred.child.java.opts配置参数来设定子jvm上的附加选项,例如: 通过-Djava.library.path=<> 将一个非标准路径设为运行时的链接用以搜索共享库,等等。如果mapred.child.java.opts包含一个符号@taskid@, 它会被更换成map/reduce的taskid的值。
下面是一个包含多个参数和更换的例子,个中包括:记录jvm GC日志; JVM JMX代理程序以无密码的办法启动,这样它就能连接到jconsole上,从而可以查看子进程的内存和线程,得到线程的dump;还把子jvm的最大堆尺寸设置为512MB, 并为子jvm的java.library.path添加了一个附加路径。
<property>
<name>mapred.child.java.opts</name>
<value>
-Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
-Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
</value>
</property>
用户或管理员也可以利用mapred.child.ulimit设定运行的子任务的最大虚拟内存。mapred.child.ulimit的值以(KB)为单位,并且必须大于或即是-Xmx参数传给JavaVM的值,否则VM会无法启动。
把稳:mapred.child.java.opts只用于设置task tracker启动的子任务。为守护进程设置内存选项请查看 cluster_setup.html
${mapred.local.dir}/taskTracker/是task tracker确当地目录, 用于创建本地缓存和job。它可以指定多个目录(超过多个磁盘),文件会半随机的保存到本地路径下的某个目录。当job启动时,task tracker根据配置文档创建本地job目录,目录构造如以下所示:
${mapred.local.dir}/taskTracker/archive/ :分布式缓存。这个目录保存本地的分布式缓存。因此本地分布式缓存是在所有task和job间共享的。${mapred.local.dir}/taskTracker/jobcache/$jobid/ : 本地job目录。${mapred.local.dir}/taskTracker/jobcache/$jobid/work/: job指定的共享目录。各个任务可以利用这个空间做为暂存空间,用于它们之间共享文件。这个目录通过job.local.dir 参数暴露给用户。这个路径可以通过API JobConf.getJobLocalDir()来访问。它也可以被做为系统属性得到。因此,用户(比如运行streaming)可以调用System.getProperty(\"大众job.local.dir\"大众)得到该目录。${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/: 存放jar包的路径,用于存放作业的jar文件和展开的jar。job.jar是运用程序的jar文件,它会被自动分发到各台机器,在task启动前会被自动展开。利用api JobConf.getJar() 函数可以得到job.jar的位置。利用JobConf.getJar().getParent()可以访问存放展开的jar包的目录。${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml: 一个job.xml文件,本地的通用的作业配置文件。${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid: 每个任务有一个目录task-id,它里面有如下的目录构造:${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml: 一个job.xml文件,本地化的任务作业配置文件。任务本地化是指为该task设定特定的属性值。这些值会不才面详细解释。${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output 一个存放中间过程的输出文件的目录。它保存了由framwork产生的临时map reduce数据,比如map的输出文件等。${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work: task确当前事情目录。${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp: task的临时目录。(用户可以设定属性mapred.child.tmp 来为map和reduce task设定临时目录。缺省值是./tmp。如果这个值不是绝对路径, 它会把task的事情路径加到该路径前面作为task的临时文件路径。如果这个值是绝对路径则直策应用这个值。 如果指定的目录不存在,会自动创建该目录。之后,按照选项 -Djava.io.tmpdir='临时文件的绝对路径'实行java子任务。 pipes和streaming的临时文件路径是通过环境变量TMPDIR='the absolute path of the tmp dir'设定的)。 如果mapred.child.tmp有./tmp值,这个目录会被创建。下面的属性是为每个task实行时利用确当地参数,它们保存在本地化的任务作业配置文件里:
名称类型描述mapred.job.idStringjob idmapred.jarStringjob目录下job.jar的位置job.local.dirStringjob指定的共享存储空间mapred.tip.idStringtask idmapred.task.idStringtask考试测验idmapred.task.is.mapboolean是否是map taskmapred.task.partitioninttask在job中的idmap.input.fileStringmap读取的文件名map.input.startlongmap输入的数据块的起始位置偏移map.input.lengthlongmap输入的数据块的字节数mapred.work.output.dirStringtask临时输出目录
task的标准输出和缺点输出流会被读到TaskTracker中,并且记录到 ${HADOOP_LOG_DIR}/userlogs
DistributedCache 可用于map或reduce task等分发jar包和本地库。子jvm总是把 当前事情目录 加到 java.library.path 和 LD_LIBRARY_PATH。 因此,可以通过 System.loadLibrary或 System.load装载缓存的库。有关利用分布式缓存加载共享库的细节请参考 native_libraries.html
作业的提交与监控
JobClient是用户提交的作业与JobTracker交互的紧张接口。
JobClient 供应提交作业,追踪进程,访问子任务的日志记录,得到Map/Reduce集群状态信息等功能。
作业提交过程包括:
检讨作业输入输出样式细节为作业打算InputSplit值。如果须要的话,为作业的DistributedCache建立必须的统计信息。拷贝作业的jar包和配置文件到FileSystem上的Map/Reduce系统目录下。提交作业到JobTracker并且监控它的状态。作业的历史文件记录到指定目录的\公众_logs/history/\"大众子目录下。这个指定目录由hadoop.job.history.user.location设定,默认是作业输出的目录。因此默认情形下,文件会存放在mapred.output.dir/_logs/history目录下。用户可以设置hadoop.job.history.user.location为none来停滞日志记录。
用户利用下面的命令可以看到在指定目录下的历史日志记录的择要。
$ bin/hadoop job -history output-dir
这个命令会打印出作业的细节,以及失落败的和被杀去世的任务细节。
要查看有关作业的更多细节例如成功的任务、每个任务考试测验的次数(task attempt)等,可以利用下面的命令
$ bin/hadoop job -history all output-dir
用户可以利用 OutputLogFilter 从输出目录列表中筛选日志文件。
一样平常情形,用户利用JobConf创建运用程序并配置作业属性, 然后用 JobClient 提交作业并监视它的进程。
作业的掌握
有时候,用一个单独的Map/Reduce作业并不能完成一个繁芜的任务,用户大概要链接多个Map/Reduce作业才行。这是随意马虎实现的,由于作业常日输出到分布式文件系统上的,以是可以把这个作业的输出作为下一个作业的输入实现串联。
然而,这也意味着,确保每一作业完成(成功或失落败)的任务就直接落在了客户身上。在这种情形下,可以用的掌握作业的选项有:
runJob(JobConf):提交作业,仅当作业完成时返回。submitJob(JobConf):只提交作业,之后须要你轮询它返回的 RunningJob句柄的状态,并根据情形调度。JobConf.setJobEndNotificationURI(String):设置一个作业完成关照,可避免轮询。作业的输入
InputFormat 为Map/Reduce作业描述输入的细节规范。
Map/Reduce框架根据作业的InputFormat来:
检讨作业输入的有效性。把输入文件切分成多个逻辑InputSplit实例, 并把每一实例分别分发给一个 Mapper。供应RecordReader的实现,这个RecordReader从逻辑InputSplit中得到输入记录, 这些记录将由Mapper处理。基于文件的InputFormat实现(常日是 FileInputFormat的子类) 默认行为是按照输入文件的字节大小,把输入数据切分成逻辑分块(logical InputSplit )。 个中输入文件所在的FileSystem的数据块尺寸是分块大小的上限。下限可以设置mapred.min.split.size 的值。
考虑到边界情形,对付很多运用程序来说,很明显按照文件大小进行逻辑分割是不能知足需求的。 在这种情形下,运用程序须要实现一个RecordReader来处理记录的边界并为每个任务供应一个逻辑分块的面向记录的视图。
TextInputFormat 是默认的InputFormat。
如果一个作业的Inputformat是TextInputFormat, 并且框架检测到输入文件的后缀是.gz和.lzo,就会利用对应的CompressionCodec自动解压缩这些文件。 但是须要把稳,上述带后缀的压缩文件不会被切分,并且全体压缩文件会分给一个mapper来处理。
InputSplit
InputSplit 是一个单独的Mapper要处理的数据块。
一样平常的InputSplit 是字节样式输入,然后由RecordReader处理并转化成记录样式。
FileSplit 是默认的InputSplit。 它把 map.input.file 设定为输入文件的路径,输入文件是逻辑分块文件。
RecordReader
RecordReader 从InputSlit读入<key, value>对。
一样平常的,RecordReader 把由InputSplit 供应的字节样式的输入文件,转化成由Mapper处理的记录样式的文件。 因此RecordReader卖力处理记录的边界情形和把数据表示成keys/values对形式。
作业的输出
OutputFormat 描述Map/Reduce作业的输出样式。
Map/Reduce框架根据作业的OutputFormat来:
考验作业的输出,例如检讨输出路径是否已经存在。供应一个RecordWriter的实现,用来输出作业结果。 输出文件保存在FileSystem上。TextOutputFormat是默认的 OutputFormat。
任务的Side-Effect File
在一些运用程序中,子任务须要产生一些side-file,这些文件与作业实际输出结果的文件不同。
在这种情形下,同一个Mapper或者Reducer的两个实例(比如预防性任务)同时打开或者写 FileSystem上的同一文件就会产生冲突。因此运用程序在写文件的时候须要为每次任务考试测验(不仅仅是每次任务,每个任务可以考试测验实行很多次)选取一个独一无二的文件名(利用attemptid,例如task_200709221812_0001_m_000000_0)。
为了避免冲突,Map/Reduce框架为每次考试测验实行任务都建立和掩护一个分外的 ${mapred.output.dir}/_temporary/_${taskid}子目录,这个目录位于本次考试测验实行任务输出结果所在的FileSystem上,可以通过 ${mapred.work.output.dir}来访问这个子目录。 对付成功完成的任务考试测验,只有${mapred.output.dir}/_temporary/_${taskid}下的文件会移动到${mapred.output.dir}。当然,框架会丢弃那些失落败的任务考试测验的子目录。这种处理过程对付运用程序来说是完备透明的。
在任务实行期间,运用程序在写文件时可以利用这个特性,比如 通过 FileOutputFormat.getWorkOutputPath()得到${mapred.work.output.dir}目录, 并在其下创建任意任务实行时所需的side-file,框架在任务考试测验成功时会立时移动这些文件,因此不须要在程序内为每次任务考试测验选取一个独一无二的名字。
把稳:在每次任务考试测验实行期间,${mapred.work.output.dir} 的值实际上是 ${mapred.output.dir}/_temporary/_{$taskid},这个值是Map/Reduce框架创建的。 以是利用这个特性的方法是,在 FileOutputFormat.getWorkOutputPath() 路径下创建side-file即可。
对付只利用map不该用reduce的作业,这个结论也成立。这种情形下,map的输出结果直接天生到HDFS上。
RecordWriter
RecordWriter 天生<key, value> 对到输出文件。
RecordWriter的实现把作业的输出结果写到 FileSystem。
其他有用的特性
Counters
Counters 是多个由Map/Reduce框架或者运用程序定义的全局计数器。 每一个Counter可以是任何一种 Enum类型。同一特定Enum类型的Counter可以搜集到一个组,其类型为Counters.Group。
运用程序可以定义任意(Enum类型)的Counters并且可以通过 map 或者 reduce方法中的 Reporter.incrCounter(Enum, long)或者 Reporter.incrCounter(String, String, long) 更新。之后框架会汇总这些全局counters。
DistributedCache
DistributedCache 可将详细运用干系的、大尺寸的、只读的文件有效地分布放置。
DistributedCache 是Map/Reduce框架供应的功能,能够缓存运用程序所需的文件 (包括文本,档案文件,jar文件等)。
运用程序在JobConf中通过url(hdfs://)指定须要被缓存的文件。 DistributedCache假定由hdfs://格式url指定的文件已经在 FileSystem上了。
Map-Redcue框架在作业所有任务实行之前会把必要的文件拷贝到slave节点上。 它运行高效是由于每个作业的文件只拷贝一次并且为那些没有文档的slave节点缓存文档。
DistributedCache 根据缓存文档修正的韶光戳进行追踪。 在作业实行期间,当前运用程序或者外部程序不能修正缓存文件。
distributedCache可以分发大略的只读数据或文本文件,也可以分发繁芜类型的文件例如归档文件和jar文件。归档文件(zip,tar,tgz和tar.gz文件)在slave节点上会被解档(un-archived)。 这些文件可以设置实行权限。
用户可以通过设置mapred.cache.{files|archives}来分发文件。 如果要分发多个文件,可以利用逗号分隔文件所在路径。也可以利用API来设置该属性: DistributedCache.addCacheFile(URI,conf)/ DistributedCache.addCacheArchive(URI,conf) and DistributedCache.setCacheFiles(URIs,conf)/ DistributedCache.setCacheArchives(URIs,conf) 个中URI的形式是 hdfs://host:port/absolute-path#link-name 在Streaming程序中,可以通过命令行选项 -cacheFile/-cacheArchive 分发文件。
用户可以通过 DistributedCache.createSymlink(Configuration)方法让DistributedCache 在当前事情目录下创建到缓存文件的符号链接。 或者通过设置配置文件属性mapred.create.symlink为yes。 分布式缓存会截取URI的片段作为链接的名字。 例如,URI是 hdfs://namenode:port/lib.so.1#lib.so, 则在task当前事情目录会有名为lib.so的链接, 它会链接分布式缓存中的lib.so.1。
DistributedCache可在map/reduce任务中作为 一种根本软件分发机制利用。它可以被用于分发jar包和本地库(native libraries)。 DistributedCache.addArchiveToClassPath(Path, Configuration)和 DistributedCache.addFileToClassPath(Path, Configuration) API能够被用于 缓存文件和jar包,并把它们加入子jvm的classpath。也可以通过设置配置文档里的属性 mapred.job.classpath.{files|archives}达到相同的效果。缓存文件可用于分发和装载本地库。
Tool
Tool 接口支持处理常用的Hadoop命令行选项。
Tool 是Map/Reduce工具或运用的标准。运用程序应只处理其定制参数, 要把标准命令行选项通过 ToolRunner.run(Tool, String[]) 委托给 GenericOptionsParser处理。
Hadoop命令行的常用选项有:
-conf <configuration file>
-D <property=value>
-fs <local|namenode:port>
-jt <local|jobtracker:port>
IsolationRunner
IsolationRunner 是帮助调试Map/Reduce程序的工具。
利用IsolationRunner的方法是,首先设置 keep.failed.tasks.files属性为true (同时参考keep.tasks.files.pattern)。
然后,登录到任务运行失落败的节点上,进入 TaskTracker确当地路径运行 IsolationRunner:
$ cd <local path>/taskTracker/${taskid}/work
$ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml
IsolationRunner会把失落败的任务放在单独的一个能够调试的jvm上运行,并且采取和之前完备一样的输入数据。
Profiling
Profiling是一个工具,它利用内置的java profiler工具进行剖析得到(2-3个)map或reduce样例运行剖析报告。
用户可以通过设置属性mapred.task.profile指定系统是否采集profiler信息。 利用api JobConf.setProfileEnabled(boolean)可以修正属性值。如果设为true, 则开启profiling功能。profiler信息保存在用户日志目录下。缺省情形,profiling功能是关闭的。
如果用户设定利用profiling功能,可以利用配置文档里的属性 mapred.task.profile.{maps|reduces} 设置要profile map/reduce task的范围。设置该属性值的api是 JobConf.setProfileTaskRange(boolean,String)。 范围的缺省值是0-2。
用户可以通过设定配置文档里的属性mapred.task.profile.params 来指定profiler配置参数。修正属性要利用api JobConf.setProfileParams(String)。当运行task时,如果字符串包含%s。 它会被更换成profileing的输出文件名。这些参数会在命令行里通报到子JVM中。缺省的profiling 参数是 -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s。
调试
Map/Reduce框架能够运行用户供应的用于调试的脚本程序。 当map/reduce任务失落败时,用户可以通过运行脚本在任务日志(例如任务的标准输出、标准缺点、系统日志以及作业配置文件)上做后续处理事情。用户供应的调试脚本程序的标准输出和标准缺点会输出为诊断文件。如果须要的话这些输出结果也可以打印在用户界面上。
在接下来的章节,我们谈论如何与作业一起提交调试脚本。为了提交调试脚本, 首先要把这个脚本分发出去,而且还要在配置文件里设置。
如何分发脚本文件:
用户要用 DistributedCache 机制来分发和链接脚本文件
如何提交脚本:
一个快速提交调试脚本的方法是分别为须要调试的map任务和reduce任务设置 \公众mapred.map.task.debug.script\"大众 和 \公众mapred.reduce.task.debug.script\"大众 属性的值。这些属性也可以通过 JobConf.setMapDebugScript(String) 和 JobConf.setReduceDebugScript(String) API来设置。对付streaming, 可以分别为须要调试的map任务和reduce任务利用命令行选项-mapdebug 和 -reducedegug来提交调试脚本。
脚本的参数是任务的标准输出、标准缺点、系统日志以及作业配置文件。在运行map/reduce失落败的节点上运行调试命令是:
$script $stdout $stderr $syslog $jobconf
Pipes 程序根据第五个参数得到c++程序名。 因此调试pipes程序的命令是
$script $stdout $stderr $syslog $jobconf $program
默认行为
对付pipes,默认的脚本会用gdb处理core dump, 打印 stack trace并且给出正在运行线程的信息。
JobControl
JobControl是一个工具,它封装了一组Map/Reduce作业以及他们之间的依赖关系。
数据压缩
Hadoop Map/Reduce框架为运用程序的写入文件操作供应压缩工具,这些工具可以为map输出的中间数据和作业终极输出数据(例如reduce的输出)供应支持。它还附带了一些 CompressionCodec的实现,比如实现了 zlib和lzo压缩算法。 Hadoop同样支持gzip文件格式。
考虑到性能问题(zlib)以及Java类库的缺失落(lzo)等成分,Hadoop也为上述压缩解压算法供应本地库的实现。更多的细节请参考 这里。
中间输出
运用程序可以通过 JobConf.setCompressMapOutput(boolean)api掌握map输出的中间结果,并且可以通过 JobConf.setMapOutputCompressorClass(Class)api指定 CompressionCodec。
作业输出
运用程序可以通过 FileOutputFormat.setCompressOutput(JobConf, boolean) api掌握输出是否须要压缩并且可以利用 FileOutputFormat.setOutputCompressorClass(JobConf, Class)api指定CompressionCodec。
如果作业输出要保存成 SequenceFileOutputFormat格式,须要利用 SequenceFileOutputFormat.setOutputCompressionType(JobConf, SequenceFile.CompressionType)api,来设定 SequenceFile.CompressionType (i.e. RECORD / BLOCK - 默认是RECORD)。
例子:WordCount v2.0
这里是一个更全面的WordCount例子,它利用了我们已经谈论过的很多Map/Reduce框架供应的功能。
运行这个例子须要HDFS的某些功能,特殊是 DistributedCache干系功能。因此这个例子只能运行在 伪分布式或者 完备分布式模式的 Hadoop上。
源代码
WordCount.java1.package org.myorg;2.3.import java.io.;4.import java.util.;5.6.import org.apache.hadoop.fs.Path;7.import org.apache.hadoop.filecache.DistributedCache;8.import org.apache.hadoop.conf.;9.import org.apache.hadoop.io.;10.import org.apache.hadoop.mapred.;11.import org.apache.hadoop.util.;12.13.public class WordCount extends Configured implements Tool {14.15. public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {16.17. static enum Counters { INPUT_WORDS }18.19. private final static IntWritable one = new IntWritable(1);20. private Text word = new Text();21.22. private boolean caseSensitive = true;23. private Set<String> patternsToSkip = new HashSet<String>();24.25. private long numRecords = 0;26. private String inputFile;27.28. public void configure(JobConf job) {29. caseSensitive = job.getBoolean(\"大众wordcount.case.sensitive\"大众, true);30. inputFile = job.get(\"大众map.input.file\"大众);31.32. if (job.getBoolean(\公众wordcount.skip.patterns\"大众, false)) {33. Path[] patternsFiles = new Path[0];34. try {35. patternsFiles = DistributedCache.getLocalCacheFiles(job);36. } catch (IOException ioe) {37. System.err.println(\公众Caught exception while getting cached files: \"大众 + StringUtils.stringifyException(ioe));38. }39. for (Path patternsFile : patternsFiles) {40. parseSkipFile(patternsFile);41. }42. }43. }44.45. private void parseSkipFile(Path patternsFile) {46. try {47. BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));48. String pattern = null;49. while ((pattern = fis.readLine()) != null) {50. patternsToSkip.add(pattern);51. }52. } catch (IOException ioe) {53. System.err.println(\"大众Caught exception while parsing the cached file '\"大众 + patternsFile + \公众' : \公众 + StringUtils.stringifyException(ioe));54. }55. }56.57. public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {58. String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();59.60. for (String pattern : patternsToSkip) {61. line = line.replaceAll(pattern, \"大众\公众);62. }63.64. StringTokenizer tokenizer = new StringTokenizer(line);65. while (tokenizer.hasMoreTokens()) {66. word.set(tokenizer.nextToken());67. output.collect(word, one);68. reporter.incrCounter(Counters.INPUT_WORDS, 1);69. }70.71. if ((++numRecords % 100) == 0) {72. reporter.setStatus(\"大众Finished processing \"大众 + numRecords + \"大众 records \"大众 + \公众from the input file: \公众 + inputFile);73. }74. }75. }76.77. public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {78. public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {79. int sum = 0;80. while (values.hasNext()) {81. sum += values.next().get();82. }83. output.collect(key, new IntWritable(sum));84. }85. }86.87. public int run(String[] args) throws Exception {88. JobConf conf = new JobConf(getConf(), WordCount.class);89. conf.setJobName(\"大众wordcount\"大众);90.91. conf.setOutputKeyClass(Text.class);92. conf.setOutputValueClass(IntWritable.class);93.94. conf.setMapperClass(Map.class);95. conf.setCombinerClass(Reduce.class);96. conf.setReducerClass(Reduce.class);97.98. conf.setInputFormat(TextInputFormat.class);99. conf.setOutputFormat(TextOutputFormat.class);100.101. List<String> other_args = new ArrayList<String>();102. for (int i=0; i < args.length; ++i) {103. if (\"大众-skip\公众.equals(args[i])) {104. DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);105. conf.setBoolean(\"大众wordcount.skip.patterns\"大众, true);106. } else {107. other_args.add(args[i]);108. }109. }110.111. FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));112. FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));113.114. JobClient.runJob(conf);115. return 0;116. }117.118. public static void main(String[] args) throws Exception {119. int res = ToolRunner.run(new Configuration(), new WordCount(), args);120. System.exit(res);121. }122.}123.运行样例
输入样例:
$ bin/hadoop dfs -ls /usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
Hello World, Bye World!
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
Hello Hadoop, Goodbye to hadoop.
运行程序:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output
输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop, 1
Hello 2
World! 1
World, 1
hadoop. 1
to 1
把稳此时的输入与第一个版本的不同,输出的结果也有不同。
现在通过DistributedCache插入一个模式文件,文件中保存了要被忽略的单词模式。
$ hadoop dfs -cat /user/joe/wordcount/patterns.txt
\.
\,
\!
to
再运行一次,这次利用更多的选项:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=true /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
该当得到这样的输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 1
Hello 2
World 2
hadoop 1
再运行一次,这一次关闭大小写敏感性(case-sensitivity):
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=false /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
bye 1
goodbye 1
hadoop 2
hello 2
world 2
程序要点
通过利用一些Map/Reduce框架供应的功能,WordCount的第二个版本在原始版本根本上有了如下的改进:
展示了运用程序如何在Mapper (和Reducer)中通过configure方法 修正配置参数(28-43行)。展示了作业如何利用DistributedCache 来分发只读数据。 这里许可用户指定单词的模式,在计数时忽略那些符合模式的单词(104行)。展示Tool接口和GenericOptionsParser处理Hadoop命令行选项的功能 (87-116, 119行)。展示了运用程序如何利用Counters(68行),如何通过通报给map(和reduce) 方法的Reporter实例来设置运用程序的状态信息(72行)。