博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
针对微信的一篇推送附有的数据链接进行MapReduce统计
阅读量:6453 次
发布时间:2019-06-23

本文共 5673 字,大约阅读时间需要 18 分钟。

原推送引用:https://mp.weixin.qq.com/s/3qQqN6qzQ3a8_Au2qfZnVg

版权归原作者所有,如有侵权请及时联系本人,见谅!

 

原文采用Excel进行统计数据,这里采用刚学习的工具进行练习。

 

1 import java.io.IOException;  2   3 import org.apache.hadoop.conf.Configuration;  4 import org.apache.hadoop.conf.Configured;  5 import org.apache.hadoop.fs.Path;  6 import org.apache.hadoop.io.IntWritable;  7 import org.apache.hadoop.io.LongWritable;  8 import org.apache.hadoop.io.Text;  9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 import org.apache.hadoop.util.Tool; 15 import org.apache.hadoop.util.ToolRunner; 16  17 /** 18  * https://mp.weixin.qq.com/s/3qQqN6qzQ3a8_Au2qfZnVg 19  * 针对[新兴生态系统:Python和R语言,谁更适用于大数据Spark/Hadoop和深度学习?] 20  * 的全球数据进行一系列统计 21  */ 22 public class wechat  extends Configured implements Tool { 23  24     /** 25      * Map方法 26      */ 27     private static class ModuleMapper extends Mapper
{ 28 private static final IntWritable mapOutputValue = new IntWritable(1) ; 29 private Text mapOutputKey = new Text() ; 30 @Override 31 public void map(LongWritable key, Text value, Context context) 32 throws IOException, InterruptedException { 33 34 String input = value.toString(); 35 if(input.split(",").length<16) { 36 return; 37 } 38 String[] arrStr = input.split(","); 39 //Python-大数据计数器输出 40 if("1".equals(arrStr[2])&&"1".equals(arrStr[14])) { 41 context.getCounter("WECHAT_MAPPER_COUNTERS", "Python_BigData").increment(1L); 42 } 43 //Python-Deep计数器输出 44 if("1".equals(arrStr[2])&&"1".equals(arrStr[13])) { 45 context.getCounter("WECHAT_MAPPER_COUNTERS", "Python_Deep-Learning").increment(1L); 46 } 47 //R-大数据计数器输出 48 if("1".equals(arrStr[3])&&"1".equals(arrStr[14])) { 49 context.getCounter("WECHAT_MAPPER_COUNTERS", "R_BigData").increment(1L); 50 } 51 //R-深度计数器输出 52 if("1".equals(arrStr[3])&&"1".equals(arrStr[13])) { 53 context.getCounter("WECHAT_MAPPER_COUNTERS", "R_Deep-Learning").increment(1L); 54 } 55 56 arrStr = input.split(",")[16].split(";"); 57 //遍历 58 for(String tool: arrStr){ 59 // 设置key 60 mapOutputKey.set(tool); 61 // 输出 62 context.write(mapOutputKey, mapOutputValue) ; 63 } 64 } 65 } 66 67 /** 68 * Reduce聚合结果 69 */ 70 private static class ModuleReducer extends Reducer
{ 71 private IntWritable outputValue = new IntWritable() ; 72 @Override 73 protected void reduce(Text key, Iterable
values, Context context) 74 throws IOException, InterruptedException { 75 76 // 定义临时变量,用于累加 77 int sum = 0 ; 78 79 // 遍历 80 for(IntWritable value: values){ 81 sum += value.get() ; 82 } 83 84 if(sum < 500){ 85 // 定义500以上的筛选 86 return ; 87 } 88 // 设置 89 outputValue.set(sum) ; 90 // 输出 91 context.write(key, outputValue) ; 92 93 } 94 } 95 96 /** 97 * 驱动创建Job并提交运行 返回状态码 98 */ 99 100 public int run(String[] args) throws Exception {101 // 创建一个Job102 Job job = Job.getInstance(103 this.getConf() , wechat.class.getSimpleName()104 ) ;105 // 设置job运行的class106 job.setJarByClass(wechat.class);107 108 // 设置Job109 // 1. 设置 input,从哪里读取数据110 Path inputPath = new Path(args[0]) ;111 FileInputFormat.addInputPath(job, inputPath);112 113 // 2. 设置 mapper类114 job.setMapperClass(ModuleMapper.class);115 // 设置map 输出的key和value的数据类型116 job.setMapOutputKeyClass(Text.class);117 job.setMapOutputValueClass(IntWritable.class);118 119 // 3. 设置 reducer 类120 job.setReducerClass(ModuleReducer.class);121 // 设置 reducer 输出的key和value的数据类型122 job.setOutputKeyClass(Text.class);123 job.setOutputValueClass(IntWritable.class);124 // 设置ReduceTask个数125 // job.setNumReduceTasks(2);126 127 // 4. 设置 处理结果保存的路径128 Path outputPath = new Path(args[1]) ;129 FileOutputFormat.setOutputPath(job, outputPath);130 131 // 提交job运行132 boolean isSuccess = job.waitForCompletion(true) ;133 134 // 返回状态135 return isSuccess ? 0 : 1;136 }137 138 /**139 * 140 * @param args141 * @throws Exception142 */143 public static void main(String[] args) throws Exception {144 if(2 > args.length){145 System.out.println("Usage: " + wechat.class.getSimpleName() +"
");146 return ;147 }148 149 // 读取HADOOP中配置文件, core-*.xml hdfs-*.xml yarn-*.xml mapred-*.xml150 Configuration conf = new Configuration() ;151 152 // 运行Job153 int status = ToolRunner.run(conf, new wechat(), args) ;154 155 // exit program156 System.exit(status);157 }158 159 }

 

转载于:https://www.cnblogs.com/hxiuz2014/p/7481821.html

你可能感兴趣的文章
switch函数——Gevent源码分析
查看>>
Spring MVC简单原理
查看>>
DynamoDB Local for Desktop Development
查看>>
ANDROID的SENSOR相关信息
查看>>
laravel 使用QQ邮箱发送邮件
查看>>
用javascript验证哥德巴赫猜想
查看>>
Shell编程-环境变量配置文件
查看>>
thymeleaf 中文乱码问题
查看>>
(转)CSS浮动(float,clear)通俗讲解
查看>>
os.walk函数
查看>>
[Unity3d]DrawCall优化手记
查看>>
细数.NET 中那些ORM框架 —— 谈谈这些天的收获之一
查看>>
SQL Serever学习7——数据表2
查看>>
洛谷——P2404 自然数的拆分问题
查看>>
(转)Mac 下设置android NDK的环境
查看>>
[struts]s:action 的使用方法
查看>>
dubbo问题总结
查看>>
20165320 第三周学习总结
查看>>
Struts2和Spring MVC的区别
查看>>
angular-bootstrap ui-date组件问题总结
查看>>