Flume2Kafka2HBase功能

Flume2Kafka2HBase功能块

  最近学习尚硅谷电信客服项目-大数据项目,将以前学习的Hadoop,HBase,Flume,Kafka使用起来,一方面是学习各技术之间的项目使用,二方面是通过写博客,加强学习印象,查漏补缺。
电信客服项目有数据模拟生成,数据消费,数据分析,数据展示这四个模块,我将数据消费模块剥离出来,按照项目重新敲了3遍,将关键的代码全部整合在一起,独立成一个项目。


前言

  数据消费模块是将通过Flume采集文件数据,Kafka消费,写入HBase的过程,跑通该模块需要搭建启动好Hadoop,Zookeeper,Flume,Kafka,HBase环境。Flume负责采集数据;Kafka负责拉去Flume数据,起到异步,削峰,解耦的作用;Zookeeper为HBase提供稳定服务和failover机制,HDFS为HBase提供底层存储支持,MapReduce为HBase提供高性能的计算能力。所以上面几个技术不能少,通过这个项目例子,大概能了解这些技术点之间的实际使用,方便后面的深入研究,从用到懂的升华。

一. 为什么要集成Flume和Kafka

  一般使用Flume+Kafka架构的都是希望完成实时流式的日志处理,后面再连接上Flink/Storm/Spark Streaming等流式实时处理技术,从而完成日志实时解析的目标。第一、如果Flume直接对接实时计算框架,当数据采集速度大于数据处理速度,很容易发生数据堆积或者数据丢失,而kafka可以当做一个消息缓存队列,从广义上理解,把它当做一个数据库,可以存放一段时间的数据。第二、Kafka属于中间件,一个明显的优势就是使各层解耦,使得出错时不会干扰其他组件。

二. Kafka介绍和实践

  Kafka是一个分布式发布订阅消息系统,能够将消息从一个端点传递到另一端点。Kafka消息保存在磁盘上并在集群内复制以防止数据丢失。主要掌握三个原理部分:producer,topic,consumer:
  Producer:生产者。向Kafka的一个topic发布消息的生产过程。本例中Flume作为Kafka的生产者
  Topic:主题。对消息的逻辑分类,起到资源隔离的作用
  Consumer:消费者。订阅topic并处理其发布的消息的消费过程

  在发布订阅系统中,不同的消息保存在不同的主题中,消费者可以订阅一个或多个主题,并使用该主题的所有信息。比如广播电台(Producer),它发布了体育,电影,音乐等不同的频道(Topic),任何用户(Consumer)可以订阅自己喜欢的频道,并从中获取内容

结合电信客服项目,启动Kafka部分:
为了保证Flume采集的数据能够写入Kafka指定的topic内,先启动Kafka。
先进入各个集群节点的Kafka安装路径,启动Kafka,并创建topic:

 ./bin/kafka-server-start.sh  config/server.properties
 bin/kafka-topics.sh --zookeeper master:2181 --topic ct --create --replication-factor 1 --partitions 3

三. Flume介绍和实践

  Flume是分布式高可用的海量日志聚合系统,支持在系统中监控各类数据用于收集数据;同时Flume提供数据写入各种数据接收方用于转发数据;Flume的易用性在于通过读取配置文件,自动收集日志文件,在大数据处理各种复杂情况下,经常被用来作为数据处理的工具。
  Flume内部有一个或多个Agent,用于数据的接收和输出,传递的数据格式是Event事件对象。Agent主要由source,channel,sink三个组件组成。
  Source:数据源,收集的方式多种多样,有检测文件夹变化的Spool Source,检测端口信息的Netcat Source,监控各类文件新增内容的Exec Source等等。本例中使用的Exec Source
  Channel:数据通道,是一种短暂的存储容器,将source接收的数据缓存起来,直到Sink消费掉。实现了数据的事务性传输,保证数据的安全传输。主要有两种缓存方式:Memory Channel:内存存储事务,吞吐率极高,但存在丢数据风险。 File Channel:本地磁盘的事务存储,容量大且数据死掉可恢复,但速度慢。本例中使用Memory Channel
  Sink:数据接收。将事件从Channel中移除,并将事件放置外部数据介质上。例如将数据放置到HDFS,HBase,或者放置到下一个Flume的Source,等待下一个Flume处理。本例使用HBase

