相关文章链接
1、Flink的Source源之从集合中获取数据
1 //1.env 2 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 3 4 //2.source 5 // * 1.env.fromElements(可变参数); 6 DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink"); 7 // * 2.env.fromColletion(各种集合); 8 DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink")); 9 // * 3.env.generateSequence(开始,结束); 10 DataStream<Long> ds3 = env.generateSequence(1, 10); 11 12 //3.Transformation 13 14 //4.sink 15 ds1.print(); 16 ds2.print(); 17 ds3.print(); 18 19 //5.execute 20 env.execute();
2、Flink的Source源之从集合中获取数据
//创建Flink流的执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.source // * 1.env.readTextFile(本地文件/HDFS文件);//压缩文件也可以 DataStream<String> ds1 = env.readTextFile("D:\\Project\\IDEA\\bigdata-study-tutorial\\flink-tutorial-java\\src\\main\\data\\input\\words.txt"); DataStream<String> ds2 = env.readTextFile("data/input/dir"); DataStream<String> ds3 = env.readTextFile("hdfs://node1:8020//wordcount/input/words.txt"); DataStream<String> ds4 = env.readTextFile("data/input/wordcount.txt.gz"); //3.Transformation //4.sink ds1.print(); ds2.print(); ds3.print(); ds4.print(); //5.execute env.execute();
3、Flink之自定义Source源
public static void main(String[] args) throws Exception { //##### 创建Flink流的执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // env.setParallelism(4); // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 添加自定义source源 DataStreamSource<SensorReading> dataStream = env.addSource(new SensorReadingSource(1, 1000L)); // 打印数据 dataStream.print(); //##### 启动执行环境 env.execute("SourceDemo02_UDF"); } /** * 温度传感器的bean类 */ @Data @AllArgsConstructor @NoArgsConstructor public static class SensorReading { private String id; private Long timestamp; private Double temperature; } /** * 自定义SensorReading源 */ public static class SensorReadingSource extends RichSourceFunction<SensorReading> { /** * 设置标识 */ private boolean running = true; private Integer num = 5; private Long interval = 1000L; public SensorReadingSource() { } /** * 根据传入的 数量 和 时间间隔 创建自定义源 * * @param num 要创建的传感器数量 * @param interval 创建的传感器时间间隔 */ public SensorReadingSource(Integer num, Long interval) { this.num = num; this.interval = interval; } @Override public void run(SourceContext<SensorReading> ctx) throws Exception { // 定义一个随机对象 Random random = new Random(); // 初始化num个传感器 ArrayList<SensorReading> sensorReadings = new ArrayList<>(); for (int i = 1; i <= num; i++) { String id = "sensor_" + i; long timestamp = System.currentTimeMillis(); double temperature = 60 + random.nextGaussian() * 10; sensorReadings.add(new SensorReading(id, timestamp, temperature)); } // 当running为true时,生成数据 while (running) { long timestamp = System.currentTimeMillis(); for (SensorReading sensorReading : sensorReadings) { sensorReading.setTimestamp(timestamp); sensorReading.setTemperature(sensorReading.getTemperature() + random.nextGaussian()); ctx.collect(sensorReading); } Thread.sleep(interval); } } @Override public void cancel() { running = false; } }