Flume2Kafka2HBase功能块
最近学习尚硅谷电信客服项目-大数据项目,将以前学习的Hadoop,HBase,Flume,Kafka使用起来,一方面是学习各技术之间的项目使用,二方面是通过写博客,加强学习印象,查漏补缺。
电信客服项目有数据模拟生成,数据消费,数据分析,数据展示这四个模块,我将数据消费模块剥离出来,按照项目重新敲了3遍,将关键的代码全部整合在一起,独立成一个项目。
前言
数据消费模块是将通过Flume采集文件数据,Kafka消费,写入HBase的过程,跑通该模块需要搭建启动好Hadoop,Zookeeper,Flume,Kafka,HBase环境。Flume负责采集数据;Kafka负责拉去Flume数据,起到异步,削峰,解耦的作用;Zookeeper为HBase提供稳定服务和failover机制,HDFS为HBase提供底层存储支持,MapReduce为HBase提供高性能的计算能力。所以上面几个技术不能少,通过这个项目例子,大概能了解这些技术点之间的实际使用,方便后面的深入研究,从用到懂的升华。
一. 为什么要集成Flume和Kafka
一般使用Flume+Kafka架构的都是希望完成实时流式的日志处理,后面再连接上Flink/Storm/Spark Streaming等流式实时处理技术,从而完成日志实时解析的目标。第一、如果Flume直接对接实时计算框架,当数据采集速度大于数据处理速度,很容易发生数据堆积或者数据丢失,而kafka可以当做一个消息缓存队列,从广义上理解,把它当做一个数据库,可以存放一段时间的数据。第二、Kafka属于中间件,一个明显的优势就是使各层解耦,使得出错时不会干扰其他组件。
二. Kafka介绍和实践
Kafka是一个分布式发布订阅消息系统,能够将消息从一个端点传递到另一端点。Kafka消息保存在磁盘上并在集群内复制以防止数据丢失。主要掌握三个原理部分:producer,topic,consumer:
Producer:生产者。向Kafka的一个topic发布消息的生产过程。本例中Flume作为Kafka的生产者
Topic:主题。对消息的逻辑分类,起到资源隔离的作用
Consumer:消费者。订阅topic并处理其发布的消息的消费过程
在发布订阅系统中,不同的消息保存在不同的主题中,消费者可以订阅一个或多个主题,并使用该主题的所有信息。比如广播电台(Producer),它发布了体育,电影,音乐等不同的频道(Topic),任何用户(Consumer)可以订阅自己喜欢的频道,并从中获取内容
结合电信客服项目,启动Kafka部分:
为了保证Flume采集的数据能够写入Kafka指定的topic内,先启动Kafka。
先进入各个集群节点的Kafka安装路径,启动Kafka,并创建topic:
./bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --zookeeper master:2181 --topic ct --create --replication-factor 1 --partitions 3
三. Flume介绍和实践
Flume是分布式高可用的海量日志聚合系统,支持在系统中监控各类数据用于收集数据;同时Flume提供数据写入各种数据接收方用于转发数据;Flume的易用性在于通过读取配置文件,自动收集日志文件,在大数据处理各种复杂情况下,经常被用来作为数据处理的工具。
Flume内部有一个或多个Agent,用于数据的接收和输出,传递的数据格式是Event事件对象。Agent主要由source,channel,sink三个组件组成。
Source:数据源,收集的方式多种多样,有检测文件夹变化的Spool Source,检测端口信息的Netcat Source,监控各类文件新增内容的Exec Source等等。本例中使用的Exec Source
Channel:数据通道,是一种短暂的存储容器,将source接收的数据缓存起来,直到Sink消费掉。实现了数据的事务性传输,保证数据的安全传输。主要有两种缓存方式:Memory Channel:内存存储事务,吞吐率极高,但存在丢数据风险。 File Channel:本地磁盘的事务存储,容量大且数据死掉可恢复,但速度慢。本例中使用Memory Channel
Sink:数据接收。将事件从Channel中移除,并将事件放置外部数据介质上。例如将数据放置到HDFS,HBase,或者放置到下一个Flume的Source,等待下一个Flume处理。本例使用HBase
结合电信客服项目,启动Flume部分:
在/root/projects/dianxing/下新建call.log文件,进入Flume安装目录下,启动Flume:
bin/flume-ng agent -c conf/ -n a1 -f /root/projects/dianxing/flume2kafka.conf
采集的数据格式,call.log文件内容:
15305526350 17885275338 20180408121330 1507
19313925217 14930423697 20180419104617 1057
19154926260 14397114174 20181018172955 1681
14930423697 13319935953 20180211121928 1064
17336673697 16574556259 20180625162151 1878
18101213362 16569963779 20180730224651 0440
Flume配置文件:
# 指定Agent的组件名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 指定Flume 要监听文件的目录
a1.sources.r1.type = exec
#循环读取指定文件的内容
a1.sources.r1.command = tail -F -c +0 /root/projects/dianxing/call.log
a1.sources.r1.shell = /bin/bash -c
# 指定Flume channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# KafkaSink首字母大写
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.k1.topic = ct
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 绑定source和sink到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
四.Flume+Kafka测试
Kafka和Flume都正常运作后,使用Kafka消费者测试是否采集到数据:
./bin/kafka-console-consumer.sh --zookeeper master:2181 -topic ct --from-beginning
接下来使用java端操作Flume和Kafka实现读取数据:
@Override
public void consume() {
try {
// a: kafka消费flume数据
// 前提要求,flume启动,各节点的zookeeper和kafka启动
// kafka消费者测试是否采到数据:bin/kafka-console-consumer.sh --zookeeper master:2181 --topic ct
// a1. 获取kafka配置对象,加载配置信息
Properties prop = new Properties();
prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));
// a2. 获取kafka消费者对象,设置订阅的topic
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Arrays.asList(Names.TOPIC.getValue()));
while (true) {
// a3. 每100ms拉去数据保存在ConsumerRecords内。这步就是kafka获取了从flume采集的数据
ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// 采集的数据格式:19342117869 18101213362 20181119225554 2830
System.out.println(consumerRecord.value());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
consumer.properties的配置如下:
bootstrap.servers=master:9092,slave1:9092,slave2:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=com.hbase
enable.auto.commit=true
auto.commit.interval.ms=1000
控制台能够查看到数据在刷新,说明flume采集的数据,Kafka已经在消费了。接下来就是将采集到的数据保存到HBase内
五. HBase介绍和写入HBase实践
HBase是一个面向列的分布式数据库,利用HDFS作为其文件存储系统,利用MapReduce来处理HBase中的海量数据,利用Zookeeper作为其分布式协同服务。主要存储非结构化和半结构化的松散数据。
简单介绍下HBase的结构体系:
HMaster:HBase的管理者,负责把HRegion分配给HRegionServer,
HRegionServer:维护HMaster分配给它的HRegion,处理HRegion的IO请求,切分过大的HRegion
HRegion:一个数据表会被划分为1…n个Region,通过startkey和endKey区分一个Region维护的范围,读写数据时,如果rowkey落在某个start-endkey范围内,就会定位到目标region下进行读取。在看看HRegion的结构:
每个HRegion由多个Store构成,每个Store保存一个列族(Columns Family),表有几个列族,就有几个Store,每个Store由一个MemStore和多个StoreFile组成,MemStore是Store在内存的内容,写到文件后就是StoreFile。StoreFile底层是以HFile格式保存
HLog: 每个HRegion内都有一个HLog,用来做灾难恢复,它记录这数据的变更,在RegionServer宕机,就可以从log中回滚还没有持久化的数据
HFile:HBase的数据最终以HFile形式存储在HDFS中
结合电信客服项目,完成写入HBase部分:
创建命名空间:
// 命名空间类似与关系型数据库中的数据库database,对表的逻辑分组
protected void createNameSpace(String namespace) throws IOException {
Admin admin = getAdmin();
try {
admin.getNamespaceDescriptor(namespace);
} catch (NamespaceNotFoundException e) {
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();
admin.createNamespace(namespaceDescriptor);
}
}
创建表:
protected void createTable(String name, String coprocessorClass, Integer regionCount, String... families) throws IOException {
Admin admin = getAdmin();
TableName tableName = TableName.valueOf(name);
if (admin.tableExists(tableName)) {
deleteTable(name);
}
// 添加列族
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
if (families == null || families.length ==0) {
families = new String[1];
families[0] = Names.CF_INFO.getValue();
}
for (String family : families) {
HColumnDescriptor columnDescriptor = new HColumnDescriptor(family);
tableDescriptor.addFamily(columnDescriptor);
}
// 添加协处理器
if (coprocessorClass != null && "".equals(coprocessorClass)) {
tableDescriptor.addCoprocessor(coprocessorClass);
}
// 添加分区
if (regionCount == null || regionCount <= 1) {
admin.createTable(tableDescriptor);
} else {
// 增加预分区,减少split带来的资源消耗,提高hbase性能。
// 如果不设置分区,数据只会写入一个region内,当region过大(10G),表会进行split分裂两个分区,这个过程会耗费大量资源
byte[][] splitKeys = getSplitKeys(regionCount);
admin.createTable(tableDescriptor, splitKeys);
}
}
往HBase插入数据:
public void insertData(Calllog log) throws Exception {
log.setRowkey(genRegionNum(log.getcall1(), log.getcalltime()) + "_" + log.getcall1() + "_" + log.getcalltime() + "_" + log.getcall2() + "_" + log.getduration());
putData(log);
}
// 根据注解反射操作对象,动态获取对象的字段和字段值
// 相当于自定义的get/set方法,通过getAnnotation获取对象的字段名和对应的值
// 相当方便的动态添加表名,rowkey,列族和相应值
protected void putData(Object obj) throws Exception {
Class clazz = obj.getClass();
TableRef tableRef = (TableRef) clazz.getAnnotation(TableRef.class);
String tablename = tableRef.value();
Field[] fs = clazz.getDeclaredFields();
String strRowkey = "";
for (Field f : fs) {
Rowkey rowkey = f.getAnnotation(Rowkey.class);
if (rowkey != null) {
f.setAccessible(true);
strRowkey = (String) f.get(obj);
break;
}
}
Connection conn = getConnection();
Table table = conn.getTable(TableName.valueOf(tablename));
Put put = new Put(Bytes.toBytes(strRowkey));
for (Field f : fs) {
Column column = f.getAnnotation(Column.class);
if (column != null) {
String family = column.family();
String colName = column.column();
if (colName != null || "".equals(colName)) {
colName = f.getName();
}
f.setAccessible(true);
String value = (String) f.get(obj);
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(colName), Bytes.toBytes(value));
}
}
table.put(put);
table.close();
}
六.总结
本文章介绍了Flume,Kafka和HBase的基本原理和使用,想听详细课程可以去B站电信客服项目,该课程从数据生产,存储到分析都有详细介绍.
我最后将存储模块分离独立成一个项目,欢迎下载flume+kafka+hbase实战