二、基本操作
1、入门案例
(1)批处理wordcount--DataSet
val env = ExecutionEnvironment.getExecutionEnvironment // 从文件中读取数据 val inputPath = "D:\\Projects\\BigData\\TestWC1\\src\\main\\resources\\hello.txt" val inputDS: DataSet[String] = env.readTextFile(inputPath) // 分词之后,对单词进行groupby分组,然后用sum进行聚合 val wordCountDS: AggregateDataSet[(String, Int)] = inputDS.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1) // 打印输出 wordCountDS.print() |
(2)流处理wordcount--DataStream
object StreamWordCount { def main(args: Array[String]): Unit = { // 从外部命令中获取参数 val params: ParameterTool = ParameterTool.fromArgs(args) val host: String = params.get("host") val port: Int = params.getInt("port") // 创建流处理环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 接收socket文本流 val textDstream: DataStream[String] = env.socketTextStream(host, port) // flatMap和Map需要引用的隐式转换 import org.apache.flink.api.scala._ val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s")).filter(_.nonEmpty).map((_, 1)).keyBy(0).sum(1) dataStream.print().setParallelism(1) // 启动executor,执行任务 env.execute("Socket stream word count") } } |
2、Environment创建
getExecutionEnvironment,客户端,当前执行程序的上下文
createLocalEnvironment:返回本地执行环境
createRemoteEnvironment:集群执行环境,需要指定ip、端口及jar包
3、Source读取
(1)从集合读取数据:env.fromCollection(List(SensorReading("sensor_1 ",15477181 99,35.8),SensorReading("sensor_6",1547718201,15.4))
(2)从文件读取数据:valstream2=env.readTextFile("YOUR_FILE_PATH")
(3)以kafka消息队列的数据作为来源:valstream3=env.addSource(newFlinkKafkaConsumer 011[String]("sensor",newSimpleStringSchema(),properties))
(4)自定义Source:valstream4=env.addSource(newMySensorSource()
4、Transform算子
Map:DataStream → DataStream,输入一个参数产生一个参数,map的功能是对输入的参数进行转换操作。
flatMap:flatMap(List(1,2,3))(i⇒List(i,i))变成112233自动加逗号
Filter:过滤掉指定条件的数据。
KeyBy:按照指定的key进行分组,流拆分成不相交的分区。
Reduce:合并当前的元素和上次聚合的结果,用来进行结果汇总合并。
Window:窗口函数,根据某些特性将每个key的数据进行分组(例如:在5s内到达的数据)
滚动聚合算子(RollingAggregation):sum()、min()、max()、minBy()、maxBy()针对每个支流聚合Split和Select:拆分和获取指定的流
Connect(放在同一个流中)和CoMap(组合成一个流)
Union:产生一个包含所有DataStream元素的新DataStream
Connect只能操作两个流,Union可以操作多个。
5、常见的数据类型
env.fromElements(XXXX)
(1)基础数据类型
(2)Java和Scala元组(Tuples)
(3)Scala样例类(caseclasses)
(4)Java简单对象(POJOs)
(5)其它(Arrays,Lists,Maps,Enums,等等)
6、UDF-更细粒度的控制流
函数类(Function Classes):实现MapFunction,FilterFunction,ProcessFunction接口
匿名函数(Lambda Functions)
富函数(Rich Functions):函数类的接口,所有Flink函数类都有其Rich版本,自带一系列生命周期方法(开关、得到上下文),可以实现复杂功能
7、sink操作
(1)使用
没有spark中的forEach方法,需要通过stream.addSink(newMySink(xxxx))完成任务最终输出
(2)举例
kafka:union.addSink(new FlinkKafkaProducer011[String]("localhost:9092", "test", new Simple StringSchema()))
redis:dataStream.addSink(newRedisSink[SensorReading](conf,newMyRedisMapper))
Elasticsearch:dataStream.addSink( esSinkBuilder.build() )
自定义sink:dataStream.addSink(newMyJdbcSink())