flink学习笔记-各种Time

说明:本文为《Flink大数据项目实战》学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程:

Flink大数据项目实战:http://t.cn/EJtKhaz

flink学习笔记-各种Time

从上图可以看出Flink 中的Time大致分为以下三类:

1.Event Time:Event 真正产生的时间,我们称之为Event Time。

2.Ingestion Time:Event 事件被Source拿到,进入Flink处理引擎的时间,我们称之为Ingestion Time。

3.Window Processing Time:Event事件被Flink 处理(比如做windows操作)时的时间,我们称之为Window Processing Time。

4. Stateful Operations

flink学习笔记-各种Time

什么是状态?

state一般指一个具体的task/operator的状态,比如当前处理那些数据,数据处理的进度等等。

Flink state操作状态分为两类:

1.Operator State

Operator State跟一个特定operator的一个并发实例绑定,整个operator只对应一个state。

2.Keyed State

基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,可能都对应一个state。

Flink 每个操作状态又分为两类:

Keyed State和Operator State可以以两种形式存在:原始状态和托管状态( Flink框架管理的状态)。

1.原始状态:比如一个字符串或者数组,它需要序列化,保存到内存或磁盘,或者外部存储中,这就是它的原始状态。

2.托管状态:比如数据放在Hash表中,或者放在HDFS中,或者放在rocksdb中,这种就是托管状态。当需要处理数据的时候,从托管状态中读取出来,还原成原始状态,甚至变量和集合,然后再进行处理。

5.Checkpoints(备份)

flink学习笔记-各种Time

什么是checkpoint?

所谓checkpoint,就是在某一时刻,将所有task的状态做一个快照(snapshot),然后存储到State Backend(比如hdfs)。checkpoint拥有轻量级容错机制,可以保证exactly-once 语义,用于内部失败的恢复(比如当应用挂了,它可以自动恢复从上次的进度接着执行)。

checkpoint基本原理:通过往source 注入barrier(可以理解为特殊的Event),barrier作为checkpoint的标志,它会自动做checkpoint无需人工干预。

6.Savepoint

savepoint是流处理过程中的状态历史版本,它具有可以replay的功能。用于外部恢复,当Flink应用重启和升级,它会做一个先做一个savepoint,下次应用启动可以接着上次进度执行。

savepoint两种触发方式:

1.Cancel with savepoint

2.手动主动触发

savepoint可以理解为是一种特殊的checkpoint,savepoint就是指向checkpoint的一个指针,需要手动触发,而且不会过期,不会被覆盖,除非手动删除。正常情况下的线上环境是不需要设置savepoint的。除非对job或集群做出重大改动的时候,需要进行测试运行。

(4)Flink Runtime

1. Flink运行时架构

1.1Flink架构

Flink 运行时架构主要包含几个部分:Client、JobManager(master节点)和TaskManger(slave节点)。

flink学习笔记-各种Time

Client:Flink 作业在哪台机器上面提交,那么当前机器称之为Client。用户开发的Program 代码,它会构建出DataFlow graph,然后通过Client提交给JobManager。

JobManager:是主(master)节点,相当于YARN里面的REsourceManager,生成环境中一般可以做HA 高可用。JobManager会将任务进行拆分,调度到TaskManager上面执行。

TaskManager:是从节点(slave),TaskManager才是真正实现task的部分。

Client提交作业到JobManager,就需要跟JobManager进行通信,它使用Akka框架或者库进行通信,另外Client与JobManager进行数据交互,使用的是Netty框架。Akka通信基于Actor System,Client可以向JobManager发送指令,比如Submit job或者Cancel /update job。JobManager也可以反馈信息给Client,比如status updates,Statistics和results。

Client提交给JobManager的是一个Job,然后JobManager将Job拆分成task,提交给TaskManager(worker)。JobManager与TaskManager也是基于Akka进行通信,JobManager发送指令,比如Deploy/Stop/Cancel Tasks或者触发Checkpoint,反过来TaskManager也会跟JobManager通信返回Task Status,Heartbeat(心跳),Statistics等。另外TaskManager之间的数据通过网络进行传输,比如Data Stream做一些算子的操作,数据往往需要在TaskManager之间做数据传输。

1.2. TaskManger Slot

flink学习笔记-各种Time

