首页 » PHP教程 » phpclassword技巧_好轨范员大年夜数据教程路线分享marpreduce快速入门

phpclassword技巧_好轨范员大年夜数据教程路线分享marpreduce快速入门

访客 2024-12-03 0

扫一扫用手机浏览

文章目录 [+]

• 与HDFS办理问题的事理类似,HDFS是将大的文件切分成多少小文件,然后将它们分别存储到集群中各个主机中。

• 同样事理,mapreduce是将一个繁芜的运算切分成若个子运算,然后将它们分别交给集群中各个主机,由各个主机并走运算。

phpclassword技巧_好轨范员大年夜数据教程路线分享marpreduce快速入门

1.1 mapreduce产生的背景

phpclassword技巧_好轨范员大年夜数据教程路线分享marpreduce快速入门
(图片来自网络侵删)

• 海量数据在单机上处理由于硬件资源限定,无法胜任。

• 而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的繁芜度和开拓难度。

• 引入mapreduce框架后,开拓职员可以将绝大部分事情集中在业务逻辑的开拓上,而将分布式打算中的繁芜性交由框架来处理。

1.2 mapreduce编程模型

• 一种分布式打算模型。

• MapReduce将这个并行打算过程抽象到两个函数。

– Map(映射):对一些独立元素组成的列表的每一个元素进行指定的操作,可以高度并行。

– Reduce(化简 归约):对一个列表的元素进行合并。

• 一个大略的MapReduce程序只须要指定map()、reduce()、input和output,剩下的事由框架完成。

Mapreduce的几个关键名词

• Job :用户的每一个打算要求称为一个作业。

• Task:每一个作业,都须要拆分开了,交由多个主机来完成,拆分出来的实行单位便是任务。

• Task又分为如下三种类型的任务:

– Map:卖力map阶段的全体数据处理流程

– Reduce:卖力reduce阶段的全体数据处理流程

– MRAppMaster:卖力全体程序的过程调度及状态折衷

1.4 mapreduce程序运行流程

详细流程解释:

一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,打算出须要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程

maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:

– 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对。

– 将输入KV(k是文件的行号,v是文件一行的数据)对通报给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对网络到缓存。

– 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件

MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并奉告reducetask进程要处理的数据范围(数据分区)

Reducetask进程启动之后,根据MRAppMaster奉告的待处理数据所在位置,从多少台maptask运行所在机器上获取到多少个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并网络运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储

1.5 编写MapReduce程序

• 基于MapReduce 打算模型编写分布式并行程序非常大略,程序员的紧张编码事情便是实现Map 和Reduce函数。

• 其它的并行编程中的各类繁芜问题,如分布式存储,事情调度,负载平衡,容错处理,网络通信等,均由YARN框架卖力处理。

• MapReduce中,map和reduce函数遵照如下常规格式:

map: (K1, V1) → list(K2, V2)

reduce: (K2, list(V2)) → list(K3, V3)

• Mapper的接口:

protected void map(KEY key, VALUE value, Context context)

throws IOException, InterruptedException {

}

• Reduce的接口:

protected void reduce(KEY key, Iterable<VALUE> values,

Context context) throws IOException, InterruptedException {

}

• Mapreduce程序代码基本构造

maprecue实例开拓

2.1 编程步骤

用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)

Mapper的输入数据是KV对的形式(KV的类型可自定义)

Mapper的输出数据是KV对的形式(KV的类型可自定义)

Mapper中的业务逻辑写在map()方法中

map()方法(maptask进程)对每一个<K,V>调用一次

Reducer的输入数据类型对应Mapper的输出数据类型,也是KV

Reducer的业务逻辑写在reduce()方法中

Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法

用户自定义的Mapper和Reducer都要继续各自的父类

全体程序须要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job工具

2.2 经典的wordcount程序编写

需求:有一批文件(规模为TB级或者PB级),如何统计这些文件中所有单词涌现次数

如有三个文件,文件名是qfcourse.txt、qfstu.txt 和 qf_teacher

qf_course.txt内容:

php java linux

bigdata VR

C C++ java web

linux shell

qf_stu.txt内容:

tom jim lucy

lily sally

andy

tom jim sally

qf_teacher内容:

jerry Lucy tom

jim

方案

– 分别统计每个文件中单词涌现次数 - map()

– 累加不同文件中同一个单词涌现次数 - reduce()

实当代码

– 创建一个大略的maven项目

– 添加hadoop client依赖的jar,pom.xml紧张内容如下:

<dependencies>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-client</artifactId>

<version>2.7.1</version>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.11</version>

<scope>test</scope>

</dependency>

</dependencies>

– 编写代码

– 自定义一个mapper类

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

/

Maper里面的泛型的四个类型从左到右依次是:

