本文主要分享下Spring Boot和Spring Kafka如何配置整合,实现发送和接收来自Spring Kafka的消息。
项目结构
pom依赖包
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.java.zy</groupId> <artifactId>base-kafka</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.2.RELEASE</version> <relativePath/> </parent> <dependencies> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--引入kafak和spring整合的jar--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.8.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>2.2.8.RELEASE</version> <scope>test</scope> </dependency> </dependencies> </project>
Springboot+kafka对应表
JAVA代码
定义数据传输对象
package com.java.common.entity; import org.springframework.stereotype.Component; @Component public class UserLog { private String username; private String userid; private String state; public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getUserid() { return userid; } public void setUserid(String userid) { this.userid = userid; } public String getState() { return state; } public void setState(String state) { this.state = state; } }
定义发送信息实体 ( 消息的发送直接使用KafkaTemplate模板即可,都封装好了,直接使用 )
package com.java.common.controller; import com.alibaba.fastjson.JSON; import com.java.common.entity.UserLog; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Component public class UserLogProducer { @Autowired private KafkaTemplate kafkaTemplate; /** * 发送数据 * @param userid */ public void sendLog(String userid){ UserLog userLog = new UserLog(); userLog.setUsername("jhp"); userLog.setUserid(userid); userLog.setState("0"); System.err.println("发送用户日志数据:"+userLog); kafkaTemplate.send("user-log", JSON.toJSONString(userLog)); } }
获取消息的消费者实体
注意:消费机制是通过监听器实现的,直接使用@KafkaListener(topics = {"user-log"})注解,根据指定的条件进行消息的监听
package com.java.common.controller; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; @Component public class UserLogConsumer { @KafkaListener(topics = {"user-log"}) public void consumer(ConsumerRecord<?,?> consumerRecord){ //判断是否为null Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value()); System.out.println(">>>>>>>>>> record = " + kafkaMessage); if(kafkaMessage.isPresent()){ //得到Optional实例中的值 Object message = kafkaMessage.get(); System.err.println("消费消息:"+message); } } }
Springboot应用启动类(方便测试,在容器初始化的就开始模拟消息的发送)
package com.java.common; import com.java.common.controller.UserLogProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import javax.annotation.PostConstruct; @SpringBootApplication public class KafkaApplication { @Autowired private UserLogProducer kafkaSender; @PostConstruct public void init(){ for (int i = 0; i < 10; i++) { //调用消息发送类中的消息发送方法 kafkaSender.sendLog(String.valueOf(i)); } } public static void main(String[] args) { SpringApplication.run(KafkaApplication.class,args); } }
对应的application.properties配置文件
spring.application.name=base-kafka server.port=8080 #============== kafka =================== # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-servers=localhost:9092,192.168.100.1:9093 #=============== provider ======================= spring.kafka.producer.retries=0 # 每次批量发送消息的数量 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 # 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= # 指定默认消费者group id spring.kafka.consumer.group-id=user-log-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 # 指定消息key和消息体的编解码方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
对应的application.yml配置文件
server: port: 8080 spring: application: name: base-kafka #============== kafka =================== # 指定kafka 代理地址,可以多个 kafka: bootstrap-servers: 192.168.200.27:19092 #=============== provider ======================= producer: retries: 0 # 每次批量发送消息的数量 batch-size: 16384 buffer-memory: 33554432 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= consumer: # 指定默认消费者group id(消息组) group-id: user-log auto-offset-reset: earliest enable-auto-commit: true auto-commit-interval: 100 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
测试结果
启动项目,观察控制台打印的消息
和
springboot和kafka成功整合!
文章转载至:https://blog.csdn.net/qq_18603599/article/details/81169488