TaskManager是进程,他下面运行的task(整个Flink应用是Job,Job可以拆分成很多个task)是线程,每个task/subtask(线程)下可运行一个或者多个operator,即OperatorChain。Task是class,抽象的,subtask是Object(类比学习),具体的。

一个TaskManager通过Slot(任务槽)来控制它上面可以接受多少个task,比如一个TaskManager划分了3个Task Slot(仅限内存托管,目前CPU未做隔离),它只能接受3个task。Slot均分TaskManager所托管的内存,比如一个TaskManager有6G内存,那么每个Slot分配2G。

同一个TaskManager中的task共享TCP连接(通过多路复用)和心跳消息。它们还可以共享数据集和数据结构,从而减少每个任务的开销。一个TaskManager有N个槽位只能接受N个Task吗?不是,后面会讲共享槽位。

1.3. OperatorChain && Task

为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。以wordcount为例,解析不同视图下的数据流,如下图所示。

flink学习笔记-各种Time

数据流(逻辑视图)

创建Source(并行度设置为1)读取数据源,数据经过FlatMap(并行度设置为2)做转换操作,然后数据经过Key Agg(并行度设置为2)做聚合操作,最后数据经过Sink(并行度设置为2)将数据输出。

数据流(并行化视图)

并行度为1的Source读取数据源,然后FlatMap并行度为2读取数据源进行转化操作,然后数据经过Shuffle交给并行度为2的Key Agg进行聚合操作,然后并行度为2的Sink将数据输出,未优化前的task总和为7。

数据流(优化后视图)

并行度为1的Source读取数据源,然后FlatMap并行度为2读取数据源进行转化操作,然后数据经过Shuffle交给Key Agg进行聚合操作,此时Key Agg和Sink操作合并为一个task(注意:将KeyAgg和Sink两个operator进行了合并,因为这两个合并后并不会改变整体的拓扑结构),它们一起的并行度为2,数据经过Key Agg和Sink之后将数据输出,优化后的task总和为5.

1.4. OperatorChain的优点和组成条件

OperatorChain的优点

1.减少线程切换

2.减少序列化与反序列化

3.减少数据在缓冲区的交换

4.减少延迟并且提高吞吐能力

OperatorChain 组成条件

1.没有禁用Chain

2.上下游算子并行度一致 。

3.下游算子的入度为1(也就是说下游节点没有来自其他节点的输入)。

4.上下游算子在同一个slot group(后面紧跟着就会讲如何通过slot group先分配到同一个solt,然后才能chain) 。

5.下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)。

6.上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)。

7.上下游算子之间没有数据shuffle (数据分区方式是 forward)。

1.5. 编程改变OperatorChain行为

Operator chain的行为可以通过编程API中进行指定,可以通过在DataStream的operator后面(如someStream.map(..))调用startNewChain()来指示从该operator开始一个新的chain(与前面截断,不会被chain到前面)。可以调用disableChaining()来指示该operator不参与chaining(不会与前后的operator chain一起)。可以通过调用StreamExecutionEnvironment.disableOperatorChaining()来全局禁用chaining。可以设置Slot group,例如someStream.filter(...).slotSharingGroup(“name”)。可以通过调整并行度,来调整Operator chain。

2. Slot分配与共享

2.1共享Slot

flink学习笔记-各种Time

默认情况下,Flink 允许subtasks共享slot,条件是它们都来自同一个Job的不同task的subtask。结果可能一个slot持有该job的整个pipeline。

允许slot共享有以下两点好处:

1.Flink集群需要的任务槽与作业中使用的最高并行度正好相同(前提,保持默认SlotSharingGroup)。也就是说我们不需要再去计算一个程序总共会起多少个task了。

2.更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作 keyAggregation/sink 一样多的资源。如果有slot共享,将task的2个并行度增加到6个,能充分利用slot资源,同时保证每个TaskManager能平均分配到重的subtasks。

2.2共享Slot实例

flink学习笔记-各种Time

将 WordCount 的并行度从之前的2个增加到6个(Source并行度仍为1),并开启slot共享(所有operator都在default共享组),将得到如上图所示的slot分布图。

首先,我们不用去计算这个job会其多少个task,总之该任务最终会占用6个slots(最高并行度为6)。其次,我们可以看到密集型操作 keyAggregation/sink 被平均地分配到各个 TaskManager。

2.3 SlotSharingGroup(soft)

