Flink对接kafka

启动kafka和flink

1、进入zookeeper的bin目录下启动zookeeper

./zkServer.sh start

2、进入kafka的bin目录下启动kafka

/kafka-server-start.sh -daemon /opt/module/kafka-0.11/config/server.properties

3、进入flink的bin目录下启动flink

./start-cluster.sh 

kafka启动生产者

kafka主题为sensor

./bin/kafka-console-producer.sh --broker-list 192.168.158.202:90992 --topic sensor

添加pom依赖

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>

执行

Java代码如下

package com.test.apitest.souceTest;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;

public class SourceTest02_kafka {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // kafka配置项
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","192.168.153.202:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");



        // 从kafka中读取数据
        DataStreamSource<String> sensor = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));

        sensor.print();
        //执行任务
        env.execute();
    }
}

kafka生产数据

Flink对接kafka

 

 flink消费数据

Flink对接kafka

 

上一篇:Springboot项目jar包外配置文件使用


下一篇:c3p0连接池加解密密码以及出现的问题