Flink DataStream 模板
package org.example.scala
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
/**
* date: 2021-03-23 14:43
* author: jichun.yang
* remark: 测试flink读取文件内容,并将文件中的内容进行处理后输出
* Date : 2022/2/7 11:10
* Author: jichun.yang
*/
object Flink_2021_0323_1443 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) // 设置并行度
doPlan02(env)
env.execute("Flink_2021_0323_1443")
}
/**
* Author: jichun.yang
* Date : 2022/2/7 11:28
* Remark: 读取文件
*
* @param env
*/
def doPlan01(env: StreamExecutionEnvironment): Unit = {
val file = "G:\\workspace01\\flink\\src\\main\\resources\\test.txt"
val dataStream: DataStream[String] = env.readTextFile(file)
val splitStream: DataStream[(String, Int)] = dataStream
.flatMap(_.toUpperCase.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
splitStream.print()
}
/**
* Author: jichun.yang
* Date : 2022/2/7 15:46
* Remark:
*
* @param env
*/
def doPlan02(env: StreamExecutionEnvironment): Unit = {
val socketStream: DataStream[String] = env.socketTextStream("192.168.195.178", 9999)
val filterStream:DataStream[String]=socketStream.filter(_.contains('a')) // 过滤条件
filterStream.print()
}
}
声明:本文档仅是自己学习总结,其中有些知识点可能存在错误,若是学友偶然搜到参考,望斟酌后再使用,以免给您带来困扰,若是发现错误也希望您指出更正,在此提前感谢!! 总结过程中要是有些地方借鉴了各路大神成果,您觉得侵犯了您的知识产权,对您有所冒犯,烦请通知鄙人,鄙人将会尽快修正! 邮箱地址:390835164@qq.com