Canal的数据结构
网上对Canal的介绍已经够多了,这里不再赘述,但是有一点必须要强调,就是Canal 是怎么对数据进行的封装,只有明白了这点,才可以去消费其中的数据
Canal的安装及配置
配置Mysql主服务器的my.cnf文件(位于/etc目录下,没有就新建)
#主服务器的id server-id=1 #启用二进制日志 log-bin=mysql-bin #设置不复制的数据库(选配) binlog-ignore-db=mysql #设置要复制的数据库(选配) binlog-do-db=需要复制的主数据库名字(设置一个之前没有的数据库) #设置logbin的格式 binlog_format=row
logbin格式有三种
statement 存储的是涉及到数据变化的sql语句,文件比较小,但是如果sql语句中有些特殊语句(比如随即值),就会导致数据不一致
row 记录的是数据改变后的数据,能保证数据严格一致,但是会使文件比较大
mixed 会动态调整使用statement和row
这里应用场景是对数据进行监控,所以使用row
重启Mysql服务
service mysql restart
进入Mysql,检查binlog是否生效
mysql> show variables like ‘log_%‘;
配置conf/canal.properties
基本所有配置都可以保持默认,但需要注意默认端口是11111,后续连接会用上
配置实例配置conf/example/instance.properties
#//这个id不能跟mysql中配置的id相同!!! canal.instance.mysql.slaveId=100 //mysql地址 canal.instance.master.address=hadoop102:3306 #连接数据库所需要的用户名和密码 canal.instance.dbUsername=root canal.instance.dbPassword=123 canal.instance.connectionCharset = UTF-8 canal.instance.defaultDatabaseName = # table regex //要监听的数据库,用正则表达式,这里表示gmall数据库中的所有表 canal.instance.filter.regex=gmall\\..* # table black regex //黑名单 canal.instance.filter.black.regex=
启动服务
bin/startup.sh
bin/stop.sh
从Canal中消费数据到kafka
添加依赖
<dependencies> <!--canal 客户端, 从 canal 服务器读取数据--> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <!-- kafka 客户端 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> </dependencies>
代码实现
import java.net.InetSocketAddress import java.util import com.alibaba.fastjson.JSONObject import com.alibaba.otter.canal.client.{CanalConnector, CanalConnectors} import com.alibaba.otter.canal.protocol.CanalEntry.{EntryType, EventType, RowChange} import com.alibaba.otter.canal.protocol.{CanalEntry, Message} import com.atguigu.gmall.common.Constant import com.google.protobuf.ByteString /** * Author atguigu * Date 2020/5/30 15:29 */ object CanalClient { // 真正的处理数据 def parseData(rowDataList: util.List[CanalEntry.RowData], tableName: String, eventType: CanalEntry.EventType): Unit = { // 计算订单总额 ,每在order_info表中插入一条数据就发送给kafka if(tableName == "order_info" && eventType == EventType.INSERT && rowDataList != null && rowDataList.size() > 0){ import scala.collection.JavaConversions._ for(rowData <- rowDataList){ val result: JSONObject = new JSONObject() // 一个rowData表示一行数据, 所有列组成一个json对象, 写入到Kafka中 val columnList: util.List[CanalEntry.Column] = rowData.getAfterColumnsList for(column <- columnList){ // column 列 val key: String = column.getName // 列名 val value: String = column.getValue // 列值 result.put(key, value) } // 把数据写入到kafka中. 用一个生产者 MykafkaUtil.send(Constant.ORDER_INFO_TOPIC, result.toJSONString) } } } def main(args: Array[String]): Unit = { // 1. 连接到canal服务器 // 1.1 canal服务器的地址 canal服务器的端口号 val address: InetSocketAddress = new InetSocketAddress("hadoop102", 11111) val connector: CanalConnector = { CanalConnectors.newSingleConnector(address, destination="example", username="", password="") } // 1.2 连接到canal connector.connect() // 2. 订阅你要处理的具体表 gmall1128下所有的表 connector.subscribe("gmall.*") // 3. 读取数据, 解析 while (true) { // 一致监听mysql数据变化, 所以这个地方不挺 // 100表示最多一次拉取由于100条sql导致的数据的变化 val msg: Message = connector.get(100) val entries: util.List[CanalEntry.Entry] = msg.getEntries if (entries != null && entries.size() > 0) { // 遍历拿到每个entry import scala.collection.JavaConversions._ for (entry <- entries) { // 处理的EntryType应该时刻RowData if (entry != null && entry.hasEntryType && entry.getEntryType == EntryType.ROWDATA) { // 获取storeValue. 每个entry一个 val storeValue: ByteString = entry.getStoreValue // 每个storeVales一个RowChange val rowChange: RowChange = RowChange.parseFrom(storeValue) // 每个rowChange中多个RowData. 一个RowData就表示一行数据 val rowDataList: util.List[CanalEntry.RowData] = rowChange.getRowDatasList //调用处理数据的方法,在这里对每行的数据进行真正的处理 parseData(rowDataList, entry.getHeader.getTableName, rowChange.getEventType) } } } else { println("没有拉倒数据, 2s之后继续拉....") Thread.sleep(2000) } } } }