上篇:第 13 节 DataStream之partition(java)
1、Sink部分详解
DataStream API之Data Sink
- writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
- print() / printToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
- 自定义输出addSink【kafka、redis】
2、内置Connectors
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache ActiveMQ (source/sink)
- Redis (sink)
3、Sink 容错性保证
Sink | 语义保证 | 备注 |
---|---|---|
hdfs | exactly once | |
elasticsearch | at least once | |
kafka produce | at least once/exactly once | Kafka 0.9 and 0.10提供at least onceKafka 0.11提供exactly once |
redis | at least once |
4、实际操作
(1)先启动redis服务:
[root@flink102 module]# service redisd start
Starting Redis server...
1769:C 08 Mar 15:31:56.554 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
1769:C 08 Mar 15:31:56.554 # Redis version=4.0.6, bits=64, commit=00000000, modified=0, pid=1769, just started
1769:C 08 Mar 15:31:56.554 # Configuration loaded
(2)启动客服端服务
[root@flink102 src]# redis-cli
127.0.0.1:6379>
//查看当前库的数据
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379>
(3)pom文件需要引入的依赖:
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
pom文件的完整依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example.flink01</groupId>
<artifactId>flink01</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.1</version>
<!-- // <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>xuwei.streaming.SocketWindowWordCountJava</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
(4)具体代码实现:
package xuwei.sink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
* 接收socket数据,把数据保存到redis中
*
* List
*
* lpush list_key value
*/
public class StreamingDemoToRedis {
public static void main(String[] args)throws Exception {
//获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//指定数据源的端口
DataStreamSource<String> text = env.socketTextStream("flink102", 9000, "\n");
//lpush 1_words word
//对数据进行组装,把String转化为Tuple2<String,String>
DataStream<Tuple2<String, String>> wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
return new Tuple2<>("1_words", value);
}
});
//把数据存储到redis
//创建redis的配置
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("flink102").setPort(6379).build();
//创建redis sink
RedisSink<Tuple2<String, String>> redisink = new RedisSink<>(conf, new MyRedisMapper());
wordsData.addSink(redisink);
env.execute("StreamingDemoToRedis");
}
public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>>{
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
//表示从接收的数据中 获取需要操作的redis key
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
//表示从接收的数据中 获取需要操作的redis Value
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
}
}
(5)连接上flink102机器,执行nc -l 9000
[root@flink102 ~]# nc -l 9000
(6)启动代码程序,控制台打印信息,发现错误:
(7)排查问题:
//发现连接不上
C:\Users\HP>telnet flink102 6379
正在连接flink102...无法打开到主机的连接。 在端口 6379: 连接失败
当telnet 已经通了,再次运行程序,没报错
我们就可以在redis数据库,查看
[root@flink102 redis-4.0.6]# redis-cli -p 6379
127.0.0.1:6379> keys *
1) "1_words" //数据已经进来了
查看数据
127.0.0.1:6379> lrange 1_words 0 -1
1) "gg"
查看数据数量
127.0.0.1:6379> llen 1_words
(integer) 1
数据状态
127.0.0.1:6379> monitor
OK
//输入数据:
[root@flink102 redis]# nc -l 9000
gg
hadoop
flink
kill
flume
//接收数据
1583691934.079993 [0 192.168.219.1:58607] "LPUSH" "1_words" "hadoop"
1583691938.604767 [0 192.168.219.1:58609] "LPUSH" "1_words" "flink"
1583691941.721202 [0 192.168.219.1:58611] "LPUSH" "1_words" "kill"
1583691945.638629 [0 192.168.219.1:58045] "LPUSH" "1_words" "flume"