基于SLS+Blink的实时计算最佳实践

日志服务简介

阿里云的日志服务(SLS)是针对日志类数据的一站式服务,无需开发就能快捷完成海量日志数据的采集、消费、投递以及查询分析等功能,提升运维、运营效率。在采集端支持30多种写入方式,包括自研的客户端Logtail,开源软件如Logstash、Fluent,Flume,Beats等,各种语言的SDK/Producer,无论是嵌入式设备、网页、服务器、程序等都能轻松接入。在消费端,支持与Storm、Spark Streaming、Flink/Blink等大数据系统无缝对接。
基于SLS+Blink的实时计算最佳实践

阿里云实时计算(Blink)

阿里云实时计算是基于Apache Flink构建的一站式、高性能实时大数据处理平台,广泛适用于流式数据处理、离线数据处理等场景。阿里云实时计算提供了如下两种数据处理API:

  • Flink SQL:通过DDL的方式定义Source和Sink,用SQL来实现数据的处理。
  • Flink Datastream: 在程序中使用各个Source和Sink的SDK,通过提交jar的方式运行托管的Flink Job。

通过这两种API,既可以把SLS作为数据源(Source),实现日志端到端的实时采集与处理,也可以把SLS作为结果的输出目标(Sink),在SLS对结果实时查询分析,配置可视化报表等。
基于SLS+Blink的实时计算最佳实践

Flink SQL

Flink SQL是阿里云实时计算为了简化计算模型、降低用户使用实时计算门槛而设计的一套符合标准SQL语义的开发语言。

创建SLS源表

SLS源表对应SLS中的Logstore,表中的字段与Logstore中日志字段一一映射,像执行SQL一样流式处理SLS中的数据。除了Logstore所属的Region对应的Endpoint和Project之外,还需要具有读SLS Logstore权限的Access Key以及消费数据起始位置对应的时间点。

create table sls_stream(
  a INT,
  b INT,
  c VARCHAR
) with (
  type ='sls',
  endPoint ='http://cn-hangzhou-share.log.aliyuncs.com',
  accessId ='<yourAccessId>',
  accessKey ='<yourAccessKey>',
  startTime = '2017-07-05 00:00:00',
  project ='<yourProjectName>',
  logStore ='<yourLogStoreName>',
  consumerGroup ='<yourConsumerGroupName>'
);

Checkpoint

SLS的底层存储Loghub,是一个类似Kafka的Append Only的存储系统,覆盖Kafka 100%的功能。与Kafka的partition类型,Logstore中的数据存储在每个Shard中。每个Shard都可以通过cursor或者时间戳,确定日志在Shard中的存储位置(对应Kafka中的Offset)。在消费过程中,为了支持程序重启时尽可能少的数据重复,需要将最新消费位置记录下来,用于进程重启之后继续消费。这个位置就是我们所说的checkpoint。
基于SLS+Blink的实时计算最佳实践
目前,Flink SQL任务checkpoint保存在Flink的State中,如果任务Failover 或者暂停之后恢复,会从State中恢复消费位置继续消费。然而如果任务重启或者其他原因造成的State丢失,任务将从启动时指定的时间点开始消费。创建源表时建议指定参数consumerGroup,Blink将自动同步消费位置到SLS服务端,以用于在SLS控制台监控消费进度。注意:任务重启时不会使用SLS服务端的消费位置恢复。

创建SLS结果表

通过定义结果表,通过INSERT语句,把从源表中处理之后的数据写入到SLS的Logstore中。

create table sls_output(
 `name` VARCHAR,
 age BIGINT,
 birthday BIGINT
)with(
 type='sls',
 endPoint='http://cn-hangzhou-corp.sls.aliyuncs.com',
 accessId='<yourAccessId>',
 accessKey='<yourAccessKey>',
 project='<yourProjectName>',
 logstore='<yourLogstoreName>'
);

INSERT INTO sls_output SELECT age, birthday FROM source_table;

Flink Datastream

Flink SQL 的不足

  1. Flink SQL已经能够实现许多场景下的数据处理需求,但是受限于SQL的表达能力,对于比较复杂的业务场景,SQL实现起来比较复杂。
  2. 对于DDL 的定义,需要指定固定的字段列表,对于日志场景而言,日志字段不固定的情况非常普遍,这就意味着很难提前定义好全部的字段。
  3. 在日志中,很多时候单个字段是JSON或者其他复杂的形式,如果要在SQL里面解析和处理这类字段,不如在程序中之间处理起来灵活。

