mongoDB CDC to Doris

MongoDB CDC to Doris

目标

mongodb --flink cdc--> kafka --> doris

一、环境准备

1. 版本

  • ubuntu 20.04
  • mongodb 4.2
  • doris 0.14
  • flink 1.13.3
  • kafka 2.4.1
  • flink-sql-connector-mongodb-cdc-2.1.0.jar
ll ./lib                                                                                                                                                            
-rw-r--r-- 1 hadoop hadoop    249567 Oct 12 17:23 flink-connector-jdbc_2.11-1.13.3.jar
-rw-r--r-- 1 hadoop hadoop     92313 Oct 13 00:15 flink-csv-1.13.3.jar
-rw-r--r-- 1 hadoop hadoop 115418686 Oct 13 00:29 flink-dist_2.11-1.13.3.jar
-rw-r--r-- 1 hadoop hadoop    148127 Oct 13 00:13 flink-json-1.13.3.jar
-rwxrwxrwx 1 hadoop hadoop   7709740 Jun  8 21:13 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 hadoop hadoop   3674114 Oct 12 17:35 flink-sql-connector-kafka_2.11-1.13.3.jar
-rw-r--r-- 1 hadoop hadoop  15296786 Nov 15 14:30 flink-sql-connector-mongodb-cdc-2.1.0.jar
-rw-r--r-- 1 hadoop hadoop  19648146 Nov 15 14:30 flink-sql-connector-mysql-cdc-2.1.0.jar
-rw-r--r-- 1 hadoop hadoop  36453353 Oct 13 00:25 flink-table_2.11-1.13.3.jar
-rw-r--r-- 1 hadoop hadoop  41061738 Oct 13 00:26 flink-table-blink_2.11-1.13.3.jar
-rwxrwxrwx 1 hadoop hadoop     67114 Mar 31  2021 log4j-1.2-api-2.12.1.jar
-rwxrwxrwx 1 hadoop hadoop    276771 Mar 31  2021 log4j-api-2.12.1.jar
-rwxrwxrwx 1 hadoop hadoop   1674433 Mar 31  2021 log4j-core-2.12.1.jar
-rwxrwxrwx 1 hadoop hadoop     23518 Mar 31  2021 log4j-slf4j-impl-2.12.1.jar
-rw-r--r-- 1 hadoop hadoop   2475087 Sep 29 02:17 mysql-connector-java-8.0.27.jar

二、 mongo 数据写入kafka

1. mongodb 准备初始数据

db.checkresult.insertMany
(
[
    {
        "_id": ObjectId("7159a93265b6c375acfa279h"),
        "userid": NumberLong(112480487),
        "mid": "14029647849143177307",
        "tag": [
            "mnt_seven"
        ],
        "checkcontent": [
            {
                "name": "14029647849143177303_112480480_7",
                "type": 1,
                "data": "http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92aa.jpg"
            }
        ],
        "createat": NumberLong(1633265977)
    },
    {
        "_id": ObjectId("8159a93265b6c375acfa2710h"),
        "userid": NumberLong(112480488),
        "mid": "14029647849143177303",
        "tag": [
            "mnt_eight"
        ],
        "checkcontent": [
            {
                "name": "14029647849143177303_112480480_8",
                "type": 2,
                "data": "http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92bb.jpg"
            }
        ],
        "createat": NumberLong(1633265978)
    },    
		{
        "_id": ObjectId("9059a93265b6c375acfa279h"),
        "userid": NumberLong(112480489),
        "mid": "14029647849143177309",
        "tag": [
            "mnt_nigh"
        ],
        "checkcontent": [
            {
                "name": "14029647849143177303_112480480_9",
                "type": 3,
                "data": "http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92cc.jpg"
            }
        ],
        "createat": NumberLong(1633265979)
    }	
]	
)

-- flink sql
# create source mongodb

CREATE TABLE mongo_checkresult (
   _id STRING,                      //must be declared
   userid BIGINT,
   mid STRING,
   tag ARRAY<STRING>,
   checkcontent ARRAY<ROW<name STRING,type INT,data STRING>>, // embedded document
   createat BIGINT,
   PRIMARY KEY (_id) NOT ENFORCED
 ) WITH (
   'connector' = 'mongodb-cdc',
   'hosts' = '192.168.3.121:27017',
   'username' = 'mongouser',
   'password' = 'mongodbpwd',
   'database' = 'xxx',
   'collection' = 'checkresult'
 );


