springboot集成kafka

1.前置配置

  • pom

    <properties>
    	<maven.compiler.source>8</maven.compiler.source>
    	<maven.compiler.target>8</maven.compiler.target>
    </properties>
    
    <parent>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-parent</artifactId>
    	<version>2.1.5.RELEASE</version>
    </parent>
    
    <dependencies>
    	<dependency>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter</artifactId>
    	</dependency>
    	<dependency>
    		<groupId>org.springframework.kafka</groupId>
    		<artifactId>spring-kafka</artifactId>
    	</dependency>
    	<dependency>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-test</artifactId>
    		<scope>test</scope>
    	</dependency>
    </dependencies>
    
  • application.properties

    spring.kafka.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
    
    spring.kafka.producer.retries=5
    spring.kafka.producer.acks=all
    spring.kafka.producer.batch-size=16384
    spring.kafka.producer.buffer-memory=33554432
    spring.kafka.producer.transaction-id-prefix=transaction-id-
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.properties.enable.idempotence=true
    
    spring.kafka.consumer.group-id=group1
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=100
    spring.kafka.consumer.properties.isolation.level=read_committed
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    
  • 日志

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>%p %d{yyyy-MM-dd HH:mm:ss} - %m%n</pattern>
                <charset>UTF-8</charset>
            </encoder>
        </appender>
    
        <!-- 控制台输出日志级别 -->
        <root level="ERROR">
            <appender-ref ref="STDOUT" />
        </root>
    
        <logger name="org.springframework.kafka" level="INFO"  additivity="false">
            <appender-ref ref="STDOUT" />
        </logger>
    
        <!--事务控制-->
        <logger name="org.springframework.kafka.transaction" level="debug"  additivity="false">
            <appender-ref ref="STDOUT" />
        </logger>
    
    </configuration>
    

2.代码

  • 启动类
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.KafkaListeners;
import org.springframework.messaging.handler.annotation.SendTo;
import java.io.IOException;

/**
 * 程序启动入口
 */
@SpringBootApplication
@EnableKafka
public class KafkaSpringBootApplication {
    public static void main(String[] args) throws IOException {
        SpringApplication.run(KafkaSpringBootApplication.class, args);
        System.in.read();
    }

    /**
     * 消费者topic04,同时将消息发送给topic05
     * @param cr
     * @return
     */
    @KafkaListeners(value = {@KafkaListener(topics = {"topic04"})})
    @SendTo(value = {"topic05"})
    public String listenner(ConsumerRecord<?, ?> cr) {
        System.out.println("消费到数据:"+cr.value());
        return cr.value() + "demo";
    }
}
  • service类

    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    /**
     * 通过KafkaTemplate发送消息
     * Transactional可以通过这个配置使用kafka的相关事务
     */
    @Transactional
    @Service
    public class OrderService {
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        public void saveOrder(String id, Object message) {
            kafkaTemplate.send(new ProducerRecord("topic04", id, message));
        }
    }
    
  • 测试类

    import com.baizhi.KafkaSpringBootApplication;
    import com.baizhi.OrderService;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.core.KafkaOperations;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.test.context.junit4.SpringRunner;
    
    /**
     * 通过单元测试类来发送消息
     */
    @SpringBootTest(classes = {KafkaSpringBootApplication.class})
    @RunWith(SpringRunner.class)
    public class KafkaTempolateTests {
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        @Autowired
        private OrderService orderService;
    
        @Test
        public void testOrderService() {
            orderService.saveOrder("001", "诗和远方");
        }
    
        @Test
        public void testKafkaTemplate() {
            kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
                @Override
                public Object doInOperations(KafkaOperations kafkaOperations) {
                    return kafkaOperations.send(new ProducerRecord("topic04", "002", "this is a demo"));
                }
            });
        }
    }
    
上一篇:集成MyBatis


下一篇:非常简易的SpringBoot后台项目