Flink(2):Flink的Source源

相关文章链接

1、Flink的Source源之从集合中获取数据

 1 //1.env
 2 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 3 
 4 //2.source
 5 // * 1.env.fromElements(可变参数);
 6 DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
 7 // * 2.env.fromColletion(各种集合);
 8 DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));
 9 // * 3.env.generateSequence(开始,结束);
10 DataStream<Long> ds3 = env.generateSequence(1, 10);
11 
12 //3.Transformation
13 
14 //4.sink
15 ds1.print();
16 ds2.print();
17 ds3.print();
18 
19 //5.execute
20 env.execute();

2、Flink的Source源之从集合中获取数据

//创建Flink流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//2.source
// * 1.env.readTextFile(本地文件/HDFS文件);//压缩文件也可以
DataStream<String> ds1 = env.readTextFile("D:\\Project\\IDEA\\bigdata-study-tutorial\\flink-tutorial-java\\src\\main\\data\\input\\words.txt");
DataStream<String> ds2 = env.readTextFile("data/input/dir");
DataStream<String> ds3 = env.readTextFile("hdfs://node1:8020//wordcount/input/words.txt");
DataStream<String> ds4 = env.readTextFile("data/input/wordcount.txt.gz");

//3.Transformation

//4.sink
ds1.print();
ds2.print();
ds3.print();
ds4.print();

//5.execute
env.execute();

3、Flink之自定义Source源

public static void main(String[] args) throws Exception {
    //##### 创建Flink流的执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    // env.setParallelism(4);
    // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    // 添加自定义source源
    DataStreamSource<SensorReading> dataStream = env.addSource(new SensorReadingSource(1, 1000L));

    // 打印数据
    dataStream.print();

    //##### 启动执行环境
    env.execute("SourceDemo02_UDF");
}

/**
 * 温度传感器的bean类
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class SensorReading {
    private String id;
    private Long timestamp;
    private Double temperature;
}

/**
 * 自定义SensorReading源
 */
public static class SensorReadingSource extends RichSourceFunction<SensorReading> {

    /**
     * 设置标识
     */
    private boolean running = true;

    private Integer num = 5;
    private Long interval = 1000L;

    public SensorReadingSource() {
    }

    /**
     * 根据传入的 数量 和 时间间隔 创建自定义源
     *
     * @param num      要创建的传感器数量
     * @param interval 创建的传感器时间间隔
     */
    public SensorReadingSource(Integer num, Long interval) {
        this.num = num;
        this.interval = interval;
    }

    @Override
    public void run(SourceContext<SensorReading> ctx) throws Exception {

        // 定义一个随机对象
        Random random = new Random();

        // 初始化num个传感器
        ArrayList<SensorReading> sensorReadings = new ArrayList<>();
        for (int i = 1; i <= num; i++) {
            String id = "sensor_" + i;
            long timestamp = System.currentTimeMillis();
            double temperature = 60 + random.nextGaussian() * 10;
            sensorReadings.add(new SensorReading(id, timestamp, temperature));
        }

        // 当running为true时,生成数据
        while (running) {
            long timestamp = System.currentTimeMillis();
            for (SensorReading sensorReading : sensorReadings) {
                sensorReading.setTimestamp(timestamp);
                sensorReading.setTemperature(sensorReading.getTemperature() + random.nextGaussian());
                ctx.collect(sensorReading);
            }
            Thread.sleep(interval);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

 

Flink(2):Flink的Source源

上一篇:202104-2 邻域均值


下一篇:DNS协议(Domain Name System)