SlotSharingGroup是Flink中用来实现slot共享的类,它尽可能地让subtasks共享一个slot。

保证同一个group的并行度相同的sub-tasks 共享同一个slots。算子的默认group为default(即默认一个job下的subtask都可以共享一个slot)

为了防止不合理的共享,用户也能通过API来强制指定operator的共享组,比如:someStream.filter(...).slotSharingGroup("group1");就强制指定了filter的slot共享组为group1。怎么确定一个未做SlotSharingGroup设置算子的SlotSharingGroup什么呢(根据上游算子的group 和自身是否设置group共同确定)。适当设置可以减少每个slot运行的线程数,从而整体上减少机器的负载。

flink学习笔记-各种Time

2.4 CoLocationGroup(强制)

CoLocationGroup可以保证所有的并行度相同的sub-tasks运行在同一个slot,主要用于迭代流(训练机器学习模型)。

3. Slot & parallelism的关系

3.1 Slots && parallelism

flink学习笔记-各种Time

如上图所示,有两个TaskManager,每个TaskManager有3个槽位。假设source操作并行度为3,map操作的并行度为4,sink的并行度为4,所需的task slots数与job中task的最高并行度一致,最高并行度为4,那么使用的Slot也为4。

3.2如何计算Slot

如何计算一个应用需要多少slot?

flink学习笔记-各种Time

如果不设置SlotSharingGroup,那么需要的Slot数为应用的最大并行度数。如果设置了SlotSharingGroup,那么需要的Slot数为所有SlotSharingGroup中的最大并行度之和。比如已经强制指定了map的slot共享组为test,那么map和map下游的组为test,map的上游source的组为默认的default,此时default组中最大并行度为10,test组中最大并行度为20,那么需要的Slot=10+20=30。

4.Flink部署模式

4.1 Local 本地部署

Flink 可以运行在 Linux、Mac OS X 和 Windows 上。本地模式的安装唯一需要的只是 Java 1.7.x或更高版本,本地运行会启动Single JVM,主要用于测试调试代码。

4.2 Standalone Cluster集群部署

软件需求

1.安装Java1.8或者更高版本

2.集群各个节点需要ssh免密登录

Flink Standalone 运行流程前面已经讲过,这里就不在赘叙。

4.3Flink ON YARN

flink学习笔记-各种Time

Flink ON YARN工作流程如下所示:

首先提交job给YARN,就需要有一个Flink YARN Client。

第一步:Client将Flink 应用jar包和配置文件上传到HDFS。

第二步:Client向REsourceManager注册resources和请求APPMaster  Container

第三步:REsourceManager就会给某一个Worker节点分配一个Container来启动APPMaster,JobManager会在APPMaster中启动。

第四步:APPMaster为Flink的TaskManagers分配容器并启动TaskManager,TaskManager内部会划分很多个Slot,它会自动从HDFS下载jar文件和修改后的配置,然后运行相应的task。TaskManager也会与APPMaster中的JobManager进行交互,维持心跳等。

5.Flink Standalone集群部署

安装Flink之前需要提前安装好JDK,这里我们安装的是JDK1.8版本。

flink学习笔记-各种Time

5.1下载

可以到官网:https://archive.apache.org/dist/flink/ 将Flink1.6.2版本下载到本地。

flink学习笔记-各种Time

5.2解压

将下载的flink-1.6.2-bin-hadoop26-scala_2.11.tgz上传至主节点

flink学习笔记-各种Time

使用tar -zxvf flink-1.6.2-bin-hadoop26-scala_2.11.tgz命令解压flink安装包

flink学习笔记-各种Time

方便后期flink多版本的使用,可以创建flink软连接

ln -s flink-1.6.2 flink

flink学习笔记-各种Time

5.3配置环境变量

vi ~/.bashrc

export FLINK_HOME=/home/hadoop/app/flink

export PATH=$FLINK_HOME/bin:$PATH

使配置文件生效

source ~/.bashrc

查看flink版本

flink -v

5.4修改配置文件

1.修改flink-conf.yaml配置文件

vi flink-conf.yaml

#JobManager地址

jobmanager.rpc.address: cdh01

#槽位配置为3

taskmanager.numberOfTaskSlots: 3

#设置并行度为3

parallelism.default: 3

2.修改masters配置

vi masters

cdh01:8081

3.修改slaves配置

vi slaves

cdh01

cdh02

