Flink落HDFS数据按事件时间分区解决方案

0x1 摘要

Hive离线数仓中为了查询分析方便,几乎所有表都会划分分区,最为常见的是按天分区,Flink通过以下配置把数据写入HDFS,

BucketingSink<Object> sink = new BucketingSink<>(path);
//通过这样的方式来实现数据跨天分区
sink.setBucketer(new DateTimeBucketer<>("yyyy/MM/dd"));
sink.setWriter(new StringWriter<>());
sink.setBatchSize(1024 * 1024 * 256L);
sink.setBatchRolloverInterval(30 * 60 * 1000L);
sink.setInactiveBucketThreshold(3 * 60 * 1000L);
sink.setInactiveBucketCheckInterval(30 * 1000L);
sink.setInProgressSuffix(".in-progress");
sink.setPendingSuffix(".pending");

0x2 问题点

如果要做到数据完全正确的落到相应分区,那必须用eventTime来划分,我们先来看看DateTimeBucketer桶实现代码,

public class DateTimeBucketer<T> implements Bucketer<T> {
 private static final long serialVersionUID = 1L;
 private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
 private final String formatString;
 private final ZoneId zoneId;
 private transient DateTimeFormatter dateTimeFormatter;

 /**
  * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"} using JVM's default timezone.
  */
 public DateTimeBucketer() {
  this(DEFAULT_FORMAT_STRING);
 }

 /**
  * Creates a new {@code DateTimeBucketer} with the given date/time format string using JVM's default timezone.
  *
  * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
  * the bucket path.
  */
 public DateTimeBucketer(String formatString) {
  this(formatString, ZoneId.systemDefault());
 }

 /**
  * Creates a new {@code DateTimeBucketer} with the given date/time format string using the given timezone.
  *
  * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
  * the bucket path.
  * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket path.
  */
 public DateTimeBucketer(String formatString, ZoneId zoneId) {
  this.formatString = Preconditions.checkNotNull(formatString);
  this.zoneId = Preconditions.checkNotNull(zoneId);

  this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(zoneId);
 }

 @Override
 public Path getBucketPath(Clock clock, Path basePath, T element) {
  //分桶关键代码在这里,通过clock获取当前时间戳后格式
  String newDateTimeString = dateTimeFormatter.format(Instant.ofEpochMilli(clock.currentTimeMillis()));
  return new Path(basePath + "/" + newDateTimeString);
 }
}

以上代码clock实例是在BucketingSink#open方法中实例化,代码如下:

this.clock = new Clock() {
 @Override
 public long currentTimeMillis() {
  //直接返回当前处理时间
  return processingTimeService.getCurrentProcessingTime();
 }
};

结合以上源码分析发现,使用DateTimeBucketer分桶是采用当前处理时间,采用当前处理时间必然会跟事件事件存在差异,因此会导致数据跨分区落入HDFS文件,举个例子,假设有一条数据事件时间是2019-09-29 23:59:58,那这条数据应该落在2019/09/29分区,但由于这条数据延迟了3秒过来,当处理过来时当前处理时间已经是2019-09-30 00:00:01,所以这条数据会被落到2019/09/30分区,针对一些重要场景数据这样的结果是不可接受的。

0x3 解决方案

从以上第二节源码分析可以看出,解决问题的核心在getBucketPath方法中时间的获取,只要把这里的时间改为事件即可,而正好这个方法的第三参数就是element,代表每一条记录,只要记录中有事件时间就可以获取。既然现有的实现源码不好改,那我们可以自己基于Bucketer接口实现一个EventTimeBucketer分桶器,实现源码如下:

public class EventTimeBucketer implements Bucketer<BaseCountVO> {
    private static final String DEFAULT_FORMAT_STRING = "yyyy/MM/dd";

    private final String formatString;

    private final ZoneId zoneId;
    private transient DateTimeFormatter dateTimeFormatter;

    public EventTimeBucketer() {
        this(DEFAULT_FORMAT_STRING);
    }

    public EventTimeBucketer(String formatString) {
        this(formatString, ZoneId.systemDefault());
    }

    public EventTimeBucketer(ZoneId zoneId) {
        this(DEFAULT_FORMAT_STRING, zoneId);
    }

    public EventTimeBucketer(String formatString, ZoneId zoneId) {
        this.formatString = formatString;
        this.zoneId = zoneId;
        this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(this.zoneId);
    }

    //记住,这个方法一定要加,否则dateTimeFormatter对象会是空,此方法会在反序列的时候调用,这样才能正确初始化dateTimeFormatter对象
    //那有的人问了,上面构造函数不是初始化了吗?反序列化的时候是不走构造函数的
    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();

        this.dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
    }

    @Override
    public Path getBucketPath(Clock clock, Path basePath, BaseCountVO element) {
        String newDateTimeString = dateTimeFormatter.format(Instant.ofEpochMilli(element.getTimestamp()));
        return new Path(basePath + "/" + newDateTimeString);
    }
}

大家实际项目中可以把BaseCountVO改成自己的实体类即可,使用的时候只要换一下setBucketer值,代码如下:

BucketingSink<Object> sink = new BucketingSink<>(path);
//通过这样的方式来实现数据跨天分区
sink.setBucketer(new EventTimeBucketer<>("yyyy/MM/dd"));
sink.setWriter(new StringWriter<>());
sink.setBatchSize(1024 * 1024 * 256L);
sink.setBatchRolloverInterval(30 * 60 * 1000L);
sink.setInactiveBucketThreshold(3 * 60 * 1000L);
sink.setInactiveBucketCheckInterval(30 * 1000L);
sink.setInProgressSuffix(".in-progress");
sink.setPendingSuffix(".pending");
上一篇:netty案例,netty4.1中级拓展篇九《Netty集群部署实现跨服务端通信的落地方案》


下一篇:java设计模式----模版模式+内部类+设计时间事件处理框架