8、Bulk API
可以把多个index
或delete
操作放在单个bulk API
中执行。这样可以极大地提高索引速度。
/_bulk
API使用如下的JSON结构:
action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
....
action_and_meta_data\n
optional_source\n
注意,最后一行数据必须要以\n
结尾。发送请求时,Content-Type 标头应设置为 application /x-ndjson。
action
可以是index
,create
,delete
和update
操作。index
和create
操作需要在下一行指定一个文档数据。index
操作相当于 index API
,create
操作相当于op_type=create
的index
操作。delete
操作不要指定文档参数,参考delete API
。update
操作仅需文档的部分参数,doc
、upsert
,script
,或其他一些可选项。
如果使用curl
导入文本作为参数,你必须使用--data-binary
标记而不是-d
参数。使用--data-binary
的json文本数据不能换行:
$ cat requests
{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
{ "field1" : "value1" }
$ curl -s -H "Content-Type: application/x-ndjson" -XPOST localhost:9200/_bulk --data-binary "@requests"; echo
{"took":7, "errors": false, "items":[{"index":{"_index":"test","_type":"_doc","_id":"1","_version":1,"result":"created","forced_refresh":false}}]}
因为使用\n
作为分割符,所以请确保json数据不是格式化的(需要在同一行)。例如:
POST _bulk
{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
返回结果:
{
"took": 30,
"errors": false,
"items": [
{
"index": {
"_index": "test",
"_type": "_doc",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 201,
"_seq_no" : 0,
"_primary_term": 1
}
},
{
"delete": {
"_index": "test",
"_type": "_doc",
"_id": "2",
"_version": 1,
"result": "not_found",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 404,
"_seq_no" : 1,
"_primary_term" : 2
}
},
{
"create": {
"_index": "test",
"_type": "_doc",
"_id": "3",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 201,
"_seq_no" : 2,
"_primary_term" : 3
}
},
{
"update": {
"_index": "test",
"_type": "_doc",
"_id": "1",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 200,
"_seq_no" : 3,
"_primary_term" : 4
}
}
]
}
可以使用 /_bulk
, /{index}/_bulk
, {index}/{type}/_bulk
,在url中指定index
和type
参数。
每个操作都会分配到对应的分片执行,仅仅action_meta_data
数据在所请求的分片中解析(因为需要预先找出该操作在那个分片上执行)。
客户端应尽量使用{index}/{type}/_bulk
减少数据传输。
返回结果会保存每个操作的返回结果,并且即使某个操作失败了也不会影响剩下的操作。
bulk
api 没有规定每个bulk
中应该包含多少个操作,具体操作数量应该和你操作的工作量相关。(例如工作量少可以包含多点,工作量大的包含少点)
如果使用HTTP API
,确保客户端不要发送 HTTP chunks
,因为这样会降低速度。
8.1 乐观锁( Optimistic Concurrency Control)
批量API调用中的每个索引和删除操作可以在其各自的操作和元数据行中包括if_seq_no和if_primary_term参数。if_seq_no和if_primary_term参数根据对现有文档的最后一次修改来设置。有关更多详细信息,请参阅乐观并发控制。
8.2 版本号(Versioning)
每个 bulk
项 都可指定 _version
或version
字段。它的行为和index
或delete
操作一样,具体还要基于映射中_version
设置。它同样也支持 version_type
查阅versioning
8.3 路由(Routing)
bulk
中的每一项都可以指定_routing
或routing
字段。它会根据_routing
自动跟踪索引 / 删除操作的行为。
8.4 等待活跃分片(Wait For Active Shards)
在执行bulk
之前,可以指定最小活跃分片数。有关更多详细信息和用法示例,查阅这里
8.5 刷新(refresh)
设置bulk
操作所做的更改到搜索可见的时间。查阅 refresh
只有收到批量请求的分片才会受到刷新的影响。想象一下这样的请求
_bulk?refresh = wait_for
,其中包含三个文档,这些文档恰好被路由到具有五个分片的索引,且请求打落在其中3个不同分片上。那么请求将会等待这三个分片刷新。其他两个分片不参与_bulk请求。
8.6 更新
使用更新操作时,retry_on_conflict
可用作操作本身的字段(不在额外的有效负载行中),以指定在版本冲突的情况下应重试更新的次数。
更新操作支持的参数有:doc(部分文档)
、upsert
, doc_as_upsert
,script
, params (用于脚本)
, lang (用于脚本)
和_source
。
更新操作的示例:
POST _bulk
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"} }
{ "update" : { "_id" : "0", "_type" : "_doc", "_index" : "index1", "retry_on_conflict" : 3} }
{ "script" : { "source": "ctx._source.counter += params.param1", "lang" : "painless", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}}
{ "update" : {"_id" : "2", "_type" : "_doc", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"}, "doc_as_upsert" : true }
{ "update" : {"_id" : "3", "_type" : "_doc", "_index" : "index1", "_source" : true} }
{ "doc" : {"field" : "value"} }
{ "update" : {"_id" : "4", "_type" : "_doc", "_index" : "index1"} }
{ "doc" : {"field" : "value"}, "_source": true}