• 与HDFS办理问题的事理类似,HDFS是将大的文件切分成多少小文件,然后将它们分别存储到集群中各个主机中。
• 同样事理,mapreduce是将一个繁芜的运算切分成若个子运算,然后将它们分别交给集群中各个主机,由各个主机并走运算。
1.1 mapreduce产生的背景

• 海量数据在单机上处理由于硬件资源限定,无法胜任。
• 而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的繁芜度和开拓难度。
• 引入mapreduce框架后,开拓职员可以将绝大部分事情集中在业务逻辑的开拓上,而将分布式打算中的繁芜性交由框架来处理。
1.2 mapreduce编程模型
• 一种分布式打算模型。
• MapReduce将这个并行打算过程抽象到两个函数。
– Map(映射):对一些独立元素组成的列表的每一个元素进行指定的操作,可以高度并行。
– Reduce(化简 归约):对一个列表的元素进行合并。
• 一个大略的MapReduce程序只须要指定map()、reduce()、input和output,剩下的事由框架完成。
Mapreduce的几个关键名词
• Job :用户的每一个打算要求称为一个作业。
• Task:每一个作业,都须要拆分开了,交由多个主机来完成,拆分出来的实行单位便是任务。
• Task又分为如下三种类型的任务:
– Map:卖力map阶段的全体数据处理流程
– Reduce:卖力reduce阶段的全体数据处理流程
– MRAppMaster:卖力全体程序的过程调度及状态折衷
1.4 mapreduce程序运行流程
详细流程解释:
一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,打算出须要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程
maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:
– 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对。
– 将输入KV(k是文件的行号,v是文件一行的数据)对通报给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对网络到缓存。
– 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件
MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并奉告reducetask进程要处理的数据范围(数据分区)
Reducetask进程启动之后,根据MRAppMaster奉告的待处理数据所在位置,从多少台maptask运行所在机器上获取到多少个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并网络运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储
1.5 编写MapReduce程序
• 基于MapReduce 打算模型编写分布式并行程序非常大略,程序员的紧张编码事情便是实现Map 和Reduce函数。
• 其它的并行编程中的各类繁芜问题,如分布式存储,事情调度,负载平衡,容错处理,网络通信等,均由YARN框架卖力处理。
• MapReduce中,map和reduce函数遵照如下常规格式:
map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)
• Mapper的接口:
protected void map(KEY key, VALUE value, Context context)
throws IOException, InterruptedException {
}
• Reduce的接口:
protected void reduce(KEY key, Iterable<VALUE> values,
Context context) throws IOException, InterruptedException {
}
• Mapreduce程序代码基本构造
maprecue实例开拓
2.1 编程步骤
用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)
Mapper的输入数据是KV对的形式(KV的类型可自定义)
Mapper的输出数据是KV对的形式(KV的类型可自定义)
Mapper中的业务逻辑写在map()方法中
map()方法(maptask进程)对每一个<K,V>调用一次
Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
Reducer的业务逻辑写在reduce()方法中
Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
用户自定义的Mapper和Reducer都要继续各自的父类
全体程序须要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job工具
2.2 经典的wordcount程序编写
需求:有一批文件(规模为TB级或者PB级),如何统计这些文件中所有单词涌现次数
如有三个文件,文件名是qfcourse.txt、qfstu.txt 和 qf_teacher
qf_course.txt内容:
php java linux
bigdata VR
C C++ java web
linux shell
qf_stu.txt内容:
tom jim lucy
lily sally
andy
tom jim sally
qf_teacher内容:
jerry Lucy tom
jim
方案
– 分别统计每个文件中单词涌现次数 - map()
– 累加不同文件中同一个单词涌现次数 - reduce()
实当代码
– 创建一个大略的maven项目
– 添加hadoop client依赖的jar,pom.xml紧张内容如下:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
– 编写代码
– 自定义一个mapper类
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/
Maper里面的泛型的四个类型从左到右依次是:
LongWritable KEYIN: 默认情形下,是mr框架所读到的一行文本的起始偏移量,Long, 类似于行号但是在hadoop中有自己的更精简的序列化接口,以是不直接用Long,而用LongWritable
Text VALUEIN:默认情形下,是mr框架所读到的一行文本的内容,String,同上,用Text
Text KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String,同上,用Text
IntWritable VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,同上,用IntWritable
/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
/
map阶段的业务逻辑就写在自定义的map()方法中
maptask会对每一行输入数据调用一次我们自定义的map()方法
/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//将maptask传给我们的一行的文本内容先转换成String
String line = value.toString();
//根据空格将这一行切分成单词
String[] words = line.split(\"大众 \"大众);
/
将单词输出为<单词,1>
如<lily,1> <lucy,1> <c,1> <c++,1> <tom,1>
/
for(String word:words){
//将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reduce task
context.write(new Text(word), new IntWritable(1));
}
}
}
– 自定义一个reduce类
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/
Reducer里面的泛型的四个类型从左到右依次是:
Text KEYIN: 对应mapper输出的KEYOUT
IntWritable VALUEIN: 对应mapper输出的VALUEOUT
KEYOUT, 是单词
VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型,是总次数
/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
/
<tom,1>
<tom,1>
<linux,1>
<banana,1>
<banana,1>
<banana,1>
入参key,是一组相同单词kv对的key
values是多少相同key的value凑集
如 <tom,[1,1]> <linux,[1]> <banana,[1,1,1]>
/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count=0; //累加单词的涌现的次数
for(IntWritable value:values){
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
– 编写一个Driver类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/
相称于一个yarn集群的客户端
须要在此封装我们的mr程序的干系运行参数,指定jar包
末了提交给yarn
/
public class WordcountDriver {
/
该类是运行在hadoop客户真个,main一运行,yarn客户端就启动起来了,与yarn做事器端通信
yarn做事器端卖力启动mapreduce程序并利用WordcountMapper和WordcountReducer类
/
public static void main(String[] args) throws Exception {
//此代码须要两个输入参数 第一个参数支持要处理的源文件;第二个参数是处理结果的输出路径
if (args == null || args.length == 0) {
args = new String[2];
//路径都是 hdfs系统的文件路径
args[0] = \"大众hdfs://192.168.18.64:9000/wordcount/input/\"大众;
args[1] = \公众hdfs://192.168.18.64:9000/wordcount/output\"大众;
}
/
什么也不设置时,如果在安装了hadoop的机器上运行时,自动读取
/home/hadoop/app/hadoop-2.7.1/etc/hadoop/core-site.xml
文件放入Configuration中
/
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//指定本程序的jar包所在确当地路径
job.setJarByClass(WordcountDriver.class);
//指定本业务job要利用的mapper业务类
job.setMapperClass(WordcountMapper.class);
//指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定本业务job要利用的Reducer业务类
job.setReducerClass(WordcountReducer.class);
//指定终极输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//将job中配置的干系参数,以及job所用的java类所在的jar包,提交给yarn去运行
/job.submit();/
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
wordcount处理过程
将文件拆分成splits,由于测试用的文件较小,以是每个文件为一个split,并将文件按行分割形成<key,value>对,下图所示。这一步由MapReduce框架自动完成,个中偏移量(即key值)包括了回车所占的字符数(Windows/Linux环境不同)。
将分割好的<key,value>对交给用户定义的map方法进行处理,天生新的<key,value>对,下图所示。
得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并实行Combine过程,将key至相同value值累加,得到Mapper的终极输出结果。下图所示。
Reducer先对从Mapper吸收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果,下图所示。