MapReduce 订单和商品pid替换(Reduce Join)基本实现

思路简介

首先在Map阶段将两个表的数据全部存入一个自定义Bean中,然后在Reduce阶段将其进行替换。

输入数据

order.txt 订单表数据(间隔:\t)

订单id 商品id 数量

1001	01	1
1002	02	2
1003	03	3
1004	01	4
1005	02	5
1006	03	6

pd.txt 商品表数据(间隔:\t)

商品id 商品名字

01	小米
02	华为
03	红米

Maven和log4j.properties配置

参考 MapReduce统计流量案例 中的配置  

自定义Writable类实现(TableBean)

package com.test.mapreduce.reducejoin;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TableBean implements Writable {

    private String id;     // 订单ID
    private String pid;    // 产品ID
    private Integer amount;// 产品数量
    private String pname;  // 产品名称
    private String flag;   // 标识来源

    /**
     * 构建空参构造函数
     */
    public TableBean() {
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public Integer getAmount() {
        return amount;
    }

    public void setAmount(Integer amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    /**
     * 重写 write 序列化
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(id);
        dataOutput.writeUTF(pid);
        dataOutput.writeInt(amount);
        dataOutput.writeUTF(pname);
        dataOutput.writeUTF(flag);
    }

    /**
     * 重写 readFields 反序列化
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.id     = dataInput.readUTF();
        this.pid    = dataInput.readUTF();
        this.amount = dataInput.readInt();
        this.pname  = dataInput.readUTF();
        this.flag   = dataInput.readUTF();
    }

    /**
     * 重写toString,设置输出数据格式
     */
    @Override
    public String toString() {
        return id + "\t" + pname + "\t" + amount;
    }
}

自定义Mapper类实现(TableMapper)

package com.test.mapreduce.reducejoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {

    // 定义对象,以便封装数据
    private Text k = new Text();
    private TableBean v = new TableBean();

    // 定义全局变量
    private String filename;

    /**
     * 初始化
     * 初始化时获取传入文件的文件名称
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 获取输入的切片信息
        FileSplit split = (FileSplit) context.getInputSplit();
        // 获取其中输入的文件名
        filename = split.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1.将每一行转换为字符串
        String line = value.toString();
        // 2. 切割每一行
        String[] split = line.split("\t");
        // 3.判断是哪个表的内容
        if (filename.contains("order")) { // 订单表
            // 封装k,v
            k.set(split[1]);
            v.setId(split[0]);
            v.setPid(split[1]);
            v.setAmount(Integer.parseInt(split[2]));
            v.setPname("");
            v.setFlag("order");
        }else {                          // 商品表
            // 封装k,v
            k.set(split[0]);
            v.setId("");
            v.setPid(split[0]);
            v.setAmount(0);
            v.setPname(split[1]);
            v.setFlag("pd");
        }
        // 4.写出
        context.write(k, v);
    }
}

自定义Reducer类实现(TableReducer)

package com.atguigu.mapreduce.reducejoin;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
        // 1.定义TableBean数组对象存储order表数据(多条)
        ArrayList<TableBean> orderBeans = new ArrayList<>();
        // 2.定义TableBean对象存储pd表数据(只有一条)
        TableBean pdBean = new TableBean();

        // 3.变量所有所有数据,将其分赋值至创建的变量中
        for (TableBean value : values) {
            // 判断来自那张表
            if ("order".equals(value.getFlag())) { // 订单表
                // 因为Hadoop底层优化,不能直接将对象放入集合,需要copy对象之后在放入。!
                // 创建临时TableBean对象来接收value
                TableBean tmpOrderBean = new TableBean();

                // 将order数据拷贝给临时对象存储
                try {
                    BeanUtils.copyProperties(tmpOrderBean, value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }

                // 将临时对象存入集合
                orderBeans.add(tmpOrderBean);
            }else {                                // 商品表
                // 将pd数据拷贝给pdBean对象存储
                try {
                    BeanUtils.copyProperties(pdBean, value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }

        // 4.变量集合,进行替换操作,然后写出
        for (TableBean orderBean : orderBeans) {
            // 替换操作
            orderBean.setPname(pdBean.getPname());
            // 写出
            context.write(orderBean, NullWritable.get());
        }

    }
}

自定义Reducer类实现(TableDriver)

package com.test.mapreduce.reducejoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TableDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.创建配置信息Configuration对象并获取Job单例对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2.设置关联本Driver程序的jar
        job.setJarByClass(TableDriver.class);

        // 3.设置关联Mapper和Reducer
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);

        // 4.设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);

        // 5. 设置最终输出的kv类型
        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);

        // 6.设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\input"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\output"));

        // 7.提交job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

上一篇:初始化hive时报错:org.apache.hadoop.hive.metastore.HiveMetaException: Failed to get schema version


下一篇:使用虚拟机搭建Hadoop(伪分布式搭建、分布式搭建)