Kafka-HBase-MapReduce-Mysql 连接实践 通话记录

1.项目介绍

本项目采用的数据为通话记录数据,例(张三 李四 2021-4-23 12:32:13 2942)意思是张三在2021-4-23 12:32:13这个时间给李四通话,通话时长为2942秒

  1. 数据来源【程序自己模拟数据的产生,交给Kafka的生产者】
  2. Kafka的消费者端接的是HBase数据库
  3. MapReduce读取HBase中的数据进行分析
  4. 再将分析的数据导入MySQL

2.各类介绍

Produce模块

Kafka-HBase-MapReduce-Mysql 连接实践 通话记录

  • DataProduce:主要负责生产数据
  • Main:函数的入口
  • testAPI:进行功能测试
  • KafkaUtils:将数据发送到topic

Consumer模块

Kafka-HBase-MapReduce-Mysql 连接实践 通话记录

  • Main:程序的入口
  • HBaseConsumer:消费者拉取数据
  • HBaseDao:HBase的客户端对象,创建表导入数据
  • HBaseUtils:主要是创建RowKey,还有一些建表和命名空间的操作

Analysis模块

Kafka-HBase-MapReduce-Mysql 连接实践 通话记录

  • HashUtils:将每个Cell中的数据存入到HashMap中
  • MysqlUtils:主要是Mysql的连接操作
  • CountMap:计算每个用户之间的通话记录次数
  • DBWrite:实现了Writable、DBWritable,用于序列化以及写数据库操作

3.项目各模块

项目分为三个模块,分别是produce、consumer、analysis

Kafka-HBase-MapReduce-Mysql 连接实践 通话记录

  • produce:实现数据生产
  • consumer:Kafka将数据写入HBase
  • analysis:利用MapReduce分析数据将结果导入Mysql

2.1 produce

2.1.1 entry

public class Main {

    public static void main(String[] args) throws ParseException, InterruptedException {
        //生产数据,发到Kafka
        KafkaUtils.writeDataIntoKafka();
    }
}

2.1.2 dataProduce

public String produce(String startTime, String endTime) throws ParseException {
        // 张三 李四 2021-2-3 13:43:25 1204
        initMetaData();

        //获得随机下标来获得拨打电话者
        int callerIndex = (int) (Math.random() * telePhone.size());
        String callerName = phoneToName.get(telePhone.get(callerIndex));

        //获得被拔打电话者
        int calleeIndex;
        do {
            calleeIndex = (int) (Math.random() * telePhone.size());
        } while (callerIndex == calleeIndex);
        String calleeName = phoneToName.get(telePhone.get(calleeIndex));

        //定义解析时间的对象
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");

        //定义起止时间
        Date startDate = null;
        Date endDate = null;

        //解析传入的时间字符串,将其转化成Date格式
        startDate = sdf.parse(startTime);
        endDate = sdf.parse(endTime);

        //获得一个时间戳,来初始打电话的时间
        long randomTs = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());

        Date resultDate = new Date(randomTs);

        //将初始化好的Date时间,转化成字符串
        String resultTimeString = sdf.format(resultDate);

        //随机初始化小时、分钟、秒
        int hour = (int) (Math.random() * 24);
        int minute = (int) (Math.random() * 60);
        int second = (int) (Math.random() * 60);

        //初始化具体时间,精确到小时、分钟、秒
        String specificTime = String.format(String.format("%02d", hour) + ":"
                + String.format("%02d", minute) + ":"
                + String.format("%02d", second));

        //定义时间跨度,表明电话的拨打时长
        int duration = (int) (Math.random() * 3600);

        //拼接结果 张三 李四 2021-2-3 13:43:25 1204
        String result = callerName + " " + calleeName + " " + resultTimeString + " " + specificTime + " " + duration;

        return result;
    }

2.1.3 kafkaUtils

