文章目录
前言
提示:这里可以添加本文要记录的大概内容,例如:我是一个帅哥,你懂吧?
文献 |
---|
《Flink入门与实战》 - 徐葳 |
/ |
一、概述
1 Flink是什么
Apache Flink,内部是用Java及Scala编写的分布式流数据计算引擎,可以支持以批处理或流处理的方式处理数据,在2014年这个项目被Apache孵化器所接受后,Flink迅速成为ASF(ApacheSoftware Foundation)的*项目之一,在2019年1月,阿里巴巴集团收购了Flink创始公司(DataArtisans),打造了阿里云商业化的实时计算Flink产品。
它有如下几个特点
- 低延迟
- 高吞吐
- 支持有界数据/*数据的处理,数据流式计算
- 支持集群,支持HA,可靠性强
什么是有界数据/*数据?
- 有界数据:数据是有限的,一条SELECT查询下的数据不会是源源不断的
- *数据:数据源源不断,不知道为什么时候结束,例如监控下的告警
2 架构分层
名称 | 描述 |
---|---|
Deploy 部署方式 | 本地/集群/云服务部署。 |
Core 分布式流处理模型 | 计算核心实现,为API层提供基础服务。 |
API 调用接口 | 提供面向*数据 的流处理API及有界数据 的批处理API,其中流处理对应DataStream API ,批处理对应DataSet API 。 |
Library 应用层 | 提供应用计算框架,面向流处理支持CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作),面向批处理支持FlinkML(机器学习库)、Gelly(图处理)、Table 操作。 |
3 基本组件
一个Flink任务 = DataSource + Transformation + DataSink
DataSource :数据源
Transformation :数据处理
DataSink:计算结果输出
而Flink在网络传输中通过缓存块承载数据,可以通过设置缓存块的超时时间
,变相的决定了数据在网络中的处理方式。
4 其他流式计算框架+
1
1
1
1
1
二、入门与使用
1 Flink基本安装
1.1 Linux
下载链接 |
---|
Index of /dist/flink/flink-1.14.3 (apache.org) |
首先去apache官网下载部署的软件包,下载完成之后进行解压
## 解压
tar -zxvf flink-1.14.3-bin-scala_2.12.tgz
## 进入bin目录 启动
./start-cluster.sh
## Flink提供的WebUI的端口是8081 此时可以去看看是否启动完成
netstat -anp |grep 8081
接着通过页面访问8081端口来个初体验
关于Linux下的Flink Shell终端的使用
文章目录 |
---|
flink~使用shell终端_cai_and_luo的博客-CSDN博客 |
1.2 Java
文章目录 |
---|
Flink入门之Flink程序开发步骤(java语言)_胖虎儿的博客-CSDN博客 |
导入依赖
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.14.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.14.3</version>
</dependency>
入门Demo
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class DemoApplication {
public static void main(String[] args) throws Exception {
/**
* 大致的流程就分为
* 1.环境准备
* 设置运行模式
* 2.加载数据源
* 3.数据转换
* 4.数据输出
* 5.执行程序
*/
// 1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置运行模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2.加载数据源
DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++",
"java,scala,php", "java,scala", "java");
// 3.数据转换
DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String element, Collector<String> out) throws Exception {
String[] wordArr = element.split(",");
for (String word : wordArr) {
out.collect(word);
}
}
});
// DataStream 下边为DataStream子类
SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value.toUpperCase();
}
});
// 4.数据输出
source.print();
// 5.执行程序
env.execute();
}
}
关于在设置运行模式的代码上,有三种选择
/**
* Runtime execution mode of DataStream programs. Among other things, this controls task scheduling,
* network shuffle behavior, and time semantics. Some operations will also change their record
* emission behaviour based on the configured execution mode.
*
* @see <a
* href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API">
* https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API</a>
*/
@PublicEvolving
public enum RuntimeExecutionMode {
/**
* The Pipeline will be executed with Streaming Semantics. All tasks will be deployed before
* execution starts, checkpoints will be enabled, and both processing and event time will be
* fully supported.
*/
/** 流处理模式 */
STREAMING,
/**
* The Pipeline will be executed with Batch Semantics. Tasks will be scheduled gradually based
* on the scheduling region they belong, shuffles between regions will be blocking, watermarks
* are assumed to be "perfect" i.e. no late data, and processing time is assumed to not advance
* during execution.
*/
/** 批处理模式 */
BATCH,
/**
* Flink will set the execution mode to {@link RuntimeExecutionMode#BATCH} if all sources are
* bounded, or {@link RuntimeExecutionMode#STREAMING} if there is at least one source which is
* unbounded.
*/
/** 自动模式 */
AUTOMATIC
}
1.3 Scala+
与Java一样都在IDEA编译器上做,此时引入依赖
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.14.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.14.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.14.3</version>
</dependency>
// …
待定 …
// …
2 常用API
第一次学时,光看上面的Demo例子比较难以理解,所以通过书下面的API内容对照上面的Demo来进行理解,先来了解Flink四种层次的API详情
层级 | 描述信息 | 备注 |
---|---|---|
底层 API | 偏底层,易用性比较差,提供时间/状态的细粒度控制 | Stateful Stream Processing |
核心 API | 对有界/*数据提供处理方法 | DataStream(流处理) / DataSet(批处理) |
Table API | / | 声明式DSL |
SQL | / | 高级语言 |
2.1 DataStream 流处理
主要分为三个流程
- DataSource 数据输入:addSource(sourceFunction)为程序添加一个数据源。
- Transformation 数据处理:对一个或多个数据源进行操作。
- Sink 数据输出:通过Transformation 处理后的数据输出到指定的位置。
DataSource
看看他们的API
DataSource API | 描述 |
---|---|
readTextFile(文件路径) | 逐行读取文本文件的数据 |
socketTextStream(地址信息) | 从socket中读取数据 |
fromCollection(集合数据) | 从集合内获取数据 |
其他第三方输入数据…或者自定义数据源 | 通过Flink提供的内置连接器去链接其它数据源 |
如果是自定义数据源,有两种实现方式
- 实现SourceFunction接口(并行度为1 = 无并行度)
- 实现ParallelSourceFunction接口 / 继承RichParallelSourceFunction
什么是并行度?
一个Flink程序由多个任务(Source、Transformation和Sink)组成。一个任务由多个并行实例(线程)来执行,一个任务的并行实例(线程)数目被称为该任务的并行度。
Transformation
接下来是Transformation
数据处理,Flink针对DataStream提供了大量的已经实现的算子。
DataStream API | 描述 |
---|---|
Map | 输入一个元素,然后返回一个元素,中间可以进行清洗转换等操作 |
FlatMap | 输入一个元素,可以返回零个、一个或者多个元素 |
Filter | 过滤函数,对传入的数据进行判断,符合条件的数据会被留下 |
KeyBy | 根据指定的Key进行分组,Key相同的数据会进入同一个分区,典型用法如下:1、DataStream.keyBy(“someKey”) 指定对象中的someKey段作为分组Key。2、DataStream.keyBy(0) 指定Tuple中的第一个元素作为分组Key。 |
Reduce | 对数据进行聚合操作,结合当前元素和上一次Reduce返回的值进行聚合操作,然后返回一个新的值 |
Aggregations | sum()、min()、max()等 |
Union | 合并多个流,新的流会包含所有流中的数据,但是Union有一个限制,就是所有合并的流类型必须是一致的 |
Connect | 和Union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法 |
coMap和coFlatMap | 在ConnectedStream中需要使用这种函数,类似于Map和flatMap |
Split | 根据规则把一个数据流切分为多个流 |
Select | 和Split配合使用,选择切分后的流 |
关于Flink针对DataStream提供的一些数据分区规则
分区规则 | 描述 |
---|---|
DataStream.shuffle() | 随机分区 |
DataStream.rebalance() | 对数据集进行再平衡、重分区和消除数据倾斜 |
DataStream.rescale() | 重新调节 |
DataStream.partitionCustom(partitioner,0) 或者 DataStream.partitionCustom(partitioner,“smeKey”) | 自定义分区 |
Sink
数据处理后的输出
Sink API | 描述 |
---|---|
writeAsText() | 将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取 |
print() / printToErr() | 打印每个元素的toString()方法的值到标准输出或者标准错误输出流中 |
自定义输出 | addSink可以实现把数据输出到第三方存储介质中。系统提供了一批内置的Connector,它们会提供对应的Sink支持 |
示例一:自定义数据源(SourceFunction)
第一步,继承SourceFunction接口,实现自定义数据源类
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
/**
* 自定义数据源
* @author 李家民
*/
public class DemoTransactionSource implements SourceFunction<String> {
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (true) {
// 发射元素
ctx.collect(String.valueOf(new Random().nextInt(50)
));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
}
第二步,在Flink代码中引入这个数据源
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author 李家民
*/
@Component
public class FlinkInitialize {
@PostConstruct
public void starter() throws Exception {
// 1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置运行模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2.设置自定义数据源
DataStreamSource<String> stringDataStreamSource = env.addSource(new DemoTransactionSource(), "测试用的数据源");
// 3.数据处理
SingleOutputStreamOperator<String> stringSingleOutputStreamOperator = stringDataStreamSource.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value;
}
});
// 4.数据输出
stringSingleOutputStreamOperator.print();
// 5.执行程序
env.execute();
}
}
此时执行代码,就可以把引入的数据进行打印
SourceFunction定义了run和cancel两个方法和SourceContext内部接口。
- run(SourceContex):实现数据获取逻辑,并可以通过传入的参数ctx进行向下游节点的数据转发。
- cancel():用来取消数据源,一般在run方法中,会存在一个循环来持续产生数据,cancel方法则可以使该循环终止。
- SourceContext:source函数用于发出元素和可能的watermark的接口,返回source生成的元素的类型。
示例二:自定义分区
数据源沿用上述案例的代码,自定义分区是通过实现Partitioner接口去做处理
首先看看自定义分区的实现类
/**
* 自定义分区
* @author 李家民
*/
public class DemoPartitioner implements Partitioner<String> {
@Override
public int partition(String key, int numPartitions) {
System.out.println("目前分区总数=" + numPartitions + " 当前值=" + key + " 通过最左边的值看分区号");
if (new Integer(key) > 20) {
return 1;
} else {
return 2;
}
}
}
然后在Flink的代码中体现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class FlinkInitialize {
@PostConstruct
public void starter() throws Exception {
// 1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置运行模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2.设置自定义数据源
DataStreamSource<String> stringDataStreamSource = env.addSource(new DemoTransactionSource(), "测试用的数据源");
// 3.数据处理
DataStream<String> dataStream = stringDataStreamSource.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value;
}
}).partitionCustom(new DemoPartitioner(), new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
});
// 4.数据输出
dataStream.print();
// 5.执行程序
env.execute();
}
}
输出后的结果如下
示例三:NettyServer作为数据源
第一步:搭建数据来源,这里选择了Netty服务端作为本次示例
import com.sun.org.slf4j.internal.Logger;
import com.sun.org.slf4j.internal.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author 李家民
*/
@Component
public class NettyServerInitialize {
private static Logger log = LoggerFactory.getLogger(NettyServerInitialize.class);
public static EventLoopGroup bossGroup;
public static EventLoopGroup workerGroup;
public static ServerBootstrap serverBootstrap;
public static ChannelFuture channelFuture;
public static Boolean isRunning = false;
static {
// Server初始化
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup(2);
serverBootstrap =
new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("客户端进入:" + ch.remoteAddress().getAddress());
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new NettyServerHandler());
pipeline.addLast(new StringEncoder());
}
});
}
@PostConstruct
public void starter() throws InterruptedException {
try {
if (!isRunning) {
channelFuture = serverBootstrap.bind(16668)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("监听端口 16668 成功");
isRunning = true;
} else {
log.error("监听端口 16668 失败");
}
}
}).channel().closeFuture().sync();
}
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
第二步:XXXX
示例四:RabbitMQ作为数据源
1
示例五:待定
1
2.2 DataSet 批处理
1
2.3 Table API/SQL
1
1
2.4 关于序列化
1
111111
3 集群模式
1
1
1
三、高阶功能使用
1
四、原理解析
1
总结
提示:这里对文章进行总结:
例如:以上就是今天要讲的内容,本文仅仅简单介绍了pandas的使用,而pandas提供了大量能使我们快速便捷地处理数据的函数和方法。