Flink java作为消费者连接虚拟机中的kafka/或本地的kafka,并解决java.net.UnknownHostException报错

kafka的安装与配置请参考:https://blog.csdn.net/weixin_35757704/article/details/120488287

  1. 首先在kafka中创建一个topic,名称叫mytesttopic,进入到kafka的目录下,运行:
./bin/kafka-topics.sh --create --topic mytesttopic --replication-factor 1 --partitions 1 --bootstrap-server localhost:9092

然后启动生产者:

./bin/kafka-console-producer.sh --topic mytesttopic --broker-list localhost:9092
  1. 首先配置pom.xml
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId><!--这个是kafka的版本-->
            <version>1.13.2</version><!--这个是flink的版本-->
        </dependency>
  1. java代码如下:
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class KafkaStreamWordCount {

    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.99.5:9092");// 这里是kafka的主机地址,可以是 域名:端口,也可是 ip:端口
        properties.setProperty("group.id", "test");//第1个参数是固定值 group.id,第2个参数是自定义的组ID,这个可以自己指定
        DeserializationSchema<String> deserializationSchema = new SimpleStringSchema();
        String topic = "mytesttopic";// 哇!这里不要写错啊,这个是作为消费者接收的kafka对应的topic名称
        DataStream<String> text = env.addSource(new FlinkKafkaConsumer<String>(topic, deserializationSchema, properties));
        text.print();
        env.execute("Flink-Kafka demo");
    }
}
  1. 修改host文件(如果是本机的flink与kafka是不需要配置的)

比如我的虚拟机主机名是:ubuntu,ip是:192.168.99.5,就在host里添加:

192.168.99.5	ubuntu

注意即便properties.setProperty("bootstrap.servers", "192.168.99.5:9092");这样使用ip:端口配置也需要添加host!

  1. 运行java程序,然后在kafka的生产者中输入任何想要输入的内容,就可以在flink里查看了
xq@ubuntu:~/Desktop/software/kafka_2.12-2.7.1$ ./bin/kafka-console-producer.sh --topic myte--broker-list localhost:9092
>hello kafka
>hello flink
>

在flink中显示:

.........
16:52:44,055 INFO  org.apache.kafka.clients.Metadata                            [] - [Consumer clientId=consumer-test-26, groupId=test] Cluster ID: HDij23gxR_edwXhIDqE9ng
16:52:44,056 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-test-26, groupId=test] Discovered group coordinator ubuntu:9092 (id: 2147483647 rack: null)
16:52:44,075 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-test-26, groupId=test] Setting offset for partition mytesttopic-0 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=ubuntu:9092 (id: 0 rack: null), epoch=0}}
16> hello kafka
16> hello flink
上一篇:各组件命令


下一篇:Flink tableapi数据写入ES