LongWritable KEYIN: 默认情形下,是mr框架所读到的一行文本的起始偏移量,Long, 类似于行号但是在hadoop中有自己的更精简的序列化接口,以是不直接用Long,而用LongWritable

Text VALUEIN:默认情形下,是mr框架所读到的一行文本的内容,String,同上,用Text

Text KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String,同上,用Text

IntWritable VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,同上,用IntWritable

/

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

/

map阶段的业务逻辑就写在自定义的map()方法中

maptask会对每一行输入数据调用一次我们自定义的map()方法

/

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//将maptask传给我们的一行的文本内容先转换成String

String line = value.toString();

//根据空格将这一行切分成单词

String[] words = line.split(\"大众 \"大众);

/

将单词输出为<单词,1>

如<lily,1> <lucy,1> <c,1> <c++,1> <tom,1>

/

for(String word:words){

//将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reduce task

context.write(new Text(word), new IntWritable(1));

}

}

}

– 自定义一个reduce类

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

/

Reducer里面的泛型的四个类型从左到右依次是:

Text KEYIN: 对应mapper输出的KEYOUT

IntWritable VALUEIN: 对应mapper输出的VALUEOUT

KEYOUT, 是单词

VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型,是总次数

/

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

/

<tom,1>

<tom,1>

<linux,1>

<banana,1>

<banana,1>

<banana,1>

入参key,是一组相同单词kv对的key

values是多少相同key的value凑集

如 <tom,[1,1]> <linux,[1]> <banana,[1,1,1]>

/

@Override

protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

int count=0; //累加单词的涌现的次数

for(IntWritable value:values){

count += value.get();

}

context.write(key, new IntWritable(count));

}

}

– 编写一个Driver类

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/

相称于一个yarn集群的客户端

须要在此封装我们的mr程序的干系运行参数,指定jar包

末了提交给yarn

/

public class WordcountDriver {

/

该类是运行在hadoop客户真个,main一运行,yarn客户端就启动起来了,与yarn做事器端通信

yarn做事器端卖力启动mapreduce程序并利用WordcountMapper和WordcountReducer类

/

public static void main(String[] args) throws Exception {

//此代码须要两个输入参数 第一个参数支持要处理的源文件;第二个参数是处理结果的输出路径

if (args == null || args.length == 0) {

args = new String[2];

//路径都是 hdfs系统的文件路径

args[0] = \"大众hdfs://192.168.18.64:9000/wordcount/input/\"大众;

args[1] = \公众hdfs://192.168.18.64:9000/wordcount/output\"大众;

}

/

什么也不设置时,如果在安装了hadoop的机器上运行时,自动读取

/home/hadoop/app/hadoop-2.7.1/etc/hadoop/core-site.xml

文件放入Configuration中

/

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

//指定本程序的jar包所在确当地路径

job.setJarByClass(WordcountDriver.class);

//指定本业务job要利用的mapper业务类

job.setMapperClass(WordcountMapper.class);

//指定mapper输出数据的kv类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

//指定本业务job要利用的Reducer业务类

job.setReducerClass(WordcountReducer.class);

//指定终极输出的数据的kv类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

//指定job的输入原始文件所在目录

FileInputFormat.setInputPaths(job, new Path(args[0]));

//指定job的输出结果所在目录

FileOutputFormat.setOutputPath(job, new Path(args[1]));

//将job中配置的干系参数,以及job所用的java类所在的jar包,提交给yarn去运行

/job.submit();/

boolean res = job.waitForCompletion(true);

System.exit(res?0:1);

}

}

wordcount处理过程

将文件拆分成splits,由于测试用的文件较小,以是每个文件为一个split,并将文件按行分割形成<key,value>对,下图所示。
这一步由MapReduce框架自动完成,个中偏移量(即key值)包括了回车所占的字符数(Windows/Linux环境不同)。

将分割好的<key,value>对交给用户定义的map方法进行处理,天生新的<key,value>对,下图所示。

得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并实行Combine过程,将key至相同value值累加,得到Mapper的终极输出结果。
下图所示。

Reducer先对从Mapper吸收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果,下图所示。

相关文章

语言游戏聚会的魅力,跨界交流的盛宴

在繁忙的都市生活中,一场别开生面的语言游戏聚会悄然兴起。这不仅是一场简单的娱乐活动,更是一次跨界交流的盛宴,一场思想的碰撞与火花。...

PHP教程 2024-12-29 阅读1 评论0

语言序列逻辑在现代传播中的运用与影响

语言序列逻辑,作为现代传播学中的重要理论之一,对于理解语言传播的规律、提高传播效果具有重要作用。在信息化、网络化时代,语言序列逻辑...

PHP教程 2024-12-29 阅读1 评论0