不多说,直接上代码。
对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件。
代码
package zhouls.bigdata.myMapReduce.areapartition;
import java.io.DataInput;
import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable<FlowBean>{
private String phoneNB; private long up_flow; private long d_flow; private long s_flow; //在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数 public FlowBean(){} //为了对象数据的初始化方便,加入一个带参的构造函数 public FlowBean(String phoneNB, long up_flow, long d_flow) { this.phoneNB = phoneNB; this.up_flow = up_flow; this.d_flow = d_flow; this.s_flow = up_flow + d_flow; }public String getPhoneNB() {
return phoneNB; }public void setPhoneNB(String phoneNB) {
this.phoneNB = phoneNB; }public long getUp_flow() {
return up_flow; }public void setUp_flow(long up_flow) {
this.up_flow = up_flow; }public long getD_flow() {
return d_flow; }public void setD_flow(long d_flow) {
this.d_flow = d_flow; }public long getS_flow() {
return s_flow; }public void setS_flow(long s_flow) {
this.s_flow = s_flow; }//将对象数据序列化到流中 public void write(DataOutput out) throws IOException {
out.writeUTF(phoneNB);
out.writeLong(up_flow); out.writeLong(d_flow); out.writeLong(s_flow); }//从数据流中反序列出对象的数据 //从数据流中读出对象字段时,必须跟序列化时的顺序保持一致 public void readFields(DataInput in) throws IOException {
phoneNB = in.readUTF();
up_flow = in.readLong(); d_flow = in.readLong(); s_flow = in.readLong(); } @Override public String toString() {return "" + up_flow + "\t" +d_flow + "\t" + s_flow;
}public int compareTo(FlowBean o) {
return s_flow>o.getS_flow()?-1:1; }}
package zhouls.bigdata.myMapReduce.areapartition;
import java.util.HashMap;
import org.apache.hadoop.mapreduce.Partitioner;
public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{
private static HashMap<String,Integer> areaMap = new HashMap<>();
static{ areaMap.put("135", 0); areaMap.put("136", 1); areaMap.put("137", 2); areaMap.put("138", 3); areaMap.put("139", 4); } @Override public int getPartition(KEY key, VALUE value, int numPartitions) { //从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号 int areaCoder = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3));return areaCoder;
}}
package zhouls.bigdata.myMapReduce.areapartition;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner; import zhouls.bigdata.myMapReduce.areapartition.FlowBean; /** * 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件 * 需要自定义改造两个机制: * 1、改造分区的逻辑,自定义一个partitioner * 2、自定义reduer task的并发任务数 * * * */public class FlowSumArea implements Tool {public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
@Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {//拿一行数据
String line = value.toString(); //切分成各个字段 String[] fields = StringUtils.split(line, "\t"); //拿到我们需要的字段 String phoneNB = fields[1]; long u_flow = Long.parseLong(fields[7]); long d_flow = Long.parseLong(fields[8]); //封装数据为kv并输出 context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));}
} public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ @Override protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException {long up_flow_counter = 0;
long d_flow_counter = 0; for(FlowBean bean: values){ up_flow_counter += bean.getUp_flow(); d_flow_counter += bean.getD_flow(); } context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter)); } } public int run(String[] arg0) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumArea.class); job.setMapperClass(FlowSumAreaMapper.class); job.setReducerClass(FlowSumAreaReducer.class); //设置我们自定义的分组逻辑定义 job.setPartitionerClass(AreaPartitioner.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //设置reduce的任务并发数,应该跟分组的数量保持一致 job.setNumReduceTasks(1); FileInputFormat.addInputPath(job, new Path(arg0[0]));// 文件输入路径 FileOutputFormat.setOutputPath(job, new Path(arg0[1]));// 文件输出路径 job.waitForCompletion(true); return 0;}
public static void main(String[] args) throws Exception { //集群路径 // String[] args0 = { "hdfs://HadoopMaster:9000/flowSumArea/HTTP_20130313143750.dat",// "hdfs://HadoopMaster:9000/out/flowSumArea"};//集群路径 String[] args0 = { "./data/flowSumArea/HTTP_20130313143750.dat", "./out/flowSumArea/"}; int ec = ToolRunner.run( new Configuration(), new FlowSumArea(), args0); System. exit(ec); }@Override
public Configuration getConf() { // TODO Auto-generated method stub return null; }@Override
public void setConf(Configuration arg0) { // TODO Auto-generated method stub }}