对于这些场景,可以考虑使用Data Stream API,通过自定义程序,实现更复杂的业务逻辑。

Maven 依赖

SLS开发了一个与开源Flink集成的SDK,同样适用于Blink。

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>flink-log-connector</artifactId>
    <version>0.1.19.1</version>
</dependency>

Github源码:https://github.com/aliyun/aliyun-log-flink-connector

消费SLS示例

public class ConsumerSample {
    private static final String SLS_ENDPOINT = "";
    private static final String ACCESS_KEY_ID = "";
    private static final String ACCESS_KEY_SECRET = "";
    private static final String SLS_PROJECT = "";
    private static final String SLS_LOGSTORE = "";

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(1);
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setStateBackend(new FsStateBackend("your checkpoint dir"));
        Properties configProps = new Properties();
        configProps.put(ConfigConstants.LOG_ENDPOINT, SLS_ENDPOINT);
        configProps.put(ConfigConstants.LOG_ACCESSSKEYID, ACCESS_KEY_ID);
        configProps.put(ConfigConstants.LOG_ACCESSKEY, ACCESS_KEY_SECRET);
        configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "10");
        configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
        configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "23_ots_sla_etl_product1");
        configProps.put(ConfigConstants.LOG_CHECKPOINT_MODE, CheckpointMode.ON_CHECKPOINTS.name());
        configProps.put(ConfigConstants.LOG_COMMIT_INTERVAL_MILLIS, "10000");

        FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer();
        DataStream<FastLogGroupList> stream = env.addSource(
                new FlinkLogConsumer<>(SLS_PROJECT, SLS_LOGSTORE, deserializer, configProps));

        stream.flatMap((FlatMapFunction<FastLogGroupList, String>) (value, out) -> {
            for (FastLogGroup logGroup : value.getLogGroups()) {
                int logCount = logGroup.getLogsCount();
                for (int i = 0; i < logCount; i++) {
                    FastLog log = logGroup.getLogs(i);
                    // processing log
                }
            }
        });
        stream.writeAsText("log-" + System.nanoTime());
        env.execute("Flink consumer");
    }
}

Checkpoint

与SQL类似,SDK也会把checkpoint保存到Flink State中,此外,还支持把Checkpoint同步到SLS服务端,这样当Flink本地的State无法恢复时,还能从服务端获取checkpoint,从而保证即便任务重启甚至重建,只要服务端的消费组没有删除,checkpoint就不会丢失。SDK同步checkpoint支持如下策略:

  • 与Flink snapshotState同步,即在Flink调用snapshotState时,同步到服务端。默认是这种方式。
  • 自动提交checkpoint,即定时提交checkpoint到服务端。好处是当Flink下游任务不支持exactly once时最大程度上避免数据重复。
  • 不同步到SLS服务端。

处理结果写入SLS示例

public class ProducerSample {
    private static final String SLS_ENDPOINT = "cn-hangzhou.log.aliyuncs.com";
    private static final String ACCESS_KEY_ID = "";
    private static final String ACCESS_KEY = "";
    private static final String SLS_PROJECT = "";
    private static final String SLS_LOGSTORE = "test-flink-producer";

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(3);
        DataStream<String> stream = env.addSource(new EventsGenerator());
        Properties configProps = new Properties();
        configProps.put(ConfigConstants.LOG_ENDPOINT, SLS_ENDPOINT);
        configProps.put(ConfigConstants.LOG_ACCESSSKEYID, ACCESS_KEY_ID);
        configProps.put(ConfigConstants.LOG_ACCESSKEY, ACCESS_KEY);
        configProps.put(ConfigConstants.LOG_PROJECT, SLS_PROJECT);
        configProps.put(ConfigConstants.LOG_LOGSTORE, SLS_LOGSTORE);

