上一期讲到了通过canal订阅mysql的binlog日志并且转换为对象,那么这一次我们将订阅来的对象通过RocketMQ发送消息,接收方接受消息之后同时存储到其他类型的数据源当中,完成一个简单的数据异构的过程。
什么是Java消息服务?
两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持JAVA应用程序开发。
在J2EE中,当两个应用程序使用JMS进行通信时,它们之间并不是直接相连的,而是通过一个共同的消息收发服务连接起来,可以达到解耦的效果,我们将会在接下来的教程中详细介绍。
jms的消息传送模型
常见的消息传送模型有以下两种:
点对点消息传送模型
在点对点消息传送模型中,应用程序由消息队列,发送者,接收者组成。每一个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息(除了被接收者消费掉的和过期的消息)。如下图所示:
发布订阅消息传送模型
在发布订阅模型中,消费者需要订阅相关的topic才能接收到生产者的信息。生产者会将信息传输到topic中,然后消费者只需要从topic中获取数据即可。如下图所示:
RocketMQ消息队列使用
这次使用的消息中间件为RocketMQ的使用场景。RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,并于2016年11月成为 Apache 孵化项目。
RocketMQ在使用之前,需要我们引入相关的依赖配置:
<!-- 整合RocketMq --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>${rocketmq.version}</version> </dependency>
关于RocketMQ的安装在这里就不做过多的讲解了。
通过mq的方式来进行数据异构通常是比较简单的方案,首先我们需要在项目里面独立一个模块专门用于监听mysql的binlog日志,这个模块我暂且称之为datahandle-core模块
整个工程采用了springboot的结构来构建,主要的核心也是在core工程中。
首先是监听canal的日志状态模块了,采用了上一节中讲解到的客户端代码进行数据监听,并且将其转换为对象然后发送往mq中:
package com.sise.datahandle.core; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import static com.sise.datahandle.constants.CanalConstants.*; /** * @author idea * @date 2019/10/20 */ @Component @Slf4j public class CanalListener implements CommandLineRunner { @Autowired private CanalClient canalClient; @Override public void run(String... args) throws Exception { log.info("=============canal监听器开启==============="); CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress(SERVER_ADDRESS, PORT), DESTINATION, USERNAME, PASSWORD); canalConnector.connect(); canalConnector.subscribe(".*\\..*"); canalConnector.rollback(); for (; ; ) { Message message = canalConnector.getWithoutAck(100); long batchId = message.getId(); if (batchId != -1) { canalClient.entityHandle(message.getEntries()); } } } }
ps:这里面的CanalClient代码主要来自上一篇的canal客户端代码,文末会有完整项目代码链接,需要的读者可以前往查看。
在CanalClient里面,有一个函数是专门用于处理将订阅的数据发送到mq消息队列中:
package com.sise.datahandle.core; import com.alibaba.fastjson.JSON; import com.alibaba.otter.canal.protocol.CanalEntry; import com.google.protobuf.InvalidProtocolBufferException; import com.sise.datahandle.handler.CanalDataHandler; import com.sise.datahandle.model.TypeDTO; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; /** * canal监听客户端变化 * * @author idea * @date 2019/10/12 */ @Slf4j @Service public class CanalClient { @Autowired private DefaultMQProducer rocketMqProducer; /** * 处理binlog日志的监听 * * @param entries */ public void entityHandle(List<CanalEntry.Entry> entries) { for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) { continue; } try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { switch (rowChange.getEventType()) { case INSERT: String tableName = entry.getHeader().getTableName(); //测试选用t_type这张表进行映射处理 if ("t_type".equals(tableName)) { TypeDTO typeDTO = CanalDataHandler.convertToBean(rowData.getAfterColumnsList(), TypeDTO.class); org.apache.rocketmq.common.message.Message message = new org.apache.rocketmq.common.message.Message(); message.setTopic("canal-test-topic"); message.setTags("canal-test-tag"); String json = JSON.toJSONString(typeDTO); message.setBody(json.getBytes()); SendResult sendResult = rocketMqProducer.send(message); log.info("[mq消息发送结果]----" + sendResult); } break; default: break; } } } catch (InvalidProtocolBufferException e) { log.error("[CanalClient]监听数据过程出现异常,异常信息为{}", e); } catch (InterruptedException | RemotingException | MQClientException | MQBrokerException e) { log.error("[CanalClient] mq发送信息出现异常:{}", e); } } } }
这里面主要是监听binlog记录为插入数据事件的时候做发送mq操作。
接下来便是常见的mq配置了,本工程主要是一个模拟的简单案例,因此我将consumer和producer都放在了一起方便测试。
通过springboot自身的properties文件对mq进行参数初始化配置之后便可以构建一个基本的consumer和producer了。这里我们拿一个TypeDto类来进行树异构的测试,consumer端的核心代码为:
package com.sise.datahandle.mq.rocketmq.consumer; import com.sise.datahandle.model.TypeDTO; import com.sise.datahandle.mq.rocketmq.producer.RocketMqMsgHandle; import com.sise.datahandle.redis.RedisService; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.List; /** * @author idea * @date 2019/10/20 */ @Component @Slf4j public class RocketMqConsumeMsgListenerProcessor implements MessageListenerConcurrently { @Autowired private RedisService redisService; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { if(CollectionUtils.isEmpty(msgs)){ log.info("接受到的消息为空,不处理,直接返回成功"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } MessageExt messageExt = msgs.get(0); System.out.println("接受到的消息为:"+messageExt.toString()); if("canal-test-topic".equals(messageExt.getTopic())){ if("canal-test-tag".equals(messageExt.getTags())){ int reconsume = messageExt.getReconsumeTimes(); if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } TypeDTO typeDTO = RocketMqMsgHandle.parseMessage(messageExt,TypeDTO.class); //存储进入redis中 redisService.setObject("typeDTO-"+System.currentTimeMillis(),typeDTO); } } // 如果没有return success ,consumer会重新消费该消息,直到return success return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
通过订阅mq的信息,读取相关的数据再次写入到redis里面,完成一个简单过程的数据异构。
整个迷你工程写下来,比较核心的地方就在于对binlog日志的解析器部分,如何将日志订阅之后转换为相应的对象进行处理。
通常采用mq的方式进行数据异构会相对简单,实际上是在监听binlog为写DB的同时去写一次MQ,但是这种方式不能够保证数据一致性,就是不能保证跨资源的事务。注:调用第三方远程RPC的操作一定不要放到事务中。
完整案例的代码链接如下(点击阅读原文直达):