本文共 6335 字,大约阅读时间需要 21 分钟。
复杂的MapReduce处理中,往往需要将复杂的处理过程,分解成多个简单的Job来执行,第1个Job的输出做为第2个Job的输入,相互之间有一定依赖关系。以中的求平均数为例,可以分解成三个步骤:
1. 求Sum
2. 求Count
3. 计算平均数
每1个步骤看成一个Job,其中Job3必须等待Job1、Job2完成,并将Job1、Job2的输出结果做为输入,下面的代码演示了如何将这3个Job串起来
1 package yjmyzz.mr.job.link; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.DoubleWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 import yjmyzz.util.HDFSUtil; 14 15 import java.io.IOException; 16 17 18 public class Avg2 { 19 20 private static final Text TEXT_SUM = new Text("SUM"); 21 private static final Text TEXT_COUNT = new Text("COUNT"); 22 private static final Text TEXT_AVG = new Text("AVG"); 23 24 //计算Sum 25 public static class SumMapper 26 extends Mapper{ 27 28 public long sum = 0; 29 30 public void map(LongWritable key, Text value, Context context) 31 throws IOException, InterruptedException { 32 sum += Long.parseLong(value.toString()); 33 } 34 35 protected void cleanup(Context context) throws IOException, InterruptedException { 36 context.write(TEXT_SUM, new LongWritable(sum)); 37 } 38 39 } 40 41 public static class SumReducer extends Reducer { 42 43 public long sum = 0; 44 45 public void reduce(Text key, Iterable values, Context context) 46 throws IOException, InterruptedException { 47 for (LongWritable v : values) { 48 sum += v.get(); 49 } 50 context.write(TEXT_SUM, new LongWritable(sum)); 51 } 52 53 } 54 55 //计算Count 56 public static class CountMapper 57 extends Mapper { 58 59 public long count = 0; 60 61 public void map(LongWritable key, Text value, Context context) 62 throws IOException, InterruptedException { 63 count += 1; 64 } 65 66 protected void cleanup(Context context) throws IOException, InterruptedException { 67 context.write(TEXT_COUNT, new LongWritable(count)); 68 } 69 70 } 71 72 public static class CountReducer extends Reducer { 73 74 public long count = 0; 75 76 public void reduce(Text key, Iterable values, Context context) 77 throws IOException, InterruptedException { 78 for (LongWritable v : values) { 79 count += v.get(); 80 } 81 context.write(TEXT_COUNT, new LongWritable(count)); 82 } 83 84 } 85 86 //计算Avg 87 public static class AvgMapper 88 extends Mapper { 89 90 public long count = 0; 91 public long sum = 0; 92 93 public void map(LongWritable key, Text value, Context context) 94 throws IOException, InterruptedException { 95 String[] v = value.toString().split("\t"); 96 if (v[0].equals("COUNT")) { 97 count = Long.parseLong(v[1]); 98 } else if (v[0].equals("SUM")) { 99 sum = Long.parseLong(v[1]);100 }101 }102 103 protected void cleanup(Context context) throws IOException, InterruptedException {104 context.write(new LongWritable(sum), new LongWritable(count));105 }106 107 }108 109 110 public static class AvgReducer extends Reducer {111 112 public long sum = 0;113 public long count = 0;114 115 public void reduce(LongWritable key, Iterable values, Context context)116 throws IOException, InterruptedException {117 sum += key.get();118 for (LongWritable v : values) {119 count += v.get();120 }121 }122 123 protected void cleanup(Context context) throws IOException, InterruptedException {124 context.write(TEXT_AVG, new DoubleWritable(new Double(sum) / count));125 }126 127 }128 129 130 public static void main(String[] args) throws Exception {131 132 Configuration conf = new Configuration();133 134 String inputPath = "/input/duplicate.txt";135 String maxOutputPath = "/output/max/";136 String countOutputPath = "/output/count/";137 String avgOutputPath = "/output/avg/";138 139 //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在)140 HDFSUtil.deleteFile(conf, maxOutputPath);141 HDFSUtil.deleteFile(conf, countOutputPath);142 HDFSUtil.deleteFile(conf, avgOutputPath);143 144 Job job1 = Job.getInstance(conf, "Sum");145 job1.setJarByClass(Avg2.class);146 job1.setMapperClass(SumMapper.class);147 job1.setCombinerClass(SumReducer.class);148 job1.setReducerClass(SumReducer.class);149 job1.setOutputKeyClass(Text.class);150 job1.setOutputValueClass(LongWritable.class);151 FileInputFormat.addInputPath(job1, new Path(inputPath));152 FileOutputFormat.setOutputPath(job1, new Path(maxOutputPath));153 154 155 Job job2 = Job.getInstance(conf, "Count");156 job2.setJarByClass(Avg2.class);157 job2.setMapperClass(CountMapper.class);158 job2.setCombinerClass(CountReducer.class);159 job2.setReducerClass(CountReducer.class);160 job2.setOutputKeyClass(Text.class);161 job2.setOutputValueClass(LongWritable.class);162 FileInputFormat.addInputPath(job2, new Path(inputPath));163 FileOutputFormat.setOutputPath(job2, new Path(countOutputPath));164 165 166 Job job3 = Job.getInstance(conf, "Average");167 job3.setJarByClass(Avg2.class);168 job3.setMapperClass(AvgMapper.class);169 job3.setReducerClass(AvgReducer.class);170 job3.setMapOutputKeyClass(LongWritable.class);171 job3.setMapOutputValueClass(LongWritable.class);172 job3.setOutputKeyClass(Text.class);173 job3.setOutputValueClass(DoubleWritable.class);174 175 //将job1及job2的输出为做job3的输入176 FileInputFormat.addInputPath(job3, new Path(maxOutputPath));177 FileInputFormat.addInputPath(job3, new Path(countOutputPath));178 FileOutputFormat.setOutputPath(job3, new Path(avgOutputPath));179 180 //提交job1及job2,并等待完成181 if (job1.waitForCompletion(true) && job2.waitForCompletion(true)) {182 System.exit(job3.waitForCompletion(true) ? 0 : 1);183 }184 185 }186 187 188 }
输入文本在可以找到,上面这段代码的主要思路:
1. Sum和Count均采用相同的输入/input/duplicate.txt,然后将各自的处理结果分别输出到/output/max/及/output/count/下
2. Avg从/output/max及/output/count获取结果做为输入,然后根据Key值不同,拿到sum和count的值,最终计算并输出到/output/avg/下
转载地址:http://tgkpa.baihongyu.com/