kafka的安装与配置请参考:https://blog.csdn.net/weixin_35757704/article/details/120488287
- 首先在kafka中创建一个topic,名称叫
mytesttopic
,进入到kafka
的目录下,运行:
./bin/kafka-topics.sh --create --topic mytesttopic --replication-factor 1 --partitions 1 --bootstrap-server localhost:9092
然后启动生产者:
./bin/kafka-console-producer.sh --topic mytesttopic --broker-list localhost:9092
- 首先配置
pom.xml
:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId><!--这个是kafka的版本-->
<version>1.13.2</version><!--这个是flink的版本-->
</dependency>
- java代码如下:
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaStreamWordCount {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.99.5:9092");// 这里是kafka的主机地址,可以是 域名:端口,也可是 ip:端口
properties.setProperty("group.id", "test");//第1个参数是固定值 group.id,第2个参数是自定义的组ID,这个可以自己指定
DeserializationSchema<String> deserializationSchema = new SimpleStringSchema();
String topic = "mytesttopic";// 哇!这里不要写错啊,这个是作为消费者接收的kafka对应的topic名称
DataStream<String> text = env.addSource(new FlinkKafkaConsumer<String>(topic, deserializationSchema, properties));
text.print();
env.execute("Flink-Kafka demo");
}
}
- 修改host文件(如果是本机的flink与kafka是不需要配置的)
比如我的虚拟机主机名是:ubuntu
,ip是:192.168.99.5
,就在host里添加:
192.168.99.5 ubuntu
注意即便properties.setProperty("bootstrap.servers", "192.168.99.5:9092");
这样使用ip:端口
配置也需要添加host!
- 运行java程序,然后在kafka的生产者中输入任何想要输入的内容,就可以在flink里查看了
xq@ubuntu:~/Desktop/software/kafka_2.12-2.7.1$ ./bin/kafka-console-producer.sh --topic myte--broker-list localhost:9092
>hello kafka
>hello flink
>
在flink中显示:
.........
16:52:44,055 INFO org.apache.kafka.clients.Metadata [] - [Consumer clientId=consumer-test-26, groupId=test] Cluster ID: HDij23gxR_edwXhIDqE9ng
16:52:44,056 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-test-26, groupId=test] Discovered group coordinator ubuntu:9092 (id: 2147483647 rack: null)
16:52:44,075 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-test-26, groupId=test] Setting offset for partition mytesttopic-0 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=ubuntu:9092 (id: 0 rack: null), epoch=0}}
16> hello kafka
16> hello flink