7 window API

一 时间窗口

我们可以用window函数来创建一个窗口,需要提供窗口分配器.
但有些提供了更方便的方法来创建窗口.
开窗之前必须要先使用keyby函数进行分区.

1. 滚动时间窗口

timeWindow(Time.seconds(5))

2. 滑动窗口

timeWindow(Time.seconds(15),Time.seconds(5))

3.会话窗口

window(EventTimeSessionWindo.withGap(Time.minutes(10)))

4.滚动计数窗口

countWindow(5)

5.滑动计数窗口

countWindow(10,2)

二 窗口函数(需要在开窗后使用)

1.增量聚合函数

ReduceFunction 和 AggregateFunction
每来一条数据就进行计算,保持一个简单的状态

package test3


import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import test2.SensorSource

object AvgTempPerWindow {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream = env
      .addSource(new SensorSource)

    stream
      .map(r => (r.id, r.temperature))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5))
      .aggregate(new AvgTempFunction)
      .print()

    env.execute()
  }

  // 平均温度值 = 总的温度值 / 温度的条数
  class AvgTempFunction extends AggregateFunction[(String, Double), (String, Double, Long), (String, Double)] {
    // 创建累加器
    override def createAccumulator(): (String, Double, Long) = ("", 0.0, 0L)

    // 每来一条数据,如何累加?
    override def add(value: (String, Double), accumulator: (String, Double, Long)): (String, Double, Long) = {
      (value._1, accumulator._2 + value._2, accumulator._3 + 1)
    }

    override def getResult(accumulator: (String, Double, Long)): (String, Double) = {
      (accumulator._1, accumulator._2 / accumulator._3)
    }

    override def merge(a: (String, Double, Long), b: (String, Double, Long)): (String, Double, Long) = {
      (a._1, a._2 + b._2, a._3 + b._3)
    }
  }
}

2 全窗口函数

先把窗口所有数据收集起来,等到计算的时候会遍历所有数据
ProcessWindowFunction

package test3

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import test2.SensorSource


object AvgTempPerWindowByProcessWindowFunction {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream = env
      .addSource(new SensorSource)

    stream
      .map(r => (r.id, r.temperature))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5))
      .process(new AvgTempFunc)
      .print()

    env.execute()
  }

  class AvgTempFunc extends ProcessWindowFunction[(String, Double), (String, Double), String, TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[(String, Double)], out: Collector[(String, Double)]): Unit = {
      val size = elements.size
      var sum: Double = 0.0
      for (r <- elements) {
        sum += r._2
      }
      out.collect((key, sum / size))
    }
  }
}

3 两者可以结合起来使用

来一条数据先增量聚合,等到窗口关闭的时候在用全窗口聚合

package test3

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import test2.{SensorReading, SensorSource}


object MinMaxTempByAggregateAndProcess {
  case class MinMaxTemp(id: String,
                        min: Double,
                        max: Double,
                        endTs: Long)
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream = env.addSource(new SensorSource)

    stream
      .keyBy(_.id)
      .timeWindow(Time.seconds(5))
      // 第一个参数:增量聚合,第二个参数:全窗口聚合
      .aggregate(new Agg, new WindowResult)
      .print()

    env.execute()
  }

  class WindowResult extends ProcessWindowFunction[(String, Double, Double),
    MinMaxTemp, String, TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[(String, Double, Double)], out: Collector[MinMaxTemp]): Unit = {
      // 迭代器中只有一个值,就是增量聚合函数发送过来的聚合结果
      val minMax = elements.head
      out.collect(MinMaxTemp(key, minMax._2, minMax._3, context.window.getEnd))
    }
  }

  class Agg extends AggregateFunction[SensorReading, (String, Double, Double), (String, Double, Double)] {
    override def createAccumulator(): (String, Double, Double) = {
      ("", Double.MaxValue, Double.MinValue)
    }

    override def add(value: SensorReading, accumulator: (String, Double, Double)): (String, Double, Double) = {
      (value.id, value.temperature.min(accumulator._2), value.temperature.max(accumulator._3))
    }

    override def getResult(accumulator: (String, Double, Double)): (String, Double, Double) = accumulator

    override def merge(a: (String, Double, Double), b: (String, Double, Double)): (String, Double, Double) = {
      (a._1, a._2.min(b._2), a._3.max(b._3))
    }
  }
}

7 window API

上一篇:Window常用操作(一)


下一篇:解决Windows 10 1809 使用管理员权限运行的程序无法浏览网络驱动器的问题