# create sink kafka ,嵌套的字段会展开
## connector 必须用upsert-kafka,因为cdc定义的是upsert的数据

CREATE TABLE kafka_checkresult (
   _id STRING,
   userid BIGINT,
   mid STRING,
   tag ARRAY<STRING>,
   check_name STRING,
   check_type INT,
   check_data STRING,
   createat BIGINT,
  PRIMARY KEY (_id) NOT ENFORCED   
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'checkresult',
  'properties.bootstrap.servers' = '192.168.3.124:9092',
  'key.format' = 'json',  
  'value.format' = 'json'
)

# execute sink task

insert into kafka_checkresult
select 
_id,
userid,
mid,
tag,
checkcontent[1].name,
checkcontent[1].type,
checkcontent[1].data,
createat
from mongo_checkresult

3、测试结果,数据已写入kafka

# 观察修改是否也产生消息,upsert-kafka 会对消息做upsert及delete 操作,select * from kafka_checkresult 展现的为与源端一致的数据

kafka-console-consumer.sh --bootstrap-server 192.168.3.124:9092 --topic checkresult --from-beginning
{"_id":"7159a93265b6c375acfa2790","userid":112480487,"mid":"14029647849143177307","tag":["mnt_seven"],"check_name":"14029647849143177303_112480480_7","check_type":1,"check_data":"http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92aa.jpg","createat":1633265977}
{"_id":"9059a93265b6c375acfa2790","userid":112480489,"mid":"14029647849143177309","tag":["mnt_nigh"],"check_name":"14029647849143177303_112480480_9","check_type":3,"check_data":"http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92cc.jpg","createat":1633265979}
{"_id":"8159a93265b6c375acfa2710","userid":112480488,"mid":"14029647849143177303","tag":["mnt_eight"],"check_name":"14029647849143177303_112480480_8","check_type":2,"check_data":"http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92bb.jpg","createat":1633265978}
#修改 userid
db.checkresult.updateOne(
  { userid: 112480487},
  { $set: {mid: "14029647849143177344"} }
);

{"_id":"7159a93265b6c375acfa2790","userid":112480487,"mid":"14029647849143177344","tag":["mnt_seven"],"check_name":"14029647849143177303_112480480_7","check_type":1,"check_data":"http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92aa.jpg","createat":1633265977}
# 删除,会生成null 记录
a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE”


三、消费 kafka 消息,通过routine load 消费到doris

1. doris 建好对应表


CREATE TABLE ods_safety_checkresult (
   _id STRING,
   userid BIGINT,
   mid STRING,
   tag STRING,
   check_name STRING,
   check_type INT,
   check_data STRING,
   createat BIGINT
) ENGINE=OLAP 
UNIQUE KEY(_id)
COMMENT "OLAP"
DISTRIBUTED BY HASH(_id) BUCKETS 8 
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "V2"
);

2. 加上routine load 任务

create routine load etl_ods_safety_checkresult on ods_safety_checkresult
columns (
   _id,
   userid ,
   mid ,
   tag ,
   check_name ,
   check_type ,
   check_data ,
   createat
) 
PROPERTIES (
  "format"="json",
  "desired_concurrent_number"="1",
  "max_error_number"="1000",
  "max_batch_interval"="5" 
)  
FROM KAFKA  
(
"kafka_broker_list"= "192.168.3.121:9092",
"kafka_topic" = "checkresult",
"kafka_partitions" = "0",
"kafka_offsets" = "0"
);

3. 结果,数据能正常写入doris


