Flink本地idea运行环境配置webui
public class FlinkWithLocalWebui {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setString(RestOptions.BIND_PORT, "8081"); // 设置WebUI端口为8081
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); // 创建带有WebUI的本地流执行环境
env.setParallelism(1); // 设置并行度为1
DataStream<Map<String, String>> stream = env.addSource(new SourceFunction<Map<String, String>>() {
@Override
public void run(SourceContext<Map<String, String>> ctx) throws Exception {
while (true) {
HashMap<String, String> hashMap = new HashMap<>();
hashMap.put("ID", new Random().nextInt(3) + 1 + ""); // 随机生成ID
hashMap.put("AMT", "1"); // 设置AMT为1
System.out.println("生产数据:" + hashMap); // 打印生产的数据
ctx.collect(hashMap); // 发射数据
Thread.sleep(1000); // 每隔1秒发送一次数据
}
}
@Override
public void cancel() {
}
})
// 按照ID字段进行分区
.keyBy(new KeySelector<Map<String, String>, String>() {
@Override
public String getKey(Map<String, String> value) throws Exception {
return value.get("ID");
}
})
// 对AMT字段进行累加
.reduce(new ReduceFunction<Map<String, String>>() {
@Override
public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) throws Exception {
HashMap<String, String> hashMap = new HashMap<>();
hashMap.put("ID", value1.get("ID"));
hashMap.put("AMT", Integer.valueOf(value1.get("AMT")) + Integer.valueOf(value2.get("AMT")) + "");
return hashMap;
}
});
// 输出数据流
stream.print();
// 执行作业并指定作业名称
env.execute("job-" + FlinkWithLocalWebui.class.getSimpleName());
}
}
//这段代码是一个基于Apache Flink的实时数据处理程序。
//程序创建了一个带有WebUI的本地流执行环境,设置了并行度为1。
//通过自定义的SourceFunction生成随机数据流,数据包含ID和AMT字段,每秒发送一次数据。
//然后对数据流按照ID字段进行分区,并对AMT字段进行累加操作。
//最后,将处理后的数据流打印输出,并执行作业。
//整体流程是一个简单的实时数据处理流水线,用于生成、处理和输出数据流。