如何理解TopK算法及其实现

今天就跟大家聊聊有关如何理解TopK算法及其实现,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

1、问题描述

在大数据规模中,经常遇到一类需要求出现频率最高的K个数,这类问题称为“TOPK”问题!例如:统计歌曲中最热门的前10首歌曲,统计访问流量最高的前5个网站等。

2、例如统计访问流量最高的前5个网站:

数据test.data文件:

如何理解TopK算法及其实现

数据格式解释:域名    上行流量    下行流量

思路:

1、Mapper每解析一行内容,按照"\t"获取各个字段

2、因为URL有很多重复记录,所以将URL放到key通过分析MapReduce原理),流量放在value

3、在reduce统计总流量,通过TreeMap进行对数据进行缓存,最后一并输出值得注意的是要一次性输出必须要用到Reduce类的cleanup方法)

程序如下:

Mapper类:

package com.itheima.hadoop.mapreduce.mapper;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Counter;
import com.itheima.hadoop.mapreduce.bean.FlowBean;
public class TopKURLMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    /**
     * @param key
     *            : 每一行偏移量
     * @param value
     *            : 每一行的内容
     * @param context
     *            : 环境上下文
     */
    @Override
    public void mapLongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        /**
         * 该计数器是org.apache.hadoop.mapreduce.Counter
         */
        Counter counter = context
                .getCounter"ExistProblem", "ExistProblemLine"); // 自定义存在问题的行错误计数器
        String line = value.toString); // 读取一行数据
        String[] fields = line.split"\t"); // 获取各个字段,按照\t划分
        try {
            String url = fields[0]; // 获取URL字段
            long upFlow = Long.parseLongfields[1]); // 获取上行流量upFlow)字段
            long downFlow = Long.parseLongfields[2]); // 获取下行流量downFlow)字段

            FlowBean bean = new FlowBeanupFlow, downFlow); // 将上行流量和下行流量封装到bean中
            Text tUrl = new Texturl); // 将java数据类型转换hadoop数据类型
            context.writetUrl, bean); // 传递的数据较多,封装到bean进行传输tips:bean传输时需要注意序列化问题)
        } catch Exception e) {
            e.printStackTrace);
            counter.increment1); // 记录错误行数
        }
    }
}

Reduce类:

package com.itheima.hadoop.mapreduce.reducer;

import java.io.IOException;
import java.util.Map.Entry;
import java.util.TreeMap;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import com.itheima.hadoop.mapreduce.bean.FlowBean;

public class TopKURLReducer extends Reducer<Text, FlowBean, FlowBean, Text> {
    private TreeMap<FlowBean, Text> treeMap = new TreeMap<FlowBean, Text>);
    /**
     * @param key
     *            : 每一行相同URL
     * @param values
     *            : 总流量bean
     */
    @Override
    public void reduceText key, Iterable<FlowBean> values, Context context)
            throws IOException, InterruptedException {
        long countUpFlow = 0;
        long countDownFlow = 0;
        /*
         * 1、取出每个bean的总流量 2、统计多个bean的总流量 3、缓存到treeMap中
         */
        for FlowBean bean : values) {
            countUpFlow += bean.getUpFlow); // 统计上行流量
            countDownFlow += bean.getDownFlow); // 统计下行总流量
        }
        // 封装统计的流量
        FlowBean bean = new FlowBeancountUpFlow, countDownFlow);
        treeMap.putbean, new Textkey)); // 缓存到treeMap中
    }
    @Override
    public void cleanupContext context) throws IOException,
            InterruptedException {
        //遍历缓存
        for Entry<FlowBean,Text> entry : treeMap.entrySet)) {
            context.writeentry.getKey), entry.getValue));
        }
        super.cleanupcontext); // 不能动原本的销毁操作
    }
}

FlowBean类:

package com.itheima.hadoop.mapreduce.bean;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable, Comparable<FlowBean> {
    private long upFlow;
    private long downFlow;
    private long maxFlow;
    @Override
    public String toString) {
        return upFlow + "\t" + downFlow + "\t" + maxFlow;
    }
    /**
     * 1、序列化注意的问题,序列化需要默认的构造方法反射) 2、在readFields)和write)方法中,应该遵循按照顺序写出和读入
     */
    public FlowBean) {
    }
    public FlowBeanlong upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.maxFlow = upFlow + downFlow;
    }
    public long getUpFlow) {
        return upFlow;
    }
    public void setUpFlowlong upFlow) {
        this.upFlow = upFlow;
    }
    public long getDownFlow) {
        return downFlow;
    }
    public void setDownFlowlong downFlow) {
        this.downFlow = downFlow;
    }
    public long getMaxFlow) {
        return maxFlow;
    }
    public void setMaxFlowlong maxFlow) {
        this.maxFlow = maxFlow;
    }
    @Override
    public void readFieldsDataInput dataIn) throws IOException {
        upFlow = dataIn.readLong);
        downFlow = dataIn.readLong);
        maxFlow = dataIn.readLong);
    }
    @Override
    public void writeDataOutput dataOut) throws IOException {
        dataOut.writeLongupFlow);
        dataOut.writeLongdownFlow);
        dataOut.writeLongmaxFlow);
    }
    @Override
    public int compareToFlowBean o) {
        return this.maxFlow > o.maxFlow ? -1
                : this.maxFlow < o.maxFlow ? 1 : 0;
    }
}

驱动类:

package com.itheima.hadoop.drivers;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;

import com.itheima.hadoop.mapreduce.bean.FlowBean;
import com.itheima.hadoop.mapreduce.mapper.TopKURLMapper;
import com.itheima.hadoop.mapreduce.reducer.TopKURLReducer;

public class TopKURLDriver extends Configured implements Tool{

    @Override
    public int runString[] args) throws Exception {
        
        /**
         * 1、创建job作业
         * 2、设置job提交的Class
         * 3、设置MapperClass,设置ReduceClass
         * 4、设置Mapper和Reduce各自的OutputKey和OutputValue类型
         * 5、设置处理文件的路径,输出结果的路径
         * 6、提交job
         */
        Configuration conf = new Configuration);
        Job job = Job.getInstanceconf);
        
        job.setJarByClassTopKURLRunner.class);
        
        job.setMapperClassTopKURLMapper.class);
        job.setReducerClassTopKURLReducer.class);
        
        job.setMapOutputKeyClassText.class);
        job.setMapOutputValueClassFlowBean.class);
        job.setOutputKeyClassFlowBean.class);
        job.setOutputValueClassText.class);
        
        FileInputFormat.setInputPathsjob, new Pathargs[0]));
        FileOutputFormat.setOutputPathjob,new Pathargs[1]));
        
        //参数true为打印进度
        return job.waitForCompletiontrue)?0:1;
    }

}
package com.itheima.hadoop.runner;

import org.apache.hadoop.util.ToolRunner;

import com.itheima.hadoop.runner.TopKURLRunner;

public class TopKURLRunner {

    public static void mainString[] args) throws Exception {
        int res = ToolRunner.runnew TopKURLRunner), args);
        System.exitres);
    }
}

运行命令:hadoop jar topkurl.jar com.itheima.hadoop.drives.TopKURLDriver /test/inputData /test/outputData

运行结果:

如何理解TopK算法及其实现

Published by

风君子

独自遨游何稽首 揭天掀地慰生平

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注