相关文章链接
Flink之API的使用(2):Transform算子的使用
具体代码实现如下所示:
1、main函数中代码实现:
// 创建执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(2) // 1、从文件中读取数据 val fileStream: DataStream[String] = env.readTextFile("D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\source.txt") // 2、从kafka中读取数据 // 2.1、创建kafka的properties配置信息对象 val prop: Properties = new Properties() prop.setProperty("bootstrap.servers", "cdh1:9092,cdh2:9092,cdh3:9092") prop.setProperty("group.id", "flink-consumer-group") prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") prop.setProperty("auto.offset.reset", "latest") // 2.2、添加kafka的source源 val kafkaStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("flinkTestTopic", new SimpleStringSchema(), prop)) // 3、自定义source源(自定义源需要创建一个自定义源类,并继承SourceFunction) val mySensorStream: DataStream[SensorReading] = env.addSource(new MySensorSource(2)) // 打印数据 mySensorStream.print() // 启动执行环境,运行任务 env.execute("SourceDemo")
2、自定义source源代码实现:
/** * 自定义一个生成 SensorReading(温度传感器) 的源 */ class MySensorSource(sensorNum: Int) extends SourceFunction[SensorReading] { /** * flag: 表示数据源是否还在正常运行 */ var running: Boolean = true /** * 当启动数据源时,会在此方法中生成数据,并通过ctx(环境上下文)输出 * * @param ctx 环境上下文 */ override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = { // 初始化一个随机数发生器 val rand: Random = new Random() // 初始化sensorNum个传感器(包括初始化名称,时间戳,温度) var curTemp: immutable.Seq[SensorReading] = 1.to(sensorNum).map( i => SensorReading("sensor_" + i, System.currentTimeMillis(), 65 + rand.nextGaussian() * 20) ) // 每1000毫秒更新一次传感器中的温度和时间戳,并通过ctx将数据输出 while (running) { val curTime: Long = System.currentTimeMillis() curTemp.foreach(sensorReading => { sensorReading.timestamp = curTime sensorReading.temperature = sensorReading.temperature + rand.nextGaussian() ctx.collect(sensorReading) }) Thread.sleep(1000) } } /** * 停止此源(将flag修改为false) */ override def cancel(): Unit = { running = false } }