cdh03

5.5主节点安装目录同步到从节点

通过deploy.sh脚本将flink安装目录同步到其他节点。

deploy.sh flink-1.6.2 /home/hadoop/app/ slave

在从节点分别创建flink软连接

ln -s flink-1.6.2 flink

5.6启动服务

进入flink bin目录执行启动集群脚本start-cluster.sh

bin/start-cluster.sh

flink学习笔记-各种Time

通过web查看flink集群,查看相关集群信息。

http://cdh01:8081

5.7测试运行

查看官网案例:https://ci.apache.org/projects/flink/flink-docs-release-1.6/

1.启动nc服务

nc -l 9000

2.提交flink作业

bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

3.输入测试数据

5.7测试运行

查看官网案例:https://ci.apache.org/projects/flink/flink-docs-release-1.6/

1.启动nc服务

nc -l 9000

2.提交flink作业

bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

3.输入测试数据

flink学习笔记-各种Time

4.查看运行结果

在TaskManager界面查看Flink运行结果

flink学习笔记-各种Time

(5)Flink开发环境搭建

1. 创建Flink项目及依赖管理

1.1创建Flink项目

官网创建Flink项目有两种方式:

https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/java_api_quickstart.html

方式一:

mvn archetype:generate \

-DarchetypeGroupId=org.apache.flink \

-DarchetypeArtifactId=flink-quickstart-java \

-DarchetypeVersion=1.6.2

方式二

$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.6.2

这里我们仍然使用第一种方式创建Flink项目。

打开终端,切换到对应的目录,通过maven创建flink项目

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java  -DarchetypeVersion=1.6.2

flink学习笔记-各种Time

项目构建过程中需要输入groupId,artifactId,version和package

flink学习笔记-各种Time

Flink项目创建成功

flink学习笔记-各种Time

打开IDEA工具,点击open。

flink学习笔记-各种Time

选择刚刚创建的flink项目

flink学习笔记-各种Time

Flink项目已经成功导入IDEA开发工具

flink学习笔记-各种Time

通过maven打包测试运行

mvn clean package

flink学习笔记-各种Time

刷新target目录可以看到刚刚打包的flink项目

flink学习笔记-各种Time

1.2. Flink依赖

Core Dependencies(核心依赖):

1.核心依赖打包在flink-dist*.jar里

2.包含coordination, networking, checkpoints, failover, APIs, operations (such as windowing), resource management等必须的依赖

注意:核心依赖不会随着应用打包(<scope>provided</scope>)

3.核心依赖项尽可能小,并避免依赖项冲突

Pom文件中添加核心依赖

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-java</artifactId>

<version>1.6.2</version>

<scope>provided</scope>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_2.11</artifactId>

<version>1.6.2</version>

<scope>provided</scope>

</dependency>

注意:不会随着应用打包。

User Application Dependencies(应用依赖):

connectors, formats, or libraries(CEP, SQL, ML)、

注意:应用依赖会随着应用打包(scope保持默认值就好)

Pom文件中添加应用依赖

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka-0.10_2.11</artifactId>

<version>1.6.2</version>

</dependency>

注意:应用依赖按需选择,会随着应用打包,可以通过Maven Shade插件进行打包。

1.3. 关于Scala版本

Scala各版本之间是不兼容的(你基于Scala2.12开发Flink应用就不能依赖Scala2.11的依赖包)。

只使用Java的开发人员可以选择任何Scala版本,Scala开发人员需要选择与他们的应用程序的Scala版本匹配的Scala版本。

1.4. Hadoop依赖

不要把Hadoop依赖直接添加到Flink application,而是:

export HADOOP_CLASSPATH=`hadoop classpath`

Flink组件启动时会使用该环境变量的

特殊情况:如果在Flink application中需要用到Hadoop的input-/output format,只需引入Hadoop兼容包即可(Hadoop compatibility wrappers)

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-hadoop-compatibility_2.11</artifactId>

<version>1.6.2</version>

</dependency>

1.5 Flink项目打包

Flink 可以使用maven-shade-plugin对Flink maven项目进行打包,具体打包命令为mvn clean

package。

2. 自己编译Flink

2.1安装maven

1.下载

到maven官网下载安装包,这里我们可以选择使用apache-maven-3.3.9-bin.tar.gz。

2.解压

