Flink+ClickHouse 玩转企业级实时大数据开发

Download: Flink+ClickHouse 玩转企业级实时大数据开发

Flink,ClickHouse介绍

Apache Flink是一个在*和有界数据流上进行有状态计算的框架。由于许多流应用程序被设计为在最短的停机时间内连续运行,流处理器必须提供出色的故障恢复,以及在应用程序运行时监视和维护应用程序的工具。
Apache Flink非常关注流处理的操作方面。在这里,我们将解释Flink的故障恢复机制,并介绍其用于管理和监督运行中的应用程序的特性。
Flink+ClickHouse主要特点:

  1. 指标实现支持 sql 描述,以前的方案使用是 storm 的程序,通过 stormsql 实现,包括 flinksql,这些内容对于 UDF 支持相对有限,但是现在这套 Flink+ClickHouse 基本上可以把分析师提的指标通过 sql 实现。

  2. 指标的上下线互不影响,这个主要是解决上边提到的关于 Flink 任务消费了 topic 以后,假如用户提出新的指标的时候,是启动新任务还是要不断修改的问题。

  3. 数据可回溯,方便异常排查,这个就类似上边提到的假如我的日活掉了,需要知道哪些指标的口径的逻辑掉了、哪个上报的数据掉了,如 cmd 掉了还是数据流 kafka 掉了还是用户上报的时候指标没有上报导致的日活掉了。假如单纯的 flink 的话,只是会计算出那个指标掉了,是没办法回溯的。

  4. 计算快,一个周期内完成所有的指标计算,现在的 horizon 曲线可能是几百上千,需要在五分钟之内或者十分钟之内,把所有分时、累时、以及维度下降的指标全部计算出来。

  5. 支持实时流,分部署部署,运维简单。

如何学习?通过Flink+ClickHouse 玩转企业级实时大数据开发课程你将很好掌握以上特点。

Flink+ClickHouse 如何开发企业级实时大数据?

Flink安装使用

Step 1 Flink安装
要能够运行Flink,唯一的要求是安装一个可工作的Java 8或11。你可以通过发出以下命令来检查Java的正确安装:

java -version

$ tar -xzf flink-1.13.0-bin-scala_2.11.tgz
$ cd flink-1.13.0-bin-scala_2.11

Step 2: Start a Cluster
Flink ships with a single bash script to start a local cluster.

$ ./bin/start-cluster.sh

启动集群。
在主机上启动独立进程。
启动主机上的任务执行程序守护进程。

Step 3: Submit a Job

Flink的发布附带了一些示例Jobs。您可以快速地将其中一个应用程序部署到正在运行的集群中。

$ ./bin/flink run examples/streaming/WordCount.jar
$ tail log/flink-*-taskexecutor-*.out
  (to,1)
  (be,1)
  (or,1)
  (not,1)
  (to,2)
  (be,2)

此外,您可以检查Flink的Web UI来监视Cluster和运行Job的状态。

Step 4: Stop the Cluster #
完成后,可以快速停止集群和所有正在运行的组件。

$ ./bin/stop-cluster.sh

Flink基础 API

Flink 程序是实现了分布式集合转换(例如过滤、映射、更新状态、join、分组、定义窗口、聚合)的规范化程序。集合初始创建自 source(例如读取文件、kafka 主题,或本地内存中的集合)。结果通过 sink 返回,例如,它可以将数据写入(分布式)文件,或标准输出(例如命令行终端)。Flink 程序可以在多种环境中运行,独立运行或嵌入到其他程序中。可以在本地 JVM 中执行,也可以在多台机器的集群上执行。

针对有界和*两种数据 source 类型,你可以使用 DataSet API 来编写批处理程序或使用 DataStream API 来编写流处理程序。本篇指南将介绍这两种 API 通用的基本概念

Flink+ClickHouse 玩转企业级实时大数据开发项目实战

/**
 * clickhouse方言
 */
public class ClickHouseJDBCDialect implements JDBCDialect {

    private static final long serialVersionUID = 1L;

    @Override
    public boolean canHandle(String url) {
        return url.startsWith("jdbc:clickhouse:");
    }

    @Override
    public Optional<String> defaultDriverName() {
        return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
    }

    @Override
    public String quoteIdentifier(String identifier) {
        return "`" + identifier + "`";
    }

    @Override
    public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        return Optional.of(getInsertIntoStatement(tableName, fieldNames));
    }

    @Override
    public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
        return null;
    }

}

。。。

上一篇:clickhouse 


下一篇:ClickHouse单机安装部署