今天就跟大家聊聊有关如何理解TopK算法及其实现,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
1、问题描述
在大数据规模中,经常遇到一类需要求出现频率最高的K个数,这类问题称为“TOPK”问题!例如:统计歌曲中最热门的前10首歌曲,统计访问流量最高的前5个网站等。
2、例如统计访问流量最高的前5个网站:
数据test.data文件:
数据格式解释:域名 上行流量 下行流量
思路:
1、Mapper每解析一行内容,按照"\t"获取各个字段
2、因为URL有很多重复记录,所以将URL放到key通过分析MapReduce原理),流量放在value
3、在reduce统计总流量,通过TreeMap进行对数据进行缓存,最后一并输出值得注意的是要一次性输出必须要用到Reduce类的cleanup方法)
程序如下:
Mapper类:
package com.itheima.hadoop.mapreduce.mapper; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Counter; import com.itheima.hadoop.mapreduce.bean.FlowBean; public class TopKURLMapper extends Mapper<LongWritable, Text, Text, FlowBean> { /** * @param key * : 每一行偏移量 * @param value * : 每一行的内容 * @param context * : 环境上下文 */ @Override public void mapLongWritable key, Text value, Context context) throws IOException, InterruptedException { /** * 该计数器是org.apache.hadoop.mapreduce.Counter */ Counter counter = context .getCounter"ExistProblem", "ExistProblemLine"); // 自定义存在问题的行错误计数器 String line = value.toString); // 读取一行数据 String[] fields = line.split"\t"); // 获取各个字段,按照\t划分 try { String url = fields[0]; // 获取URL字段 long upFlow = Long.parseLongfields[1]); // 获取上行流量upFlow)字段 long downFlow = Long.parseLongfields[2]); // 获取下行流量downFlow)字段 FlowBean bean = new FlowBeanupFlow, downFlow); // 将上行流量和下行流量封装到bean中 Text tUrl = new Texturl); // 将java数据类型转换hadoop数据类型 context.writetUrl, bean); // 传递的数据较多,封装到bean进行传输tips:bean传输时需要注意序列化问题) } catch Exception e) { e.printStackTrace); counter.increment1); // 记录错误行数 } } }
Reduce类:
package com.itheima.hadoop.mapreduce.reducer; import java.io.IOException; import java.util.Map.Entry; import java.util.TreeMap; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.itheima.hadoop.mapreduce.bean.FlowBean; public class TopKURLReducer extends Reducer<Text, FlowBean, FlowBean, Text> { private TreeMap<FlowBean, Text> treeMap = new TreeMap<FlowBean, Text>); /** * @param key * : 每一行相同URL * @param values * : 总流量bean */ @Override public void reduceText key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long countUpFlow = 0; long countDownFlow = 0; /* * 1、取出每个bean的总流量 2、统计多个bean的总流量 3、缓存到treeMap中 */ for FlowBean bean : values) { countUpFlow += bean.getUpFlow); // 统计上行流量 countDownFlow += bean.getDownFlow); // 统计下行总流量 } // 封装统计的流量 FlowBean bean = new FlowBeancountUpFlow, countDownFlow); treeMap.putbean, new Textkey)); // 缓存到treeMap中 } @Override public void cleanupContext context) throws IOException, InterruptedException { //遍历缓存 for Entry<FlowBean,Text> entry : treeMap.entrySet)) { context.writeentry.getKey), entry.getValue)); } super.cleanupcontext); // 不能动原本的销毁操作 } }
FlowBean类:
package com.itheima.hadoop.mapreduce.bean; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class FlowBean implements Writable, Comparable<FlowBean> { private long upFlow; private long downFlow; private long maxFlow; @Override public String toString) { return upFlow + "\t" + downFlow + "\t" + maxFlow; } /** * 1、序列化注意的问题,序列化需要默认的构造方法反射) 2、在readFields)和write)方法中,应该遵循按照顺序写出和读入 */ public FlowBean) { } public FlowBeanlong upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.maxFlow = upFlow + downFlow; } public long getUpFlow) { return upFlow; } public void setUpFlowlong upFlow) { this.upFlow = upFlow; } public long getDownFlow) { return downFlow; } public void setDownFlowlong downFlow) { this.downFlow = downFlow; } public long getMaxFlow) { return maxFlow; } public void setMaxFlowlong maxFlow) { this.maxFlow = maxFlow; } @Override public void readFieldsDataInput dataIn) throws IOException { upFlow = dataIn.readLong); downFlow = dataIn.readLong); maxFlow = dataIn.readLong); } @Override public void writeDataOutput dataOut) throws IOException { dataOut.writeLongupFlow); dataOut.writeLongdownFlow); dataOut.writeLongmaxFlow); } @Override public int compareToFlowBean o) { return this.maxFlow > o.maxFlow ? -1 : this.maxFlow < o.maxFlow ? 1 : 0; } }
驱动类:
package com.itheima.hadoop.drivers; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import com.itheima.hadoop.mapreduce.bean.FlowBean; import com.itheima.hadoop.mapreduce.mapper.TopKURLMapper; import com.itheima.hadoop.mapreduce.reducer.TopKURLReducer; public class TopKURLDriver extends Configured implements Tool{ @Override public int runString[] args) throws Exception { /** * 1、创建job作业 * 2、设置job提交的Class * 3、设置MapperClass,设置ReduceClass * 4、设置Mapper和Reduce各自的OutputKey和OutputValue类型 * 5、设置处理文件的路径,输出结果的路径 * 6、提交job */ Configuration conf = new Configuration); Job job = Job.getInstanceconf); job.setJarByClassTopKURLRunner.class); job.setMapperClassTopKURLMapper.class); job.setReducerClassTopKURLReducer.class); job.setMapOutputKeyClassText.class); job.setMapOutputValueClassFlowBean.class); job.setOutputKeyClassFlowBean.class); job.setOutputValueClassText.class); FileInputFormat.setInputPathsjob, new Pathargs[0])); FileOutputFormat.setOutputPathjob,new Pathargs[1])); //参数true为打印进度 return job.waitForCompletiontrue)?0:1; } }
package com.itheima.hadoop.runner; import org.apache.hadoop.util.ToolRunner; import com.itheima.hadoop.runner.TopKURLRunner; public class TopKURLRunner { public static void mainString[] args) throws Exception { int res = ToolRunner.runnew TopKURLRunner), args); System.exitres); } }
运行命令:hadoop jar topkurl.jar com.itheima.hadoop.drives.TopKURLDriver /test/inputData /test/outputData
运行结果: