Kafka生产消费应用

Kafka

消息中间件产生背景

在客户端与服务器进行通讯时.客户端调用后,必须等待服务对象完成处理返回结果才能继续执行。

这样会引发很多的问题:

  1. 客户与服务器对象的生命周期紧密耦合,客户进程和服务对象进程都都必须正常运行;

  2. 如果由于服务对象崩溃或者网络故障导致用户的请求不可达,客户会受到异常。

为了解决这样的问题,消息中间件技术应运而生。

面向消息的中间件(MessageOrlented MiddlewareMOM)较好的解决了以上问题。发送者将消息发送给消息服务器,消息服务器将消感存放在若千队列中,在合适的时候再将消息转发给接收者。

这种模式下,好处有很多:

  1. 发送和接收是异步的,发送者无需等待;

  2. 二者的生命周期未必相同: 发送消息的时候接收者不一定运行,接收消息的时候发送者也不一定运行;

  3. 一对多通信: 对于一个消息可以有多个接收者。

使用

引入基本jar
`<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>2.2.0</version>
        </dependency>
</dependencies>
相关配置
kafka:
    bootstrap-servers: bl-kafka-001:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 0
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: latest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false
消息队列服务接口
package com.fawvw.ftb3.order.infrastructure.integration.kafka;

/**
 * @program: order-service2
 * @description: 消息队列服务接口
 * @author: wig
 * @create: 2021-06-29 09:50
 **/
public interface MessageQueueService {

    /**
     * 发送消息到消息队列
     *
     * @param message 消息
     * @param topic topic
     */
    void sendText(String message, String topic);
}
实现消息队列服务接口
package com.fawvw.ftb3.order.infrastructure.integration.kafka.impl;

import com.fawvw.ftb3.order.infrastructure.integration.kafka.MessageQueueService;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;

/**
 * @program: order-service
 * @description: 实现消息队列服务接口
 * @author: chenglin.zhou
 * @create: 2021-06-29 10:23
 **/
@Slf4j
@Component
@RequiredArgsConstructor
public class MessageQueueServiceImpl implements MessageQueueService {

    private static final Integer KAFKA_TIME = 10;

    private final KafkaTemplate<String, String> template;

    @Override
    public void sendText(String message, String topic) {
        try {
            SendResult<String, String> result = template.send(topic, message).get(KAFKA_TIME, TimeUnit.SECONDS);
            log.info("send kafka success topic:{} result:{}", topic, result);
        } catch (Exception e) {
            log.error("send kafka failed message:{} topic:{}", message, topic, e);
            throw new RuntimeException(e);
        }
    }
}
kafka处理逻辑
package com.fawvw.ftb3.payment.facade.impl.bus.kafka;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;

/**
 * @author wig
 * @Type KafkaMessageListener
 * @Desc kafka处理逻辑
 * @date 2021/07/23 11:41
 */

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaMessageListener {

    private final KafkaTopicHandlerManager kafkaTopicHandlerService;

    private final ThreadPoolTaskExecutor threadPoolTaskExecutor = ThreadPoolGenerator.getDefaultThreadPoolTaskExecutor();

    @KafkaListener(topics = "#{'${kafka.topic}'.split(',')}")
    public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) {
        acknowledgment.acknowledge();
        if (CollectionUtils.isNotEmpty(records)) {
            records.forEach(record -> {
                String topic = record.topic();
                KafkaTopicHandler handler = kafkaTopicHandlerService.getHandlerByTopic(topic);
                if (Objects.nonNull(handler)) {
                    try {
                        String message = new String(record.value().getBytes(), StandardCharsets.UTF_8.name());
                        threadPoolTaskExecutor.submit(() -> execute(message, handler));
                    } catch (IOException exception) {
                        log.error("kafka message transfer to String failed. ", exception);
                    }
                } else {
                    log.error("undefined handler topic:{}", topic);
                }
            });
        }
    }

    private void execute(String message, KafkaTopicHandler handler) {
        try {
            handler.execute(message);
        } catch (Exception e) {
            log.error("handler topic:{} hand message:{} failed", handler.getTopic(), message, e);
        }
    }
}
根据不同的topic处理消息
package com.fawvw.ftb3.payment.facade.impl.bus.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * @author wig
 * @date 2020-11-19 10:59
 */
@Service
public class KafkaTopicHandlerManager {

    private final Map<String, KafkaTopicHandler> handlerMap;

    @Autowired
    public KafkaTopicHandlerManager(List<KafkaTopicHandler> kafkaTopicHandlers) {
        handlerMap = kafkaTopicHandlers.stream()
            .collect(Collectors.toMap(KafkaTopicHandler::getTopic,
                Function.identity()));
    }

    public KafkaTopicHandler getHandlerByTopic(String topic) {
        return handlerMap.get(topic);
    }
}
topic接受配置
package com.fawvw.ftb3.payment.infrastructure.config.topicconfig;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;

/**
 * @program: payment-service
 * @description: 订单分发部分逻辑配置
 * @author: wig
 * @create: 2021-07-23 10:23
 **/
@Component
@Data
@ConfigurationProperties(prefix = "kafka.topics")
@Slf4j
@RefreshScope
public class PaymentKafkaTopicConfig {

    /**
     * 支付结果回调异步处理MQ domain->payment 或者payment->payment(支付宝、微信、一汽财司)
     */
    private String paymentResultCallbackTopic;

    /**
     * 支付结果通知MQ payment->order
     */
    private String paymentResultNotifyTopic;

    /**
     * 退款结果回调处理MQ domain->payment
     */
    private String payRefundResultCallbackTopic;

    /**
     * 退款结果通知回调MQ payment->order
     */
    private String payRefundNotifyTopic;

    /**
     * 申请退款成功通知MQ  payment-> order
     */
    private String payRefundingNotifyTopic;

}
使用
package com.fawvw.ftb3.order.biz.service.impl.bus.kafka.topic;


/**
 * @author yi.luo
 * @date 2020-11-19 10:59
 */
public interface KafkaTopicHandler {

    /**
     * Handler处理的topic
     *
     * @return String topic
     */
    String getTopic();

    /**
     * 执行处理逻辑
     *
     * @param message 待处理消息
     */
    void execute(String message);

}
handle实现
package com.fawvw.ftb3.order.biz.service.impl.bus.kafka.topic.impl;

import cn.hutool.json.JSONUtil;
import com.fawvw.ftb3.order.biz.service.OrderStateMachineService;
import com.fawvw.ftb3.order.biz.service.impl.bus.kafka.topic.KafkaTopicHandler;
import com.fawvw.ftb3.order.biz.service.impl.config.OrderKafkaTopicConfig;
import com.fawvw.ftb3.order.biz.service.model.PayRefundingNotifyMsg;
import com.fawvw.ftb3.order.domain.service.BaseOrderService;
import com.fawvw.ftb3.order.infrastructure.dao.model.OrderMaster;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;

import java.util.Objects;

/**
 * 退款通知处理器
 *
 * @author wenLong.wu 2021年8月27日16:09:34
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class PayRefundingNotifyTopicHandler implements KafkaTopicHandler {

    private final OrderKafkaTopicConfig orderKafkaTopicConfig;
    private final OrderStateMachineService orderStateMachineService;
    private final BaseOrderService baseOrderService;

    @Override
    public String getTopic() {
        return orderKafkaTopicConfig.getPayRefundingNoticeTopic();
    }

    @Override
    public void execute(String message) {
        PayRefundingNotifyMsg msg = JSONUtil.toBean(message, PayRefundingNotifyMsg.class);
        if (!checkMessage(msg)) {
            log.error("[Refunding-Notice] message check failed. {}", message);
            return;
        }
        OrderMaster orderMaster = baseOrderService.queryOrderByOrderNo(msg.getOrderNo());
        if (Objects.isNull(orderMaster)) {
            log.error("[Refunding-Notice] order not found. orderNo:{}", msg.getOrderNo());
            return;
        }
        orderMaster.setRefundReason(msg.getRefundReason());
        orderStateMachineService.userRefund(orderMaster);
    }

    private boolean checkMessage(PayRefundingNotifyMsg msg) {
        if (StringUtils.isBlank(msg.getOrderNo())) {
            log.error("[Refunding-Notice] orderNo is blank. message:{}", msg);
            return false;
        }
        return true;
    }
}

原始案例

生产kafka
  • package com.kafkaTwo;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    import java.util.Properties;
    
    
    /**
    
     * kafka下载和安装  http://mirrors.hust.edu.cn/apache/kafka/0.9.0.0/
    
     * window 启动:
        bin\windows\zookeeper-server-start.bat config\zookeeper.properties
        bin\windows\kafka-server-start.bat config\server.properties
        */
       public class KafkaProducer
       {
       private final Producer<String, String> producer;
       public final static String TOPIC = "TEST-TOPIC";
    
       private KafkaProducer(){
           Properties props = new Properties();
           //此处配置的是kafka的端口
           props.put("metadata.broker.list", "127.0.0.1:9092");
    
       //配置value的序列化类
       props.put("serializer.class", "kafka.serializer.StringEncoder");
       //配置key的序列化类
       props.put("key.serializer.class", "kafka.serializer.StringEncoder");
    
       props.put("request.required.acks","-1");
    
       producer = new Producer<String, String>(new ProducerConfig(props));
    
       }
    
       void produce() {
           int messageNo = 100;
           final int COUNT = 10000000;
    
       while (messageNo < COUNT) {
           String key = String.valueOf(messageNo);
           String data = "hello kafka message " + key;
           producer.send(new KeyedMessage<String, String>(TOPIC, key ,data));
           System.out.println(data);
           messageNo ++;
       }
    
       }
      
       public static void main( String[] args )
       {
           new KafkaProducer().produce();
       }
       }
    
    消费kafka
package com.kafkaTwo;


import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**

 * kafka下载和安装  http://mirrors.hust.edu.cn/apache/kafka/0.9.0.0/

 * http://czj4451.iteye.com/blog/2041096

 * window 启动:
    bin\windows\zookeeper-server-start.bat config\zookeeper.properties
    bin\windows\kafka-server-start.bat config\server.properties
    */
   public class KafkaConsumer {

   private final ConsumerConnector consumer;

   private KafkaConsumer() {
       Properties props = new Properties();
       //zookeeper 配置
       props.put("zookeeper.connect", "127.0.0.1:2181");

   //group 代表一个消费组
   props.put("group.id", "jd-group");

   //zk连接超时
   props.put("zookeeper.session.timeout.ms", "4000");
   props.put("zookeeper.sync.time.ms", "200");
   props.put("auto.commit.interval.ms", "1000");
   props.put("auto.offset.reset", "smallest");
   //序列化类
   props.put("serializer.class", "kafka.serializer.StringEncoder");

   ConsumerConfig config = new ConsumerConfig(props);

   consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);

   }

   void consume() {
       Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
       topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));

   StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
   StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

   Map<String, List<KafkaStream<String, String>>> consumerMap =
           consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
   KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0);
   ConsumerIterator<String, String> it = stream.iterator();
   while (it.hasNext())
   {
       System.out.println(it.next().message());
   }
   System.out.println("finish");

}

//http://www.open-open.com/lib/view/open1412991579999.html
public static void main(String[] args) {
    new KafkaConsumer().consume();
}

}
上一篇:dubbo工作原理


下一篇:Kafka复习(一):基本概念、生产者、消费者