Doris数据更新及删除

1.数据更新

Doris中存储的数据都是以追加(Append)的方式进入系统,这意味着所有已写入的数据是不可变更的。

所以 Doris采用标记的方式来实现数据更新的目的。即在一批更新数据中,将之前的数据标记为删除,并写入新的数据。

在读取过程中,Doris会自动处理这些标记数据(Merge-on-Read),保证用户读取到的是最新的数据。同时,Doris后台的数据合并(Compaction)线程也会不断的对数据进行合并,消除标记数据,以减少在读取过程中需要进行的合并操作,加速查询。

大部分对数据修改的场景仅适用于 Unique Key 数据模型,因为只有该模型可以保证主键的唯一性,从而支持按主键对数据进行更新。

本文档主要介绍如何使用 Unique Key 数据模型来进行数据更新操作。

数据更新

关于 UNIQUE KEY 的说明,请参阅相关文档。这里不再赘述。下面仅举例说明。

  1. 创建一张 UNIQUE KEY 模型的表

     CREATE TABLE order_table
     (
         order_id BIGINT,
         order_type VARCHAR(8),
         order_status VARCHAR(32)
     )
     UNIQUE KEY(order_id)
     DISTRIBUTED BY HASH(order_id) BUCKETS 8;
  2. 导入第一批数据

     1000, TYPE#1, PAID
     1001, TYPE#2, PENDING
     1002, TYPE#3, PAID
  3. 将 ID 为 1001 的订单条目的 order_status 字段修改为 PAID,则需导入以下数据:

     1001, TYPE#2, PAID

    Doris会根据主键 1001,则读取或者后台合并过程中,将这个条目的 order_status 字段替换为 PAID。

更新部分字段

在上一个例子中,我们仅需要更新 order_status 字段,但是导入的数据中却需要包含 order_type 字段。

在某些场景下,用户无法获取全列数据,仅知道主键和部分要更新的字段的值。在这种情况下,我们可以通过 REPLACE_IF_NOT_NULL 这种聚合方式来实现。

UNIQUE_KEY 模型本质上是聚合模型的一种,其在非主键列上的聚合类型默认为 REPLACE。而 REPLACE_IF_NOT_NULL 的则表示,当遇到 null 值则不更新。举例如下:

  1. 创建一张 UNIQUE KEY 模型的表,使用 REPLACE_IF_NOT_NULL 方式。

     CREATE TABLE order_table
     (
         order_id BIGINT,
         order_type VARCHAR(8),
         order_status VARCHAR(32)
     )
     UNIQUE KEY(order_id)
     DISTRIBUTED BY HASH(order_id) BUCKETS 8
     PROPERTIES
     (
         "replace_if_not_null" = "true"
     );
  2. 导入第一批数据

     1000, TYPE#1, PAID
     1001, TYPE#2, PENDING
     1002, TYPE#3, PAID
  3. 将 ID 为 1001 的订单条目的 order_status 字段修改为 PAID,则需导入以下数据:

     1001, \N, PAID

    原始数据中的 \N 即表示 null。Doris会根据主键 1001,则读取或者后台合并过程中,将这个条目的 order_status 字段替换为 PAID。并且因为 order_type 字段为 null,所以该字段不会被替换。

更新顺序

Doris内部仅能够保证两个批次的导入数据中,后一批次的数据覆盖更新前一批次的数据。但是如果在同一批次数据中,如果出现主键相同的多行记录,Doris是无法识别哪一条才是最终生效数据的。

假设某一批次的导入数据如下:

 1000, TYPE#1, PENDING
 1001, TYPE#2, PENDING
 1000, TYPE#3, PAID

注意第一行和第三行主键相同,因此无法确定哪一条会生效,用户最终查询 1000 这个订单的状态,可能为 PENDING,也可能为 PAID

要解决这个问题,需要业务侧保证在同一批次数据中,没有主键相同的行。或者需参考 Sequence Column 对数据进行适配。

数据删除

