kafka 整合 spring boot 与 avro

Spring boot 基于 1.5.9版本

1. maven 添加依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.3.9.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.9.0</version>
        </dependency>

        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>5.2.1</version>
        </dependency>

 

2. maven 添加 plugin

            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.9.0</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>avsc问价路径</sourceDirectory>
                            <outputDirectory>avro代码生成路径</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

3. application.yml 添加 kafka配置

spring:
    kafka:
        bootstrap-servers: xxx
        producer:
            key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        consumer:
            group-id: test
            key-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
        properties:
            schema.registry.url: xxx
            security.protocol: SASL_PLAINTEXT
            sasl:
                mechanism: PLAIN
                jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";
        template:
            default-topic: xxx

4. Java Producer

@Component
public class Producer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void send(AvroRecord record) {
        if (Objects.isNull(record)) {
            return;
        }
        LOGGER.info(record.toString());
        kafkaTemplate.sendDefault("key", record);
    }
}

 

上一篇:爱浦路IPLOOK成为中国通信标准化协会的全权会员


下一篇:java-Spring Boot Embedded Kafka无法连接