需求1:统计手机号耗费的总上行流量、下行流量、总流量(序列化)
统计每一个手机号耗费的总上行流量、下行流量、总流量
数据准备
原始数据格式:时间戳、电话号码、基站的物理地址、访问网址的ip、网站域名、数据包、接包数、上行/传流量、下行/载流量、响应码输出数据格式:1356·0436666 1116954 2070手机号码上行流量 下行流量总流量分析
基本思路
Map阶段:
读取一行数据,切分字段 抽取手机号、上行流量、下行流量 以手机号为key,bean对象为value输出,即context.write(手机号,bean);Reduce阶段:
累加上行流量和下行流量得到总流量。 实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输 MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable。然后重写key的compareTo方法。
编写mapreduce程序
编写流量统计的bean对象 import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;// 1 实现writable接口public class FlowBean implements Writable{ private long upFlow ;private long downFlow;private long sumFlow;//2 反序列化时,需要反射调用空参构造函数,所以必须有public FlowBean() { super();}public FlowBean(long upFlow, long downFlow) { super();this.upFlow = upFlow;this.downFlow = downFlow;this.sumFlow = upFlow + downFlow;}//3 写序列化方法@Overridepublic void write(DataOutput out) throws IOException { out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}//4 反序列化方法//5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致@Overridepublic void readFields(DataInput in) throws IOException { this.upFlow = in.readLong();this.downFlow = in.readLong();this.sumFlow = in.readLong();}// 6 编写toString方法,方便后续打印到文本@Overridepublic String toString() { return upFlow + " " + downFlow + " " + sumFlow;}public long getUpFlow() { return upFlow;}public void setUpFlow(long upFlow) { this.upFlow = upFlow;}public long getDownFlow() { return downFlow;}public void setDownFlow(long downFlow) { this.downFlow = downFlow;}public long getSumFlow() { return sumFlow;}public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow;}}编写mapper
import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class FlowCountMapper extends Mapper{ FlowBean v = new FlowBean();Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { // 1 获取一行String line = value.toString();// 2 切割字段String[] fields = line.split(" ");// 3 封装对象// 取出手机号码String phoneNum = fields[1];// 取出上行流量和下行流量long upFlow = Long.parseLong(fields[fields.length - 3]);long downFlow = Long.parseLong(fields[fields.length - 2]);v.set(downFlow, upFlow);// 4 写出context.write(new Text(phoneNum), new FlowBean(upFlow, downFlow));}}编写reducer
import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class FlowCountReducer extends Reducer { @Overrideprotected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException { long sum_upFlow = 0;long sum_downFlow = 0;// 1 遍历所用bean,将其中的上行流量,下行流量分别累加for (FlowBean flowBean : values) { sum_upFlow += flowBean.getSumFlow();sum_downFlow += flowBean.getDownFlow();}// 2 封装对象FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);// 3 写出context.write(key, resultBean);}}编写驱动类
import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowsumDriver { public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { // 1 获取配置信息,或者job对象实例Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 6 指定本程序的jar包所在的本地路径job.setJarByClass(FlowsumDriver.class);// 2 指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);// 3 指定mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 4 指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 5 指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}}需求2:将统计结果按照手机归属地不同省份输出到不同文件中(Partitioner)
需求:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
数据准备 分析 Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask。默认的分发规则为:根据key的hashcode%reducetask数来分发如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner自定义一个CustomPartitioner继承抽象类:Partitioner
在job驱动中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class)在需求1的基础上,增加一个分区类
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class ProvincePartitioner extends Partitioner { @Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) { // 1 获取电话号码的前三位String preNum = key.toString().substring(0, 3);//注:如果设置的分区数小于下面的分区数,如3、则最后一个分区混数据分区 //注:如何设置的分区数大于下面的分区数,如5,则报错int partition = 4;// 2 判断是哪个省if ("136".equals(preNum)) { partition = 0;}else if ("137".equals(preNum)) { partition = 1;}else if ("138".equals(preNum)) { partition = 2;}else if ("139".equals(preNum)) { partition = 3;}return partition;}}在驱动函数中增加自定义数据分区设置和reduce task设置
import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowsumDriver { public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { // 1 获取配置信息,或者job对象实例Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 6 指定本程序的jar包所在的本地路径job.setJarByClass(FlowsumDriver.class);// 2 指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);// 3 指定mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 4 指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 8 指定自定义数据分区job.setPartitionerClass(ProvincePartitioner.class);// 9 同时指定相应数量的reduce taskjob.setNumReduceTasks(5);// 5 指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}}需求3:将统计结果按照总流量倒序排序(全排序)
根据需求1产生的结果再次对总流量进行排序。
数据准备 分析 把程序分两步走,第一步正常统计总流量,第二步再把结果进行排序 context.write(总流量,手机号) FlowBean实现WritableComparable接口重写compareTo方法 @Overridepublic int compareTo(FlowBean o) { // 倒序排列,从大到小return this.sumFlow > o.getSumFlow() ? -1 : 1;}代码实现
FlowBean对象在在需求1基础上增加了比较功能 import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable { private long upFlow;private long downFlow;private long sumFlow;// 反序列化时,需要反射调用空参构造函数,所以必须有public FlowBean() { super();}public FlowBean(long upFlow, long downFlow) { super();this.upFlow = upFlow;this.downFlow = downFlow;this.sumFlow = upFlow + downFlow;}public void set(long upFlow, long downFlow) { this.upFlow = upFlow;this.downFlow = downFlow;this.sumFlow = upFlow + downFlow;}public long getSumFlow() { return sumFlow;}public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow;}public long getUpFlow() { return upFlow;}public void setUpFlow(long upFlow) { this.upFlow = upFlow;}public long getDownFlow() { return downFlow;}public void setDownFlow(long downFlow) { this.downFlow = downFlow;}/** * 序列化方法 * @param out * @throws IOException */@Overridepublic void write(DataOutput out) throws IOException { out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}/** * 反序列化方法 注意反序列化的顺序和序列化的顺序完全一致 * @param in * @throws IOException */@Overridepublic void readFields(DataInput in) throws IOException { upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}@Overridepublic String toString() { return upFlow + " " + downFlow + " " + sumFlow;}@Overridepublic int compareTo(FlowBean o) { // 倒序排列,从大到小return this.sumFlow > o.getSumFlow() ? -1 : 1;}}编写mapper
import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class FlowCountSortMapper extends Mapper{ FlowBean k= new FlowBean();Text v = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { // 1 获取一行String line = value.toString();// 2 截取String[] fields = line.split(" ");// 3 封装对象String phoneNbr = fields[0];long upFlow = Long.parseLong(fields[1]);long downFlow = Long.parseLong(fields[2]);k.set(upFlow, downFlow);v.set(phoneNbr);// 4 输出context.write(k, v);}}编写reduce
import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class FlowCountSortReducer extends Reducer{ @Overrideprotected void reduce(FlowBean key, Iterable values, Context context)throws IOException, InterruptedException { // 循环输出,避免总流量相同情况for (Text text : values) { context.write(text, key);}}}编写driver
import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowCountSortDriver { public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { // 1 获取配置信息,或者job对象实例Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 6 指定本程序的jar包所在的本地路径job.setJarByClass(FlowCountSortDriver.class);// 2 指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(FlowCountSortMapper.class);job.setReducerClass(FlowCountSortReducer.class);// 3 指定mapper输出数据的kv类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);// 4 指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 5 指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}}需求4:不同省份输出文件内部排序(部分排序)
要求每个省份手机号输出的文件中按照总流量内部排序。
分析:基于需求3,增加自定义分区类即可。 案例实操 增加自定义分区类 import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class ProvincePartitioner extends Partitioner { @Overridepublic int getPartition(FlowBean key, Text value, int numPartitions) { // 1 获取手机号码前三位String preNum = value.toString().substring(0, 3);int partition = 4;// 2 根据手机号归属地设置分区if ("136".equals(preNum)) { partition = 0;}else if ("137".equals(preNum)) { partition = 1;}else if ("138".equals(preNum)) { partition = 2;}else if ("139".equals(preNum)) { partition = 3;}return partition;}}在驱动类中添加分区类
// 加载自定义分区类job.setPartitionerClass(FlowSortPartitioner.class);// 设置Reducetask个数job.setNumReduceTasks(5);