Doris中的数据删除有以下几种方式:

  • TRUNCATE,该命令用于直接清空表或分区,但不会删除对应的元数据。操作代价较低,再有清空数据需求时,建议使用。

  • DROP,删除表或分区,会同时删除数据和元数据。

  • DELETE,Delete 语句用于按条件删除数据,具体说明见本文档按条件删除一节。

  • MARK DELETE,Mark Delete 功能对数据进行按行删除,具体说明见本文档标记删除一节。

本文档主要介绍 DELETE 和 BATCH DELETE 两种方式,其他方式可参阅对应的命令文档。

按条件删除

使用 DELETE 命令可以按条件删除数据。具体说明请参阅 DELETE 命令文档。下面通过示例说明:

 DELETE FROM example_table WHERE event_day < 20201001 AND event_key != 1000;
 ​
 DELETE FROM example_table PARTITION p202010 WHERE event_key in (1000, 1001, 1002, 1003); 

DELETE 命令是一个同步命令,返回成功即代表删除成功。

用户可以通过以下命令查看历史的 DELETE 操作记录:

mysql> SHOW DELETE FROM example_db;
+-----------+---------------+---------------------+-----------------+----------+
| TableName | PartitionName | CreateTime          | DeleteCondition | State    |
+-----------+---------------+---------------------+-----------------+----------+
| empty_tbl | p3            | 2020-04-15 23:09:35 | k1 EQ "1"       | FINISHED |
| test_tbl  | p4            | 2020-04-15 23:09:53 | k1 GT "80"      | FINISHED |
+-----------+---------------+---------------------+-----------------+----------+
2 rows in set (0.00 sec)

具体说明,可参阅 SHOW DELETE 命令文档。

注意事项

  • DELETE 命令不适用于高频的删除操作,比如短时间内发送大量的 DELETE 命令,会严重影响底层数据合并效率以及查询效率。因为DELETE 操作本质上是存储了一个删除条件,在查询时会对每一行记录应用这个删除条件做过滤,因此当有大量删除条件时,查询效率就会降低。

  • 尽量避免使用 DELETE-LOAD-DELETE-LOAD 这种交替执行的使用模式,这种模式对底层的数据合并策略非常不友好,可能会导致大量的数据未合并,导致积压。

标记删除

标记删除功能主要用于解决一些需要实时更新同步的场景。比如 同步MySQL的Binlog 数据。该方式只能应用于 UNIQUE KEY 模型的表上。具体说明,请参阅 MARK DELETE 文档。

2.标记删除

标记删除功能是对 DELETE 语句删除功能的一种补充。使用 DELETE 语句对数据删除,无法支持高频操作场景。

另外,类似于CDC(Change Data Capture)场景中,INSERT 和 DELETE 一般是穿插出现的。

标记删除功能就是为了支持以上两种场景而做的功能。

实现原理

标记删除功能仅适用于 UNIQUE KEY 模型的表。其原理是在表中增加了一个隐藏列 __DELETE_SIGN__。该列值为 true,则表示该行为一个删除操作,如果为 false,则表示该行是一个插入操作。该隐藏列的只需要用户在导入数据中添加。

假设表结构如下:

列名 主键
order_id Yes
order_type No
order_status No

我们插入以下三行数据:

1000, TYPE#1, PENDING, false
1001, TYPE#2, PENDING, false
1002, TYPE#3, PENDING, false

其中第四列均为false,即表示这三行都是插入操作。执行后,查询结果为:

order_id order_type order_status
1000 TYPE#1 PENDING
1001 TYPE#2 PENDING
1002 TYPE#3 PENDING

接下来我们再导入两行数据:

1001, TYPE#2, PENDING, true
1002, TYPE#3, PAID, false