public static void writeDataIntoKafka() throws ParseException, InterruptedException {
        //定义配置对象
        Properties properties = new Properties();

        //定义主机名
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");

        //字符串序列化的类
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        //Kafka的主题
        String topic = "telecom";

        //定义一个生产者对象
        KafkaProducer producer = new KafkaProducer<String, String>(properties);

        //循环发送数据到Kafka
        for (int i = 0; i < 1000; i++) {
            //按给定起止时间生成数据
            String value = dataProduce.produce("2021-1-1", "2021-5-1");

            //睡眠1秒
            Thread.sleep(1000);

            //创建ProducerRecord对象
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);

            //发送数据
            producer.send(record);
        }

        //关闭资源
        producer.close();

    }

2.2 consumer

2.2.1 entry

public class Main {
    public static void main(String[] args) throws IOException, InterruptedException, ParseException {
        //创建HBase消费者
        HBaseConsumer hBaseConsumer = new HBaseConsumer();

        //从Kafka中获取数据输到HBase
        hBaseConsumer.getDataFromKafka();
    }
}

2.2.2 hbase

2.2.2.1 HBaseConsumer

public class HBaseConsumer {
    public void getDataFromKafka() throws InterruptedException, IOException, ParseException {
        //定义配置对象
        Properties properties = new Properties();

        //连接主机名
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");

        //是否自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        //自动提交的时间间隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);

        //消费者组名
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test3");

        //字符串序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //创建消费者对象
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

        //消费者订阅主题
        consumer.subscribe(Arrays.asList("telecom"));

        //创建一个Dao对象,用于上传数据到HBase
        HBaseDao hBaseDao = new HBaseDao();

        //从Kafka拉取数据
        while (true) {
            //拉取的时间间隔
            ConsumerRecords<String,String> records = consumer.poll(100);

            //拉取数据输到HBase
            for (ConsumerRecord<String,String> record : records) {
                String value = record.value();

                System.out.println(value);
                Thread.sleep(1000);

                //上传数据
                hBaseDao.put(value);
            }

        }

    }
}

2.2.2.2 HBaseDao

public class HBaseDao {
    //命名空间
    private String nameSpace;
    //表名
    private String tableName;
    //配置对象
    public static Configuration conf;
    //表对象
    private Table table;
    //连接HBase对象
    private Connection connection;
    //解析日期对象
    private SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
    private SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMddHHmmss");

    //初始化配置对象
    static {
        conf = HBaseConfiguration.create();
    }

    public HBaseDao() throws IOException {
        nameSpace = "telecom";
        tableName = "teleRecord";

        connection = ConnectionFactory.createConnection(conf);

        if (!HBaseUtils.isExistTable(conf, tableName)) {
            HBaseUtils.initNamespace(conf, nameSpace);
            HBaseUtils.createTable(conf, tableName, "f1", "f2");
        }

        table = connection.getTable(TableName.valueOf(tableName));
    }

    //将数据导入HBase
    public void put(String value) throws ParseException, IOException {
        //将Kafka拉取的数据切分
        String[] splitValues = value.split(" ");

        String caller = splitValues[0];
        String callee = splitValues[1];
        String buildTime = splitValues[2];
        String specificTime = splitValues[3];
        String duration = splitValues[4];

        //2021-03-21 12:23:04
        buildTime = buildTime + " " + specificTime;

        //20210321122304   用于创建rowKey
        String buildTimeReplace = sdf2.format(sdf1.parse(buildTime));

        //时间戳
        String buildTimeTs = String.valueOf(sdf1.parse(buildTime).getTime());

        //获得rowKey
        String rowKey = HBaseUtils.createRowKey(caller, callee, buildTimeReplace, "1", duration);

        //创建put对象
        Put put = new Put(Bytes.toBytes(rowKey));

        //添加各列属性
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("caller"), Bytes.toBytes(caller));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callee"), Bytes.toBytes(callee));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time"), Bytes.toBytes(buildTime));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time_ts"), Bytes.toBytes(buildTimeTs));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("flag"), Bytes.toBytes("1"));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("duration"), Bytes.toBytes(duration));

        //添加put
        table.put(put);
    }
}

