Flume 采集配置
安装
使用CDH安装
存在的问题
-
Timed out before HDFS call was made. Your hdfs.callTimeout might be set too low or HDFS calls are taking too long.
增加hdfs的超时时间 tier1.sinks.ods_hdfs_sink.hdfs.callTimeout
-
ChannelFullException: Space for commit to queue couldn’t be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
source端的batchSize和sink端的batchSize要一致
tier1.sources.ods_kafka_source.batchSize=10000 tier1.sinks.ods_hdfs_sink.hdfs.batchSize=10000
-
buffer oom
source是内存模式下,要避免存在多个目录既同时存在多个待写入的文件,这样就会引起OOM。每个文件都会申请你所设置的文件滚动大小的内存空间。
-
hdfs小文件的问题
#当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
tier1.sinks.ods_hdfs_sink.hdfs.rollInterval=3600
#当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节 128M 10倍
tier1.sinks.ods_hdfs_sink.hdfs.rollSize=1310720000
#当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
tier1.sinks.ods_hdfs_sink.hdfs.rollCount=0
我的解决思路是没一个小时进行滚动一次,然后文件的大小在128M后滚动一次,这样会出现在0点的时候还存在未滚动的数据,需要在1点的时候开始作业。
编写interceptor
AbstractInterceptor
package com.tbl.flume.interceptor;
import com.tbl.flume.conf.CommonConf;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* 自定义flume拦截器
*/
public abstract class AbstractInterceptor implements Interceptor {
protected static final Logger logger = LoggerFactory.getLogger(AbstractInterceptor.class);
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
doIntercept(event);
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
}
protected abstract void doIntercept(Event event);
}
TimeJsonObjectEventInterceptor
package com.tbl.flume.interceptor;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.tbl.flume.conf.CommonConf;
import com.tbl.flume.sink.AbstractClickhouseSink;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.interceptor.InterceptorBuilderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import static com.tbl.flume.conf.CommonConf.*;
public class TimeJsonObjectEventInterceptor extends AbstractInterceptor {
private String timeField;
private String prefix;
public TimeJsonObjectEventInterceptor(Context context) {
timeField = context.getString(TIME_FIELD);
prefix = context.getString(TOPIC_PREFIX);
}
@Override
protected void doIntercept(Event event) {
JSONObject jsonObject = JSON.parseObject(new String(event.getBody(), StandardCharsets.UTF_8));
String dateTime = jsonObject.getString(timeField);
String[] ts = dateTime.split(" ");
event.getHeaders().put(CONSUME_DATE, ts[0]);
event.getHeaders().put(CONSUME_DATE_TIME, dateTime);
String topic = event.getHeaders().get(TOPIC);
String[] topics = topic.split(prefix);
event.getHeaders().put(TABLE, topics[topics.length - 1]);
}
public static class Builder implements Interceptor.Builder {
private Context context;
@Override
public Interceptor build() {
return new TimeJsonObjectEventInterceptor(context);
}
@Override
public void configure(Context context) {
this.context = context;
}
}
}
配置文件
#ources sinks channels
tier1.sources=ods_kafka_source
tier1.sinks=ods_hdfs_sink ods_clickhouse_sink
tier1.channels=ods_file_channel
#配置关系
tier1.sources.ods_kafka_source.channels=ods_file_channel
tier1.sinks.ods_hdfs_sink.channel=ods_file_channel
tier1.sinks.ods_clickhouse_sink.channel=ods_file_channel
#配置ods_kafka_source
tier1.sources.ods_kafka_source.type=org.apache.flume.source.kafka.KafkaSource
tier1.sources.ods_kafka_source.kafka.bootstrap.servers=cdh210:9092,cdh211:9092,cdh212:9092
tier1.sources.ods_kafka_source.kafka.consumer.group.id=flume
tier1.sources.ods_kafka_source.kafka.topics=topic_wa_dw_0001,topic_wa_wb_0001,topic_wa_source_fj_0001,topic_wa_source_fj_0002,topic_wa_source_fj_1002,topic_wa_source_fj_1001,topic_ga_viid_0001,topic_ga_viid_0002,topic_ga_viid_0003
tier1.sources.ods_kafka_source.batchSize=10000
tier1.sources.ods_kafka_source.batchDurationMillis=5000
tier1.sources.ods_kafka_source.setTopicHeader=true
tier1.sources.ods_kafka_source.topicHeader=topic
tier1.sources.ods_kafka_source.interceptors=timeInterceptor
tier1.sources.ods_kafka_source.interceptors.timeInterceptor.type=com.tbl.flume.interceptor.TimeJsonObjectEventInterceptor$Builder
tier1.sources.ods_kafka_source.interceptors.timeInterceptor.time_field=startTime
tier1.sources.ods_kafka_source.interceptors.timeInterceptor.topic_prefix=topic_
tier1.channels.ods_file_channel.type=memory
tier1.channels.ods_file_channel.capacity=100000
tier1.channels.ods_file_channel.transactionCapacity=10000
#ods_hdfs_sink
tier1.sinks.ods_hdfs_sink.type=hdfs
tier1.sinks.ods_hdfs_sink.hdfs.useLocalTimeStamp=true
tier1.sinks.ods_hdfs_sink.hdfs.path=/flume/capture/%{topic}/%{consume_date}/
tier1.sinks.ods_hdfs_sink.hdfs.filePrefix=ods_
tier1.sinks.ods_hdfs_sink.hdfs.round=false
#当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
tier1.sinks.ods_hdfs_sink.hdfs.rollInterval=3600
tier1.sinks.ods_hdfs_sink.hdfs.threadsPoolSize=10
#当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节 128M 10倍
tier1.sinks.ods_hdfs_sink.hdfs.rollSize=1310720000
tier1.sinks.ods_hdfs_sink.hdfs.hdfs.minBlockReplicas=1
#当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
tier1.sinks.ods_hdfs_sink.hdfs.rollCount=0
tier1.sinks.ods_hdfs_sink.hdfs.writeFormat=Text
tier1.sinks.ods_hdfs_sink.hdfs.codeC=snappy
tier1.sinks.ods_hdfs_sink.hdfs.fileType=CompressedStream
tier1.sinks.ods_hdfs_sink.hdfs.batchSize=10000
tier1.sinks.ods_hdfs_sink.hdfs.callTimeout=180000
tier1.sinks.ods_clickhouse_sink.type=com.tbl.flume.sink.JsonObjectClickhouseSink
tier1.sinks.ods_clickhouse_sink.servers=192.168.2.182:8123
tier1.sinks.ods_clickhouse_sink.user=root
tier1.sinks.ods_clickhouse_sink.password=tbl_db_543
tier1.sinks.ods_clickhouse_sink.database=yth
tier1.sinks.ods_clickhouse_sink.table_prefix=dwd_
tier1.sinks.ods_clickhouse_sink.batchSize=2000
tier1.sinks.ods_clickhouse_sink.max_waite_time=10000