Apache Kafka是开源分布式高并发消息中间件,支持每秒百万级消息并发,在互联网高并发架构:双11、电商秒杀抢购、网络直播、IOT大数据采集、聊天App、导航等高并发架构中大量使用。本节课程一起Java Spring Boot2.0实战Kafka并深入其架构原理。
Linux Ubuntu 18.04系统上安装最新的Kafka 2.12版本可以参考文章https://yq.aliyun.com/articles/690818
1、Kafka百万级高并发消息中间件
Kafka是LinkedIn公司开源的分布式消息平台。
使用Scala语言开发, 2011开源,现在属于Apache基金会。基于Pull模式处理消息,也支持消息推送,追求高吞吐量,百万级高并发
不支持AMQP协议,可以选择性支持事务。
可以和大数据中间件ES、Cloudera、Storm、Spark都支持与Kafka集成。
开始设计的目的就是用于日志收集和传输,利用了磁盘顺序读写。
从0.8版本开始支持主从复制Replication,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网场景。
http://kafka.apache.org/
2、Kafka分布式消息架构
Kafka原始支持分布式架构,消息支持分区模式存储,可以使用JSON、二进制等多种序列化格式。
一个Topic可以包含多个分区,一个分区可以对应多个消息接受者。
3、Spring for Apache Kafka
Spring for Apache Kafka(spring-kafka)。可以简化开发Java Kafka消息中间件开发。
他提供“KafkaTemplate”高级抽象模板类。 封装对于Kafka的消息驱动的POJO操作接口。 此外为了方便处理消息还提供@KafkaListener注解和“侦听器容器”方便处理消息,将核心Spring风格扩展到Kafka消息中间件开发。 简化自动化配置、依赖注入和声明的使用。Spring JMS和Spring AMQP的RabbitMQ接口统一风格。Spring for Apache Kafka 基于java kafka-clients jar封装
允许我们自定义扩展Kafka配置。
4、Spring for Apache Kafka新特性
Spring for Apache Kafka提供KafkaTemplate模板类。
此外还提供KafkaMessageListenerContainer容器。
为了方便处理消息,提供@KafkaListener注解。
此外事务支持也很方便,提供KafkaTransactionManager事务管理器。也可以方便对Kafka做单元测试,spring-kafka-test jar with embedded kafka server
5、Linux安装Kafka
使用JavaSpring Boot 2.1.3进行Kafka开发,注意驱动和Kafka的版本关系。
另外要提前安装配置好Kafka,可以选择Linux环境安装Kafka。我们使用的是2.12版本,下载地址是:
http://kafka.apache.org/downloads.html
6、Spring Boot 2.0 实战Kafka
使用JavaSpring Boot 2.1.3进行Kafka开发,新建Spring Boot项目,
然后修改POM文件添加依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
接下来编写Kafka的Java发送器类KafkaMessageSender ,负责发送消息。使用的核心对象就是KafkaTemplate。
public class KafkaMessageSender {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
kafkaTemplate.send(topic, payload);
System.out.println("发送到主题:" + topic + "消息:"+payload);
}
}
接下来编写消息接收者类KafkaMessageReceiver
public class KafkaMessageReceiver {
@KafkaListener(topics = "Java")
public void receiveTopic1(ConsumerRecord<?, ?> consumerRecord) {
System.out.println("接收java主题消息: "+consumerRecord.toString());
}
@KafkaListener(topics = "frankxulei-test")
public void receiveTopic2(ConsumerRecord<?, ?> consumerRecord) {
System.out.println("接收frankxulei-test主题消息: "+consumerRecord.toString());
}
}
测试控制器的代码,调用发送者发送100万消息
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
KafkaMessageSender kafkaMessageSender;
private String topicName ="Java";
private String message = "Java Spring Boot 2.0 Kafka Alibaba:";
@RequestMapping("/send")
public String sendMessageToKafkaTopic() {
System.out.println("Java Spring Boot 2.0 发送100万消息到Kafka!");
for (int i = 0; i < 1000000; i++) {
kafkaMessageSender.send(topicName, message+i);
}
System.out.println("成功 发送100万消息!");
return "发送100万消息到Kafka Sussessfully sent Message to Kafka";
}
}
启动Spring Boot,输入接口,发送100万消息,观察日志接收消息信息
7、视频课程
视频地址:https://yq.aliyun.com/live/868
PPT地址:https://yq.aliyun.com/live/868
8、阿里巴巴Java群超过2900人
直播地址:Java技术进阶群
进群方式:钉钉扫码入群
阿里巴巴MongoDB群