博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop MapReduce编程 API入门系列之网页流量版本1(二十一)
阅读量:7044 次
发布时间:2019-06-28

本文共 5266 字,大约阅读时间需要 17 分钟。

 

  不多说,直接上代码。

  对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

代码

package zhouls.bigdata.myMapReduce.areapartition;

import java.io.DataInput;

import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean>{

private String phoneNB;
private long up_flow;
private long d_flow;
private long s_flow;
//在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数
public FlowBean(){}
//为了对象数据的初始化方便,加入一个带参的构造函数
public FlowBean(String phoneNB, long up_flow, long d_flow) {
this.phoneNB = phoneNB;
this.up_flow = up_flow;
this.d_flow = d_flow;
this.s_flow = up_flow + d_flow;
}

public String getPhoneNB() {

return phoneNB;
}

public void setPhoneNB(String phoneNB) {

this.phoneNB = phoneNB;
}

public long getUp_flow() {

return up_flow;
}

public void setUp_flow(long up_flow) {

this.up_flow = up_flow;
}

public long getD_flow() {

return d_flow;
}

public void setD_flow(long d_flow) {

this.d_flow = d_flow;
}

public long getS_flow() {

return s_flow;
}

public void setS_flow(long s_flow) {

this.s_flow = s_flow;
}

//将对象数据序列化到流中
public void write(DataOutput out) throws IOException {

out.writeUTF(phoneNB);

out.writeLong(up_flow);
out.writeLong(d_flow);
out.writeLong(s_flow);
}

//从数据流中反序列出对象的数据
//从数据流中读出对象字段时,必须跟序列化时的顺序保持一致
public void readFields(DataInput in) throws IOException {

phoneNB = in.readUTF();

up_flow = in.readLong();
d_flow = in.readLong();
s_flow = in.readLong();
}
@Override
public String toString() {

return "" + up_flow + "\t" +d_flow + "\t" + s_flow;

}

public int compareTo(FlowBean o) {

return s_flow>o.getS_flow()?-1:1;
}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.areapartition;

import java.util.HashMap;

import org.apache.hadoop.mapreduce.Partitioner;

public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{

private static HashMap<String,Integer> areaMap = new HashMap<>();

static{
areaMap.put("135", 0);
areaMap.put("136", 1);
areaMap.put("137", 2);
areaMap.put("138", 3);
areaMap.put("139", 4);
}
@Override
public int getPartition(KEY key, VALUE value, int numPartitions) {
//从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号
int areaCoder = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3));

return areaCoder;

}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.areapartition;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import zhouls.bigdata.myMapReduce.areapartition.FlowBean;

/**
* 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件
* 需要自定义改造两个机制:
* 1、改造分区的逻辑,自定义一个partitioner
* 2、自定义reduer task的并发任务数
*
*
*
*/
public class FlowSumArea implements Tool {

public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {

//拿一行数据

String line = value.toString();
//切分成各个字段
String[] fields = StringUtils.split(line, "\t");
//拿到我们需要的字段
String phoneNB = fields[1];
long u_flow = Long.parseLong(fields[7]);
long d_flow = Long.parseLong(fields[8]);
//封装数据为kv并输出
context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));

}

}
public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
@Override
protected void reduce(Text key, Iterable<FlowBean> values,Context context)
throws IOException, InterruptedException {

long up_flow_counter = 0;

long d_flow_counter = 0;
for(FlowBean bean: values){
up_flow_counter += bean.getUp_flow();
d_flow_counter += bean.getD_flow();
}
context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));
}
}
public int run(String[] arg0) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSumArea.class);
job.setMapperClass(FlowSumAreaMapper.class);
job.setReducerClass(FlowSumAreaReducer.class);
//设置我们自定义的分组逻辑定义
job.setPartitionerClass(AreaPartitioner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//设置reduce的任务并发数,应该跟分组的数量保持一致
job.setNumReduceTasks(1);
FileInputFormat.addInputPath(job, new Path(arg0[0]));// 文件输入路径
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));// 文件输出路径
job.waitForCompletion(true);
return 0;

}

public static void main(String[] args) throws Exception {
//集群路径
// String[] args0 = { "hdfs://HadoopMaster:9000/flowSumArea/HTTP_20130313143750.dat",
// "hdfs://HadoopMaster:9000/out/flowSumArea"};
//集群路径
String[] args0 = { "./data/flowSumArea/HTTP_20130313143750.dat",
"./out/flowSumArea/"};
int ec = ToolRunner.run( new Configuration(), new FlowSumArea(), args0);
System. exit(ec);
}

@Override

public Configuration getConf() {
// TODO Auto-generated method stub
return null;
}

@Override

public void setConf(Configuration arg0) {
// TODO Auto-generated method stub
}
}

 

转载地址:http://zpzol.baihongyu.com/

你可能感兴趣的文章
Entity Framework 6.3 和EF Core 3.0路线图
查看>>
《敏捷时代》作者访谈录
查看>>
Scrum Guides 2017年最新修改
查看>>
Cling旨在提供一款高性能的C++ REPL
查看>>
关于《在Windows与.NET平台上的持续交付实践》的问答录
查看>>
TensorFlow模型的签名推荐与快速上线\n
查看>>
改变的六条规则
查看>>
GitHub是如何改进自身的DNS架构的
查看>>
IntelliJ IDEA 2018.3 新版本发布,支持 Java 12及Spring Boot增强等特性
查看>>
阿里重磅发布大规模图神经网络平台AliGraph,架构算法解读
查看>>
AWS Amplify Console:赋予应用程序快速部署的能力
查看>>
Git漏洞导致攻击者可在用户电脑上运行任意代码
查看>>
书评 —— 《Go语言编程》
查看>>
红帽收购混合云管理提供商NooBaa,混合云爆发节点临近!
查看>>
保持分布式团队同步
查看>>
QCon上海2015盛大开幕
查看>>
Jakarta EE:云原生Java的新平台
查看>>
2018 Node.js用户调查报告显示社区仍然在快速成长
查看>>
WhiteSource推出免费开源的漏洞检查工具
查看>>
聊天机器人已死,为什么腾讯还要打造自己的智能客服?
查看>>