博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce漫谈
阅读量:6941 次
发布时间:2019-06-27

本文共 6961 字,大约阅读时间需要 23 分钟。

序列化是把对象变成二进制的过程

反序列化是将二进制变成对象的过程

由HDFS中读取数据片段,一个数据片段对应着一个map线程

这里写图片描述

分为四个步骤:如图。split的时候不是一行就是一个数据片段,不要误会。Reduce数量由map的输出决定

这里写图片描述

注意map的输出到本地磁盘并不是HDFS上。因为Map的输出在job完成后即可删除,因此不需要存储在HDFS上,虽然存到HDFS上更安全,但是网络传输会降低MR的执行效率。Reduce的输出是到HDFS上。

shuffle的过程的作用:

  • 将mapper的输出按照某种key值重新切分成N分,把key值符合某种范围的输出送到特定的reducer里面去处理,从而简化reduce的过程。

下图将maptask 和 reduce task分开考虑

这里写图片描述

map之后有三个

  1. partition. 将数据分成一个个分区,每个分区对应着一个reduce去执行。so, 解决好partition的问题就能解决好数据倾斜的问题。在fetch的时候起作用。partition怎么分区按照编程规则,默认的分区规则是哈希模运算(获得对象的hash值,哈希值是int,这个hash值模Reduce的整数),默认的partition规则会可能产生数据倾斜。分区不是将数据分开而是给要处理的数据打上标志位,哪些数据是1区,哪些数据是2区,真正的分开是在fetch阶段进行分开。 所谓的fetch是reduce可能和map的结果不在同一台机器上,故需要数据的拷贝,根据分区移动数据。上面有多个数据段,每次溢写产生一个文件。

  2. sort。其实就是解决对象比较规则的算法。默认排序是字典排序(ASC马,11排在9的前面)。

  3. spill to disk(溢写)。Map输出的数据在内存里,内存由阈值,到达阈值之后要输出到硬盘,这就叫溢写

之后再merger on disk的时候,根据key的hash值进行合并,要用到的是conbainer。减少数据拷贝的量,减少map的输出。

所以shuffer阶段三次比较:在sort,conbainer,和merger的时候比较key。

这里写图片描述

MR split的大小

这里写图片描述

Demo1:WordCount程序

WordCountMap.javapackage day0525;import java.util.StringTokenizer;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class WordCountMap extends Mapper
{ protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { /* * First two parameters are the input data */ String line = value.toString(); StringTokenizer st = new StringTokenizer(line); while (st.hasMoreTokens()) { String word = st.nextToken(); context.write(new Text(word), new IntWritable(1)); } }}
WordCountReduce.javapackage day0525;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class WordCountReduce extends Reducer
{
protected void reduce(Text key, Iterable
value, Context context) throws java.io.IOException ,InterruptedException { /* * Iterable
is Iter which used in Set operation. * Output of map is set, so iterable is used. Moreover, map's output maybe a huge file, * Therefore, only iter can read data step by step */ int sum = 0; for(IntWritable i:value){ sum = sum + i.get(); } context.write(key, new IntWritable(sum)); }}
WordCountRun.javapackage day0525;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCountRun {    public static void main(String[] args) {        Configuration conf = new Configuration();        conf.set("mapreduce.framework.name", "yarn");        try {            Job job = new Job(conf);            job.setJarByClass(WordCountRun.class);            job.setMapperClass(WordCountMap.class);            job.setReducerClass(WordCountReduce.class);            job.setOutputKeyClass(Text.class);            job.setOutputValueClass(IntWritable.class);//          job.setNumReduceTasks(2); // set number of reducer task            FileInputFormat.addInputPath(job, new Path("/input0917/README.txt"));            FileOutputFormat.setOutputPath(job, new Path("/output0525/wc"));            System.exit(job.waitForCompletion(true) ? 0 : 1);        } catch (Exception e) {            e.printStackTrace();        }    }}

其中,conf.set 是mapred-site.xml中的内容

之后导出jar包并执行 hadoop jar WordCountTest01.jar day0525.WordCountRun 最后的参数时报名加main函数的入口

另外,MR的官方文档也给除了wordcount的程序,具体请参考:。注意官方用了job.setCombinerClass(WordCountReduce.class);

Demo2:模仿qq好友推荐

思路:name1(主) name2(从)    map:        以主为key,输出一次        以从为key,输出一次        目的就是将key以及其所有的关系都列出
package com.hadoop.qq;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class FriendRecomMap extends Mapper
{
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { String line = value.toString(); String[] data = line.split(" "); context.write(new Text(data[0]), new Text(data[1])); context.write(new Text(data[1]), new Text(data[0])); }}
package com.hadoop.qq;import java.util.HashSet;import java.util.Iterator;import java.util.Set;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class FriendRecomReduce extends Reducer
{ protected void reduce(Text key, Iterable
value, Context cxt) throws java.io.IOException ,InterruptedException { Set
set = new HashSet
(); for(Text val:value){ set.add(val.toString()); } if(set.size() > 1){ for (Iterator iterator = set.iterator(); iterator.hasNext();) {
String str1 = (String) iterator.next(); for (Iterator iterator2 = set.iterator(); iterator2.hasNext();) {
String str2 = (String) iterator2.next(); if(!str1.equals(str2)){ cxt.write(new Text(str1), new Text(str2)); } } } } }}
package com.hadoop.qq;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FriendRecom {    public static void main(String[] args) {        Configuration conf = new Configuration();        conf.set("mapreduce.framework.name", "yarn");        try {            Job job = new Job(conf);            job.setJobName("QQ");            job.setJarByClass(FriendRecom.class);            job.setMapperClass(FriendRecomMap.class);            job.setReducerClass(FriendRecomReduce.class);            job.setOutputKeyClass(Text.class);            job.setOutputValueClass(Text.class);//          job.setNumReduceTasks(2); // set number of reducer task            FileInputFormat.addInputPath(job, new Path("/input0917/qqFriend.txt"));            FileOutputFormat.setOutputPath(job, new Path("/output/qqFriend"));            System.exit(job.waitForCompletion(true) ? 0 : 1);        } catch (Exception e) {            e.printStackTrace();        }    }}

测试数据:

Jerry ChrisJerry YoungJerry DanielChris ShawnChris PerryChris FelixShawn LarryShawn MapleYoung WillYoung IanYoung LilyLiuChao LilyLiuChao CarolNaixin PerryPerry MaplePerry AlexDaniel Terry

执行命令

hadoop jar qqFriend01.jar com.hadoop.qq.FriendRecom

未完待续

你可能感兴趣的文章
安卓实现多条件筛选列表菜单筛选菜单
查看>>
linux文本安装
查看>>
文件切割命令:split
查看>>
Oracle Fusion Middleware 12c GoldenGate Studio Installation
查看>>
Linux 文件与目录管理
查看>>
linux下使用 du查看某个文件或目录占用磁盘空间的大小
查看>>
Charles抓包IOS手机(HTTP HTTPS)
查看>>
我的友情链接
查看>>
证书服务应用
查看>>
CentOS6.4 64位系统安装jdk
查看>>
SQLServer 报错:用户、组或角色'XXX' 在当前数据库中已存在. 错误:15023解决方法
查看>>
ESXI虚拟机如何挂载移动硬盘或U盘
查看>>
国家简写
查看>>
IBM的图像识别技术
查看>>
mysql性能基本调优
查看>>
python 最简单使用ajax
查看>>
容器之Zookeeper的使用
查看>>
Linux btrfs之调整文件系统大小
查看>>
Navicat安装配置Mysql
查看>>
课外学习(一)
查看>>