流程
- kafka配置
- 创建消费者
- 关注主题ct
- 获取数据
- 将数据写入HBase
consumer.properties是kafka集群的配置信息,calllog是数据封装对象。
package com.csw.ct.consumer.bean;
import com.csw.ct.common.bean.Consumer;
import com.csw.ct.common.constant.Names;
import com.csw.ct.consumer.dao.HBaseDao;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
/**
* 通话日志消费者对象
*/
public class CalllogConsumer implements Consumer {
/**
* 消费数据
*/
public void consume() {
try {
//创建配置对象
Properties prop = new Properties ();
prop.load (Thread.currentThread ().getContextClassLoader ().getResourceAsStream ( "consumer.properties" ));
//获取flume采集的数据
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String> (prop);
//关注主题
consumer.subscribe( Arrays.asList( Names.TOPIC.getValue ()));
//HBase数据访问对象
HBaseDao dao = new HBaseDao ();
//初始化
dao.init();
//消费数据
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll ( 100 );
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println (consumerRecord.value());
//插入数据
//dao.insertDate(consumerRecord.value());
Calllog log = new Calllog(consumerRecord.value());
dao.insertDate (log);
}
}
} catch (Exception e) {
e.printStackTrace ();
}
}
public void close() throws IOException {
}
}
写入HBase具体代码
https://www.cnblogs.com/chenshaowei/p/12736522.html