原推送引用:https://mp.weixin.qq.com/s/3qQqN6qzQ3a8_Au2qfZnVg
版权归原作者所有,如有侵权请及时联系本人,见谅!
原文采用Excel进行统计数据,这里采用刚学习的工具进行练习。
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 import org.apache.hadoop.util.Tool; 15 import org.apache.hadoop.util.ToolRunner; 16 17 /** 18 * https://mp.weixin.qq.com/s/3qQqN6qzQ3a8_Au2qfZnVg 19 * 针对[新兴生态系统:Python和R语言,谁更适用于大数据Spark/Hadoop和深度学习?] 20 * 的全球数据进行一系列统计 21 */ 22 public class wechat extends Configured implements Tool { 23 24 /** 25 * Map方法 26 */ 27 private static class ModuleMapper extends Mapper{ 28 private static final IntWritable mapOutputValue = new IntWritable(1) ; 29 private Text mapOutputKey = new Text() ; 30 @Override 31 public void map(LongWritable key, Text value, Context context) 32 throws IOException, InterruptedException { 33 34 String input = value.toString(); 35 if(input.split(",").length<16) { 36 return; 37 } 38 String[] arrStr = input.split(","); 39 //Python-大数据计数器输出 40 if("1".equals(arrStr[2])&&"1".equals(arrStr[14])) { 41 context.getCounter("WECHAT_MAPPER_COUNTERS", "Python_BigData").increment(1L); 42 } 43 //Python-Deep计数器输出 44 if("1".equals(arrStr[2])&&"1".equals(arrStr[13])) { 45 context.getCounter("WECHAT_MAPPER_COUNTERS", "Python_Deep-Learning").increment(1L); 46 } 47 //R-大数据计数器输出 48 if("1".equals(arrStr[3])&&"1".equals(arrStr[14])) { 49 context.getCounter("WECHAT_MAPPER_COUNTERS", "R_BigData").increment(1L); 50 } 51 //R-深度计数器输出 52 if("1".equals(arrStr[3])&&"1".equals(arrStr[13])) { 53 context.getCounter("WECHAT_MAPPER_COUNTERS", "R_Deep-Learning").increment(1L); 54 } 55 56 arrStr = input.split(",")[16].split(";"); 57 //遍历 58 for(String tool: arrStr){ 59 // 设置key 60 mapOutputKey.set(tool); 61 // 输出 62 context.write(mapOutputKey, mapOutputValue) ; 63 } 64 } 65 } 66 67 /** 68 * Reduce聚合结果 69 */ 70 private static class ModuleReducer extends Reducer { 71 private IntWritable outputValue = new IntWritable() ; 72 @Override 73 protected void reduce(Text key, Iterable values, Context context) 74 throws IOException, InterruptedException { 75 76 // 定义临时变量,用于累加 77 int sum = 0 ; 78 79 // 遍历 80 for(IntWritable value: values){ 81 sum += value.get() ; 82 } 83 84 if(sum < 500){ 85 // 定义500以上的筛选 86 return ; 87 } 88 // 设置 89 outputValue.set(sum) ; 90 // 输出 91 context.write(key, outputValue) ; 92 93 } 94 } 95 96 /** 97 * 驱动创建Job并提交运行 返回状态码 98 */ 99 100 public int run(String[] args) throws Exception {101 // 创建一个Job102 Job job = Job.getInstance(103 this.getConf() , wechat.class.getSimpleName()104 ) ;105 // 设置job运行的class106 job.setJarByClass(wechat.class);107 108 // 设置Job109 // 1. 设置 input,从哪里读取数据110 Path inputPath = new Path(args[0]) ;111 FileInputFormat.addInputPath(job, inputPath);112 113 // 2. 设置 mapper类114 job.setMapperClass(ModuleMapper.class);115 // 设置map 输出的key和value的数据类型116 job.setMapOutputKeyClass(Text.class);117 job.setMapOutputValueClass(IntWritable.class);118 119 // 3. 设置 reducer 类120 job.setReducerClass(ModuleReducer.class);121 // 设置 reducer 输出的key和value的数据类型122 job.setOutputKeyClass(Text.class);123 job.setOutputValueClass(IntWritable.class);124 // 设置ReduceTask个数125 // job.setNumReduceTasks(2);126 127 // 4. 设置 处理结果保存的路径128 Path outputPath = new Path(args[1]) ;129 FileOutputFormat.setOutputPath(job, outputPath);130 131 // 提交job运行132 boolean isSuccess = job.waitForCompletion(true) ;133 134 // 返回状态135 return isSuccess ? 0 : 1;136 }137 138 /**139 * 140 * @param args141 * @throws Exception142 */143 public static void main(String[] args) throws Exception {144 if(2 > args.length){145 System.out.println("Usage: " + wechat.class.getSimpleName() +" ");146 return ;147 }148 149 // 读取HADOOP中配置文件, core-*.xml hdfs-*.xml yarn-*.xml mapred-*.xml150 Configuration conf = new Configuration() ;151 152 // 运行Job153 int status = ToolRunner.run(conf, new wechat(), args) ;154 155 // exit program156 System.exit(status);157 }158 159 }