2.2.3 hbaseUtils

public class HBaseUtils {
    //判断表是否存在
    public static boolean isExistTable(Configuration conf, String tableName) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();

        boolean result = admin.tableExists(TableName.valueOf(tableName));

        admin.close();
        connection.close();

        return result;
    }

    //判断命名空间是否存在
    public static boolean isExistTableSpace(Configuration conf, String nameSpace) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();

        boolean result = false;

        admin.close();
        connection.close();

        return result;
    }

    //创建命名空间
    public static void initNamespace(Configuration conf, String nameSpace) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();

        try {
            NamespaceDescriptor descriptor = NamespaceDescriptor.create(nameSpace).build();

            admin.createNamespace(descriptor);
        } catch (NamespaceExistException e) {

        } finally {
            admin.close();
            connection.close();
        }


    }

    //创建表
    public static void createTable(Configuration conf, String tableName, String... cfs) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();

        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));

        for (String cf : cfs) {
            hTableDescriptor.addFamily(new HColumnDescriptor(cf));
        }

        admin.createTable(hTableDescriptor);

        admin.close();
        connection.close();
    }

    //创建RowKey
    public static String createRowKey(String caller, String callee, String buildTime, String flag, String duration) {
        StringBuilder rowKey = new StringBuilder();

        rowKey.append(caller + "_")
                .append(buildTime + "_")
                .append(callee + "_")
                .append(flag + "_")
                .append(duration);

        return rowKey.toString();
    }
}

2.3 analysis

2.3.1 hashUtils

public class HashUtils {
    public static void putValue(Cell cell, HashMap<String, String> hashMap) {
        //获取cell中的列名
        String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));

        //获取每列的值
        String value = Bytes.toString(CellUtil.cloneValue(cell));

        //存入map
        hashMap.put(qualifier, value);
    }
}

2.3.2 mysqlUtils

public class MysqlUtils {
    public static Connection connection;
    public static String userName = "root";
    public static String passwd = "123456";
    public static PreparedStatement ps = null;

    //获得Connection对象
    static {
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        try {
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb" +
                            "?useSSL=false" +
                            "&allowPublicKeyRetrieval=true" +
                            "&serverTimezone=UTC",
                    userName,
                    passwd);
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    //清空表数据
    public static void deleteData(String tableName) throws SQLException {
        String sql = String.format("delete from %s", tableName);
        ps = connection.prepareStatement(sql);
        ps.executeUpdate();
    }
}

2.3.3 hbaseToMR

2.3.3.1 callCount

2.3.3.1.1 Map
public class CountMap extends TableMapper<Text, IntWritable> {
    //输出    张三      1
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        HashMap<String, String> hashMap = new HashMap<>();

        for (Cell cell : value.rawCells()) {
            HashUtils.putValue(cell, hashMap);
        }

        String caller = hashMap.get("caller");
        String callee = hashMap.get("callee");

        context.write(new Text(caller + "-" + callee), new IntWritable(1));
    }
}
2.3.3.1.2 Reduce
public class CountReduce extends Reducer<Text, IntWritable, DBWrite, NullWritable> {
    //输出  张三    23
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;

        for (IntWritable value : values) {
            count += 1;
        }

        context.write(new DBWrite(key.toString(), count), NullWritable.get());
    }
}
2.3.3.1.3 Driver
public class CountDriver implements Tool {

    //配置对象
    public static Configuration conf = null;

    //Mysql数据库表名
    public static String mysqlTableName = "callcounts";

    //Mysql表中列名
    public static String[] fieldNames = {"callercallee", "counts"};

    //Mysql驱动类
    public static String driverClass = "com.mysql.cj.jdbc.Driver";

    //Mysql的URL
    public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" +
            "?useSSL=false" +
            "&allowPublicKeyRetrieval=true" +
            "&serverTimezone=UTC";

    //Mysql的用户名
    public static String userName = "root";

    //Mysql的用户密码
    public static String passwd = "123456";

