项目位置:https://github.com/wty19920422/mydesign
按照需求可以自定制代码,下面展示部分简单展示流程
应用flume接受tcp数据的过程中,为了产品需要有时需要动态修改数据格式。例如增加公司信息、数据监管人员信息以及其他自定制格式等,数据接受实时性很高,格式变化需要及时响应。为了满足数据变化的实时性可以通过多台flume服务器同时监控zookeeper中对应znode变化情况,当zookeeper中znode发生变化,flume接受数据格式及时发生变化。
1、架构简介
-- 多台服务器上配置flume用于接受数据
-- flume source应用原生syslog接受tcp数据
-- flume sink通过自定制输出类,目的为了兼容多目的地写入以及可以及时响应zookeeper配置变化
-- zookeeper通过监听/storedata znode的变化来实时改编数据输出格式
-- 存储目标为多目标,根据配置可以存储到hdfs、file、kafka、mongo、redis、mysql等,能够达到目标写入
2、实现简介
1)flume source
配置部分
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 6666
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.channels = c1
2)flume sink
自定义sink,当然代码看不看都无所谓
a1.sinks.k1.type = cn.zkjt.data.transmit.flume_ng_zk_datatransmit.ZKDataTransmitSink
a1.sinks.k1.sendtype = logger or file or hdfs or kafka or mongo or redis
a1.sinks.k1.zookeeperlist = 0.0.0.0
a1.sinks.k1.zookeeperpath = /zkdata
a1.sinks.k1.channel = c1
# file part
a1.sinks.k1.filewritepath = /data/filewrite
# hdfs part
a1.sinks.k1.hdfslist = hdfs://192.168.20.18:9000
a1.sinks.k1.hdfsuser = hadoop
a1.sinks.k1.hdfswritepath = /writedata
# kafka part
a1.sinks.k1.kafkabrokerlist = 192.168.10.7:9092
a1.sinks.k1.kafkatopic = zktest
a1.sinks.k1.partionnum = 2
# mongo part
a1.sinks.k1.mongohost = 192.168.20.35
a1.sinks.k1.mongoport = 27017
a1.sinks.k1.mongouser = flume
a1.sinks.k1.mongopassword = CskAMpk=
a1.sinks.k1.mongodb = zzzsj
a1.sinks.k1.mongocollection = test
# redis part
a1.sinks.k1.redishost = 192.168.20.20
a1.sinks.k1.redisport = 6379
a1.sinks.k1.redispassword = 123456
a1.sinks.k1.redisdb = 7
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Throwables;
import cn.zkjt.data.transmit.flume_ng_zk_datatransmit.process.ConstantData;
import cn.zkjt.data.transmit.flume_ng_zk_datatransmit.process.ZKThread;
import cn.zkjt.data.transmit.flume_ng_zk_datatransmit.writer.DataWriter;
import cn.zkjt.data.transmit.flume_ng_zk_datatransmit.writer.DataWriterFactory;
public class ZKDataTransmitSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(ZKDataTransmitSink.class);
private int batchSize;
private SinkCounter sinkCounter;
private String type;
private DataWriter dataWriter;
private Context context;
private Thread zkThread;
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
try {
transaction.begin();
long count = 0;
for (count = 0; count < batchSize; ++count) {
Event event = channel.take();
if (event == null) {
break;
}
String commonData = ConstantData.commonData;
logger.info("common data : {}", commonData);
dataWriter.write(event, commonData);
}
if (count <= 0) {
sinkCounter.incrementBatchEmptyCount();
status = Status.BACKOFF;
} else {
if (count < batchSize) {
sinkCounter.incrementBatchUnderflowCount();
status = Status.BACKOFF;
} else {
sinkCounter.incrementBatchCompleteCount();
}
sinkCounter.addToEventDrainAttemptCount(count);
}
transaction.commit();
sinkCounter.addToEventDrainSuccessCount(count);
} catch (Throwable t) {
try {
transaction.rollback();
} catch (Exception e) {
logger.error("Exception during transaction rollback.", e);
}
logger.error("Failed to commit transaction. Transaction rolled back.", t);
if (t instanceof Error || t instanceof RuntimeException) {
Throwables.propagate(t);
} else {
throw new EventDeliveryException("Failed to commit transaction. Transaction rolled back.", t);
}
} finally {
if (transaction != null) {
transaction.close();
}
}
return status;
}
@Override
public synchronized void start() {
logger.info("Starting sink");
dataWriter = new DataWriterFactory().createWriter(type);
dataWriter.setContext(context);
dataWriter.start();
zkThread = new Thread(new ZKThread(context));
zkThread.start();
sinkCounter.start();
try {
sinkCounter.incrementConnectionCreatedCount();
} catch (Exception e) {
logger.error("Exception while connecting to Redis", e);
sinkCounter.incrementConnectionFailedCount();
}
super.start();
logger.info("Sink started");
}
@Override
public synchronized void stop() {
logger.info("Stopping sink");
dataWriter.stop();
sinkCounter.incrementConnectionClosedCount();
sinkCounter.stop();
super.stop();
logger.info("Sink stopped");
}
public void configure(Context context) {
this.context = context;
type = context.getString(ZKDataTransmitSinkConstants.SENDTYPE, ZKDataTransmitSinkConstants.DEFAULT_SENDTYPE);
batchSize = context.getInteger(ZKDataTransmitSinkConstants.BATCH_SIZE, ZKDataTransmitSinkConstants.DEFAULT_BATCH_SIZE);
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
}
}
生成对应的writer
import cn.zkjt.data.transmit.flume_ng_zk_datatransmit.process.TypeMap;
public class DataWriterFactory implements DataWriterFactoryInterface{
private String classPath;
public DataWriterFactory() {
// String simpleName = getClass().getSimpleName();
// String totalName = getClass().getName();
String packageName = getClass().getPackage().getName();
classPath = packageName.replace("writer", "datawriter");
// classPath = totalName.replace(simpleName, "");
}
public DataWriter createWriter(String typeName) {
DataWriter dataWriter = null;
TypeMap typeMap = new TypeMap();
String className = typeMap.getClassName(typeName);
String factoryClassName = classPath + "." + typeName + "." + className;
try {
Class<?> clazz = Class.forName(factoryClassName);
dataWriter = (DataWriter) clazz.newInstance();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return dataWriter;
}
}
kafka writer
import java.util.Properties;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import cn.zkjt.data.transmit.flume_ng_zk_datatransmit.configure.kafka.KafkaWriteConstants;
import cn.zkjt.data.transmit.flume_ng_zk_datatransmit.writer.DataWriter;
public class KafkaDataWriter implements DataWriter {
private String brokerList;
private String topic;
private int partitionNum;
private KafkaProducer<String, byte[]> producer;
private ProducerRecord<String, byte[]> record;
public void write(Event event, String flag) {
int code = Math.abs(new String(event.getBody()).hashCode());
int partitionID = code % partitionNum;
record = new ProducerRecord<String, byte[]>(topic, partitionID, "data",
event.getBody());
producer.send(record);
}
public void setContext(Context context) {
brokerList = context.getString(KafkaWriteConstants.KAFKA_BROKERLIST);
topic = context.getString(KafkaWriteConstants.KAFKA_TOPIC);
partitionNum = context.getInteger(KafkaWriteConstants.KAFKA_PARTITION_NUM, KafkaWriteConstants.DEFAULT_KAFKA_PARTITION_NUM);
}
public void start() {
Properties properties=new Properties();
// properties.put("bootstrap.servers",brokerList);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getClass());
properties.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
// properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getClass());
//properties.put("client.id","producer.client.id.demo");//指定客户端ID
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo");//指定客户端ID
producer = new KafkaProducer<String, byte[]>(properties);
}
public void stop() {
// TODO Auto-generated method stub
producer.close();
}
}
3、zookeeper部分
import java.io.IOException;
import org.apache.flume.Context;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.zkjt.data.transmit.flume_ng_zk_datatransmit.ZKDataTransmitSinkConstants;
import org.apache.zookeeper.ZooDefs.Ids;
public class ZKThread implements Runnable{
private String connectString;
private int sessionTimeout = 2000;
private String storePath;
private ZooKeeper zk;
private Logger logger = LoggerFactory.getLogger(getClass());
public ZKThread(Context context) {
connectString = context.getString(ZKDataTransmitSinkConstants.ZOOKEEPER_LIST);
storePath = context.getString(ZKDataTransmitSinkConstants.ZOOKEEPER_PATH, ZKDataTransmitSinkConstants.DEFAULT_ZOOKEEPER_PATH);
}
@Override
public void run() {
// TODO Auto-generated method stub
try {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
try {
logger.info("path:{}, list:{}", storePath, connectString);
Stat exists = zk.exists(storePath, false);
if(exists == null) {
zk.create(storePath, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
byte[] data = zk.getData(storePath, true, null);
ConstantData.commonData = new String(data);
}catch (Exception e) {
e.printStackTrace();
}
}
});
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
4、存储部分
5、数据展示
启动flume
bin/flume-ng agent -c conf/ -f conf/zk.conf -n a1 -Dflume.root.logger=INFO,console
启动一个netcat向flume中发送数据
nc 0.0.0.0 6666
此时我们修改zookeeper中配置
再次查看一次