MongoDB CDC to Doris
目标:
mongodb --flink cdc--> kafka --> doris
一、环境准备
1. 版本
- ubuntu 20.04
- mongodb 4.2
- doris 0.14
- flink 1.13.3
- kafka 2.4.1
- flink-sql-connector-mongodb-cdc-2.1.0.jar
2. 把 mongodb-cdc和 kafka 的 connector 包放入flink lib 目录
ll ./lib
-rw-r--r-- 1 hadoop hadoop 249567 Oct 12 17:23 flink-connector-jdbc_2.11-1.13.3.jar
-rw-r--r-- 1 hadoop hadoop 92313 Oct 13 00:15 flink-csv-1.13.3.jar
-rw-r--r-- 1 hadoop hadoop 115418686 Oct 13 00:29 flink-dist_2.11-1.13.3.jar
-rw-r--r-- 1 hadoop hadoop 148127 Oct 13 00:13 flink-json-1.13.3.jar
-rwxrwxrwx 1 hadoop hadoop 7709740 Jun 8 21:13 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 hadoop hadoop 3674114 Oct 12 17:35 flink-sql-connector-kafka_2.11-1.13.3.jar
-rw-r--r-- 1 hadoop hadoop 15296786 Nov 15 14:30 flink-sql-connector-mongodb-cdc-2.1.0.jar
-rw-r--r-- 1 hadoop hadoop 19648146 Nov 15 14:30 flink-sql-connector-mysql-cdc-2.1.0.jar
-rw-r--r-- 1 hadoop hadoop 36453353 Oct 13 00:25 flink-table_2.11-1.13.3.jar
-rw-r--r-- 1 hadoop hadoop 41061738 Oct 13 00:26 flink-table-blink_2.11-1.13.3.jar
-rwxrwxrwx 1 hadoop hadoop 67114 Mar 31 2021 log4j-1.2-api-2.12.1.jar
-rwxrwxrwx 1 hadoop hadoop 276771 Mar 31 2021 log4j-api-2.12.1.jar
-rwxrwxrwx 1 hadoop hadoop 1674433 Mar 31 2021 log4j-core-2.12.1.jar
-rwxrwxrwx 1 hadoop hadoop 23518 Mar 31 2021 log4j-slf4j-impl-2.12.1.jar
-rw-r--r-- 1 hadoop hadoop 2475087 Sep 29 02:17 mysql-connector-java-8.0.27.jar
二、 mongo 数据写入kafka
1. mongodb 准备初始数据
db.checkresult.insertMany
(
[
{
"_id": ObjectId("7159a93265b6c375acfa279h"),
"userid": NumberLong(112480487),
"mid": "14029647849143177307",
"tag": [
"mnt_seven"
],
"checkcontent": [
{
"name": "14029647849143177303_112480480_7",
"type": 1,
"data": "http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92aa.jpg"
}
],
"createat": NumberLong(1633265977)
},
{
"_id": ObjectId("8159a93265b6c375acfa2710h"),
"userid": NumberLong(112480488),
"mid": "14029647849143177303",
"tag": [
"mnt_eight"
],
"checkcontent": [
{
"name": "14029647849143177303_112480480_8",
"type": 2,
"data": "http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92bb.jpg"
}
],
"createat": NumberLong(1633265978)
},
{
"_id": ObjectId("9059a93265b6c375acfa279h"),
"userid": NumberLong(112480489),
"mid": "14029647849143177309",
"tag": [
"mnt_nigh"
],
"checkcontent": [
{
"name": "14029647849143177303_112480480_9",
"type": 3,
"data": "http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92cc.jpg"
}
],
"createat": NumberLong(1633265979)
}
]
)
2. flink sql-client 建立相关表,并把数据sink 到 kafka
-- flink sql
# create source mongodb
CREATE TABLE mongo_checkresult (
_id STRING, //must be declared
userid BIGINT,
mid STRING,
tag ARRAY<STRING>,
checkcontent ARRAY<ROW<name STRING,type INT,data STRING>>, // embedded document
createat BIGINT,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = '192.168.3.121:27017',
'username' = 'mongouser',
'password' = 'mongodbpwd',
'database' = 'xxx',
'collection' = 'checkresult'
);
# create sink kafka ,嵌套的字段会展开
## connector 必须用upsert-kafka,因为cdc定义的是upsert的数据
CREATE TABLE kafka_checkresult (
_id STRING,
userid BIGINT,
mid STRING,
tag ARRAY<STRING>,
check_name STRING,
check_type INT,
check_data STRING,
createat BIGINT,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'checkresult',
'properties.bootstrap.servers' = '192.168.3.124:9092',
'key.format' = 'json',
'value.format' = 'json'
)
# execute sink task
insert into kafka_checkresult
select
_id,
userid,
mid,
tag,
checkcontent[1].name,
checkcontent[1].type,
checkcontent[1].data,
createat
from mongo_checkresult
3、测试结果,数据已写入kafka
# 观察修改是否也产生消息,upsert-kafka 会对消息做upsert及delete 操作,select * from kafka_checkresult 展现的为与源端一致的数据
kafka-console-consumer.sh --bootstrap-server 192.168.3.124:9092 --topic checkresult --from-beginning
{"_id":"7159a93265b6c375acfa2790","userid":112480487,"mid":"14029647849143177307","tag":["mnt_seven"],"check_name":"14029647849143177303_112480480_7","check_type":1,"check_data":"http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92aa.jpg","createat":1633265977}
{"_id":"9059a93265b6c375acfa2790","userid":112480489,"mid":"14029647849143177309","tag":["mnt_nigh"],"check_name":"14029647849143177303_112480480_9","check_type":3,"check_data":"http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92cc.jpg","createat":1633265979}
{"_id":"8159a93265b6c375acfa2710","userid":112480488,"mid":"14029647849143177303","tag":["mnt_eight"],"check_name":"14029647849143177303_112480480_8","check_type":2,"check_data":"http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92bb.jpg","createat":1633265978}
#修改 userid
db.checkresult.updateOne(
{ userid: 112480487},
{ $set: {mid: "14029647849143177344"} }
);
{"_id":"7159a93265b6c375acfa2790","userid":112480487,"mid":"14029647849143177344","tag":["mnt_seven"],"check_name":"14029647849143177303_112480480_7","check_type":1,"check_data":"http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92aa.jpg","createat":1633265977}
# 删除,会生成null 记录
a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE”
三、消费 kafka 消息,通过routine load 消费到doris
1. doris 建好对应表
CREATE TABLE ods_safety_checkresult (
_id STRING,
userid BIGINT,
mid STRING,
tag STRING,
check_name STRING,
check_type INT,
check_data STRING,
createat BIGINT
) ENGINE=OLAP
UNIQUE KEY(_id)
COMMENT "OLAP"
DISTRIBUTED BY HASH(_id) BUCKETS 8
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "V2"
);
2. 加上routine load 任务
create routine load etl_ods_safety_checkresult on ods_safety_checkresult
columns (
_id,
userid ,
mid ,
tag ,
check_name ,
check_type ,
check_data ,
createat
)
PROPERTIES (
"format"="json",
"desired_concurrent_number"="1",
"max_error_number"="1000",
"max_batch_interval"="5"
)
FROM KAFKA
(
"kafka_broker_list"= "192.168.3.121:9092",
"kafka_topic" = "checkresult",
"kafka_partitions" = "0",
"kafka_offsets" = "0"
);
3. 结果,数据能正常写入doris
mysql> show routine load for etl_ods_safety_checkresult \G
*************************** 1. row ***************************
Id: 22055
Name: etl_ods_safety_checkresult
CreateTime: 2021-11-20 15:32:18
PauseTime: NULL
EndTime: NULL
DbName: default_cluster:mongotest
TableName: ods_safety_checkresult
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"_id,userid,mid,tag,check_name,check_type,check_data,createat","maxBatchIntervalS":"5","whereExpr":"*","dataFormat":"json","timezone":"Asia/Shanghai","send_batch_parallelism":"1","precedingFilter":"*","mergeType":"APPEND","format":"json","json_root":"","maxBatchSizeBytes":"104857600","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","deleteCondition":"*","desireTaskConcurrentNum":"1","maxErrorNum":"1000","strip_outer_array":"false","currentTaskConcurrentNum":"1","execMemLimit":"2147483648","num_as_string":"false","fuzzy_parse":"false","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"checkresult","currentKafkaPartitions":"0","brokerList":"192.168.3.121:9092"}
CustomProperties: {"group.id":"etl_ods_safety_checkresult_d1504eb7-6dc7-4f5b-801e-09ade020ee0b"}
Statistic: {"receivedBytes":1313,"errorRows":0,"committedTaskNum":1,"loadedRows":5,"loadRowsRate":0,"abortedTaskNum":2,"errorRowsAfterResumed":0,"totalRows":5,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":5015}
Progress: {"0":"4"}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
1 row in set (0.00 sec)
mysql> select * from ods_safety_checkresult;
+--------------------------+-----------+----------------------+---------------+----------------------------------+------------+--------------------------------------------------------------+------------+
| _id | userid | mid | tag | check_name | check_type | check_data | createat |
+--------------------------+-----------+----------------------+---------------+----------------------------------+------------+--------------------------------------------------------------+------------+
| 7159a93265b6c375acfa2790 | 112480487 | 14029647849143177344 | ["mnt_seven"] | 14029647849143177303_112480480_7 | 1 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92aa.jpg | 1633265977 |
| 9059a93265b6c375acfa2790 | 112480499 | 14029647849143177309 | ["mnt_nigh"] | 14029647849143177303_112480480_9 | 3 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92cc.jpg | 1633265979 |
| 8159a93265b6c375acfa2710 | 112480488 | 14029647849143177303 | ["mnt_eight"] | 14029647849143177303_112480480_8 | 2 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92bb.jpg | 1633265978 |
+--------------------------+-----------+----------------------+---------------+----------------------------------+------------+--------------------------------------------------------------+------------+
3 rows in set (0.00 sec)
// 插入数据,更新数据测试
mysql> select * from ods_safety_checkresult;
+--------------------------+-----------+----------------------+---------------+-----------------------------------+------------+-----------------------------------------------------------------+------------+
| _id | userid | mid | tag | check_name | check_type | check_data | createat |
+--------------------------+-----------+----------------------+---------------+-----------------------------------+------------+-----------------------------------------------------------------+------------+
| 7159a93265b6c375acfa2790 | 112480487 | 14029647849143177344 | ["mnt_seven"] | 14029647849143177303_112480480_7 | 1 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92aa.jpg | 1633265977 |
| 1059a93265b6c375acfa2790 | 112480489 | 14029647849143177309 | ["mnt_nigh"] | 14029647849143177303_112480480_10 | 4 | http://sz.aliyuncs.com/db/9591ec01a358b29efdafd9634837e92cc.jpg | 1633268979 |
| 9059a93265b6c375acfa2790 | 112480499 | 14029647849143177399 | ["mnt_nigh"] | 14029647849143177303_112480480_9 | 3 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92cc.jpg | 1633265979 |
| 8159a93265b6c375acfa2710 | 112480488 | 14029647849143177303 | ["mnt_eight"] | 14029647849143177303_112480480_8 | 2 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92bb.jpg | 1633265978 |
+--------------------------+-----------+----------------------+---------------+-----------------------------------+------------+-----------------------------------------------------------------+------------+
4 rows in set (0.01 sec)
// 删除呢? unique 为replace 的操作不能支持!!!
总结
测验结果证明,利用 mongodb-cdc 能够通过flink 准实时的把数据传输到doris表,但是delete 不能处理。如果sink 端用支持upsert的数据库如 ES,KUDU,MySQL是能支持完整的upsert及delete操作的
参考