博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop: MapReduce2多个job串行处理
阅读量:6264 次
发布时间:2019-06-22

本文共 6335 字,大约阅读时间需要 21 分钟。

复杂的MapReduce处理中,往往需要将复杂的处理过程,分解成多个简单的Job来执行,第1个Job的输出做为第2个Job的输入,相互之间有一定依赖关系。以中的求平均数为例,可以分解成三个步骤:

1. 求Sum

2. 求Count

3. 计算平均数

每1个步骤看成一个Job,其中Job3必须等待Job1、Job2完成,并将Job1、Job2的输出结果做为输入,下面的代码演示了如何将这3个Job串起来

1 package yjmyzz.mr.job.link;  2   3 import org.apache.hadoop.conf.Configuration;  4 import org.apache.hadoop.fs.Path;  5 import org.apache.hadoop.io.DoubleWritable;  6 import org.apache.hadoop.io.LongWritable;  7 import org.apache.hadoop.io.Text;  8 import org.apache.hadoop.mapreduce.Job;  9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 import yjmyzz.util.HDFSUtil; 14  15 import java.io.IOException; 16  17  18 public class Avg2 { 19  20     private static final Text TEXT_SUM = new Text("SUM"); 21     private static final Text TEXT_COUNT = new Text("COUNT"); 22     private static final Text TEXT_AVG = new Text("AVG"); 23  24     //计算Sum 25     public static class SumMapper 26             extends Mapper
{ 27 28 public long sum = 0; 29 30 public void map(LongWritable key, Text value, Context context) 31 throws IOException, InterruptedException { 32 sum += Long.parseLong(value.toString()); 33 } 34 35 protected void cleanup(Context context) throws IOException, InterruptedException { 36 context.write(TEXT_SUM, new LongWritable(sum)); 37 } 38 39 } 40 41 public static class SumReducer extends Reducer
{ 42 43 public long sum = 0; 44 45 public void reduce(Text key, Iterable
values, Context context) 46 throws IOException, InterruptedException { 47 for (LongWritable v : values) { 48 sum += v.get(); 49 } 50 context.write(TEXT_SUM, new LongWritable(sum)); 51 } 52 53 } 54 55 //计算Count 56 public static class CountMapper 57 extends Mapper
{ 58 59 public long count = 0; 60 61 public void map(LongWritable key, Text value, Context context) 62 throws IOException, InterruptedException { 63 count += 1; 64 } 65 66 protected void cleanup(Context context) throws IOException, InterruptedException { 67 context.write(TEXT_COUNT, new LongWritable(count)); 68 } 69 70 } 71 72 public static class CountReducer extends Reducer
{ 73 74 public long count = 0; 75 76 public void reduce(Text key, Iterable
values, Context context) 77 throws IOException, InterruptedException { 78 for (LongWritable v : values) { 79 count += v.get(); 80 } 81 context.write(TEXT_COUNT, new LongWritable(count)); 82 } 83 84 } 85 86 //计算Avg 87 public static class AvgMapper 88 extends Mapper
{ 89 90 public long count = 0; 91 public long sum = 0; 92 93 public void map(LongWritable key, Text value, Context context) 94 throws IOException, InterruptedException { 95 String[] v = value.toString().split("\t"); 96 if (v[0].equals("COUNT")) { 97 count = Long.parseLong(v[1]); 98 } else if (v[0].equals("SUM")) { 99 sum = Long.parseLong(v[1]);100 }101 }102 103 protected void cleanup(Context context) throws IOException, InterruptedException {104 context.write(new LongWritable(sum), new LongWritable(count));105 }106 107 }108 109 110 public static class AvgReducer extends Reducer
{111 112 public long sum = 0;113 public long count = 0;114 115 public void reduce(LongWritable key, Iterable
values, Context context)116 throws IOException, InterruptedException {117 sum += key.get();118 for (LongWritable v : values) {119 count += v.get();120 }121 }122 123 protected void cleanup(Context context) throws IOException, InterruptedException {124 context.write(TEXT_AVG, new DoubleWritable(new Double(sum) / count));125 }126 127 }128 129 130 public static void main(String[] args) throws Exception {131 132 Configuration conf = new Configuration();133 134 String inputPath = "/input/duplicate.txt";135 String maxOutputPath = "/output/max/";136 String countOutputPath = "/output/count/";137 String avgOutputPath = "/output/avg/";138 139 //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在)140 HDFSUtil.deleteFile(conf, maxOutputPath);141 HDFSUtil.deleteFile(conf, countOutputPath);142 HDFSUtil.deleteFile(conf, avgOutputPath);143 144 Job job1 = Job.getInstance(conf, "Sum");145 job1.setJarByClass(Avg2.class);146 job1.setMapperClass(SumMapper.class);147 job1.setCombinerClass(SumReducer.class);148 job1.setReducerClass(SumReducer.class);149 job1.setOutputKeyClass(Text.class);150 job1.setOutputValueClass(LongWritable.class);151 FileInputFormat.addInputPath(job1, new Path(inputPath));152 FileOutputFormat.setOutputPath(job1, new Path(maxOutputPath));153 154 155 Job job2 = Job.getInstance(conf, "Count");156 job2.setJarByClass(Avg2.class);157 job2.setMapperClass(CountMapper.class);158 job2.setCombinerClass(CountReducer.class);159 job2.setReducerClass(CountReducer.class);160 job2.setOutputKeyClass(Text.class);161 job2.setOutputValueClass(LongWritable.class);162 FileInputFormat.addInputPath(job2, new Path(inputPath));163 FileOutputFormat.setOutputPath(job2, new Path(countOutputPath));164 165 166 Job job3 = Job.getInstance(conf, "Average");167 job3.setJarByClass(Avg2.class);168 job3.setMapperClass(AvgMapper.class);169 job3.setReducerClass(AvgReducer.class);170 job3.setMapOutputKeyClass(LongWritable.class);171 job3.setMapOutputValueClass(LongWritable.class);172 job3.setOutputKeyClass(Text.class);173 job3.setOutputValueClass(DoubleWritable.class);174 175 //将job1及job2的输出为做job3的输入176 FileInputFormat.addInputPath(job3, new Path(maxOutputPath));177 FileInputFormat.addInputPath(job3, new Path(countOutputPath));178 FileOutputFormat.setOutputPath(job3, new Path(avgOutputPath));179 180 //提交job1及job2,并等待完成181 if (job1.waitForCompletion(true) && job2.waitForCompletion(true)) {182 System.exit(job3.waitForCompletion(true) ? 0 : 1);183 }184 185 }186 187 188 }

输入文本在可以找到,上面这段代码的主要思路:

1. Sum和Count均采用相同的输入/input/duplicate.txt,然后将各自的处理结果分别输出到/output/max/及/output/count/下

2. Avg从/output/max及/output/count获取结果做为输入,然后根据Key值不同,拿到sum和count的值,最终计算并输出到/output/avg/下

转载地址:http://tgkpa.baihongyu.com/

你可能感兴趣的文章
java 项目的jar 反编译后 压缩回去 命令
查看>>
单链表及简单应用
查看>>
C#反射技术
查看>>
jdk源码分析ArrayDeque
查看>>
伸缩布局flex
查看>>
【秦九韶算法】【字符串哈希】bzoj3751 [NOIP2014]解方程
查看>>
用JS实现任意导航栏的调用
查看>>
【GDOI2018】所有题目和解题报告
查看>>
【BZOJ】3302: [Shoi2005]树的双中心 && 2103: Fire 消防站 && 2447: 消防站
查看>>
存储过程与触发器面试
查看>>
CSS系列:在HTML中引入CSS的方法
查看>>
Orcale约束-------檢查約束
查看>>
VS2013 配置CUDNN V4 DEMO
查看>>
Codeforces Round #207 (Div. 2)C. Knight Tournament
查看>>
JS(JavaScript)的进一步了解5(更新中···)
查看>>
python3基础学习笔记
查看>>
STL模板整理 pair
查看>>
【转】jmeter学习笔记——一个简单的性能测试
查看>>
企业级镜像管理系统Harbor
查看>>
A Plain English Guide to JavaScript Prototypes
查看>>