mysql> show routine load for etl_ods_safety_checkresult \G
*************************** 1. row ***************************
                  Id: 22055
                Name: etl_ods_safety_checkresult
          CreateTime: 2021-11-20 15:32:18
           PauseTime: NULL
             EndTime: NULL
              DbName: default_cluster:mongotest
           TableName: ods_safety_checkresult
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"_id,userid,mid,tag,check_name,check_type,check_data,createat","maxBatchIntervalS":"5","whereExpr":"*","dataFormat":"json","timezone":"Asia/Shanghai","send_batch_parallelism":"1","precedingFilter":"*","mergeType":"APPEND","format":"json","json_root":"","maxBatchSizeBytes":"104857600","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","deleteCondition":"*","desireTaskConcurrentNum":"1","maxErrorNum":"1000","strip_outer_array":"false","currentTaskConcurrentNum":"1","execMemLimit":"2147483648","num_as_string":"false","fuzzy_parse":"false","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"checkresult","currentKafkaPartitions":"0","brokerList":"192.168.3.121:9092"}
    CustomProperties: {"group.id":"etl_ods_safety_checkresult_d1504eb7-6dc7-4f5b-801e-09ade020ee0b"}
           Statistic: {"receivedBytes":1313,"errorRows":0,"committedTaskNum":1,"loadedRows":5,"loadRowsRate":0,"abortedTaskNum":2,"errorRowsAfterResumed":0,"totalRows":5,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":5015}
            Progress: {"0":"4"}
ReasonOfStateChanged: 
        ErrorLogUrls: 
            OtherMsg: 
1 row in set (0.00 sec)

mysql> select * from ods_safety_checkresult;
+--------------------------+-----------+----------------------+---------------+----------------------------------+------------+--------------------------------------------------------------+------------+
| _id                      | userid    | mid                  | tag           | check_name                       | check_type | check_data                                                   | createat   |
+--------------------------+-----------+----------------------+---------------+----------------------------------+------------+--------------------------------------------------------------+------------+
| 7159a93265b6c375acfa2790 | 112480487 | 14029647849143177344 | ["mnt_seven"] | 14029647849143177303_112480480_7 |          1 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92aa.jpg | 1633265977 |
| 9059a93265b6c375acfa2790 | 112480499 | 14029647849143177309 | ["mnt_nigh"]  | 14029647849143177303_112480480_9 |          3 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92cc.jpg | 1633265979 |
| 8159a93265b6c375acfa2710 | 112480488 | 14029647849143177303 | ["mnt_eight"] | 14029647849143177303_112480480_8 |          2 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92bb.jpg | 1633265978 |
+--------------------------+-----------+----------------------+---------------+----------------------------------+------------+--------------------------------------------------------------+------------+
3 rows in set (0.00 sec)

// 插入数据,更新数据测试

mysql> select * from ods_safety_checkresult;
+--------------------------+-----------+----------------------+---------------+-----------------------------------+------------+-----------------------------------------------------------------+------------+
| _id                      | userid    | mid                  | tag           | check_name                        | check_type | check_data                                                      | createat   |
+--------------------------+-----------+----------------------+---------------+-----------------------------------+------------+-----------------------------------------------------------------+------------+
| 7159a93265b6c375acfa2790 | 112480487 | 14029647849143177344 | ["mnt_seven"] | 14029647849143177303_112480480_7  |          1 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92aa.jpg    | 1633265977 |
| 1059a93265b6c375acfa2790 | 112480489 | 14029647849143177309 | ["mnt_nigh"]  | 14029647849143177303_112480480_10 |          4 | http://sz.aliyuncs.com/db/9591ec01a358b29efdafd9634837e92cc.jpg | 1633268979 |
| 9059a93265b6c375acfa2790 | 112480499 | 14029647849143177399 | ["mnt_nigh"]  | 14029647849143177303_112480480_9  |          3 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92cc.jpg    | 1633265979 |
| 8159a93265b6c375acfa2710 | 112480488 | 14029647849143177303 | ["mnt_eight"] | 14029647849143177303_112480480_8  |          2 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92bb.jpg    | 1633265978 |
+--------------------------+-----------+----------------------+---------------+-----------------------------------+------------+-----------------------------------------------------------------+------------+
4 rows in set (0.01 sec)

// 删除呢? unique 为replace 的操作不能支持!!!

总结

测验结果证明,利用 mongodb-cdc 能够通过flink 准实时的把数据传输到doris表,但是delete 不能处理。如果sink 端用支持upsert的数据库如 ES,KUDU,MySQL是能支持完整的upsert及delete操作的

参考

MongoDB CDC Connector

Upsert Kafka SQL Connector

Doris Routine Load

上一篇:如何快速学习网站基本搭建和服务器环境配置?


下一篇:Delphi三层中间件AB开源了