将apache-maven-3.3.9-bin.tar.gz安装包上传至主节点的,然后使用tar命令进行解压

tar -zxvf apache-maven-3.3.9-bin.tar.gz

3.创建软连接

ln -s apache-maven-3.3.9 maven

4.配置环境变量

vi ~/.bashrc

export MAVEN_HOME=/home/hadoop/app/maven

export PATH=$MAVEN_HOME/bin:$PATH

5.生效环境变量

source ~/.bashrc

6.查看maven版本

mvn –version

7. settings.xml配置阿里镜像

添加阿里镜像

<mirror>

<id>nexus-osc</id>

<mirrorOf>*</mirrorOf>

<name>Nexus osc</name>

<url>http://maven.aliyun.com/nexus/content/repositories/central</url>

</mirror>

2.2安装jdk

编译flink要求jdk8或者以上版本,这里已经提前安装好jdk1.8,具体安装配置不再赘叙,查看版本如下:

[hadoop@cdh01 conf]$ java -version

java version "1.8.0_51"

Java(TM) SE Runtime Environment (build 1.8.0_51-b16)

Java HotSpot(TM) 64-Bit Server VM (build 25.51-b03, mixed mode)

2.3下载源码

登录github:https://github.com/apache/flink,获取flink下载地址:https://github.com/apache/flink.git

打开Flink主节点终端,进入/home/hadoop/opensource目录,通过git clone下载flink源码:

git clone https://github.com/apache/flink.git

错误1:如果Linux没有安装git,会报如下错误:

bash: git: command not found

解决:git安装步骤如下所示:

1.安装编译git时需要的包(注意需要在root用户下安装)

yum install curl-devel expat-devel gettext-devel openssl-devel zlib-devel

yum install  gcc perl-ExtUtils-MakeMaker

2.删除已有的git

yum remove git

3.下载git源码

先安装wget

yum -y install wget

使用wget下载git源码

wget https://www.kernel.org/pub/software/scm/git/git-2.0.5.tar.gz

解压git

tar xzf git-2.0.5.tar.gz

编译安装git

cd git-2.0.5

make prefix=/usr/local/git all

sudo make prefix=/usr/local/git install

echo "export PATH=$PATH:/usr/local/git/bin" >> ~/.bashrc

source ~/.bashrc

查看git版本

git –version

错误2:git clone https://github.com/apache/flink.git

Cloning into 'flink'...

fatal: unable to access 'https://github.com/apache/flink.git/': SSL connect error

解决:

升级 nss 版本:yum update nss

2.4切换对应flink版本

使用如下命令查看flink版本分支

git tag

切换到flink对应版本(这里我们使用flink1.6.2)

git checkout release-1.6.2

2.5编译flink

进入flink 源码根目录:/home/hadoop/opensource/flink,通过maven编译flink

mvn clean install -DskipTests -Dhadoop.version=2.6.0

报错:

[INFO] BUILD FAILURE

[INFO] ------------------------------------------------------------------------

[INFO] Total time: 06:58 min

[INFO] Finished at: 2019-01-18T22:11:54-05:00

[INFO] Final Memory: 106M/454M

[INFO] ------------------------------------------------------------------------

