来源:https://blog.csdn.net/zhuzuwei/article/details/107142494
1. 安装nc
yum -y install nmap-ncat
2. 启动(8888是端口号)
nc -lk 8888
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class AddSinkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("10.66.31.133", 8888); SingleOutputStreamOperator<Tuple2<String, Integer>> words = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = s.split(","); for (int i = 0; i < words.length; i++) { collector.collect(Tuple2.of(words[i], 1)); } } }); SingleOutputStreamOperator<Tuple2<String, Integer>> summed = words.keyBy(0).sum(1); summed.print(); summed.writeAsText("C:\\Users\\admin\\Desktop\\flinkTest\\sinkout1.txt", FileSystem.WriteMode.OVERWRITE); SingleOutputStreamOperator<Tuple3<String, String, Integer>> words2 = lines.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple3<String, String, Integer>> collector) throws Exception { String[] words = s.split(","); for (int i = 0; i < words.length; i++) { collector.collect(Tuple3.of("wordscount", words[i], 1)); } } }); SingleOutputStreamOperator<Tuple3<String, String, Integer>> summed2 = words2.keyBy(1).sum(2); String configPath = "C:\\Users\\admin\\Desktop\\flinkTest\\config.txt"; ParameterTool parameters = ParameterTool.fromPropertiesFile(configPath); //设置全局参数 env.getConfig().setGlobalJobParameters(parameters); summed2.addSink(new MyRedisSinkFunction()); env.execute("AddSinkTest"); } }
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;
public class MyRedisSinkFunction extends RichSinkFunction<Tuple3<String, String, Integer>>{
private transient Jedis jedis;
@Override
public void open(Configuration config) {
ParameterTool parameters = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
String host = parameters.getRequired("redis.host");
String password = parameters.get("redis.password", "");
Integer port = parameters.getInt("redis.port", 6379);
Integer timeout = parameters.getInt("redis.timeout", 5000);
Integer db = parameters.getInt("redis.db", 0);
jedis = new Jedis(host, port, timeout);
jedis.auth(password);
jedis.select(db);
}
@Override
public void invoke(Tuple3<String, String, Integer> value, Context context) throws Exception {
if (!jedis.isConnected()) {
jedis.connect();
}
//保存
jedis.hset(value.f0, value.f1, String.valueOf(value.f2));
}
@Override
public void close() throws Exception {
jedis.close();
}
}