SpringCloud-Stream实战入门(二,缓存架构技术

比如下面的代码中实现了一个只能转换User对象的MessageConverter底层使用的是FastJson,在进行发送消息时重置了user的name属性,加上了t-前缀。

SpringCloud-Stream实战入门(二,缓存架构技术

然后为了使它生效,我们需要把它定义为一个bean,并标注@StreamMessageConverter,比如下面这样。SpringCloud-Stream实战入门(二,缓存架构技术

如果在转换为JSON时不希望使用默认的Jackson实现,而希望使用Alibaba的FastJson也是可以的

FastJson已经提供了MessageConverter的实现类com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter。

所以如果希望使用FastJson的实现,只需要进行类似如下这样的定义。SpringCloud-Stream实战入门(二,缓存架构技术

异常处理

在接收消息时,如果消息处理失败,Spring Cloud会把失败的消息转到名为..errors的Channel,并可通过@ServiceActivator方法进行接收。比如有如下Binding定义。

spring.cloud.stream.bindings.input1.destination=test-topic1

spring.cloud.stream.bindings.input1.group=test-group1

当消息消费失败时将转发包装了失败原因的消息到名为test-topic1.test-group1.errors的Channel,我们可以通过在某个bean中定义一个@ServiceActivator方法处理相应的异常。

SpringCloud-Stream实战入门(二,缓存架构技术

上面介绍的方法是处理某个特定Binding的消息消费异常的,如果你的消息消费异常的处理方式都是一样的,你可能希望有一个统一的入口来处理所有的消息消费异常,而不用管当前的Binding是什么。

Spring Cloud Stream也考虑到了这一点,它提供了一个名为errorChannel的Binding,所有的消息消费异常都会转发到该Binding,所以如果我们想有一个统一的处理所有的消息消费异常的入口则可以定义一个Binding名为errorChannel的@StreamListener方法。SpringCloud-Stream实战入门(二,缓存架构技术

重试机制

Spring Cloud Stream在进行消息的接收处理时也是利用Spring Retry进行了包装的。当消息消费失败时默认会最多试3次(加上第一次),使用的是Spring Retry的RetryTemplate的默认配置。

如果默认的重试逻辑不能满足你的需求,你也可以定义自己的RetryTemplate,但是需要使用@StreamRetryTemplate进行标注(StreamRetryTemplate上标注了@Bean)。SpringCloud-Stream实战入门(二,缓存架构技术

上面的代码中就应用了自定义的RetryTemplate,指定最多尝试5次的消息消费尝试5次后仍然失败将走前面介绍的异常处理逻辑,即投递消息到相应的异常处理的Channel。

也可以通过配置的方式,最多尝试次数通过Binding的consumer.maxAttempts参数进行指定,如果需要指定名为input1的Binding在消费某条消息时最多允许尝试5次,则可以进行如下定义。如果将该属性值定义为1,则表示不允许进行重试。

spring.cloud.stream.bindings.input1.consumer.maxAttempts=5

定制消费者线程数

默认情况下,每个Binding对应的消费者线程数是1,可以通过spring.cloud.stream.bindings..consumer.concurrency属性进行指定,比如下面的配置就指定了名称为input1的Binding的消费者线程是3,**即Spring Cloud Stream将同时启动3个线程用于从名为input1的Bi

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

nding进行消费**。

spring.cloud.stream.bindings.input1.consumer.concurrency=3

边接收边发送

所谓的边接收边发送是指接收到消息经过处理后可以产生新的消息,然后允许通过配置指定新的消息的发送目的地。比如下面的代码就定义了从名为input的Binding接收消息,经过处理后再返回,然后经过方法上的**@SendTo指定返回的内容将发送到名为output的Binding**。

SpringCloud-Stream实战入门(二,缓存架构技术

RocketMQ的Tag特性

RocketMQ建议我们一个应用就使用一个Topic不同的消息类型通过Tag来区分。我们发送的消息都在header里面加入了消息对应的Tag

如果我们的某个Binding只希望接收某些Tag的消息,则可以通过spring.cloud.stream.rocketmq.bindings..consumer.tags属性来指定期望接收的Tag,多个Tag之间通过双竖线分隔

spring.cloud.stream.rocketmq.bindings.input1.consumer.tags=tag0||tag1

上面配置就指定了名为input1的Binding期望接收的消息的Tag是tag0或tag1

指定RocketMQ特性配置的属性前缀是spring.cloud.stream.rocketmq,如果是Binder的配置则后面可以接binder,如果是Binding的配置则后面接binding。

RocketMq广播方式

RocketMQ的消息消费有两种方式,CLUSTERING(集群)和BROADCASTING(广播)默认是CLUSTERINIG

CLUSTERING的意思是同一消费组的多个消费者共享同一消息队列,彼此分担压力

比如消息队列中有100条消息,当同时有3个相同消费者组的消费者按照CLUSTERING方式进行消息消费时,它们总的消息的消费数量是100,但是分摊到每个消费者的数量可能是40、30、30

BROADCASTING的意思是广播即可以理解为每个消费者都有唯一的消息队列与之对应。当消息队列中有100条消息时,如果有相同消费者组的3个消费者时,每个消费者都将完整的消费这100条消息

通过spring.cloud.stream.rocketmq.bindings..consumer.broadcasting=true指定该消费者将通过广播的方式进行消费。

spring.cloud.stream.rocketmq.bindings.input1.consumer.broadcasting=true

总结

关于RocketMq的特性还有顺序消息、死信队列以及4.x发布的事务消息;老顾之前的文章已经介绍了RocketMq事务;小伙伴们可以自行查看,有机会再给大家介绍顺序消息等。谢谢!!!

如果你的技术提升遇到瓶颈了,或者缺高级Android进阶视频学习提升自己,这有大量大厂面试题为你面试做准备!
点击Android 学习,面试文档,视频收集大整理获取

你的赞和关注是我继续创作的动力~

颈了,或者缺高级Android进阶视频学习提升自己,这有大量大厂面试题为你面试做准备!
点击Android 学习,面试文档,视频收集大整理获取

你的赞和关注是我继续创作的动力~

上一篇:解决org.apache.ibatis.binding.BindingException: Invalid bound statement (not found)问题


下一篇:Prometheus之blackbox exporter实现URL监控