##########保存至文件##############
1、处理主类
package sink import com.yangwj.api.SensorReading import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ /** * @author yangwj * @date 2021/1/6 21:17 * @version 1.0 */ object FileSink { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt" val input: DataStream[String] = env.readTextFile(inputFile) val dataStream = input.map(data => { val arr: Array[String] = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }) //1、过时方法 val savePath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensorToFile" dataStream.writeAsCsv(savePath) //2、分布式方法 val saveDistributePath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\saveDistributePath" dataStream.addSink(StreamingFileSink.forRowFormat( new Path(saveDistributePath), new SimpleStringEncoder[SensorReading]()) .build() ) env.execute("FileSink Test") } }
##########保存至Es##############
1、依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.10.1</version>
</dependency>
2、处理主类
package sink import java.util import com.yangwj.api.SensorReading import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests /** * @author yangwj * @date 2021/1/6 22:05 * @version 1.0 */ object EsSink { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt" val input: DataStream[String] = env.readTextFile(inputFile) val dataStream = input.map(data => { val arr: Array[String] = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }) val httpHosts = new util.ArrayList[HttpHost]() httpHosts.add(new HttpHost("localhost",9200)) //自定义写入Es的function val myEsSinkFunc: ElasticsearchSinkFunction[SensorReading] = new ElasticsearchSinkFunction[SensorReading] { //process 数据处理方法,并发送至Es override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { //包装一个Map作为data source val dataSource = new util.HashMap[String, String]() dataSource.put("id",t.id) dataSource.put("temperature",t.temperature.toString) dataSource.put("ts",t.timestamp.toString) //创建indexRequest,用于发送http请求 val request: IndexRequest = Requests.indexRequest().index("sensor").`type`("_doc").source(dataSource) requestIndexer.add(request) } } dataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHosts,myEsSinkFunc).build()) env.execute("Es Sink Test") } }
##########保存至Kafka##############
1、依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
2、处理主类
package sink import java.util.Properties import com.yangwj.api.SensorReading import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer010, FlinkKafkaProducer011} /** * @author yangwj * @date 2021/1/6 21:32 * @version 1.0 */ object KafkaSink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 从kafka读取数据 val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "consumer-group") val stream = env.addSource( new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties) ) // 先转换成样例类类型(简单转换操作) val dataStream = stream .map( data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString } ) dataStream.addSink( new FlinkKafkaProducer011[String]("localhost:9092", "sinktest", new SimpleStringSchema()) ) env.execute("kafka sink test") } }
##########保存至Mysql##############
1、依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
2、处理主类
package sink import java.sql.{Connection, DriverManager, PreparedStatement} import com.yangwj.api.SensorReading import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ /** * @author yangwj * @date 2021/1/6 22:27 * @version 1.0 * 由于官网没有提供,无法保证数据一致性 */ object MysqlSink { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt" val input: DataStream[String] = env.readTextFile(inputFile) val dataStream = input.map(data => { val arr: Array[String] = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }) dataStream.addSink( new MyJdbcSinkFunc() ) env.execute("mysql Sink Test") } } class MyJdbcSinkFunc() extends RichSinkFunction[SensorReading]{ // 定义连接、预编译语句 var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456") insertStmt = conn.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)") updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?") } override def invoke(value: SensorReading): Unit = { // 先执行更新操作,查到就更新 updateStmt.setDouble(1, value.temperature) updateStmt.setString(2, value.id) updateStmt.execute() // 如果更新没有查到数据,那么就插入 if( updateStmt.getUpdateCount == 0 ){ insertStmt.setString(1, value.id) insertStmt.setDouble(2, value.temperature) insertStmt.execute() } } override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } }
##########保存至Redis##############
1、依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
2、处理主类
package sink import com.yangwj.api.SensorReading import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} /** * @author yangwj * @date 2021/1/6 21:48 * @version 1.0 */ object RedisSink { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt" val input: DataStream[String] = env.readTextFile(inputFile) val dataStream = input.map(data => { val arr: Array[String] = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }) //1、redis配置 val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("localhost") .setPort(6379) .build() dataStream.addSink(new RedisSink[SensorReading](config , new MyRedisMapper())) } } // 2、定义一个RedisMapper class MyRedisMapper extends RedisMapper[SensorReading]{ // 定义保存数据写入redis的命令,HSET 表名 key value override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET, "sensor_temp") } // 将温度值指定为value override def getValueFromData(data: SensorReading): String = data.temperature.toString // 将id指定为key override def getKeyFromData(data: SensorReading): String = data.id }