        FlinkLogProducer<String> logProducer = new FlinkLogProducer<>(new SimpleLogSerializer(), configProps);
        logProducer.setCustomPartitioner(new LogPartitioner<String>() {
            @Override
            public String getHashKey(String element) {
                String hash = "";
                try {
                    MessageDigest md = MessageDigest.getInstance("MD5");
                    md.update(element.getBytes());
                    hash = new BigInteger(1, md.digest()).toString(16);
                } catch (NoSuchAlgorithmException ignore) {
                }
                StringBuilder builder = new StringBuilder();
                while (builder.length() < 32 - hash.length()) {
                    builder.append("0");
                }
                builder.append(hash);
                return builder.toString();
            }
        });
        stream.addSink(logProducer);
        env.execute("Flink producer");
    }

    public static class EventsGenerator implements SourceFunction<String> {
        private volatile boolean running = true;

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            long seq = 0;
            while (running) {
                Thread.sleep(10);
                ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

任务监控

Metric监控

对于Flink SQL任务,Blink控制台提供了非常完备的监控报表,可以通过这些报表观察任务运行的状态,如延迟,内存状态,Failover等。
基于SLS+Blink的实时计算最佳实践
Metric解释可以参考Blink作业运维相关文档:https://help.aliyun.com/document_detail/62482.html
注意:SLS Datastream SDK暂时还没有实现Metric上报。

消费进度与监控报警

在SLS查看消费组对应的消费进度。
基于SLS+Blink的实时计算最佳实践
SLS对于每个消费组会定时输出延迟日志,根据这个延迟日志结合SLS的告警,可以用于监控消费延迟。参考如何开通消费组日志:https://help.aliyun.com/document_detail/85663.html
基于SLS+Blink的实时计算最佳实践

任务日志采集

Blink支持把作业日志存储到SLS中,在作业编辑页面右侧的作业参数页面,配置Log4j appender:
基于SLS+Blink的实时计算最佳实践

log4j.logger.org.apache.hadoop=OFF
log4j.appender.loghub = com.alibaba.blink.log.loghub.BlinkLogHubAppender
log4j.appender.loghub.Threshold = ERROR
log4j.appender.loghub.projectName = <your SLS Project>
log4j.appender.loghub.logstore = <your SLS Logstore>
log4j.appender.loghub.endpoint = <your SLS Endpoint>
log4j.appender.loghub.accessKeyId = <your Access Key ID>
log4j.appender.loghub.accessKey = <your Access Key Secret>

常见问题

部分task没有读到数据

从SLS消费数据的本质就是从每个shard的指定位置开始消费,直到最新位置。然后单个shard不支持并发消费,也就是说一个shard最多被一个task消费到。而不管是SQL还是Data Stream,Shard分配的方式都是对shard本身的信息做hash,然后对task总数取模来分配的。假设某个shard hashcode是x,task 个数为y,当前task的id为z,那么仅当 x%y=z 时,当前task会消费这个shard。因此可能存在某个task没有分配到任何shard的情况。

消费太慢导致数据堆积

数据堆积的根本原因是写入速度超过了消费速度,而如何提高消费速度取决于具体的场景。常见的原因有:

  • 下游处理节点慢导致source节点反压。这种情况可以通过观察Blink的反压状态是否是HIGH来确认,通过优化下游任务节点速度来解决。
  • Shard个数太少。当shard数量少于task个数时,无疑会造成部分task空跑的现象,此时增加task个数已经对总体的并发没有任何影响,此时可以尝试在SLS侧分裂shard个数解决。
消费组在多个作业之间没有产生分配shard的效果

Blink消费SLS的数据并没有使用消费组来实现均衡消费的效果,提供的消费组名称仅仅是用于在SLS服务端保存消费位点。如果需要类似Kafka 消费组的功能,应使用SLS的Consumer Library。参考文档 https://help.aliyun.com/document_detail/28998.html

Flink Datastream任务没有同步checkpoint到SLS服务端

检查Blink任务作业参数中的 blink.checkpoint.interval.ms 是否设置过大。

更多资料

Blink开发Datastream作业:https://help.aliyun.com/document_detail/111876.html
定义SLS源表:https://help.aliyun.com/knowledge_detail/62521.html
定义SLS结果表:https://help.aliyun.com/knowledge_detail/62529.html




上一篇:GRYZ20211104 Simulation problem solving report


下一篇:Wavebee SDK solving compatibility problem with ubuntu20