结合电信客服项目,启动Flume部分:
在/root/projects/dianxing/下新建call.log文件,进入Flume安装目录下,启动Flume:

bin/flume-ng agent -c conf/ -n a1 -f /root/projects/dianxing/flume2kafka.conf

采集的数据格式,call.log文件内容:

15305526350	17885275338	20180408121330	1507
19313925217	14930423697	20180419104617	1057
19154926260	14397114174	20181018172955	1681
14930423697	13319935953	20180211121928	1064
17336673697	16574556259	20180625162151	1878
18101213362	16569963779	20180730224651	0440

Flume配置文件:

# 指定Agent的组件名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 指定Flume 要监听文件的目录
a1.sources.r1.type = exec
#循环读取指定文件的内容
a1.sources.r1.command = tail -F -c +0 /root/projects/dianxing/call.log
a1.sources.r1.shell = /bin/bash -c

# 指定Flume channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# KafkaSink首字母大写
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.k1.topic = ct
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# 绑定source和sink到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

四.Flume+Kafka测试

Kafka和Flume都正常运作后,使用Kafka消费者测试是否采集到数据:

./bin/kafka-console-consumer.sh --zookeeper master:2181 -topic ct --from-beginning

接下来使用java端操作Flume和Kafka实现读取数据:

@Override
public void consume() {

    try {
        // a: kafka消费flume数据
        // 前提要求,flume启动,各节点的zookeeper和kafka启动
        // kafka消费者测试是否采到数据:bin/kafka-console-consumer.sh --zookeeper master:2181 --topic ct
        // a1. 获取kafka配置对象,加载配置信息
        Properties prop = new Properties();
        prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));
        // a2. 获取kafka消费者对象,设置订阅的topic
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
        consumer.subscribe(Arrays.asList(Names.TOPIC.getValue()));

        while (true) {
            // a3. 每100ms拉去数据保存在ConsumerRecords内。这步就是kafka获取了从flume采集的数据
            ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                // 采集的数据格式:19342117869	18101213362	20181119225554	2830
                System.out.println(consumerRecord.value());
            }
        }

    } catch (Exception e) {
        e.printStackTrace();
    }
}

consumer.properties的配置如下:

bootstrap.servers=master:9092,slave1:9092,slave2:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=com.hbase
enable.auto.commit=true
auto.commit.interval.ms=1000

控制台能够查看到数据在刷新,说明flume采集的数据,Kafka已经在消费了。接下来就是将采集到的数据保存到HBase内

五. HBase介绍和写入HBase实践

  HBase是一个面向列的分布式数据库,利用HDFS作为其文件存储系统,利用MapReduce来处理HBase中的海量数据,利用Zookeeper作为其分布式协同服务。主要存储非结构化和半结构化的松散数据。
简单介绍下HBase的结构体系:
Flume2Kafka2HBase功能

  HMaster:HBase的管理者,负责把HRegion分配给HRegionServer,
  HRegionServer:维护HMaster分配给它的HRegion,处理HRegion的IO请求,切分过大的HRegion
  HRegion:一个数据表会被划分为1…n个Region,通过startkey和endKey区分一个Region维护的范围,读写数据时,如果rowkey落在某个start-endkey范围内,就会定位到目标region下进行读取。在看看HRegion的结构:
Flume2Kafka2HBase功能

  每个HRegion由多个Store构成,每个Store保存一个列族(Columns Family),表有几个列族,就有几个Store,每个Store由一个MemStore和多个StoreFile组成,MemStore是Store在内存的内容,写到文件后就是StoreFile。StoreFile底层是以HFile格式保存
  HLog: 每个HRegion内都有一个HLog,用来做灾难恢复,它记录这数据的变更,在RegionServer宕机,就可以从log中回滚还没有持久化的数据
  HFile:HBase的数据最终以HFile形式存储在HDFS中

