Flink -没写完更新中

文章目录

前言

提示:这里可以添加本文要记录的大概内容,例如:我是一个帅哥,你懂吧?

文献
《Flink入门与实战》 - 徐葳
/

一、概述

1 Flink是什么

​ Apache Flink,内部是用Java及Scala编写的分布式流数据计算引擎,可以支持以批处理或流处理的方式处理数据,在2014年这个项目被Apache孵化器所接受后,Flink迅速成为ASF(ApacheSoftware Foundation)的*项目之一,在2019年1月,阿里巴巴集团收购了Flink创始公司(DataArtisans),打造了阿里云商业化的实时计算Flink产品。

它有如下几个特点

  1. 低延迟
  2. 高吞吐
  3. 支持有界数据/*数据的处理,数据流式计算
  4. 支持集群,支持HA,可靠性强

什么是有界数据/*数据?

  • 有界数据:数据是有限的,一条SELECT查询下的数据不会是源源不断的
  • *数据:数据源源不断,不知道为什么时候结束,例如监控下的告警

2 架构分层

名称 描述
Deploy 部署方式 本地/集群/云服务部署。
Core 分布式流处理模型 计算核心实现,为API层提供基础服务。
API 调用接口 提供面向*数据的流处理API及有界数据的批处理API,其中流处理对应DataStream API,批处理对应DataSet API
Library 应用层 提供应用计算框架,面向流处理支持CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作),面向批处理支持FlinkML(机器学习库)、Gelly(图处理)、Table 操作。

3 基本组件

一个Flink任务 = DataSource + Transformation + DataSink

DataSource :数据源

Transformation :数据处理

DataSink:计算结果输出

而Flink在网络传输中通过缓存块承载数据,可以通过设置缓存块的超时时间,变相的决定了数据在网络中的处理方式。

4 其他流式计算框架+

1

1

1

1

1

二、入门与使用

1 Flink基本安装

1.1 Linux

下载链接
Index of /dist/flink/flink-1.14.3 (apache.org)

首先去apache官网下载部署的软件包,下载完成之后进行解压

## 解压
tar -zxvf flink-1.14.3-bin-scala_2.12.tgz 
## 进入bin目录 启动
./start-cluster.sh
## Flink提供的WebUI的端口是8081 此时可以去看看是否启动完成
netstat -anp |grep 8081

接着通过页面访问8081端口来个初体验

Flink -没写完更新中

关于Linux下的Flink Shell终端的使用

文章目录
flink~使用shell终端_cai_and_luo的博客-CSDN博客

1.2 Java

文章目录
Flink入门之Flink程序开发步骤(java语言)_胖虎儿的博客-CSDN博客

导入依赖

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.14.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.14.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.14.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.14.3</version>
</dependency>

入门Demo

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class DemoApplication {
    public static void main(String[] args) throws Exception {

        /**
         * 大致的流程就分为
         * 1.环境准备
         * 设置运行模式
         * 2.加载数据源
         * 3.数据转换
         * 4.数据输出
         * 5.执行程序
         */

        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置运行模式
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2.加载数据源
        DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++",
                "java,scala,php", "java,scala", "java");
        // 3.数据转换
        DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String element, Collector<String> out) throws Exception {
                String[] wordArr = element.split(",");
                for (String word : wordArr) {
                    out.collect(word);
                }
            }
        });
        // DataStream 下边为DataStream子类
        SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return value.toUpperCase();
            }
        });
        // 4.数据输出
        source.print();
        // 5.执行程序
        env.execute();
    }
}

关于在设置运行模式的代码上,有三种选择

/**
 * Runtime execution mode of DataStream programs. Among other things, this controls task scheduling,
 * network shuffle behavior, and time semantics. Some operations will also change their record
 * emission behaviour based on the configured execution mode.
 *
 * @see <a
 *     href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API">
 *     https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API</a>
 */
@PublicEvolving
public enum RuntimeExecutionMode {

    /**
     * The Pipeline will be executed with Streaming Semantics. All tasks will be deployed before
     * execution starts, checkpoints will be enabled, and both processing and event time will be
     * fully supported.
     */
     /** 流处理模式 */
    STREAMING,

    /**
     * The Pipeline will be executed with Batch Semantics. Tasks will be scheduled gradually based
     * on the scheduling region they belong, shuffles between regions will be blocking, watermarks
     * are assumed to be "perfect" i.e. no late data, and processing time is assumed to not advance
     * during execution.
     */
     /** 批处理模式 */
    BATCH,