[ERROR] Failed to execute goal on project flink-mapr-fs: Could not resolve dependencies for project org.apache.flink:flink-mapr-fs:jar:1.6.2: Could not find artifact com.mapr.hadoop:maprfs:jar:5.2.1-mapr in nexus-osc (http://maven.aliyun.com/nexus/content/repositories/central) -> [Help 1]

[ERROR]

[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.

[ERROR] Re-run Maven using the -X switch to enable full debug logging.

[ERROR]

[ERROR] For more information about the errors and possible solutions, please read the following articles:

[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException

[ERROR]

[ERROR] After correcting the problems, you can resume the build with the command

[ERROR]   mvn <goals> -rf :flink-mapr-fs

报错缺失flink-mapr-fs,需要手动下载安装。

解决:

1.下载maprfs jar包

通过手动下载maprfs-5.2.1-mapr.jar包,下载地址地址:https://repository.mapr.com/nexus/content/groups/mapr-public/com/mapr/hadoop/maprfs/5.2.1-mapr/

2.上传至主节点

将下载的maprfs-5.2.1-mapr.jar包上传至主节点的/home/hadoop/downloads目录下。

3.手动安装

手动安装缺少的包到本地仓库

mvn install:install-file -DgroupId=com.mapr.hadoop -DartifactId=maprfs -Dversion=5.2.1-mapr -Dpackaging=jar  -Dfile=/home/hadoop/downloads/maprfs-5.2.1-mapr.jar

4.继续编译

使用maven继续编译flink(可以排除刚刚已经安装的包)

mvn clean install -Dmaven.test.skip=true -Dhadoop.version=2.7.3  -rf :flink-mapr-fs

报错:

[INFO] BUILD FAILURE

[INFO] ------------------------------------------------------------------------

[INFO] Total time: 05:51 min

[INFO] Finished at: 2019-01-18T22:39:20-05:00

[INFO] Final Memory: 108M/480M

[INFO] ------------------------------------------------------------------------

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project flink-mapr-fs: Compilation failure: Compilation failure:

[ERROR] /home/hadoop/opensource/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[70,44] package org.apache.hadoop.fs does not exist

[ERROR] /home/hadoop/opensource/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[73,45] cannot find symbol

[ERROR] symbol:   class Configuration

[ERROR] location: package org.apache.hadoop.conf

[ERROR] /home/hadoop/opensource/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/

runtime/fs/maprfs/MapRFileSystem.java:[73,93] cannot find symbol

[ERROR] symbol:   class Configuration

缺失org.apache.hadoop.fs包,报错找不到。

解决:

flink-mapr-fs模块的pom文件中添加如下依赖:

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-common</artifactId>

<version>${hadoop.version}</version>

</dependency>

继续往后编译:

mvn clean install -Dmaven.test.skip=true -Dhadoop.version=2.7.3  -rf :flink-mapr-fs

又报错:

[ERROR] Failed to execute goal on project flink-avro-confluent-registry: Could not resolve dependencies for project org.apache.flink:flink-avro-confluent-registry:jar:1.6.2: Could not find artifact io.confluent:kafka-schema-registry-client:jar:3.3.1 in nexus-osc (http://maven.aliyun.com/nexus/content/repositories/central) -> [Help 1]

[ERROR]

报错缺少kafka-schema-registry-client-3.3.1.jar 包

解决:

手动下载kafka-schema-registry-client-3.3.1.jar包,下载地址如下:

http://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/3.3.1/kafka-schema-registry-client-3.3.1.jar

将下载的kafka-schema-registry-client-3.3.1.jar上传至主节点的目录下/home/hadoop/downloads

手动安装缺少的kafka-schema-registry-client-3.3.1.jar包

mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=3.3.1 -Dpackaging=jar  -Dfile=/home/hadoop/downloads/kafka-schema-registry-client-3.3.1.jar

继续往后编译

mvn clean install -Dmaven.test.skip=true -Dhadoop.version=2.7.3  -rf :flink-mapr-fs

flink学习笔记-各种Time

(6)Flink API 通用基本概念

1. 继续侃Flink编程基本套路

1.1 DataSet and DataStream

DataSet and DataStream表示Flink app中的分布式数据集。它们包含重复的、不可变数据集。DataSet有界数据集,用在Flink批处理。DataStream可以是*,用在Flink流处理。它们可以从数据源创建,也可以通过各种转换操作创建。

1.2共同的编程套路

DataSet and DataStream 这里以WordCount为例,共同的编程套路如下所示:

1.获取执行环境(execution environment)

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2.加载/创建初始数据集

// 读取输入数据

DataStream<String> text;

if (params.has("input")) {

// 读取text文件

text = env.readTextFile(params.get("input"));

} else {

System.out.println("Executing WordCount example with default input data set.");

System.out.println("Use --input to specify file input.");

// 读取默认测试数据集

text = env.fromElements(WordCountData.WORDS);

}

3.对数据集进行各种转换操作(生成新的数据集)

// 切分每行单词

text.flatMap(new Tokenizer())

//对每个单词分组统计词频数

.keyBy(0).sum(1);

4.指定将计算的结果放到何处去

// 输出统计结果

if (params.has("output")) {

//写入文件地址

counts.writeAsText(params.get("output"));

} else {

System.out.println("Printing result to stdout. Use --output to specify output path.");

//数据打印控制台

counts.print();

}

5.触发APP执行

// 执行flink 程序

env.execute("Streaming WordCount");

1.3惰性计算

Flink APP都是延迟执行的,只有当execute()被显示调用时才会真正执行,本地执行还是在集群上执行取决于执行环境的类型。好处:用户可以根据业务构建复杂的应用,Flink可以整体进优化并生成执行计划。

2. 指定键(Specifying Keys)

2.1谁需要指定键

哪些操作需要指定key呢?常见的操作如join, coGroup, keyBy, groupBy,Reduce, GroupReduce, Aggregate, Windows等。

Flink编程模型的key是虚拟的,不需要你创建键值对,可以在具体算子通过参数指定,如下代码所示:

DataSet<...> input = // [...]

DataSet<...> reduced = input

.groupBy(/*define key here*/)

.reduceGroup(/*do something*/);

2.2为Tuple定义键

Tuple定义键的方式有很多种,接下来我们一起看几个示例:

按照指定属性分组

DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)

注意:此时表示使用Tuple3三元组的第一个成员作为keyBy

按照组合键进行分组

DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)

注意:此时表示使用Tuple3三元组的前两个元素一起作为keyBy

特殊情况:嵌套Tuple

DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> input = // [...]

KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)

注意:这里使用KeyBy(0)指定键,系统将会使用整个Tuple2作为键(整型和浮点型的)。如果想使用Tuple2内部字段作为键,你可以使用字段来表示键,这种方法会在后面阐述。

2.3使用字段表达式定义键

基于字符串的字段表达式可以用来引用嵌套字段(例如Tuple,POJO)

public class WC {

public String word;

public User user;

public int count;

}

public class User{

public int age;

public String zip;

}

示例:通过word字段进行分组

DataStream<WC> words = // [...]

DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

语法:

1.直接使用字段名选择POJO字段

例如 user 表示 一个POJO的user字段

2.Tuple通过offset来选择

"_1"和"5"分别代表第一和第六个Scala Tuple字段

“f0” and “f5”分别代表第一和第六个Java Tuple字段

3.选择POJO和Tuples的嵌套属性

user.zip

在scala里你可以"_2.user.zip"或"user._4.1.zip”

在java里你可以“2.user.zip”或者" user.f0.1.zip ”

4.使用通配符表达式选择所有属性,java为“*”,scala为 "_"。不是POJO或者Tuple的类型也适用。

2.4字段表达式实例-Java

以下定义两个Java类:

public static class WC {

public ComplexNestedClass complex;

private int count;

public int getCount() {

return count;

}

public void setCount(int c) {

this.count = c;

}

}

public static class ComplexNestedClass {

public Integer someNumber;

public float someFloat;

public Tuple3<Long, Long, String> word;

public IntWritable hadoopCitizen;

}

我们一起看看如下key字段如何理解:

1."count": wc 类的count字段

2."complex":递归的选取ComplexNestedClass的所有字段

3."complex.word.f2": ComplexNestedClass类中的tuple word的第三个字段;

4."complex.hadoopCitizen":选择Hadoop IntWritable类型。

2.5字段表达式实例-Scala

以下定义两个Scala类:

"_1"和"5"分别代表第一和第六个Scala Tuple字段

“f0” and “f5”分别代表第一和第六个Java Tuple字段

3.选择POJO和Tuples的嵌套属性

user.zip

在scala里你可以"_2.user.zip"或"user._4.1.zip”

在java里你可以“2.user.zip”或者" user.f0.1.zip ”

4.使用通配符表达式选择所有属性,java为“*”,scala为 "_"。不是POJO或者Tuple的类型也适用。

2.4字段表达式实例-Java

以下定义两个Java类:

public static class WC {

public ComplexNestedClass complex;

private int count;

public int getCount() {

return count;

}

public void setCount(int c) {

this.count = c;

}

}

public static class ComplexNestedClass {

public Integer someNumber;

public float someFloat;

public Tuple3<Long, Long, String> word;

public IntWritable hadoopCitizen;

}

我们一起看看如下key字段如何理解:

1."count": wc 类的count字段

2."complex":递归的选取ComplexNestedClass的所有字段

3."complex.word.f2": ComplexNestedClass类中的tuple word的第三个字段;

4."complex.hadoopCitizen":选择Hadoop IntWritable类型。

2.5字段表达式实例-Scala

以下定义两个Scala类:

flink学习笔记-各种Time

上一篇:Flink学习笔记:Flink Runtime


下一篇:Flink学习笔记:Connectors概述