第一行是一个删除操作,是将 1001 这个订单id对应的数据删除(TYPE#2, PENDING 两个值在这行数据中无意义,可以随意填写,但必须填写以保证列数匹配)。

第二行是一个插入操作,相当于将 1002 这个订单的状态更新为 PAID

执行后,查询结果为:

order_id order_type order_status
1000 TYPE#1 PENDING
1002 TYPE#3 PAID

由以上示例可以看出,配合 UNIQUE KEY 模型,可以实现按主键的更新或删除操作。当需要删除时,需要将对应的隐藏列设为 true。而当隐藏列是false时,则操作类似于 UPSERT 操作,即有则更新,无则插入

启用标记删除功能

标记删除功能是 Doris3.10 版本之后引入的新功能。

在新版本中创建的 UNIQUE KEY 表默认都包含隐藏列。而在之前版本中创建的 UNIQUE KEY 表并不具有隐藏列。对于老版本的表,可以使用以下方式增加隐藏列:

ALTER TABLE tablename ENABLE FEATURE "BATCH_DELETE";

这个操作本质上是一个 Schema Change 操作,执行后,需通过 SHOW ALTER TABLE COLUMN查看作业执行进度。

如果想确定一个表是否已开启标记删除功能,可以通过 设置一个变量来显示隐藏列

SET show_hidden_columns=true`

之后使用 DESC tablename,如果输出中有 __DELETE_SIGN__ 列,则表示该表已开启标记删除功能。

在导入中使用标记删除功能

在不同的数据导入方式中使用标记删除的方式略有不同。标记删除目前支持以下数据导入方式:

  • STREAM LOAD

  • BROKER LOAD

  • ROUTINE LOAD

在这些导入中使用标记删除功能,都需要添加以下两个属性:

  1. Merge Type

    Merge Type 分为3种,APPEND、DELETE 和 MERGE。默认为 APPEND。

    APPEND 方式和正常导入无异。

    DELETE 方式则表示这一批数据中的每一行都是删除操作。在这种方式下,无需再指定 Delete Label 列。

    MERGE 方式表示这一批数据中混合了插入和删除操作。此时,需要通过指定 Delete Label 列来标识每一行的操作类型。

  2. Delete Label

    Delete Label 用于在 MERGE 方式下,指定数据中的某一列是删除标记列。用户可以指定当这一列为何值时,表示DELETE操作。

具体使用语法请参阅各自的文档,这里仅对不同导入方式进行简单的示例说明。假设原始导入数据如下:

1000,TYPE#1,PENDING,0
1001,TYPE#2,PENDING,0
1002,TYPE#3,PENDING,0
1003,TYPE#2,PENDING,1
1004,TYPE#3,PAID,1

Stream Load

Stream Load 请求如下:

curl --location-trusted -u root \
-H "columns: order_id, order_type, order_status, delete_label" \
-H "merge_type: MERGE" \
-H "delete: delete_label=1" \
-T data.txt http://host:port/api/testDb/testTbl/_stream_load

这个示例表示第四列为 Delete Label 列,并且当值为1时,表示对应行为删除操作。

Broker Load

LOAD LABEL example_db.my_label
(
    MERGE DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
    INTO TABLE example_tbl
    COLUMNS TERMINATED BY ","
    (order_id, order_type, order_status, delete_label)
    DELETE ON delete_label=1
)
WITH BROKER 'bos'
(
    ...
);

这个示例表示第四列为 Delete Label 列,并且当值为1时,表示对应行为删除操作。

Routine Load

CREATE ROUTINE LOAD example_db.job1 ON example_tbl
WITH DELETE
COLUMNS(order_id, order_type, order_status, delete_label)
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "false"
)
FROM KAFKA
(
    ...
);

注意这里我们使用的 Merge Type 是 DELETE,因此无需指定 Delete Label,第四列的值不会被使用,所有数据均为删除操作。

注意事项

  1. 因为 Doris无法保证一批次导入数据内部的顺序性,所以在诸如 CDC 等场景下,如需确保数据前后顺序,需配合 Sequence Column功能一起使用。

3.Sequence-Column

UNIQUE KEY模型下,Doris会根据主键自动的进行数据的更新。但是当同一批次导入数据中出现相同主键的行时,Doris无法判断其先后顺序,则可能出现更新行为不一致的问题。

而在某些数据同步场景下,需要保证数据能够按顺序更新,而 Sequence Column 功能就是为了解决这一问题。

实现原理

Sequence Column 仅支持 UNQIUE KEY 模型的表。其原理是在表中增加一个隐藏列 __DORIS_SEQUENCE_COL__ 。该列的类型由用户在建表时指定。

在导入的源数据中,用户需额外增加一个顺序列,其类型为建表时指定的 __DORIS_SEQUENCE_COL__ 的类型。Doris内部会根据这个顺序列的值,决定数据的前后顺序,进行数据更新。

启用 Sequence Column 功能

该功能是 Doris3.10 版本之后引入的新功能。

  1. 创建新表

    在创建表时,我们可以通过如下方式设置 Sequence Column:

    CREATE TABLE order_table
    (
        order_id BIGINT,
        order_type VARCHAR(8),
        order_status VARCHAR(32)
    )
    UNIQUE KEY(order_id)
    DISTRIBUTED BY HASH(order_id) BUCKETS 8
    PROPERTIES
    (
        "function_column.sequence_type" = 'Date'
    );

    这里我们在 PROPERTIES 中指定了 Sequence Column 的类型为 Date,即开启了该功能。更多说明,可参阅 CREATE TABLE 命令手册。

  2. 为旧表开启该功能

    对于 3.10 之前版本创建的 UNIQUE KEY 的表,可以通过以下命令开启该功能:

    ALTER TABLE order_table ENABLE FEATURE "SEQUENCE_LOAD"
    WITH PROPERTIES ("function_column.sequence_type" = "Date")

    这个操作本质上是一个 Schema Change 操作,执行后,需通过 SHOW ALTER TABLE COLUMN查看作业执行进度。

如果想确定一个表是否已开启标记删除功能,可以通过 设置一个变量来显示隐藏列

SET show_hidden_columns=true`

之后使用 DESC tablename,如果输出中有 __DORIS_SEQUENCE_COL__ 列,则表示该表已开启该功能。

在导入中使用顺序列功能

在不同的数据导入方式中使用的方式略有不同。该功能目前支持以下数据导入方式:

  • STREAM LOAD

  • BROKER LOAD

  • ROUTINE LOAD

具体使用语法请参阅各自的文档,这里仅对不同导入方式进行简单的示例说明。假设原始导入数据如下:

1000,TYPE#1,PENDING,2020-10-01
1001,TYPE#2,PAID,2020-10-02
1002,TYPE#3,PENDING,2020-10-03
1001,TYPE#2,PENDING,2020-10-01
1004,TYPE#3,PAID,2020-10-03

Stream Load

curl --location-trusted -u root \
-H "columns: order_id, order_type, order_status, source_sequence"
-H "function_column.sequence_col: source_sequence" \
-T data.txt http://host:port/api/example_db/order/_stream_load

我们在 Header 的 columns 属性中将第四列命名为 source_sequence,之后在 function_column.sequence_col 属性中将该列设置的顺序列。

这样,源数据中的 1001 这个订单的最终状态将会是 PAID.

Broker Load

LOAD LABEL example_db.label1
(
    DATA INFILE("hdfs://host:port/user/data/*/test.txt")
    INTO TABLE `order`
    COLUMNS TERMINATED BY ","
    (order_id, order_type, order_status, source_sequence)
    ORDER BY source_sequence
)
WITH BROKER 'bos'
(
    ...
);

通过 ORDER BY 子句配置顺序列。

routine load

CREATE ROUTINE LOAD example_db.job_name ON order 
COLUMNS(order_id, order_type, order_status, source_sequence),
ORDER BY source_sequence
PROPERTIES
(
    ...
)
FROM KAFKA
(
    ...
);

通过 ORDER BY 子句配置顺序列。

使用示例

下面以 Stream Load 为例,通过一个实际的示例展示顺序列的使用方式和效果。

  1. 创建支持 Sequence Column 的表

    CREATE TABLE test_table
    (
        user_id     BIGINT,
        date        DATE,
        group_id    BIGINT,
        keyword     VARCHAR(128)
    )
    UNIQUE KEY(user_id, date, group_id)
    DISTRIBUTED BY HASH(user_id, date) BUCKETS 10
    PROPERTIES
    (
        "function_column.sequence_type" = 'Date'
    )

    之后我们可以查看到隐藏列:

    mysql> set show_hidden_columns=true;
    Query OK, 0 rows affected (0.00 sec)
    mysql> desc test_table;
    +------------------------+--------------+------+-------+---------+---------+
    | Field                  | Type         | Null | Key   | Default | Extra   |
    +------------------------+--------------+------+-------+---------+---------+
    | user_id                | BIGINT       | Yes  | true  | NULL    |         |
    | date                   | DATE         | Yes  | true  | NULL    |         |
    | group_id               | BIGINT       | Yes  | true  | NULL    |         |
    | keyword                | VARCHAR(128) | Yes  | false | NULL    | REPLACE |
    | __DORIS_SEQUENCE_COL__ | DATE         | Yes  | false | NULL    | REPLACE |
    +------------------------+--------------+------+-------+---------+---------+
    5 rows in set (0.00 sec)
  2. 正常导入数据

    导入如下数据:

    1,2020-02-22,1,2020-02-22,a
    1,2020-02-22,1,2020-02-22,b
    1,2020-02-22,1,2020-03-05,c
    1,2020-02-22,1,2020-02-26,d
    1,2020-02-22,1,2020-02-22,e
    1,2020-02-22,1,2020-02-22,b

    将 Sequence Column 映射为源数据中的第4列,modify_date 列。

    curl --location-trusted -u root: \
    -H "column_separator: ," \
    -H "columns: user_id, date, group_id, modify_date, keyword" \
    -H "function_column.sequence_col: modify_date" \
    -T testData http://host:port/api/test/test_table/_stream_load

    结果为

    mysql> select * from test_table;
    +---------+------------+----------+---------+
    | user_id | date       | group_id | keyword |
    +---------+------------+----------+---------+
    |       1 | 2020-02-22 |        1 | c       |
    +---------+------------+----------+---------+

    我们也可以查看隐藏列的值:

    mysql> set show_hidden_columns=true;
    Query OK, 0 rows affected (0.01 sec)
    
    mysql> select * from test_table;
    +---------+------------+----------+---------+------------------------+
    | user_id | date       | group_id | keyword | __DORIS_SEQUENCE_COL__ |
    +---------+------------+----------+---------+------------------------+
    |       1 | 2020-02-22 |        1 | c       | 2020-03-05             |
    +---------+------------+----------+---------+------------------------+

    在这次导入中,因 Sequence Column 的值(也就是 modify_date 中的值)中 2020-03-05 为最大值,所以 keyword 列中最终保留了 c

  3. 替换顺序的保证

    上述步骤完成后,接着导入如下数据:

    1,2020-02-22,1,2020-02-22,a
    1,2020-02-22,1,2020-02-23,b

    查询数据

    MySQL [test]> select * from test_table;
    +---------+------------+----------+---------+
    | user_id | date       | group_id | keyword |
    +---------+------------+----------+---------+
    |       1 | 2020-02-22 |        1 | c       |
    +---------+------------+----------+---------+

    由于新导入的数据的 Sequence Column 都小于表中已有的值,则没有替换发生。

    再尝试导入如下数据:

    1,2020-02-22,1,2020-02-22,a
    1,2020-02-22,1,2020-03-23,w

    查询数据:

    MySQL [test]> select * from test_table;
    +---------+------------+----------+---------+
    | user_id | date       | group_id | keyword |
    +---------+------------+----------+---------+
    |       1 | 2020-02-22 |        1 | w       |
    +---------+------------+----------+---------+

    由于新导入的数据的 Sequence Column 值大于表中的值,所以数据被替换。

上一篇:c# – 在每天同步的不同机器上有两个SQL Server数据库的好方法吗?


下一篇:七、Doris Colocation Join