Flink编程基本步骤:
1.创建流执行环境 StreamExecutionEnviroment.getExecutionEnviroment() 获取流环境。
2.加载数据源 Source
3.转换操作 Transformation
4.输出出去Sink,落地到其它的数据仓库,直接打印输出.
关于Flink 数据的基本操作 —— 四种分类
-
单条数据的操作 map filter
-
多条数据的操作 window
-
多个流合并成一个流操作 connect union join
-
将一个流拆分成多个流操作 ,(split 过期),测输出流(OutputTag)output
Flink输入数据源 source
自带预定义Source
-
基于本地集合Source
-
应用场景,当程序写完之后,测试当前功能是否可用,开发测试用。
-
分类
-
从元素 fromElements
-
从集合 fromCollection
-
基于Sequence的generateSequence
-
基于开始和结束的DataStream ,fromSequence
-
-
-
并行度设置
-
并行度设置方式
1.设置配置文件 flink-conf.yaml parallelism.default: 1 2.在client端提交任务设置并行度 flink run -p 1 3.在程序中设置全局并行度 env.setParallelism(1) 4.算子级别的并行度设置 算子.setParallelism(1) 优先级: 算子优先级 > 程序中全局并行度 > client提交作业并行度 > 配置文件中的并行度
-
全局并行度设置
env.setParallelism(1);
-
算子并行度设置
source.print().setParallelism(2);
-
-
基于文件的Source
-
批的方式读取文件,只读取一次——readTextFile
-
流的方式读取文件,实时根据指定周期去监控文件—— readFile
监控方式 watchType 两种
-
FileProcessingMode.PROCESS_CONTINUOUSLY 主要用于修改删除操作比较多场景,根据周期持续读取整个文件(破坏仅一次语义特性)
-
FileProcessingMode.Process_Once 主要用于仅读取一次,读完就 exit
-
-
-
基于Socket的Source
env.socketTextStream("node1", 9999);
自定义Source
-
主要用于指定特定格式的数据源,生成有界或*数据流
-
有界数据流 - 读取 mysql ,读取 文件
-
*数据流 - for 循环生成数据
-
-
需求 - 通过用户自定义的方式生成数据源。
-
需求案例
-
自定义实现SourceFunction接口案例 只需要重写 run方法和 cancel方法
-
package cn.itcast.flink.api;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.text.SimpleDateFormat;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* Author itcast
* Date 2021/11/29 14:56
* Desc - 每1秒钟随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
* public class Order
* String oid;
* int uid;
* double money;
* long timestamp;
* String datetime;
* 每一秒钟生成一条数据
* 打印输出每条数据
* 执行流环境
*/
public class OrderSource {
public static void main(String[] args) throws Exception {
//1.创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.设置并行度
env.setParallelism(1);
//3.获取自定义数据源
//实现方式
DataStreamSource<Order> source = env.addSource(new OrderEmitSource());
//4.打印输出
source.printToErr();
//5.执行流环境
env.execute();
}
public static class OrderEmitSource implements SourceFunction<Order> {
//定义一个标记,用于标识当前持续生成数据
private volatile boolean isRunning = true;
/**
* 实现数据的生成,并将生成的数据输出 ctx 输出
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Order> ctx) throws Exception {
//定义随机数
Random rm = new Random();
//时间转换格式工具
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//死循环,一直生成数据
while (isRunning) {
//随机数
String oid = UUID.randomUUID().toString();
//用户id ,随机 0~5 之间值
int uid = rm.nextInt(6);
//money 0~100之间的
double money = rm.nextDouble()*100;
//时间戳
long timestamp = System.currentTimeMillis();
//当前时间
String datetime = sdf.format(timestamp);
Order order = new Order(
oid,
uid,
money,
timestamp,
datetime
);
//收集数据
ctx.collect(order);
//程序休眠一秒接着执行
TimeUnit.SECONDS.sleep(1);
}
}
/**
* 用户取消生成的时候,取消生成
*/
@Override
public void cancel() {
isRunning = false;
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String oid;
private int uid;
private double money;
private long timestamp;
private String datetime;
}
}
实现ParallelSourceFunction 接口案例
并行化生成数据,算子上设置并行度 setParallelism(n)
package cn.itcast.flink.api;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.text.SimpleDateFormat;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* Author itcast
* Date 2021/11/29 14:56
* Desc - 每1秒钟随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
* public class Order
* String oid;
* int uid;
* double money;
* long timestamp;
* String datetime;
* 每一秒钟生成一条数据
* 打印输出每条数据
* 执行流环境
*/
public class OrderParallelismSource {
public static void main(String[] args) throws Exception {
//1.创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.设置并行度
env.setParallelism(1);
//3.获取自定义数据源
//实现方式
DataStreamSource<Order> source = env.addSource(new OrderEmitSource()).setParallelism(6);
//4.打印输出
source.printToErr();
//5.执行流环境
env.execute();
}
public static class OrderEmitSource implements ParallelSourceFunction<Order> {
//定义一个标记,用于标识当前持续生成数据
private volatile boolean isRunning = true;
/**
* 实现数据的生成,并将生成的数据输出 ctx 输出
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Order> ctx) throws Exception {
//定义随机数
Random rm = new Random();
//时间转换格式工具
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//死循环,一直生成数据
while (isRunning) {
//随机数
String oid = UUID.randomUUID().toString();
//用户id ,随机 0~5 之间值
int uid = rm.nextInt(6);
//money 0~100之间的
double money = rm.nextDouble()*100;
//时间戳
long timestamp = System.currentTimeMillis();
//当前时间
String datetime = sdf.format(timestamp);
Order order = new Order(
oid,
uid,
money,
timestamp,
datetime
);
//收集数据
ctx.collect(order);
//程序休眠一秒接着执行
TimeUnit.SECONDS.sleep(5);
}
}
/**
* 用户取消生成的时候,取消生成
*/
@Override
public void cancel() {
isRunning = false;
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String oid;
private int uid;
private double money;
private long timestamp;
private String datetime;
}
}
实现RichParallelSourceFunction案例
-
Rich 是富函数继承了 AbstractRichFunciton,实现了
-
生命周期的 open 和 close 方法
-
open 方法,用于实现当前生成的初始化条件
-
close 方法,用于生成数据结束的收尾工作
-
getRuntimeContext 方法,用于获取当前的程序的上下文对象(参数、环境变量、状态、累加器等)
-
-
案例 - 从数据库中读取数据
-
1:初始化工作 —— 创建数据库和数据表
# 创建数据库
create database test;
# 使用数据库
use test;
# 创建表和导入数据
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` int(11) NOT NULL,
`username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (10, 'dazhuang', '123456', '大壮');
INSERT INTO `user` VALUES (11, 'erya', '123456', '二丫');
INSERT INTO `user` VALUES (12, 'sanpang', '123456', '三胖');
SET FOREIGN_KEY_CHECKS = 1;
2:Flink读取MySQL的数据源
package cn.itcast.flink.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
/**
* Author itcast
* Date 2022/1/11 16:17
* Desc 读取mysql数据表并打印输出
* 开发步骤:
* 1.创建和准备数据库和数据表 flink
* 2.获取流执行环境
* 3.设置并行度
* 4.添加自定义数据源,从mysql中读取数据,实现 RichSourceFunction ,rich 增强富功能 open close getRuntimeContext
* 4.1. open 初始化动作,创建连接,创建 statement ,获取变量
* 4.2. run方法 读取数据表中数据并封装成对象
* 4.3. close方法 关闭statement和连接
* 5. 打印结果输出
* 6. 执行流环境
*/
public class UserSource {
public static void main(String[] args) throws Exception {
//2.获取流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//3.设置并行度
env.setParallelism(1);
//4.添加自定义数据源,从mysql中读取数据,实现 RichSourceFunction ,rich 增强富功能 open close getRuntimeContext
DataStreamSource<User> source = env.addSource(new RichSourceFunction<User>() {
Connection conn = null;
Statement statement = null;
//标记
boolean isRunning = true;
/**
* 在所有执行source,首先要做的初始化工作
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
//1.设置 driver 驱动
Class.forName("com.mysql.jdbc.Driver");
//2.获取连接 设置 url 用户名 密码
conn = DriverManager.getConnection(
"jdbc:mysql://node1:3306/flink?useSSL=false",
"root",
"123456"
);
//3.创建 statement 基于 sql
statement = conn.createStatement();
}
/**
* 所有的元素都在这里执行
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<User> ctx) throws Exception {
String sql = "select id,username,password,name from user";
while (isRunning) {
//1.读取数据 statement.executeQuery 得到 ResultSet 结果集
ResultSet rs = statement.executeQuery(sql);
//2.遍历 ResultSet 是否有数据 hasNext() = true
while (rs.next()) {
User user = new User();
//3.将每条数据 赋值 对象 User
int id = rs.getInt("id");
String username = rs.getString("username");
String password = rs.getString("password");
String name = rs.getString("name");
user.setId(id);
user.setUsername(username);
user.setPassword(password);
user.setName(name);
//4.将 User 收集 ctx.collect(user)
ctx.collect(user);
}
TimeUnit.MINUTES.sleep(5);
}
}
@Override
public void cancel() {
//将flag置为 false
isRunning = false;
}
/**
* 所有的元素执行完毕的收尾工作
* @throws Exception
*/
@Override
public void close() throws Exception {
//关闭 statement
if (!statement.isClosed()) {
statement.close();
}
//关闭 connection
if (!conn.isClosed()) {
conn.close();
}
}
});
//4.1. open 初始化动作,创建连接,创建 statement ,获取变量
//4.2. run方法 读取数据表中数据并封装成对象
//4.3. close方法 关闭statement和连接
//5. 打印结果输出
source.printToErr();
//6. 执行流环境
env.execute();
}
public static class User {
// id
private int id;
// username
private String username;
// password
private String password;
// name
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", username='" + username + '\'' +
", password='" + password + '\'' +
", name='" + name + '\'' +
'}';
}
}
}