flink -

Flink DataStream 模板

package org.example.scala

import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}

/**
 * date: 2021-03-23 14:43
 * author: jichun.yang
 * remark: 测试flink读取文件内容,并将文件中的内容进行处理后输出
 * Date  : 2022/2/7 11:10
 * Author: jichun.yang
 */
object Flink_2021_0323_1443 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1) // 设置并行度

    doPlan02(env)
    env.execute("Flink_2021_0323_1443")
  }

  /**
   * Author: jichun.yang
   * Date  : 2022/2/7 11:28
   * Remark: 读取文件
   *
   * @param env
   */
  def doPlan01(env: StreamExecutionEnvironment): Unit = {
    val file = "G:\\workspace01\\flink\\src\\main\\resources\\test.txt"
    val dataStream: DataStream[String] = env.readTextFile(file)

    val splitStream: DataStream[(String, Int)] = dataStream
      .flatMap(_.toUpperCase.split(" "))
      .map((_, 1))
      .keyBy(0)
      .sum(1)
    splitStream.print()

  }

  /**
   * Author: jichun.yang
   * Date  : 2022/2/7 15:46
   * Remark:
   *
   * @param env
   */
  def doPlan02(env: StreamExecutionEnvironment): Unit = {
    val socketStream: DataStream[String] = env.socketTextStream("192.168.195.178", 9999)

    val filterStream:DataStream[String]=socketStream.filter(_.contains('a')) // 过滤条件
    filterStream.print()
  }

}

声明:本文档仅是自己学习总结,其中有些知识点可能存在错误,若是学友偶然搜到参考,望斟酌后再使用,以免给您带来困扰,若是发现错误也希望您指出更正,在此提前感谢!! 总结过程中要是有些地方借鉴了各路大神成果,您觉得侵犯了您的知识产权,对您有所冒犯,烦请通知鄙人,鄙人将会尽快修正! 邮箱地址:390835164@qq.com

上一篇:Xshell连接虚拟机


下一篇:2022-02-01打卡