    /**
     * Flink will set the execution mode to {@link RuntimeExecutionMode#BATCH} if all sources are
     * bounded, or {@link RuntimeExecutionMode#STREAMING} if there is at least one source which is
     * unbounded.
     */
    /** 自动模式 */
    AUTOMATIC
}

1.3 Scala+

与Java一样都在IDEA编译器上做,此时引入依赖

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.12</artifactId>
    <version>1.14.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.14.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.14.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.14.3</version>
</dependency>

// …

待定 …

// …

2 常用API

第一次学时,光看上面的Demo例子比较难以理解,所以通过书下面的API内容对照上面的Demo来进行理解,先来了解Flink四种层次的API详情

层级 描述信息 备注
底层 API 偏底层,易用性比较差,提供时间/状态的细粒度控制 Stateful Stream Processing
核心 API 对有界/*数据提供处理方法 DataStream(流处理) / DataSet(批处理)
Table API / 声明式DSL
SQL / 高级语言

Flink -没写完更新中

2.1 DataStream 流处理

主要分为三个流程

  1. DataSource 数据输入:addSource(sourceFunction)为程序添加一个数据源。
  2. Transformation 数据处理:对一个或多个数据源进行操作。
  3. Sink 数据输出:通过Transformation 处理后的数据输出到指定的位置。

Flink -没写完更新中

DataSource

看看他们的API

DataSource API 描述
readTextFile(文件路径) 逐行读取文本文件的数据
socketTextStream(地址信息) 从socket中读取数据
fromCollection(集合数据) 从集合内获取数据
其他第三方输入数据…或者自定义数据源 通过Flink提供的内置连接器去链接其它数据源

如果是自定义数据源,有两种实现方式

  1. 实现SourceFunction接口(并行度为1 = 无并行度)
  2. 实现ParallelSourceFunction接口 / 继承RichParallelSourceFunction

什么是并行度?

​ 一个Flink程序由多个任务(Source、Transformation和Sink)组成。一个任务由多个并行实例(线程)来执行,一个任务的并行实例(线程)数目被称为该任务的并行度。

Transformation

接下来是Transformation数据处理,Flink针对DataStream提供了大量的已经实现的算子。

Flink -没写完更新中

DataStream API 描述
Map 输入一个元素,然后返回一个元素,中间可以进行清洗转换等操作
FlatMap 输入一个元素,可以返回零个、一个或者多个元素
Filter 过滤函数,对传入的数据进行判断,符合条件的数据会被留下
KeyBy 根据指定的Key进行分组,Key相同的数据会进入同一个分区,典型用法如下:1、DataStream.keyBy(“someKey”) 指定对象中的someKey段作为分组Key。2、DataStream.keyBy(0) 指定Tuple中的第一个元素作为分组Key。
Reduce 对数据进行聚合操作,结合当前元素和上一次Reduce返回的值进行聚合操作,然后返回一个新的值
Aggregations sum()、min()、max()等
Union 合并多个流,新的流会包含所有流中的数据,但是Union有一个限制,就是所有合并的流类型必须是一致的
Connect 和Union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法
coMap和coFlatMap 在ConnectedStream中需要使用这种函数,类似于Map和flatMap
Split 根据规则把一个数据流切分为多个流
Select 和Split配合使用,选择切分后的流

关于Flink针对DataStream提供的一些数据分区规则

分区规则 描述
DataStream.shuffle() 随机分区
DataStream.rebalance() 对数据集进行再平衡、重分区和消除数据倾斜
DataStream.rescale() 重新调节
DataStream.partitionCustom(partitioner,0) 或者 DataStream.partitionCustom(partitioner,“smeKey”) 自定义分区

Sink

数据处理后的输出

Sink API 描述
writeAsText() 将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
print() / printToErr() 打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
自定义输出 addSink可以实现把数据输出到第三方存储介质中。系统提供了一批内置的Connector,它们会提供对应的Sink支持

示例一:自定义数据源(SourceFunction)

第一步,继承SourceFunction接口,实现自定义数据源类

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;

/**
 * 自定义数据源
 * @author 李家民
 */
public class DemoTransactionSource implements SourceFunction<String> {
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (true) {
            // 发射元素
            ctx.collect(String.valueOf(new Random().nextInt(50)
            ));
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
    }
}

第二步,在Flink代码中引入这个数据源

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

/**
 * @author 李家民
 */
