需要先启动kafka和HBase:
后台启动Kafka:kafka-server-start.sh -daemon /opt/server.properties
启动HBase:start-hbase
hbase shell
一.创建三个接口
IParseRecord 接口:
public interface IParseRecord {
public List<Put> parse (ConsumerRecords<String, String> records);
}
IParseRecord 接口:
public interface IWorker {
public void fillData();
}
IWriter 接口:
public interface IWriter {
public int write(ConsumerRecords<String, String> records, String tableName) throws IOException;
}
二.创建Worker和Writer类
(1)ParentWorker
package nj.zb.kb09.kafkaToHBaseGJ;
/**
* @Author Jalrs
* @Date 2021/1/11
* @Description
*/
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.IOException;
import java.util.Properties;
public abstract class ParentWorker implements IWorker{
protected Properties prop;
public ParentWorker( String groupName) throws IOException {
prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.134.104:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
}
(2)HBaseWorker
package nj.zb.kb09.kafkaToHBaseGJ;
/**
* @Author Jalrs
* @Date 2021/1/11
* @Description
*/
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class HBaseWorker extends ParentWorker {
private IWriter writer;
private String topic;
private String target;
public HBaseWorker(IWriter writer, String topic, String targetTable) throws IOException {
this(writer, "myGroupDefault", topic, targetTable);
}
public HBaseWorker(IWriter writer, String groupName, String topic, String targetTable) throws IOException {
super(groupName);
this.topic = topic;
this.writer = writer;
this.target = targetTable;
}
@Override
public void fillData() {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton(this.topic));
try {
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
int rowNum = writer.write(poll, this.target);
System.out.println("行数:" + rowNum);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
(3)HBaseWrite
package nj.zb.kb09.kafkaToHBaseGJ;
/**
* @Author Jalrs
* @Date 2021/1/8
* @Description
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class HBaseWrite implements IWriter {
private Connection connection;
private IParseRecord parsedRecord;
public IParseRecord getParsedRecord() {
return parsedRecord;
}
public void setParsedRecord(IParseRecord parsedRecord) {
this.parsedRecord = parsedRecord;
}
public HBaseWrite(IParseRecord parsedRecord) throws IOException {
this.parsedRecord = parsedRecord;
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://hadoop004:9000/hbase");
conf.set("hbase.zookeeper.quorum", "hadoop004");
conf.set("hbase.zookeeper.property.clientPort", "2181");
connection = ConnectionFactory.createConnection(conf);
}
@Override
public int write(ConsumerRecords<String, String> records, String tableName) throws IOException {
Table userFriendTable =
connection.getTable(TableName.valueOf(tableName));
List<Put> datas = parsedRecord.parse(records);
userFriendTable.put(datas);
return datas.size();
}
}
三.根据需求创建对应的Handler
(1)EventAttendHandler :
package nj.zb.kb09.kafkaToHBaseGJ;
/**
* @Author Jalrs
* @Date 2021/1/11
* @Description
*/
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.List;
public class EventAttendHandler implements IParseRecord{
@Override
public List<Put> parse(ConsumerRecords<String, String> records) {
List<Put> datas = new ArrayList<>();
for (ConsumerRecord<String, String> p : records) {
System.out.println(p.value());
String[] split = p.value().split(",");
Put put = new Put(Bytes.toBytes((split[0] + split[1] + split[2]).hashCode()));
put.addColumn("euat".getBytes(), "eventid".getBytes(), split[0].getBytes());
put.addColumn("euat".getBytes(), "userid".getBytes(), split[1].getBytes());
put.addColumn("euat".getBytes(), "state".getBytes(), split[2].getBytes());
datas.add(put);
}
return datas;
}
}
(2)EventsHandler :
package nj.zb.kb09.kafkaToHBaseGJ;
/**
* @Author Jalrs
* @Date 2021/1/12
* @Description
*/
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.List;
public class EventsHandler implements IParseRecord {
@Override
public List<Put> parse(ConsumerRecords<String, String> records) {
List<Put> datas = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
String[] split = record.value().split(",");
Put put = new Put(Bytes.toBytes(split[0].hashCode()));
put.addColumn("creator".getBytes(), "user_id".getBytes(), split[1].getBytes());
put.addColumn("schedule".getBytes(), "start_time".getBytes(), split[2].getBytes());
put.addColumn("location".getBytes(), "city".getBytes(), split[3].getBytes());
put.addColumn("location".getBytes(), "state".getBytes(), split[4].getBytes());
put.addColumn("location".getBytes(), "zip".getBytes(), split[5].getBytes());
put.addColumn("location".getBytes(), "country".getBytes(), split[6].getBytes());
put.addColumn("location".getBytes(), "lat".getBytes(), split[7].getBytes());
put.addColumn("location".getBytes(), "lng".getBytes(), split[8].getBytes());
put.addColumn("remark".getBytes(), "common_words".getBytes(), split[9].getBytes());
datas.add(put);
}
return datas;
}
}
(3)TrainHandler
package nj.zb.kb09.kafkaToHBaseGJ;/**
* @Author Jalrs
* @Date 2021/1/12
* @Description
*/
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.List;
public class TrainHandler implements IParseRecord {
@Override
public List<Put> parse(ConsumerRecords<String, String> records) {
List<Put> datas = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
String[] split = record.value().split(",");
Put put = new Put(Bytes.toBytes((split[0] + split[1]).hashCode()));
put.addColumn("eu".getBytes(), "user".getBytes(), split[0].getBytes());
put.addColumn("eu".getBytes(), "event".getBytes(), split[1].getBytes());
put.addColumn("eu".getBytes(), "invited".getBytes(), split[2].getBytes());
put.addColumn("eu".getBytes(), "timestamp".getBytes(), split[3].getBytes());
put.addColumn("eu".getBytes(), "interested".getBytes(), split[4].getBytes());
put.addColumn("eu".getBytes(), "not_interested".getBytes(), split[5].getBytes());
datas.add(put);
}
return datas;
}
}
(4)UserFriendHandler
package nj.zb.kb09.kafkaToHBaseGJ;
/**
* @Author Jalrs
* @Date 2021/1/11
* @Description
*/
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.List;
public class UserFriendHandler implements IParseRecord {
@Override
public List<Put> parse(ConsumerRecords<String, String> records) {
List<Put> datas = new ArrayList<>();
for (ConsumerRecord<String, String> p : records) {
System.out.println(p.value());
String[] split = p.value().split(",");
Put put = new Put(Bytes.toBytes((split[0] + split[1]).hashCode()));
put.addColumn("uf".getBytes(), "userid".getBytes(), split[0].getBytes());
put.addColumn("uf".getBytes(), "friendid".getBytes(), split[1].getBytes());
datas.add(put);
}
return datas;
}
}
(5)UsersHandler
package nj.zb.kb09.kafkaToHBaseGJ;/**
* @Author Jalrs
* @Date 2021/1/12
* @Description
*/
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.List;
public class UsersHandler implements IParseRecord {
@Override
public List<Put> parse(ConsumerRecords<String, String> records) {
List<Put> datas = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
String[] split = record.value().split(",");
if (split[0].trim().length() == 0) {
continue;
}
System.out.println(record);
Put put = new Put(Bytes.toBytes(split[0].hashCode()));
put.addColumn("profile".getBytes(), "locale".getBytes(), split[1].getBytes());
put.addColumn("profile".getBytes(), "birthyear".getBytes(), split[2].getBytes());
put.addColumn("region".getBytes(), "gender".getBytes(), split[3].getBytes());
if (split.length > 4) {
put.addColumn("registration".getBytes(), "joinedAt".getBytes(), split[4].getBytes());
if (split.length > 5) {
put.addColumn("region".getBytes(), "location".getBytes(), split[5].getBytes());
if (split.length > 6) {
put.addColumn("region".getBytes(), "timezone".getBytes(), split[6].getBytes());
}
}
}
datas.add(put);
}
return datas;
}
}
四.创建Driver类
package nj.zb.kb09.kafkaToHBaseGJ;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
/**
* @Author Jalrs
* @Date 2021/1/7
* @Description 将Kafka user_friends中的数据消费到HBase events_db:user_friend中
*/
public class Driver {
public static void main(String[] args) throws IOException {
IParseRecord record = new UserFriendHandler();
IWriter writer = new HBaseWrite(record);
// IWorker worker = new HBaseWorker(writer,
// "userFriend2",
// "user_friends",
// "events_db:user_friend");
// worker.fillData();
// new HBaseWorker(new HBaseWrite(new EventAttendHandler()),
// "event_attendees",
// "events_db:event_attendee")
// .fillData();
// new HBaseWorker(new HBaseWrite(new UsersHandler()),
// "users1",
// "users",
// "events_db:users")
// .fillData();
new HBaseWorker(new HBaseWrite(new EventsHandler()),
"events1",
"events",
"events_db:events")
.fillData();
// new HBaseWorker(new HBaseWrite(new TrainHandler()),
// "train1",
// "train",
// "events_db:train")
// .fillData();
}
}