Flink应用的逻辑执行图

这里以最基础的wordcount程序说明flink应用的逻辑执行图,程序代码如下:

object SetParalWC {
  def main(args: Array[String]): Unit = {
    //创建流处理执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //    env.setParallelism(8) 设置所有算子任务并行度为8,同时也可以每个算子单独设置该参数


    // 从程序运行参数中读取hostname和port
    val params: ParameterTool = ParameterTool.fromArgs(args)
    val hostname: String = params.get("host")
    val port: Int = params.getInt("port")
    // 接受socket文本流,nc -lk 8888
    val inputDataStream: DataStream[String] = env.socketTextStream(hostname, port)

    // 定义转换操作,word count
    val resultDataStream: DataStream[(String, Int)] = inputDataStream
      .flatMap(_.split(" "))   // 以空格分词,打散得到所有的word
      .filter(_.nonEmpty)
      .map( (_, 1) )  // 转换成(word, count)二元组
      .keyBy(0)  // 按照第一个元素分组
      .sum(1)  // 按照第二个元素求和,这里就能体现出有状态的流处理,结果是累计滚动输出的

    resultDataStream.print()
    // 如果在本地运行任务没有设定并行度,那么并行度默认与核数一致
    /**
     * 3> (world,1)
     * 2> (hello,1)
     * 4> (flink,1)
     * 2> (hello,2)
     * 1> (spark,1)
     * 2> (hello,3)
     * 3> (future,1)
     * 4> (stream,1)
     * 4> (is,1)
     * 4> (why,1)
     * 这里的数字代表的就是数据keyBy后的子任务序号
     */
    // resultDataStream.print().setParallelism(1)
    env.execute("stream word count job")
  }
}

该应用通过WebUI查看应用逻辑执行图如下:

Flink应用的逻辑执行图

可以看到程序通过source算子获取数据的并行度是1,但后面任务算子的任务并行度为2,所以数据需要经过rebalance才能分配到后续任务中,同时注意到任务链的存在。

keyBy算子在逻辑执行图中并没有体现出来,在任务链之后通过HASH的方式直接将数据传输到了aggregation操作中,而aggregation操作对应的应该是sum()算子,这说明keyBy不是计算方式,而是一种定义数据传输方式的算子。

上一篇:flink的DataStreamAPI


下一篇:keyby算子源码