文章目录
一. Stream load概述
Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。
Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。
1.1 基本原理
下图展示了 Stream load 的主要流程,省略了一些导入细节。
^ +
| |
| | 1A. User submit load to FE
| |
| +--v-----------+
| | FE |
5. Return result to user | +--+-----------+
| |
| | 2. Redirect to BE
| |
| +--v-----------+
+---+Coordinator BE| 1B. User submit load to BE
+-+-----+----+-+
| | |
+-----+ | +-----+
| | | 3. Distrbute data
| | |
+-v-+ +-v-+ +-v-+
|BE | |BE | |BE |
+---+ +---+ +---+
Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。
用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。
导入的最终结果由 Coordinator BE 返回给用户。
1.2 支持数据格式
目前 Stream Load 支持两个数据格式:CSV(文本) 和 JSON
1.3 基本操作
1.3.1 创建导入
Stream load 通过 HTTP 协议提交和传输数据。这里通过 curl 命令展示如何提交导入。
用户也可以通过其他 HTTP client 进行操作。
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
Header 中支持属性见下面的 ‘导入任务参数’ 说明
格式为: -H "key1:value1"
示例:
curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/_stream_load
1.3.1.1 签名参数
user/passwd
Stream load 由于创建导入的协议使用的是 HTTP 协议,通过 Basic access authentication 进行签名。Doris 系统会根据签名验证用户身份和导入权限。
1.3.1.2 导入任务参数
Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的参数均设置在 Header 中。下面主要介绍了 Stream load 导入任务参数的部分参数意义。
-
label
导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。label 是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。
label 的另一个作用,是防止用户重复导入相同的数据。强烈推荐用户同一批次数据使用相同的 label。这样同一批次数据的重复请求只会被接受一次,保证了 At-Most-Once
当 label 对应的导入作业状态为 CANCELLED 时,该 label 可以再次被使用。 -
column_separator
用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。
如hive文件的分隔符\x01,需要指定为-H “column_separator:\x01”。
可以使用多个字符的组合作为列分隔符。 -
line_delimiter
用于指定导入文件中的换行符,默认为\n。
可以使用做多个字符的组合作为换行符。 -
max_filter_ratio
导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。
如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。
计算公式为:
(dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio
dpp.abnorm.ALL 表示数据质量不合格的行数。如类型不匹配,列数不匹配,长度不匹配等等。
dpp.norm.ALL 指的是导入过程中正确数据的条数。可以通过 SHOW LOAD 命令查询导入任务的正确数据量。
原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL -
where
导入任务指定的过滤条件。Stream load 支持对原始数据指定 where 语句进行过滤。被过滤的数据将不会被导入,也不会参与 filter ratio 的计算,但会被计入num_rows_unselected。 -
partition
待导入表的 Partition 信息,如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 dpp.abnorm.ALL -
columns
待导入数据的函数变换配置,目前 Stream load 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。
列顺序变换例子:原始数据有三列(src_c1,src_c2,src_c3), 目前doris表也有三列(dst_c1,dst_c2,dst_c3)
如果原始表的src_c1列对应目标表dst_c1列,原始表的src_c2列对应目标表dst_c2列,原始表的src_c3列对应目标表dst_c3列,则写法如下:
columns: dst_c1, dst_c2, dst_c3
如果原始表的src_c1列对应目标表dst_c2列,原始表的src_c2列对应目标表dst_c3列,原始表的src_c3列对应目标表dst_c1列,则写法如下:
columns: dst_c2, dst_c3, dst_c1
表达式变换例子:原始文件有两列,目标表也有两列(c1,c2)但是原始文件的两列均需要经过函数变换才能对应目标表的两列,则写法如下:
columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = month(tmp_c2)
其中 tmp_*是一个占位符,代表的是原始文件中的两个原始列。
-
exec_mem_limit
导入内存限制。默认为 2GB,单位为字节。 -
strict_mode
Stream load 导入可以开启 strict mode 模式。开启方式为在 HEADER 中声明 strict_mode=true 。默认的 strict mode 为关闭。
strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:
9.1) 对于列类型转换来说,如果 strict mode 为true,则错误的数据将被 filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。
9.2) 对于导入的某列由函数变换生成时,strict mode 对其不产生影响。
9.3) 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。 -
merge_type
数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理
1.3.1.3 strict mode 与 source data 的导入关系
这里以列类型为 TinyInt 来举例:
(注:当表中的列允许导入空值时)
这里以列类型为 Decimal(1,0) 举例:
(注:当表中的列允许导入空值时)
注意:
10 虽然是一个超过范围的值,但是因为其类型符合 decimal的要求,所以 strict mode对其不产生影响。10 最后会在其他 ETL 处理流程中被过滤。但不会被 strict mode 过滤。
1.3.2 返回结果
由于 Stream load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。
示例:
{
"TxnId": 1003,
"Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee",
"Status": "Success",
"ExistingJobStatus": "FINISHED", // optional
"Message": "OK",
"NumberTotalRows": 1000000,
"NumberLoadedRows": 1000000,
"NumberFilteredRows": 1,
"NumberUnselectedRows": 0,
"LoadBytes": 40888898,
"LoadTimeMs": 2144,
"BeginTxnTimeMs": 1,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 325,
"WriteDataTimeMs": 1933,
"CommitAndPublishTimeMs": 106,
"ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
}
下面主要解释了 Stream load 导入结果参数:
-
TxnId
导入的事务ID。用户可不感知。 -
Label
导入 Label。由用户指定或系统自动生成。 -
Status
导入完成状态:
3.1) “Success”:表示导入成功。
3.2) “Publish Timeout”:该状态也表示导入已经完成,只是数据可能会延迟可见,无需重试。
3.3) “Label Already Exists”:Label 重复,需更换 Label。
3.4) “Fail”:导入失败。 -
ExistingJobStatus
已存在的 Label 对应的导入作业的状态。
这个字段只有在当 Status 为 “Label Already Exists” 时才会显示。用户可以通过这个状态,知晓已存在 Label 对应的导入作业的状态。“RUNNING” 表示作业还在执行,“FINISHED” 表示作业成功。 -
Message
导入错误信息。 -
NumberTotalRows
导入总处理的行数。 -
NumberLoadedRows
成功导入的行数。 -
NumberFilteredRows
数据质量不合格的行数。 -
NumberUnselectedRows
被 where 条件过滤的行数。 -
LoadBytes
导入的字节数。 -
LoadTimeMs
导入完成时间。单位毫秒。 -
BeginTxnTimeMs
向Fe请求开始一个事务所花费的时间,单位毫秒。 -
StreamLoadPutTimeMs
向Fe请求获取导入数据执行计划所花费的时间,单位毫秒。 -
ReadDataTimeMs
读取数据所花费的时间,单位毫秒。 -
WriteDataTimeMs
执行写入数据操作所花费的时间,单位毫秒。 -
CommitAndPublishTimeMs
向Fe请求提交并且发布事务所花费的时间,单位毫秒。 -
ErrorURL
如果有数据质量问题,通过访问这个 URL 查看具体错误行。
1.3.3 取消导入
用户无法手动取消 Stream load,Stream load 在超时或者导入错误后会被系统自动取消。
1.4 相关系统配置
1.4.1 FE 配置
stream_load_default_timeout_second
导入任务的超时时间(以秒为单位),导入任务在设定的 timeout 时间内未完成则会被系统取消,变成 CANCELLED。
默认的 timeout 时间为 600 秒。如果导入的源文件无法在规定时间内完成导入,用户可以在 stream load 请求中设置单独的超时时间。
或者调整 FE 的参数stream_load_default_timeout_second 来设置全局的默认超时时间。
1.4.2 BE 配置
streaming_load_max_mb
Stream load 的最大导入大小,默认为 10G,单位是 MB。如果用户的原始文件超过这个值,则需要调整 BE 的参数 streaming_load_max_mb。
二. 导入实例
示例1:
以 “table1_20211207” 为 Label,使用本地文件 table1_data 导入 table1 表。
-- FE_HOST 是任一 FE 所在节点 IP,8030 为 fe.conf 中的 http_port。
-- 可以使用任一 BE 的 IP,以及 be.conf 中的 webserver_port 进行导入。如:BE_HOST:8040
curl --location-trusted -u test:test_passwd -H "label:table1_20211207" -H "column_separator:," -T table1_data http://FE_HOST:8030/api/example_db/table1/_stream_load
本地文件 table1_data 以 , 作为数据之间的分隔,具体内容如下:
1,1,jim,2
2,1,grace,2
3,2,tom,2
4,3,bush,3
5,3,helen,3
最终导入命令:
curl --location-trusted -u doris_user:abc123 -H "label:table1_20211207" -H "column_separator:," -T /tmp/table1_data http://10.31.1.119:8030/api/example_db/table1/_stream_load
示例2:
以 “table2_20211207” 为 Label,使用本地文件 table2_data 导入 table2 表。
curl --location-trusted -u doris_user:abc123 -H "label:table2_20211207" -H "column_separator:|" -T /tmp/table2_data http://127.0.0.1:8030/api/example_db/table2/_stream_load
本地文件 table2_data 以 | 作为数据之间的分隔,具体内容如下:
2017-07-03|1|1|jim|2
2017-07-05|2|1|grace|2
2017-07-12|3|2|tom|2
2017-07-15|4|3|bush|3
2017-07-12|5|3|helen|3
参考:
- https://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html