    @Override
    public int run(String[] strings) throws Exception {
        //配置Mysql
        DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd);
        //清空表
        MysqlUtils.deleteData(mysqlTableName);

        //获得job对象
        Job job = Job.getInstance(conf);

        //关联Jar
        job.setJarByClass(CountDriver.class);

        //配置MapperJob
        TableMapReduceUtil.initTableMapperJob("teleRecord",
                new Scan(),
                CountMap.class,
                Text.class,
                IntWritable.class,
                job);

        //关联Reduce类
        job.setReducerClass(CountReduce.class);
        job.setOutputKeyClass(DBWrite.class);
        job.setOutputValueClass(NullWritable.class);

        //设置输出类型
        job.setOutputFormatClass(DBOutputFormat.class);
        DBOutputFormat.setOutput(job, mysqlTableName, fieldNames);

        //提交job任务
        boolean result = job.waitForCompletion(true);

        return result ? 0 : 1;
    }

    @Override
    public void setConf(Configuration configuration) {
        conf = configuration;
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
    }

    @Override
    public Configuration getConf() {
        return conf;
    }

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            int run = ToolRunner.run(conf, new CountDriver(), args);

            System.exit(run);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
2.3.3.1.4 DBWrite
public class DBWrite implements Writable, DBWritable {
    String caller_callee = "";
    int count = 0;

    public DBWrite(){}

    public DBWrite(String caller_callee, int count){
        this.caller_callee=caller_callee;
        this.count=count;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(caller_callee);
        out.writeInt(count);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.caller_callee = in.readUTF();
        this.count = in.readInt();
    }

    @Override
    public void write(PreparedStatement preparedStatement) throws SQLException {
        preparedStatement.setString(1, caller_callee);
        preparedStatement.setInt(2, count);
    }

    @Override
    public void readFields(ResultSet resultSet) throws SQLException {
        this.caller_callee = resultSet.getString(1);
        this.count = resultSet.getInt(2);
    }
}

2.3.3.2 callerDuration

2.3.3.2.1 Map
public class DurationMap extends TableMapper<Text, LongWritable> {
    //输出    张三     2041
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        //创建HashMap对象,为了下面取出对应值用
        HashMap<String, String> hashMap = new HashMap<>();

        //迭代rowkey对应的每个单元
        for (Cell cell : value.rawCells()) {
            HashUtils.putValue(cell, hashMap);
        }

        //获得电话发起人
        String caller = hashMap.get("caller");

        //获得每次电话时长
        String duration = hashMap.get("duration");

        //写出
        context.write(new Text(caller), new LongWritable(Long.valueOf(duration)));
    }
}
2.3.3.2.2 Reduce
public class DurationReduce extends Reducer<Text, LongWritable, DBWrite, NullWritable> {
    //输出    张三     4204
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        //存储每个人拨打电话的总时长
        long sum = 0;

        //迭代每个时长
        for (LongWritable value : values) {
            sum += value.get();
        }

        //将结果写出
        context.write(new DBWrite(key.toString(), String.valueOf(sum)), NullWritable.get());
    }
}
2.3.3.2.3 Driver
public class DurationDriver implements Tool {

    //配置对象
    public static Configuration conf = null;

    //Mysql数据库表名
    public static String mysqlTableName = "callerdurations";

    //Mysql表中列名
    public static String[] fieldNames = {"caller", "durations"};

    //Mysql驱动类
    public static String driverClass = "com.mysql.cj.jdbc.Driver";

    //Mysql的URL
    public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" +
            "?useSSL=false" +
            "&allowPublicKeyRetrieval=true" +
            "&serverTimezone=UTC";

    //Mysql的用户名
    public static String userName = "root";

    //Mysql的用户密码
    public static String passwd = "123456";

    @Override
    public int run(String[] strings) throws Exception {
        //配置Mysql
        DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd);
        //清空表
        MysqlUtils.deleteData(mysqlTableName);

        //获得job对象
        Job job = Job.getInstance(conf);

        //关联Jar
        job.setJarByClass(DurationDriver.class);

