和其他所有的计算框架一样,flink也有一些基础的开发步骤以及基础,核心的API,从开发步骤的角度来讲,主要分为四大部分
1.Environment
Flink Job在提交执行计算时,需要首先建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。而这个环境对象的获取方式相对比较简单
// 批处理环境 val env = ExecutionEnvironment.getExecutionEnvironment // 流式数据处理环境 val env = StreamExecutionEnvironment.getExecutionEnvironment
2.Source
Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源.
2.1.从集合读取数据
一般情况下,可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用。这里的数据结构采用集合类型是比较普遍的
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:从集合读取数据 */ object SourceList { def main(args: Array[String]): Unit = { //1.创建执行的环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.从集合中读取数据 val sensorDS: DataStream[WaterSensor] = env.fromCollection( // List(1,2,3,4,5) List( WaterSensor("ws_001", 1577844001, 45.0), WaterSensor("ws_002", 1577844015, 43.0), WaterSensor("ws_003", 1577844020, 42.0) ) ) //3.打印 sensorDS.print() //4.执行 env.execute("sensor") } /** * 定义样例类:水位传感器:用于接收空高数据 * * @param id 传感器编号 * @param ts 时间戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
2.2从文件中读取数据
通常情况下,我们会从存储介质中获取数据,比较常见的就是将日志文件作为数据源
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:从文件读取数据 */ object SourceFile { def main(args: Array[String]): Unit = { //1.创建执行的环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.从指定路径获取数据 val fileDS: DataStream[String] = env.readTextFile("input/data.log") //3.打印 fileDS.print() //4.执行 env.execute("sensor") } } /** * 在读取文件时,文件路径可以是目录也可以是单一文件。如果采用相对文件路径,会从当前系统参数user.dir中获取路径 * System.getProperty("user.dir") */ /** * 如果在IDEA中执行代码,那么系统参数user.dir自动指向项目根目录, * 如果是standalone集群环境, 默认为集群节点根目录,当然除了相对路径以外, * 也可以将路径设置为分布式文件系统路径,如HDFS val fileDS: DataStream[String] = env.readTextFile( "hdfs://hadoop02:9000/test/1.txt") */
如果是standalone集群环境, 默认为集群节点根目录,当然除了相对路径以外,也可以将路径设置为分布式文件系统路径,如HDFS
val fileDS: DataStream[String] = env.readTextFile( "hdfs://hadoop02:9000/test/1.txt")
默认读取时,flink的依赖关系中是不包含Hadoop依赖关系的,所以执行上面代码时,会出现错误。
解决方法就是增加相关依赖jar包就可以了
2.3 kafka读取数据
Kafka作为消息传输队列,是一个分布式的,高吞吐量,易于扩展地基于主题发布/订阅的消息系统。在现今企业级开发中,Kafka 和 Flink成为构建一个实时的数据处理系统的首选
2.3.1 引入kafka连接器的依赖
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.10.0</version> </dependency>
2.3.2 代码实现参考
import java.util.Properties import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import org.apache.flink.streaming.util.serialization.SimpleStringSchema /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:从kafka读取数据 */ object SourceKafka { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "hadoop02:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") val kafkaDS: DataStream[String] = env.addSource( new FlinkKafkaConsumer011[String]( "sensor", new SimpleStringSchema(), properties) ) kafkaDS.print() env.execute("sensor") } }
2.4 自定义数据源
大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式
2.4.1 创建自定义数据源
import com.atyang.day01.Source.SourceList.WaterSensor import org.apache.flink.streaming.api.functions.source.SourceFunction import scala.util.Random /** * description: ss * date: 2020/8/28 20:36 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:自定义数据源 */ class MySensorSource extends SourceFunction[WaterSensor] { var flg = true override def run(ctx: SourceFunction.SourceContext[WaterSensor]): Unit = { while ( flg ) { // 采集数据 ctx.collect( WaterSensor( "sensor_" +new Random().nextInt(3), 1577844001, new Random().nextInt(5)+40 ) ) Thread.sleep(100) } } override def cancel(): Unit = { flg = false; } }
3.Transform
在Spark中,算子分为转换算子和行动算子,转换算子的作用可以通过算子方法的调用将一个RDD转换另外一个RDD,Flink中也存在同样的操作,可以将一个数据流转换为其他的数据流。
转换过程中,数据流的类型也会发生变化,那么到底Flink支持什么样的数据类型呢,其实我们常用的数据类型,Flink都是支持的。比如:Long, String, Integer, Int, 元组,样例类,List, Map等。
3.1 map
- 映射:将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素
- 参数:Scala匿名函数或MapFunction
- 返回:DataStream
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:从集合读取数据 */ object Transfrom_map { def main(args: Array[String]): Unit = { //1.创建执行的环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.从集合中读取数据 val sensorDS: DataStream[WaterSensor] = env.fromCollection( // List(1,2,3,4,5) List( WaterSensor("ws_001", 1577844001, 45.0), WaterSensor("ws_002", 1577844015, 43.0), WaterSensor("ws_003", 1577844020, 42.0) ) ) val sensorDSMap = sensorDS.map(x => (x.id+"_1",x.ts+"_1",x.vc + 1)) //3.打印 sensorDSMap.print() //4.执行 env.execute("sensor") } /** * 定义样例类:水位传感器:用于接收空高数据 * * @param id 传感器编号 * @param ts 时间戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
3.1.1 MapFunction
Flink为每一个算子的参数都至少提供了Scala匿名函数和函数类两种的方式,其中如果使用函数类作为参数的话,需要让自定义函数继承指定的父类或实现特定的接口。例如:MapFunction
sensor-data.log 文件数据
sensor_1,1549044122,10 sensor_1,1549044123,20 sensor_1,1549044124,30 sensor_2,1549044125,40 sensor_1,1549044126,50 sensor_2,1549044127,60 sensor_1,1549044128,70 sensor_3,1549044129,80 sensor_3,1549044130,90 sensor_3,1549044130,100
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:从文件读取数据 */ object SourceFileMap { def main(args: Array[String]): Unit = { //1.创建执行的环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.从指定路径获取数据 val fileDS: DataStream[String] = env.readTextFile("input/sensor-data.log") val MapDS = fileDS.map( lines => { //更加逗号切割 获取每个元素 val datas: Array[String] = lines.split(",") WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) } ) //3.打印 MapDS.print() //4.执行 env.execute("map") } /** * 定义样例类:水位传感器:用于接收空高数据 * * @param id 传感器编号 * @param ts 时间戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:从文件读取数据 */ object Transform_MapFunction { def main(args: Array[String]): Unit = { //1.创建执行的环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.从指定路径获取数据 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") sensorDS.map() //3.打印 // MapDS.print() //4.执行 env.execute("map") } /** * 自定义继承 MapFunction * MapFunction[T,O] * 自定义输入和输出 * */ class MyMapFunction extends MapFunction[String,WaterSensor]{ override def map(t: String): WaterSensor = { val datas: Array[String] = t.split(",") WaterSensor(datas(0),datas(1).toLong,datas(2).toInt) } } /** * 定义样例类:水位传感器:用于接收空高数据 * * @param id 传感器编号 * @param ts 时间戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }