Flum 采集配置

Flume 采集配置

安装

使用CDH安装

存在的问题

  1. Timed out before HDFS call was made. Your hdfs.callTimeout might be set too low or HDFS calls are taking too long.

    增加hdfs的超时时间 tier1.sinks.ods_hdfs_sink.hdfs.callTimeout

  2. ChannelFullException: Space for commit to queue couldn’t be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight

    source端的batchSize和sink端的batchSize要一致

    tier1.sources.ods_kafka_source.batchSize=10000
    tier1.sinks.ods_hdfs_sink.hdfs.batchSize=10000
    
  3. buffer oom

    source是内存模式下,要避免存在多个目录既同时存在多个待写入的文件,这样就会引起OOM。每个文件都会申请你所设置的文件滚动大小的内存空间。

  4. hdfs小文件的问题

#当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
tier1.sinks.ods_hdfs_sink.hdfs.rollInterval=3600
#当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节 128M 10倍
tier1.sinks.ods_hdfs_sink.hdfs.rollSize=1310720000
#当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
tier1.sinks.ods_hdfs_sink.hdfs.rollCount=0

我的解决思路是没一个小时进行滚动一次,然后文件的大小在128M后滚动一次,这样会出现在0点的时候还存在未滚动的数据,需要在1点的时候开始作业。

编写interceptor

AbstractInterceptor

package com.tbl.flume.interceptor;

import com.tbl.flume.conf.CommonConf;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
 * 自定义flume拦截器
 */
public abstract class AbstractInterceptor implements Interceptor {
    protected static final Logger logger = LoggerFactory.getLogger(AbstractInterceptor.class);
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        doIntercept(event);
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {

    }

    protected abstract void doIntercept(Event event);

}

TimeJsonObjectEventInterceptor

package com.tbl.flume.interceptor;

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.tbl.flume.conf.CommonConf;
import com.tbl.flume.sink.AbstractClickhouseSink;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.interceptor.InterceptorBuilderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;

import static com.tbl.flume.conf.CommonConf.*;

public class TimeJsonObjectEventInterceptor extends AbstractInterceptor {
    private String timeField;
    private String prefix;

    public TimeJsonObjectEventInterceptor(Context context) {
        timeField = context.getString(TIME_FIELD);
        prefix = context.getString(TOPIC_PREFIX);
    }

    @Override
    protected void doIntercept(Event event) {
        JSONObject jsonObject = JSON.parseObject(new String(event.getBody(), StandardCharsets.UTF_8));
        String dateTime = jsonObject.getString(timeField);
        String[] ts = dateTime.split(" ");
        event.getHeaders().put(CONSUME_DATE, ts[0]);
        event.getHeaders().put(CONSUME_DATE_TIME, dateTime);
        String topic = event.getHeaders().get(TOPIC);
        String[] topics = topic.split(prefix);
        event.getHeaders().put(TABLE, topics[topics.length - 1]);
    }

    public static class Builder implements Interceptor.Builder {
        private Context context;

        @Override
        public Interceptor build() {
            return new TimeJsonObjectEventInterceptor(context);
        }

        @Override
        public void configure(Context context) {
            this.context = context;
        }
    }
}

配置文件

#ources sinks channels
tier1.sources=ods_kafka_source
tier1.sinks=ods_hdfs_sink ods_clickhouse_sink
tier1.channels=ods_file_channel

#配置关系
tier1.sources.ods_kafka_source.channels=ods_file_channel
tier1.sinks.ods_hdfs_sink.channel=ods_file_channel
tier1.sinks.ods_clickhouse_sink.channel=ods_file_channel

#配置ods_kafka_source
tier1.sources.ods_kafka_source.type=org.apache.flume.source.kafka.KafkaSource
tier1.sources.ods_kafka_source.kafka.bootstrap.servers=cdh210:9092,cdh211:9092,cdh212:9092
tier1.sources.ods_kafka_source.kafka.consumer.group.id=flume
tier1.sources.ods_kafka_source.kafka.topics=topic_wa_dw_0001,topic_wa_wb_0001,topic_wa_source_fj_0001,topic_wa_source_fj_0002,topic_wa_source_fj_1002,topic_wa_source_fj_1001,topic_ga_viid_0001,topic_ga_viid_0002,topic_ga_viid_0003
tier1.sources.ods_kafka_source.batchSize=10000
tier1.sources.ods_kafka_source.batchDurationMillis=5000
tier1.sources.ods_kafka_source.setTopicHeader=true
tier1.sources.ods_kafka_source.topicHeader=topic
tier1.sources.ods_kafka_source.interceptors=timeInterceptor
tier1.sources.ods_kafka_source.interceptors.timeInterceptor.type=com.tbl.flume.interceptor.TimeJsonObjectEventInterceptor$Builder
tier1.sources.ods_kafka_source.interceptors.timeInterceptor.time_field=startTime
tier1.sources.ods_kafka_source.interceptors.timeInterceptor.topic_prefix=topic_


tier1.channels.ods_file_channel.type=memory
tier1.channels.ods_file_channel.capacity=100000
tier1.channels.ods_file_channel.transactionCapacity=10000

#ods_hdfs_sink
tier1.sinks.ods_hdfs_sink.type=hdfs
tier1.sinks.ods_hdfs_sink.hdfs.useLocalTimeStamp=true
tier1.sinks.ods_hdfs_sink.hdfs.path=/flume/capture/%{topic}/%{consume_date}/
tier1.sinks.ods_hdfs_sink.hdfs.filePrefix=ods_
tier1.sinks.ods_hdfs_sink.hdfs.round=false
#当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
tier1.sinks.ods_hdfs_sink.hdfs.rollInterval=3600
tier1.sinks.ods_hdfs_sink.hdfs.threadsPoolSize=10
#当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节 128M 10倍
tier1.sinks.ods_hdfs_sink.hdfs.rollSize=1310720000
tier1.sinks.ods_hdfs_sink.hdfs.hdfs.minBlockReplicas=1
#当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
tier1.sinks.ods_hdfs_sink.hdfs.rollCount=0
tier1.sinks.ods_hdfs_sink.hdfs.writeFormat=Text
tier1.sinks.ods_hdfs_sink.hdfs.codeC=snappy
tier1.sinks.ods_hdfs_sink.hdfs.fileType=CompressedStream
tier1.sinks.ods_hdfs_sink.hdfs.batchSize=10000
tier1.sinks.ods_hdfs_sink.hdfs.callTimeout=180000

tier1.sinks.ods_clickhouse_sink.type=com.tbl.flume.sink.JsonObjectClickhouseSink
tier1.sinks.ods_clickhouse_sink.servers=192.168.2.182:8123
tier1.sinks.ods_clickhouse_sink.user=root
tier1.sinks.ods_clickhouse_sink.password=tbl_db_543
tier1.sinks.ods_clickhouse_sink.database=yth
tier1.sinks.ods_clickhouse_sink.table_prefix=dwd_
tier1.sinks.ods_clickhouse_sink.batchSize=2000
tier1.sinks.ods_clickhouse_sink.max_waite_time=10000
上一篇:power role Swap/PR_Swap


下一篇:寒假学习记录D16