Flink接入apollo简单示例

1、创建配置文件

apollo官网示例显示java api使用配置文件方式如下:

Flink接入apollo简单示例

配置文件内容:

     app.id=java-apollo-test-20190724
     apollo.meta=http://apollo-server:10080
     apollo.cacheDir=./cache

2、示例代码

public class StreamingJob2 {

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
		env
				.socketTextStream("localhost", 9999)
				.map(new MapFunction<String, Integer>() {
					@Override
					public Integer map(String value) throws Exception {
						return Integer.parseInt(value);
					}
				})
				.process(new ProcessFunction<Integer, Integer>() {

					private int maxNum;

					@Override
					public void open(Configuration parameters) throws Exception {
						super.open(parameters);
						//config instance is singleton for each namespace and is never null
						Config config = ConfigService.getAppConfig();
						String someKey = "max.num";
						String someDefaultValue = "3";
                        maxNum = Integer.parseInt(config.getProperty(someKey, someDefaultValue));
						System.out.println("apollo maxNum: " + maxNum);

                        config.addChangeListener(new ConfigChangeListener() {
                            @Override
                            public void onChange(ConfigChangeEvent changeEvent) {
                                for (String key : changeEvent.changedKeys()) {
                                    ConfigChange change = changeEvent.getChange(key);
                                    if (someKey.equalsIgnoreCase(change.getPropertyName())) {
                                        maxNum = Integer.parseInt(change.getNewValue());
                                    }
                                }
                                System.out.println("maxNum is: " + maxNum);
                            }
                        });
					}

					@Override
					public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
					    if (value > maxNum) {
					        out.collect(value);
                        }
					}
				})
				.print("--->");
		env.execute(StreamingJob2.class.getCanonicalName());
	}
}

 

上一篇:Apollo服务搭建


下一篇:阿里熔断限流Sentinel研究