flink读取kafka数据并回写kafka

package Consumer;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

public class KafkaConsumer {
    public static void main(String[] args) throws Exception {
        // flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.186.174:9092");
        properties.setProperty("group.id", "test");
        // source kafka消费者
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("example", new SimpleStringSchema(), properties);
        consumer.setStartFromEarliest();//从初始值开始
        DataStream<String> stream = env.addSource(consumer);
        stream.print();
        // 数据处理
        /*DataStream<String> filterStream = stream.filter((FilterFunction<String>) s -> {
            JSONObject object = JSONObject.parseObject(s);
            System.out.println(object.getString("type"));
            if("ALTER".equalsIgnoreCase(object.getString("type"))){
                return false;
            }
            return true;
        });
            SingleOutputStreamOperator mapStream = filterStream.map((MapFunction<String, Object>) s -> {
            JSONObject object = JSONObject.parseObject(s);
            return object.getJSONArray("data");
        });*/
        SingleOutputStreamOperator mapStream = stream.filter((FilterFunction<String>) s -> {
            JSONObject object = JSONObject.parseObject(s);
            System.out.println(object.getString("type"));
            if("ALTER".equalsIgnoreCase(object.getString("type"))){
                return false;
            }
            return true;
        }).map((MapFunction<String, Object>) s -> {
            JSONObject object = JSONObject.parseObject(s);
            return object.getJSONArray("data").toJSONString();
        });
        // sink kafka生产者
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("sink",new SimpleStringSchema(), properties);
        mapStream.addSink(producer);
        mapStream.print();
        env.execute();
    }
}
<dependencies>
        <!--flink-kafka连接器-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.14.3</version>
        </dependency>
        <!--flink-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.14.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.14.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.14.3</version>
        </dependency>
        <!--avro-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>1.14.3</version>
        </dependency>
        <!--fastjson-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.59</version>
        </dependency>
    </dependencies>

 

上一篇:随笔可以显示在首页吗?


下一篇:如何在 Ubuntu 20.04 上安装 PHP