结合电信客服项目,完成写入HBase部分:

创建命名空间:

// 命名空间类似与关系型数据库中的数据库database,对表的逻辑分组
protected void createNameSpace(String namespace) throws IOException {
    Admin admin = getAdmin();
    try {
        admin.getNamespaceDescriptor(namespace);
    } catch (NamespaceNotFoundException e) {
        NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();
        admin.createNamespace(namespaceDescriptor);
    }
}

创建表:

protected void createTable(String name, String coprocessorClass, Integer regionCount, String... families) throws IOException {
    Admin admin = getAdmin();
    TableName tableName = TableName.valueOf(name);
    if (admin.tableExists(tableName)) {
        deleteTable(name);
    }

    // 添加列族
    HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
    if (families == null || families.length ==0) {
        families = new String[1];
        families[0] = Names.CF_INFO.getValue();
    }
    for (String family : families) {
        HColumnDescriptor columnDescriptor = new HColumnDescriptor(family);
        tableDescriptor.addFamily(columnDescriptor);
    }

    // 添加协处理器
    if (coprocessorClass != null && "".equals(coprocessorClass)) {
        tableDescriptor.addCoprocessor(coprocessorClass);
    }

    // 添加分区
    if (regionCount == null || regionCount <= 1) {
        admin.createTable(tableDescriptor);
    } else {
        // 增加预分区,减少split带来的资源消耗,提高hbase性能。
        // 如果不设置分区,数据只会写入一个region内,当region过大(10G),表会进行split分裂两个分区,这个过程会耗费大量资源
        byte[][] splitKeys = getSplitKeys(regionCount);
        admin.createTable(tableDescriptor, splitKeys);
    }
}

往HBase插入数据:

public void insertData(Calllog log) throws Exception {
    log.setRowkey(genRegionNum(log.getcall1(), log.getcalltime()) + "_" + log.getcall1() + "_" + log.getcalltime() + "_" + log.getcall2() + "_" + log.getduration());
    putData(log);
}

// 根据注解反射操作对象,动态获取对象的字段和字段值
// 相当于自定义的get/set方法,通过getAnnotation获取对象的字段名和对应的值
// 相当方便的动态添加表名,rowkey,列族和相应值
protected void putData(Object obj) throws Exception {
    Class clazz = obj.getClass();
    TableRef tableRef = (TableRef) clazz.getAnnotation(TableRef.class);
    String tablename = tableRef.value();

    Field[] fs = clazz.getDeclaredFields();
    String strRowkey = "";
    for (Field f : fs) {
        Rowkey rowkey = f.getAnnotation(Rowkey.class);
        if (rowkey != null) {
            f.setAccessible(true);
            strRowkey = (String) f.get(obj);
            break;
        }
    }

    Connection conn = getConnection();
    Table table = conn.getTable(TableName.valueOf(tablename));
    Put put = new Put(Bytes.toBytes(strRowkey));
    for (Field f : fs) {
        Column column = f.getAnnotation(Column.class);
        if (column != null) {
            String family = column.family();
            String colName = column.column();
            if (colName != null || "".equals(colName)) {
                colName = f.getName();
            }
            f.setAccessible(true);
            String value = (String) f.get(obj);
            put.addColumn(Bytes.toBytes(family), Bytes.toBytes(colName), Bytes.toBytes(value));
        }
    }

    table.put(put);
    table.close();
}

六.总结

本文章介绍了Flume,Kafka和HBase的基本原理和使用,想听详细课程可以去B站电信客服项目,该课程从数据生产,存储到分析都有详细介绍.
我最后将存储模块分离独立成一个项目,欢迎下载flume+kafka+hbase实战

上一篇:[从源码学设计] Flume 之 memory channel


下一篇:记一次 Centos7.x Hadoop3.x集群安装部署 Pig 0.17.0