        //配置MapperJob
        TableMapReduceUtil.initTableMapperJob("teleRecord",
                new Scan(),
                DurationMap.class,
                Text.class,
                LongWritable.class,
                job);

        //关联Reduce类
        job.setReducerClass(DurationReduce.class);
        job.setOutputKeyClass(DBWrite.class);
        job.setOutputValueClass(NullWritable.class);

        //设置输出类型
        job.setOutputFormatClass(DBOutputFormat.class);
        DBOutputFormat.setOutput(job, mysqlTableName, fieldNames);

        //提交job任务
        boolean result = job.waitForCompletion(true);

        return result ? 0 : 1;
    }

    @Override
    public void setConf(Configuration configuration) {
        conf = configuration;
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
    }

    @Override
    public Configuration getConf() {
        return conf;
    }

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            int run = ToolRunner.run(conf, new DurationDriver(), args);

            System.exit(run);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2.3.3.3 dayCountDuration

2.3.3.3.1 Map
public class dayCountDurationMap extends TableMapper<Text, LongWritable> {
    //2021-01-13    3042
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        HashMap<String, String> hashmap = new HashMap<>();

        for (Cell cell : value.rawCells()) {
            HashUtils.putValue(cell, hashmap);
        }

        String date = hashmap.get("build_time").substring(0, 10);

        String duration = hashmap.get("duration");

        context.write(new Text(date), new LongWritable(Long.valueOf(duration)));
    }
}
2.3.3.3.2 Reduce
public class dayCountDurationReduce extends Reducer<Text, LongWritable, DBWrite, NullWritable> {
    //输出 2021-01-13  2042
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long durations = 0;

        for (LongWritable value : values) {
            durations += value.get();
        }

        context.write(new DBWrite(key.toString(), durations), NullWritable.get());
    }
}
2.3.3.3.3 Driver
public class dayCountDurationDriver implements Tool {

    //配置对象
    public static Configuration conf = null;

    //Mysql数据库表名
    public static String mysqlTableName = "daydurations";

    //Mysql表中列名
    public static String[] fieldNames = {"date", "durations"};

    //Mysql驱动类
    public static String driverClass = "com.mysql.cj.jdbc.Driver";

    //Mysql的URL
    public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" +
            "?useSSL=false" +
            "&allowPublicKeyRetrieval=true" +
            "&serverTimezone=UTC";

    //Mysql的用户名
    public static String userName = "root";

    //Mysql的用户密码
    public static String passwd = "123456";

    @Override
    public int run(String[] strings) throws Exception {
        //配置Mysql
        DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd);
        //清空表
        MysqlUtils.deleteData(mysqlTableName);

        //获得job对象
        Job job = Job.getInstance(conf);

        //关联Jar
        job.setJarByClass(dayCountDurationDriver.class);

        //配置MapperJob
        TableMapReduceUtil.initTableMapperJob("teleRecord",
                new Scan(),
                dayCountDurationMap.class,
                Text.class,
                LongWritable.class,
                job);

        //关联Reduce类
        job.setReducerClass(dayCountDurationReduce.class);
        job.setOutputKeyClass(DBWrite.class);
        job.setOutputValueClass(NullWritable.class);

        //设置输出类型
        job.setOutputFormatClass(DBOutputFormat.class);
        DBOutputFormat.setOutput(job, mysqlTableName, fieldNames);

        //提交job任务
        boolean result = job.waitForCompletion(true);

        return result ? 0 : 1;
    }

    @Override
    public void setConf(Configuration configuration) {
        conf = configuration;
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
    }

    @Override
    public Configuration getConf() {
        return conf;
    }

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            int run = ToolRunner.run(conf, new dayCountDurationDriver(), args);

            System.exit(run);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4 项目源码

Github地址

Kafka-HBase-MapReduce-Mysql 连接实践 通话记录

上一篇:MongoDB --- 复制集方法


下一篇:北美源代码转储 Nissan NA 的所有 git 存储库的完整转储