相关文章链接
1、批处理的WordCount案例
// 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 获取数据 DataSource<String> dataSource = env.fromElements("flink spark hadoop", "hadoop spark", "flink flink"); // 转换数据 AggregateOperator<Tuple2<String, Integer>> result = dataSource .flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { for (String field : s.split(" ")) { collector.collect(field); } } }) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { return Tuple2.of(s, 1); } }) .groupBy(0) .sum(1); // 输出数据 result.print();
2、流处理的WordCount案例
// 执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //env.setRuntimeMode(RuntimeExecutionMode.BATCH); //env.setRuntimeMode(RuntimeExecutionMode.STREAMING); // source数据源 DataStreamSource<String> lines = env.socketTextStream("localhost", 9999); // 数据转换 SingleOutputStreamOperator<Tuple2<String, Integer>> result = lines .flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { for (String word : s.split(" ")) { collector.collect(word); } } }) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { return Tuple2.of(s, 1); } }) .keyBy(t -> t.f0) .sum(1); // sink result.print(); env.execute();
3、流处理的基于Lambda表达式的WordCount案例
// 执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 获取数据 DataStreamSource<String> dataStreamSource = env.fromElements("abc abc abc"); // 数据转换 SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStreamSource .flatMap((String value, Collector<String> out) -> { Arrays.stream(value.split(" ")).forEach(out::collect); }).returns(Types.STRING) .map((String value) -> Tuple2.of(value, 1), TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {} )) .keyBy(t -> t.f0) .sum(1); // 数据输出 result.print(); // 执行程序 env.execute();