Flink本地idea运行环境配置webui

public class FlinkWithLocalWebui { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setString(RestOptions.BIND_PORT, "8081"); // 设置WebUI端口为8081 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); // 创建带有WebUI的本地流执行环境 env.setParallelism(1); // 设置并行度为1 DataStream<Map<String, String>> stream = env.addSource(new SourceFunction<Map<String, String>>() { @Override public void run(SourceContext<Map<String, String>> ctx) throws Exception { while (true) { HashMap<String, String> hashMap = new HashMap<>(); hashMap.put("ID", new Random().nextInt(3) + 1 + ""); // 随机生成ID hashMap.put("AMT", "1"); // 设置AMT为1 System.out.println("生产数据:" + hashMap); // 打印生产的数据 ctx.collect(hashMap); // 发射数据 Thread.sleep(1000); // 每隔1秒发送一次数据 } } @Override public void cancel() { } }) // 按照ID字段进行分区 .keyBy(new KeySelector<Map<String, String>, String>() { @Override public String getKey(Map<String, String> value) throws Exception { return value.get("ID"); } }) // 对AMT字段进行累加 .reduce(new ReduceFunction<Map<String, String>>() { @Override public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) throws Exception { HashMap<String, String> hashMap = new HashMap<>(); hashMap.put("ID", value1.get("ID")); hashMap.put("AMT", Integer.valueOf(value1.get("AMT")) + Integer.valueOf(value2.get("AMT")) + ""); return hashMap; } }); // 输出数据流 stream.print(); // 执行作业并指定作业名称 env.execute("job-" + FlinkWithLocalWebui.class.getSimpleName()); } } //这段代码是一个基于Apache Flink的实时数据处理程序。 //程序创建了一个带有WebUI的本地流执行环境,设置了并行度为1。 //通过自定义的SourceFunction生成随机数据流,数据包含ID和AMT字段,每秒发送一次数据。 //然后对数据流按照ID字段进行分区,并对AMT字段进行累加操作。 //最后,将处理后的数据流打印输出,并执行作业。 //整体流程是一个简单的实时数据处理流水线,用于生成、处理和输出数据流。
上一篇:HBase


下一篇:git上新down下来的项目,前端启动报错npm ERR! code 1 npm ERR! path E:codevuehr ode_modul