kafka整合springMVC(消费者)

kafka整合springMVC(消费者)

由于公司项目是mvc的,所以不得不用mvc进行整合,mvc的xml配置属实头疼,头疼也得开始搞。

开始之前,需要先在虚拟机安装环境,网上教程很多,我就不介绍了,但是有几个配置文件需要注意,如下:

kafka的配置文件server.properties,大约三十多行的地方,两个地址,需要填你自己的ip地址(我的虚拟机地址)
kafka整合springMVC(消费者)
还有一个consumer.properties中的,一个consumer group.id
kafka整合springMVC(消费者)
还有一个topic信息,网上大多使用的是如下命令查看所有的topic信息,我的不知道为啥不行,我在zookeeper中进行查看

./kafka-topics.sh --list --zookeeper 192.168.153.134:2181

进入zookeeper安装bin目录,执行如下命令

./zkCli.sh

之后进入命令行

ls /brokers/topics

其中的test,test1是我自己新建的topic
kafka整合springMVC(消费者)

项 目部分

首先是pom文件
添加如下依赖

		<!-- kafka客户端支持包 -->
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.11.0.1</version>
		</dependency>
		<!-- spring支持kafka -->
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>2.0.4.RELEASE</version>
		</dependency>

新建配置文件,application-kafka.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:jee="http://www.springframework.org/schema/jee"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:task="http://www.springframework.org/schema/task"
       xmlns:mvc="http://www.springframework.org/schema/mvc"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.0.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.1.xsd
        http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-2.1.xsd
        ">

    <!--kafka配置参数-->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!--Kafka服务地址,集群环境时地址使用","隔开-->
                <!--就是上边让填写的listen地址-->
                <entry key="bootstrap.servers" value="192.168.153.134:9092" />
                <!--kafka组id-->
                <entry key="group.id" value="test-consumer-group"></entry>
                <entry key="enable.auto.commit" value="true" />
                <entry key="session.timeout.ms" value="15000 " />
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
            </map>
        </constructor-arg>
    </bean>

    <!--消费者消费kafka信息的java类-->
    <bean id="messageListernerConsumerService" class="你的类实现类路径" />
    <!--第二监听类,用于监听多个topic-->
    <bean id="messageListernerConsumerService2" class="你的类实现类路径" />
    
    <!-- 创建consumerFactory bean,加载上边的配置信息-->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties"/>
        </constructor-arg>
    </bean>
    <!-- test是指定的第一个topic,和相应的监听类进行绑定-->
    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg value="test"/>
        <property name="messageListener" ref="messageListernerConsumerService"/>
    </bean>
     <!-- test1是指定的第二个topic,和相应的监听类进行绑定-->
    <bean id="containerProperties2" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg value="test1"/>
        <property name="messageListener" ref="messageListernerConsumerService2"/>
    </bean>
     <!-- 绑定consumerFactory和containerProperties-->
    <bean id="messageListenerContainer1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg ref="containerProperties"/>
    </bean>
    <bean id="messageListenerContainer2" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg ref="containerProperties2"/>
    </bean>
</beans>

上边配置文件中指定了两个监听tpoic的监听类

	<!--消费者消费kafka信息的java类-->
    <bean id="messageListernerConsumerService" class="XXX.XXX.XXX.KafkaConsumerListener" />
    <!--第二监听类,用于监听多个topic-->
    <bean id="messageListernerConsumerService2" class="XXX.XXX.XXX.KafkaConsumerListener2" />

监听类如下:
继承了MessageListener,需要实现其中的onMessage方法,只实现一个,如果有第二个服务订阅需求可以直接使用

public class KafkaConsumerListener implements MessageListener<Integer, String> {
    @Override
    public void onMessage(ConsumerRecord<Integer, String> integerStringConsumerRecord) {
        System.err.println("=============");
        System.err.println(integerStringConsumerRecord.value());
    }
}

我本地的是Linux环境,启动zookeeper,启动kafka
打开kafka消息提供者控制台,准备推送消息
这个ip地址就是你在配置文件中填的地址,–topic后边就是kafka的topic

./kafka-console-producer.sh --broker-list 192.168.153.134:9092 --topic test

kafka整合springMVC(消费者)
输入你好,回车后返回控制台可以查看到信息
kafka整合springMVC(消费者)
基础整合完毕

上一篇:jquery遍历数组的方式


下一篇:【Vue3】app.config.globalProperties如何创建一个储存全局变量的文件