Flink实例(四十二): Operators(三)FILTER

filter

  filter转换算子通过在每个输入事件上对一个布尔条件进行求值来过滤掉一些元素,然后将剩下的元素继续发送。一个true的求值结果将会把输入事件保留下来并发送到输出,而如果求值结果为false,则输入事件会被抛弃掉。我们通过调用DataStream.filter()来指定流的filter算子,filter操作将产生一条新的流,其类型和输入流中的事件类型是一样的。图5-2展示了只产生白色方框的filter操作。

 

Flink实例(四十二): Operators(三)FILTER

布尔条件可以使用函数、FilterFunction接口或者匿名函数来实现。FilterFunction中的泛型是输入事件的类型。定义的filter()方法会作用在每一个输入元素上面,并返回一个布尔值。

// T: the type of elements
FilterFunction[T]
    > filter(T): Boolean

实例一:

下面的例子展示了如何使用filter来从传感器数据中过滤掉温度值小于25华氏温度的读数。

scala version

val filteredReadings = readings.filter(r => r.temperature >= 25)

java version

DataStream<SensorReading> filteredReadings = readings.filter(r -> r.temperature >= 25);

实例二:

我们可以使用Lambda表达式过滤掉小于等于0的元素:

val dataStream: DataStream[Int] = senv.fromElements(1, 2, -3, 0, 5, -9, 8)
// 使用 => 构造Lambda表达式
val lambda = dataStream.filter ( input => input > 0 )
// 使用 _ 构造Lambda表达式
val lambda2 = dataStream.map { _ > 0 }

也可以继承FilterFunctionRichFilterFunction,然后重写filter方法,我们还可以将参数传递给继承后的类。比如,MyFilterFunction增加一个构造函数参数limit,并在filter方法中使用这个参数。

// 继承RichFilterFunction
// limit参数可以从外部传入
class MyFilterFunction(limit: Int) extends RichFilterFunction[Int] {
  override def filter(input: Int): Boolean = {
    if (input > limit) {
      true
    } else {
      false
    }
  }
  
}
val richFunctionDataStream = dataStream.filter(new MyFilterFunction(2))

 

 

上一篇:Flink实例(五十七):维表join(一)Apache Flink 维表关联实战


下一篇:flink 读取 CSV 文件,并将 DataStream 转 Table 对象