第 15 节 DataStream之source(scala语言)

上篇:第 14 节  DataStream之sink(java)


1、自定义sink

  1. 实现自定义的sink
    实现SinkFunction接口
    或者继承RichSinkFunction
  2. 参考org.apache.flink.streaming.connectors.redis.RedisSink

2、简单scala入门测试:

把定义每个数都累加1

具体代码实现:

package xuwei.streaming

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object StreamingFromCollectionScala {
  def main(args: Array[String]): Unit = {
    //获取flink的运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //添加隐式转换
    import org.apache.flink.api.scala._

    val data=List(10,15,20)

    val text = env.fromCollection(data)

    //针对map接收到的数据执行加1的操作
    val num = text.map(_+1)

   num.print().setParallelism(1)

    env.execute("StreamingFromCollectionScala")

  }
}

控制台打印信息:
第 15 节 DataStream之source(scala语言)


3、创建自定义并行度为1的source

实现从1开始产生递增数字

有3种方式:

方式一:
继承SourceFunction泛型为long类型

具体代码实现:

MyNoParallelSource .scala

package xuwei.streaming

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext

/**
 * 创建自定义并行度为1的source
 *
 * 实现从1开始产生递增数字
 */
class MyNoParallelSource extends SourceFunction[Long]{
  var count=1L
  var isRunning=true

  override def run(ctx: SourceContext[Long]) = {
      while(isRunning){
        ctx.collect(count)
        count+=1
        Thread.sleep(1000)
      }
  }

  override def cancel()= {
    isRunning = false
  }
}

StreamingDemoWithMyNoParallelSource.scala

package xuwei.streaming
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

object StreamingDemoWithMyNoParallelSource {
  def main(args: Array[String]): Unit = {
    //获取flink的运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //隐式转换
    import org.apache.flink.api.scala._

    val text = env.addSource(new MyNoParallelSource)

    val mapData = text.map(line => {
      println("接收到的数据:" + line)
      line
    })

    val sum = mapData.timeWindowAll(Time.seconds(2)).sum(0)


    sum.print().setParallelism(1)

    env.execute("StreamingFromCollectionScala")

  }
}

控制台打印信息(不断打印下去):
第 15 节 DataStream之source(scala语言)

方式二:
继承ParallelSourceFunction泛型为long类型
(有重复的并行)

具体代码实现:

MyParallelSourceScala.scala

package xuwei.streaming

import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext

/**
 * 创建自定义并行度为1的source
 *
 * 实现从1开始产生递增数字
 */
class MyParallelSourceScala extends ParallelSourceFunction[Long]{
  var count=1L
  var isRunning=true

  override def run(ctx: SourceContext[Long]) = {
      while(isRunning){
        ctx.collect(count)
        count+=1
        Thread.sleep(1000)
      }
  }

  override def cancel()= {
    isRunning = false
  }
}

StreamingDemoWithMyParallelSourceScala.scala

package xuwei.streaming

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

object StreamingDemoWithMyParallelSourceScala {
  def main(args: Array[String]): Unit = {
    //获取flink的运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //隐式转换
    import org.apache.flink.api.scala._

    val text = env.addSource(new MyParallelSourceScala).setParallelism(2)

    val mapData = text.map(line => {
      println("接收到的数据:" + line)
      line
    })

    val sum = mapData.timeWindowAll(Time.seconds(2)).sum(0)


    sum.print().setParallelism(1)

    env.execute("StreamingFromCollectionScala")

  }
}

控制台打印信息(不断打印下去):
发现2个并行度,组合在一起
第 15 节 DataStream之source(scala语言)

方式三:

  1. 继承RichParallelSourceFunction泛型为long类型
  2. 重写open方法

具体代码实现:

MyRichParallelSourceScala.scala\

package xuwei.streaming

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext

/**
 * 创建自定义并行度为1的source
 *
 * 实现从1开始产生递增数字
 */
class MyRichParallelSourceScala extends RichParallelSourceFunction[Long]{
  var count=1L
  var isRunning=true

  override def run(ctx: SourceContext[Long]) = {
      while(isRunning){
        ctx.collect(count)
        count+=1
        Thread.sleep(1000)
      }
  }

  override def cancel()= {
    isRunning = false
  }

  //open方法
  override def open(parameters: Configuration): Unit = super.open(parameters)

  //close方法
  override def close(): Unit = super.close()
}

StreamingDemoWithMyRichParallelSourceScala.scala

package xuwei.streaming

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

object StreamingDemoWithMyRichParallelSourceScala {
  def main(args: Array[String]): Unit = {
    //获取flink的运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //隐式转换
    import org.apache.flink.api.scala._

    val text = env.addSource(new MyRichParallelSourceScala).setParallelism(2)

    val mapData = text.map(line => {
      println("接收到的数据:" + line)
      line
    })

    val sum = mapData.timeWindowAll(Time.seconds(2)).sum(0)


    sum.print().setParallelism(1)

    env.execute("StreamingFromCollectionScala")

  }
}

控制台打印信息(不断打印下去):
发现2个并行度,也是组合在一起
第 15 节 DataStream之source(scala语言)

上一篇:flink: 容器化部署集群


下一篇:flink-之各种集群模式的运行时架构