@Component
public class FlinkInitialize {

    @PostConstruct
    public void starter() throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置运行模式
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 2.设置自定义数据源
        DataStreamSource<String> stringDataStreamSource = env.addSource(new DemoTransactionSource(), "测试用的数据源");

        // 3.数据处理
        SingleOutputStreamOperator<String> stringSingleOutputStreamOperator = stringDataStreamSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return value;
            }
        });

        // 4.数据输出
        stringSingleOutputStreamOperator.print();

        // 5.执行程序
        env.execute();
    }
}

此时执行代码,就可以把引入的数据进行打印

SourceFunction定义了run和cancel两个方法和SourceContext内部接口。

  • run(SourceContex):实现数据获取逻辑,并可以通过传入的参数ctx进行向下游节点的数据转发。
  • cancel():用来取消数据源,一般在run方法中,会存在一个循环来持续产生数据,cancel方法则可以使该循环终止。
  • SourceContext:source函数用于发出元素和可能的watermark的接口,返回source生成的元素的类型。

示例二:自定义分区

数据源沿用上述案例的代码,自定义分区是通过实现Partitioner接口去做处理

首先看看自定义分区的实现类

/**
 * 自定义分区
 * @author 李家民
 */
public class DemoPartitioner implements Partitioner<String> {
    @Override
    public int partition(String key, int numPartitions) {
        System.out.println("目前分区总数=" + numPartitions + "  当前值=" + key + "  通过最左边的值看分区号");

        if (new Integer(key) > 20) {
            return 1;
        } else {
            return 2;
        }
    }
}

然后在Flink的代码中体现

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

@Component
public class FlinkInitialize {
    @PostConstruct
    public void starter() throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置运行模式
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2.设置自定义数据源
        DataStreamSource<String> stringDataStreamSource = env.addSource(new DemoTransactionSource(), "测试用的数据源");
        // 3.数据处理
        DataStream<String> dataStream = stringDataStreamSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return value;
            }
        }).partitionCustom(new DemoPartitioner(), new KeySelector<String, String>() {
            @Override
            public String getKey(String value) throws Exception {
                return value;
            }
        });
        // 4.数据输出
        dataStream.print();
        // 5.执行程序
        env.execute();
    }
}

输出后的结果如下

Flink -没写完更新中

示例三:NettyServer作为数据源

第一步:搭建数据来源,这里选择了Netty服务端作为本次示例

import com.sun.org.slf4j.internal.Logger;
import com.sun.org.slf4j.internal.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

/**
 * @author 李家民
 */
@Component
public class NettyServerInitialize {

    private static Logger log = LoggerFactory.getLogger(NettyServerInitialize.class);
    public static EventLoopGroup bossGroup;
    public static EventLoopGroup workerGroup;
    public static ServerBootstrap serverBootstrap;
    public static ChannelFuture channelFuture;
    public static Boolean isRunning = false;

    static {
        // Server初始化
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup(2);
        serverBootstrap =
                new ServerBootstrap().group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                System.out.println("客户端进入:" + ch.remoteAddress().getAddress());
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new NettyServerHandler());
                                pipeline.addLast(new StringEncoder());
                            }
                        });
    }

    @PostConstruct
    public void starter() throws InterruptedException {
        try {
            if (!isRunning) {
                channelFuture = serverBootstrap.bind(16668)
                        .addListener(new ChannelFutureListener() {
                            @Override
                            public void operationComplete(ChannelFuture future) throws Exception {
                                if (future.isSuccess()) {
                                    System.out.println("监听端口 16668 成功");
                                    isRunning = true;
                                } else {
                                    log.error("监听端口 16668 失败");
                                }
                            }
                        }).channel().closeFuture().sync();
            }
        } catch (Exception e) {
            e.printStackTrace();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

第二步:XXXX

示例四:RabbitMQ作为数据源

1

示例五:待定

1

2.2 DataSet 批处理

1

2.3 Table API/SQL

1

1

2.4 关于序列化

1

111111

3 集群模式

1

1

1

三、高阶功能使用

1

四、原理解析

1

总结

提示:这里对文章进行总结:
例如:以上就是今天要讲的内容,本文仅仅简单介绍了pandas的使用,而pandas提供了大量能使我们快速便捷地处理数据的函数和方法。

上一篇:使用 Jupyter Notebook 运行 Delta Lake 入门教程


下一篇:YARN NodeLabel功能以及在EMR弹性伸缩中的应用