FlinK 1.12批流一体
flink 1.13.0 流处理应用更加简单高效
flink 的引入
- 第1代——Hadoop MapReduce
首先第一代的计算引擎,无疑就是 Hadoop 承载的 MapReduce。它将计算分为两个阶段,分别为 Map 和 Reduce。对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。
n 批处理
n Mapper、Reducer
- 第2代——DAG框架(Tez) + MapReduce
由于这样的弊端,催生了支持 DAG 框架的产生。因此,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。这里我们不去细究各种 DAG 实现之间的区别,不过对于当时的 Tez 和 Oozie 来说,大多还是批处理的任务。
n 批处理
n 1个Tez = MR(1) + MR(2) + ... + MR(n)
n 相比MR效率有所提升
- 第3代——Spark
接下来就是以 Spark 为代表的第三代的计算引擎。第三代计算引擎的特点主要是 Job 内部的 DAG 支持(不跨越 Job),以及强调的实时计算。在这里,很多人也会认为第三代计算引擎也能够很好的运行批处理的 Job。
n 批处理、流处理、SQL高层API支持
n 自带DAG
内存迭代计算、性能较之前大幅提升
- 第4代——Flink
随着第三代计算引擎的出现,促进了上层应用快速发展,例如各种迭代计算的性能以及对流计算和 SQ- 等的支持。Flink 的诞生就被归在了第四代。这应该主要表现在 Flink 对流计算的支持,以及更一步的实时性上面。当然 Flink 也可以支持 Batch 的任务,以及 DAG 的运算。
n 批处理、流处理、SQL高层API支持
n 自带DAG
n 流式计算性能更高、可靠性更高
应用场景
-
事件驱动
-
数据分析
-
数据管道
架构
- JobManager处理器:
也称之为Master,用于协调分布式执行,它们用来调度task,协调检查点,协调失败时恢复等。Flink运行时至少存在一个master处理器,如果配置高可用模式则会存在多个master处理器,它们其中有一个是leader,而其他的都是standby。
- TaskManager处理器:
也称之为Worker,用于执行一个dataflow的task(或者特殊的subtask)、数据缓冲和data stream的交换,Flink运行时至少会存在一个worker处理器。
- Slot 任务执行槽位:
物理概念,一个TM(TaskManager)内会划分出多个Slot,1个Slot内最多可以运行1个Task(Subtask)或一组由Task(Subtask)组成的任务链。
多个Slot之间会共享平分当前TM的内存空间
Slot是对一个TM的资源进行固定分配的工具,每个Slot在TM启动后,可以获得固定的资源
比如1个TM是一个JVM进程,如果有6个Slot,那么这6个Slot平分这一个JVM进程的资源
但是因为在同一个进程内,所以线程之间共享TCP连接、内存数据等,效率更高(Slot之间交流方便)
- Task:
任务,每一个Flink的Job会根据情况(并行度、算子类型)将一个整体的Job划分为多个Task
- Subtask:
子任务,一个Task可以由多个Subtask组成,一个Task有多少个Subtask取决于这个Task的并行度
也就是,每一个Subtask就是当前Task任务并行的一个线程
如,当前Task并行度为8,那么这个Task会有8个Subtask(8个线程并行执行这个Task)
- 并行度:
并行度就是一个Task可以分成多少个Subtask并行执行的一个参数
这个参数是动态的,可以在任务执行前进行分配,而非Slot分配,TM启动就固定了
一个Task可以获得的最大并行度取决于整个Flink环境的可用Slot数量,也就是如果有8个Slot,那么最大并行度也就是8,设置的再大也没有意义
如下图:
- 一个Job分为了3个Task来运行,分别是TaskA TaskB TaskC
- 其中TaskA设置为了6个并行度,也就是TaskA可以有6个Subtask,如图可见,TaskA的6个Subtask各自在一个Slot内执行
- 其中在Slot的时候说过,Slot可以运行由Task(或Subtask)组成的任务链,如图可见,最左边的Slot运行了TaskA TaskB TaskC 3个Task各自的1个Subtask组成的一个Subtask执行链
Slot是物理的概念,是静态的概念,一旦flink启动以后,tm就制定了slot数量,不能改变
parallelism是动态的概念,可以设置并行度的优先级,可以设置算子级别的或者应用程序全局的并行度、递交作业时设置并行度、使用flink部署环境默认配置文件中指定的并行度
优先级从前往后,越来越低
并行度的设置
并行度是一个动态的概念,可以在多个地方设置并行度:
- 配置文件默认并行度:conf/flink-conf.yaml的parallelism.default
- 启动Flink任务,动态提交参数:比如:bin/flink run -p 3
- 在代码中设置全局并行度:env.setParallelism(3);
- 针对每个算子进行单独设置:sum(1).setParallelism(3)
优先级:算子 > 代码全局 > 命令行参数 > 配置文件
fink 编程模型
-
最顶层:SQL/Table API 提供了操作关系表、执行SQL语句分析的API库,供我们方便的开发SQL相关程序
-
中层:流和批处理API层,提供了一系列流和批处理的API和算子供我们对数据进行处理和分析
-
最底层:运行时层,提供了对Flink底层关键技术的操纵,如对Event、state、time、window等进行精细化控制的操作API
Libraries支持
-
支持机器学习(FlinkML)
-
支持图分析(Gelly)
-
支持关系数据处理(Table)
-
支持复杂事件处理(CEP) 风控领域用得特别多
构建工程
-
maven
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.itcast</groupId> <artifactId>itcast_flinkbase51</artifactId> <version>1.0-SNAPSHOT</version> <!-- 指定仓库位置,依次为aliyun、apache和cloudera仓库 --> <repositories> <!-- <repository>--> <!-- <id>aliyun</id>--> <!-- <url>http://maven.aliyun.com/nexus/content/groups/public/</url>--> <!-- </repository>--> <repository> <id>apache</id> <url>https://repository.apache.org/content/repositories/snapshots/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <properties> <encoding>UTF-8</encoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <java.version>1.8</java.version> <scala.version>2.11</scala.version> <flink.version>1.13.1</flink.version> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <!--依赖Scala语言--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.12.11</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- web ui的依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- Apache Flink 的依赖 --> <!-- 这些依赖项,不应该打包到JAR文件中. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- 用于通过自定义功能,格式等扩展表生态系统的通用模块--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- blink执行计划,1.11+默认的--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <!--<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_2.12</artifactId> <version>${flink.version}</version> </dependency>--> <!-- flink连接器--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <!-- <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.12</artifactId> <version>${flink.version}</version> </dependency>--> <!--<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.12</artifactId> <version>${flink.version}</version> </dependency>--> <!--<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet_2.12</artifactId> <version>${flink.version}</version> </dependency>--> <!--<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.9.2</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.10.0</version> </dependency>--> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> <exclusions> <exclusion> <artifactId>flink-streaming-java_2.11</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <artifactId>flink-runtime_2.11</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <artifactId>flink-core</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <artifactId>flink-java</artifactId> <groupId>org.apache.flink</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-metastore</artifactId> <version>2.1.0</version> <exclusions> <exclusion> <artifactId>hadoop-hdfs</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> <version>2.7.5-10.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> <!--<version>8.0.20</version>--> </dependency> <!-- 高性能异步组件:Vertx--> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-core</artifactId> <version>3.9.0</version> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-jdbc-client</artifactId> <version>3.9.0</version> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-redis-client</artifactId> <version>3.9.0</version> </dependency> <!-- 日志 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.44</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.2</version> <scope>provided</scope> </dependency> <!-- 参考:https://blog.****.net/f641385712/article/details/84109098--> <!--<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> <version>4.4</version> </dependency>--> <!--<dependency> <groupId>org.apache.thrift</groupId> <artifactId>libfb303</artifactId> <version>0.9.3</version> <type>pom</type> <scope>provided</scope> </dependency>--> <!--<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>28.2-jre</version> </dependency>--> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> <plugins> <!-- 编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> <!--<encoding>${project.build.sourceEncoding}</encoding>--> </configuration> </plugin> <!-- 打包插件(会包含所有依赖) --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <!-- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <!-- 设置jar包的入口类(可选) --> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
单词统计
单词计数-(1.12之前的写法)流处理
package cn.itcast.day01;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* todo flink1.12之前的写法
* 编写Flink程序,接收socket的单词数据,并以空格进行单词拆分打印。
*/
public class StreamWordCount {
public static void main(String[] args) throws Exception {
/**
* 实现步骤:
* 1)获取flink流处理的运行环境
* 2)接入数据源,读取文件获取数据
* 3)数据处理
* 3.1:使用flatMap对单词进行拆分
* 3.2:对拆分后的单词进行记一次数
* 3.3:使用分组算子对key进行分组
* 3.4:对分组后的key进行聚合操作
* 4)构建sink,输出结果
*/
//todo 1)获取flink流处理的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//todo 2)接入数据源,读取文件获取数据
DataStreamSource<String> lines = env.socketTextStream("node01", 9999);
//todo 3)数据处理
// 3.1:使用flatMap对单词进行拆分
SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split(" ");
//返回数据
for (String word : words) {
out.collect(word);
}
}
});
// 3.2:对拆分后的单词进行记一次数
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
// 3.3:使用分组算子对key进行分组
//wordAndOne.keyBy(0);
// KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
// @Override
// public String getKey(Tuple2<String, Integer> value) throws Exception {
// return value.f0;
// }
// });
KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);
// 3.4:对分组后的key进行聚合操作
SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = grouped.sum(1);
//todo 4)构建sink,输出结果
sumed.print();
//todo 5)启动运行
env.execute();
}
}
单词计数-(1.12之前的写法) 批处理
package cn.itcast.day01;
import groovy.lang.Tuple;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* todo flink1.12之前的写法
* 编写Flink程序,读取文件中的字符串,并以空格进行单词拆分打印。
*/
public class BatchWordCount {
public static void main(String[] args) throws Exception {
/**
* 实现步骤:
* 1)获取flink批处理的运行环境
* 2)接入数据源,读取文件获取数据
* 3)数据处理
* 3.1:使用flatMap对单词进行拆分
* 3.2:对拆分后的单词进行记一次数
* 3.3:使用分组算子对key进行分组
* 3.4:对分组后的key进行聚合操作
* 4)构建sink,输出结果
*/
//todo 1)获取flink批处理的运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//todo 2)接入数据源,读取文件获取数据
DataSource<String> lines = env.readTextFile("./data/input/wordcount.txt");
//todo 3)数据处理
// 3.1:使用flatMap对单词进行拆分
FlatMapOperator<String, String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split(" ");
//返回数据
for (String word : words) {
out.collect(word);
}
}
});
// 3.2:对拆分后的单词进行记一次数
MapOperator<String, Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
// 3.3:使用分组算子对key进行分组
UnsortedGrouping<Tuple2<String, Integer>> grouped = wordAndOne.groupBy(0);
// 3.4:对分组后的key进行聚合操作
AggregateOperator<Tuple2<String, Integer>> sumed = grouped.sum(1);
//todo 4)构建sink,输出结果
sumed.print();
}
}
单词统计(1.12之后)批流一体
package cn.itcast.day01;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* todo flink1.12以后的实现方案
* todo 使用批流一体API编程模型实现单词计数
*
* 在flink中批是流的一个特例,也就意味着不管实现批还是流处理,肯定按照流的api实现批处理
* DataStream
* StreamExecutionEnvironment
*/
public class UnifyWordCount {
public static void main(String[] args) throws Exception {
/**
* 实现步骤:
* 1)获取flink流处理的运行环境
* 2)接入数据源,读取文件获取数据
* 3)数据处理
* 3.1:使用flatMap对单词进行拆分
* 3.2:对拆分后的单词进行记一次数
* 3.3:使用分组算子对key进行分组
* 3.4:对分组后的key进行聚合操作
* 4)构建sink,输出结果
*/
//todo 1)获取flink流处理的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH); //使用dataStream实现批处理
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING); //使用dataStream实现流处理(如果数据源是一个有界数据流则依然是一个批处理)
//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//使用DataStream的时候根据数据源自动选择是批还是流
//todo 2)接入数据源,读取文件获取数据
//DataStreamSource<String> lines = env.readTextFile("./data/input/wordcount.txt");
DataStreamSource<String> lines = env.socketTextStream("node01", 9999);
//todo 3)数据处理
// 3.1:使用flatMap对单词进行拆分
SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split(" ");
//返回数据
for (String word : words) {
out.collect(word);
}
}
});
// 3.2:对拆分后的单词进行记一次数
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
// 3.3:使用分组算子对key进行分组
//wordAndOne.keyBy(0);
// KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
// @Override
// public String getKey(Tuple2<String, Integer> value) throws Exception {
// return value.f0;
// }
// });
KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);
// 3.4:对分组后的key进行聚合操作
SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = grouped.sum(1);
//todo 4)构建sink,输出结果
sumed.print();
//todo 5)启动运行
env.execute();
}
}
提交部署
1上传作业jar包
2指定递交参数
3查看任务运行概述
4查看任务运行结果