分布式实时分析数据库citus数据插入性能优化之二

分布式实时分析数据库citus数据插入性能优化之二

在上回的分布式实时分析数据库citus数据插入性能优化 提到citus的master上执行计划生成比较耗费时间,下面尝试通过修改源码绕过master的执行计划生成。

环境

软硬件配置

  • CentOS release 6.5 x64物理机(16C/128G/300GB SSD)
    • CPU: 2*8core 16核32线程, Intel(R) Xeon(R) CPU E5-2630 v3 @ 2.40GHz
  • PostgreSQL 9.6.2
  • citus 6.1.0
  • sysbench-1.0.3

机器列表

  • master

    • 192.168.0.177
  • worker(8个)

    • 192.168.0.181~192.168.0.188

软件的安装都比较简单,参考官方文档即可,这里略过。

postgresql.conf配置

listen_addresses = '*'
port = 5432
max_connections = 1000
shared_buffers = 32GB
effective_cache_size = 96GB
work_mem = 16MB
maintenance_work_mem = 2GB
min_wal_size = 4GB
max_wal_size = 32GB
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 100
shared_preload_libraries = 'citus'
checkpoint_timeout = 60min
wal_level = replica
wal_compression = on
wal_log_hints = on
synchronous_commit = on 

注:和上次的测试不同,synchronous_commit改为on

测试场景

选用sysbench-1.0.3的oltp_insert.lua作为测试用例,执行的SQL的示例如下:

INSERT INTO sbtest1 (id, k, c, pad) VALUES (525449452, 5005, '28491622445-08162085385-16839726209-31171823540-28539137588-93842246002-13643098812-68836434394-95216556185-07917709646', '49165640733-86514010343-02300194630-37380434155-24438915047') 

但是,sysbench-1.0.3的oltp_insert.lua中有一个bug,需要先将其改正

i = sysbench.rand.unique() 

==>

i = sysbench.rand.unique() - 2147483648 

单机测试

建表

CREATE TABLE sbtest1
(
  id integer NOT NULL,
  k integer NOT NULL DEFAULT 0,
  c character(120) NOT NULL DEFAULT ''::bpchar,
  pad character(60) NOT NULL DEFAULT ''::bpchar,
  PRIMARY KEY (id)
);

CREATE INDEX k_1 ON sbtest1(k); 

插入数据

src/sysbench --test=src/lua/oltp_insert.lua \
--db-driver=pgsql \
--pgsql-host=127.0.0.1 \
--pgsql-port=5432 \
--pgsql-user=postgres  \
--pgsql-db=dbone  \
--auto_inc=0  \
--time=10 \
--threads=128  \
--report-interval=1 \
run 

测试结果

TPS为122809

-bash-4.1$ src/sysbench --test=src/lua/oltp_insert.lua --db-driver=pgsql --pgsql-host=127.0.0.1 --pgsql-port=5432 --pgsql-user=postgres  --pgsql-db=dbone  --auto_inc=0  --time=10 --threads=128  --report-interval=1 run
WARNING: the --test option is deprecated. You can pass a script name or path on the command line without any options.
sysbench 1.0.3 (using bundled LuaJIT 2.1.0-beta2)

Running the test with following options:
Number of threads: 128
Report intermediate results every 1 second(s)
Initializing random number generator from current time


Initializing worker threads...

Threads started!

[ 1s ] thds: 128 tps: 124474.46 qps: 124474.46 (r/w/o: 0.00/124474.46/0.00) lat (ms,95%): 1.93 err/s: 0.00 reconn/s: 0.00
[ 2s ] thds: 128 tps: 124674.70 qps: 124674.70 (r/w/o: 0.00/124674.70/0.00) lat (ms,95%): 1.93 err/s: 0.00 reconn/s: 0.00
[ 3s ] thds: 128 tps: 125700.72 qps: 125700.72 (r/w/o: 0.00/125700.72/0.00) lat (ms,95%): 1.93 err/s: 0.00 reconn/s: 0.00
[ 4s ] thds: 128 tps: 125316.67 qps: 125316.67 (r/w/o: 0.00/125316.67/0.00) lat (ms,95%): 1.93 err/s: 0.00 reconn/s: 0.00
[ 5s ] thds: 128 tps: 114303.50 qps: 114303.50 (r/w/o: 0.00/114303.50/0.00) lat (ms,95%): 2.22 err/s: 0.00 reconn/s: 0.00
[ 6s ] thds: 128 tps: 124781.26 qps: 124781.26 (r/w/o: 0.00/124781.26/0.00) lat (ms,95%): 1.93 err/s: 0.00 reconn/s: 0.00
[ 7s ] thds: 128 tps: 124819.42 qps: 124819.42 (r/w/o: 0.00/124819.42/0.00) lat (ms,95%): 1.93 err/s: 0.00 reconn/s: 0.00
[ 8s ] thds: 128 tps: 125309.88 qps: 125309.88 (r/w/o: 0.00/125309.88/0.00) lat (ms,95%): 1.93 err/s: 0.00 reconn/s: 0.00
[ 9s ] thds: 128 tps: 125674.52 qps: 125674.52 (r/w/o: 0.00/125674.52/0.00) lat (ms,95%): 1.89 err/s: 0.00 reconn/s: 0.00
[ 10s ] thds: 128 tps: 116230.44 qps: 116230.44 (r/w/o: 0.00/116230.44/0.00) lat (ms,95%): 2.07 err/s: 0.00 reconn/s: 0.00
SQL statistics:
    queries performed:
        read:                            0
        write:                           1232576
        other:                           0
        total:                           1232576
    transactions:                        1232576 (122809.76 per sec.)
    queries:                             1232576 (122809.76 per sec.)
    ignored errors:                      0      (0.00 per sec.)
    reconnects:                          0      (0.00 per sec.)

General statistics:
    total time:                          10.0345s
    total number of events:              1232576

Latency (ms):
         min:                                  0.15
         avg:                                  1.04
         max:                                 24.45
         95th percentile:                      1.96
         sum:                            1276394.81

Threads fairness:
    events (avg/stddev):           9629.5000/65.84
    execution time (avg/stddev):   9.9718/0.01 

资源消耗

此时CPU利用率84%,已经接近瓶颈。

-bash-4.1$ iostat sdc -xk 5
...
avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          60.32    0.00   22.88    0.16    0.00   16.64

Device:         rrqm/s   wrqm/s     r/s     w/s    rkB/s    wkB/s avgrq-sz avgqu-sz   await  svctm  %util
sdc               0.00 13649.00   18.00 11011.00    72.00 98632.00    17.90     0.45    0.04   0.03  32.10 

citus集群测试

建表

CREATE TABLE sbtest1
(
  id integer NOT NULL,
  k integer NOT NULL DEFAULT 0,
  c character(120) NOT NULL DEFAULT ''::bpchar,
  pad character(60) NOT NULL DEFAULT ''::bpchar,
  PRIMARY KEY (id)
);

CREATE INDEX k_1 ON sbtest1(k);

set citus.shard_count = 128;
set citus.shard_replication_factor = 1;
select create_distributed_table('sbtest1','id'); 

插入数据

/bak/soft/sysbench-1.0.3/src/sysbench --test=/bak/soft/sysbench-1.0.3/src/lua/oltp_insert.lua \
--db-driver=pgsql \
--pgsql-host=127.0.0.1 \
--pgsql-port=5432 \
--pgsql-user=postgres  \
--pgsql-db=dbcitus  \
--auto_inc=0  \
--time=10 \
--threads=64  \
--report-interval=1 \
run 

执行结果

上次测试的TPS为44637,但是当时master上部署了pgbouncer,pgbouncer消耗了不少CPU。 把pgbouncer停掉后,再测的结果是55717。

-bash-4.1$ /bak/soft/sysbench-1.0.3/src/sysbench /bak/soft/sysbench-1.0.3/src/lua/oltp_insert.lua --db-driver=pgsql --pgsql-host=127.0.0.1 --pgsql-port=5432 --pgsql-user=postgres  --pgsql-db=dbone  --auto_inc=0  --time=5 --threads=64  --report-interval=1 run
sysbench 1.0.3 (using bundled LuaJIT 2.1.0-beta2)

Running the test with following options:
Number of threads: 64
Report intermediate results every 1 second(s)
Initializing random number generator from current time


Initializing worker threads...

Threads started!

[ 1s ] thds: 64 tps: 52903.73 qps: 52903.73 (r/w/o: 0.00/52903.73/0.00) lat (ms,95%): 3.25 err/s: 0.00 reconn/s: 0.00
[ 2s ] thds: 64 tps: 56548.81 qps: 56548.81 (r/w/o: 0.00/56548.81/0.00) lat (ms,95%): 3.13 err/s: 0.00 reconn/s: 0.00
[ 3s ] thds: 64 tps: 56492.06 qps: 56492.06 (r/w/o: 0.00/56492.06/0.00) lat (ms,95%): 3.13 err/s: 0.00 reconn/s: 0.00
[ 4s ] thds: 64 tps: 56470.25 qps: 56470.25 (r/w/o: 0.00/56470.25/0.00) lat (ms,95%): 3.13 err/s: 0.00 reconn/s: 0.00
[ 5s ] thds: 64 tps: 56627.38 qps: 56627.38 (r/w/o: 0.00/56627.38/0.00) lat (ms,95%): 3.13 err/s: 0.00 reconn/s: 0.00
SQL statistics:
    queries performed:
        read:                            0
        write:                           279214
        other:                           0
        total:                           279214
    transactions:                        279214 (55717.02 per sec.)
    queries:                             279214 (55717.02 per sec.)
    ignored errors:                      0      (0.00 per sec.)
    reconnects:                          0      (0.00 per sec.)

General statistics:
    total time:                          5.0093s
    total number of events:              279214

Latency (ms):
         min:                                  0.45
         avg:                                  1.14
         max:                                 36.80
         95th percentile:                      3.13
         sum:                             319193.98

Threads fairness:
    events (avg/stddev):           4362.7188/79.40
    execution time (avg/stddev):   4.9874/0.00 

资源消耗

性能瓶颈在master的CPU上,master生成执行计划消耗了大量CPU。

master的CPU利用率达到82%

[root@node1 ~]# iostat sdc -xk 5
Linux 2.6.32-431.el6.x86_64 (node1)     2017年03月13日     _x86_64_    (32 CPU)

...
avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          66.15    0.00   15.00    0.00    0.00   18.85

Device:         rrqm/s   wrqm/s     r/s     w/s    rkB/s    wkB/s avgrq-sz avgqu-sz   await  svctm  %util
sdc               0.00     0.00    0.00    0.00     0.00     0.00     0.00     0.00    0.00   0.00   0.00 

优化:master绕过SQL解析

定义分发SQL的函数

定义下面分发SQL的函数。

CREATE FUNCTION pg_catalog.master_distribute_dml(table_name regclass,distribution_column_value anyelement,dml_sql text)
    RETURNS integer
    LANGUAGE C STRICT
    AS 'citus.so', $$master_distribute_dml$$;
COMMENT ON FUNCTION master_distribute_dml(regclass,anyelement,text)
    IS 'distribute delete insert and update query to appropriate shard'; 

各参数的含义如下 - table_name:表名 - distribution_column_value:分片列值 - dml_sql:DML SQL语句,其中表名用"%s"代替

该函数通过传入的分片列值判断出所属的分片,然后直接发SQL分发到该分片上,仅仅把SQL中“%s”替代为实际的分片表名。 函数的定义参考附录。

修改

在oltp_insert.lua的基础上生成oltp_insert2.lua

cp ./src/lua/oltp_insert.lua ./src/lua/oltp_insert2.lua
vi ./src/lua/oltp_insert2.lua 

修改内容如下:

con:query(string.format("INSERT INTO %s (id, k, c, pad) VALUES " ..
                         "(%d, %d, '%s', '%s')",
                      table_name, i, k_val, c_val, pad_val)) 

==>

con:query(string.format("select master_distribute_dml('%s', %d, $$" ..
                        "INSERT INTO %%s (id, k, c, pad) VALUES " ..
                         "(%d, %d, '%s', '%s')$$)",
                      table_name, i, i, k_val, c_val, pad_val)) 

测试

修改后TPS增加到75973。

-bash-4.1$ /bak/soft/sysbench-1.0.3/src/sysbench /bak/soft/sysbench-1.0.3/src/lua/oltp_insert.lua --db-driver=pgsql --pgsql-host=127.0.0.1 --pgsql-port=5432 --pgsql-user=postgres  --pgsql-db=dbcitus  --auto_inc=0  --time=5 --threads=64  --report-interval=1 run
sysbench 1.0.3 (using bundled LuaJIT 2.1.0-beta2)

Running the test with following options:
Number of threads: 64
Report intermediate results every 1 second(s)
Initializing random number generator from current time


Initializing worker threads...

Threads started!

[ 1s ] thds: 64 tps: 73760.99 qps: 73761.98 (r/w/o: 73761.98/0.00/0.00) lat (ms,95%): 2.52 err/s: 0.00 reconn/s: 0.00
[ 2s ] thds: 64 tps: 76409.47 qps: 76409.47 (r/w/o: 76409.47/0.00/0.00) lat (ms,95%): 2.48 err/s: 0.00 reconn/s: 0.00
[ 3s ] thds: 64 tps: 76669.99 qps: 76668.99 (r/w/o: 76668.99/0.00/0.00) lat (ms,95%): 2.57 err/s: 0.00 reconn/s: 0.00
[ 4s ] thds: 64 tps: 76587.58 qps: 76587.58 (r/w/o: 76587.58/0.00/0.00) lat (ms,95%): 2.57 err/s: 0.00 reconn/s: 0.00
[ 5s ] thds: 64 tps: 76996.79 qps: 76996.79 (r/w/o: 76996.79/0.00/0.00) lat (ms,95%): 2.52 err/s: 0.00 reconn/s: 0.00
SQL statistics:
    queries performed:
        read:                            380635
        write:                           0
        other:                           0
        total:                           380635
    transactions:                        380635 (75973.81 per sec.)
    queries:                             380635 (75973.81 per sec.)
    ignored errors:                      0      (0.00 per sec.)
    reconnects:                          0      (0.00 per sec.)

General statistics:
    total time:                          5.0081s
    total number of events:              380635

Latency (ms):
         min:                                  0.28
         avg:                                  0.84
         max:                                 11.70
         95th percentile:                      2.52
         sum:                             318897.47

Threads fairness:
    events (avg/stddev):           5947.4219/101.63
    execution time (avg/stddev):   4.9828/0.00 

master的CPU利用率降低到46%

[root@node1 ~]# iostat sdc -xk 5
Linux 2.6.32-431.el6.x86_64 (node1)     2017年03月13日     _x86_64_    (32 CPU)

...

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          25.10    0.00   20.81    0.00    0.00   54.08

Device:         rrqm/s   wrqm/s     r/s     w/s    rkB/s    wkB/s avgrq-sz avgqu-sz   await  svctm  %util
sdc               0.00     0.00    0.00    0.00     0.00     0.00     0.00     0.00    0.00   0.00   0.00 

总结

  1. 通过修改源码绕过master的SQL解析,单master的SQL处理能力由5.6w/s提高到7.6w/s,CPU消耗反而从82%降到了46%,综合而言master处理效率提升为原来的2.4倍。
  2. 结合masterless优化,8个worker组成的citus集群实时数据插入的速度预计可达到40w/s左右。
  3. SQL越长越复杂,该方法的性能优化效果越好。我们另一个场景中,有个更新384个字段的超长UPDATE语句,通过这种方式TPS从715提升到8173,master的CPU利用率从98%降低到36%,master处理效能提升为原来的31倍。

附录:master_distribute_dml函数原型实现

src\backend\distributed\master\master_distribute_dml.c:

#include "postgres.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h"

#include "access/htup_details.h"
#include "catalog/pg_type.h"
#include "access/xact.h"
#include "catalog/namespace.h"
#include "catalog/pg_class.h"
#include "commands/dbcommands.h"
#include "commands/event_trigger.h"
#include "distributed/citus_clauses.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/listutils.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_protocol.h"
#include "optimizer/clauses.h"
#include "optimizer/predtest.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/var.h"
#include "nodes/makefuncs.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
/* #include "Fmgr.h" */
#include "utils/catcache.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/typcache.h"


extern int64
ExecuteSingleModifyTask2(Task *task, bool expectResults);

static Task *
ModifySingleShardTask(char *query, ShardInterval *shardInterval);

static char *
generate_shard_relation_name(Oid relid, int64 shardid);

static void
generate_shard_query(char *query, Oid distrelid, int64 shardid, StringInfo buffer);

PG_FUNCTION_INFO_V1(master_distribute_dml);



/*
 * master_modify_multiple_shards takes in a DELETE or UPDATE query string and
 * pushes the query to shards. It finds shards that match the criteria defined
 * in the delete command, generates the same delete query string for each of the
 * found shards with distributed table name replaced with the shard name and
 * sends the queries to the workers. It uses one-phase or two-phase commit
 * transactions depending on citus.copy_transaction_manager value.
 */
Datum
master_distribute_dml(PG_FUNCTION_ARGS)
{
    Oid relationId = PG_GETARG_OID(0);
    Datum partitionValue = PG_GETARG_DATUM(1);
    text *queryText = PG_GETARG_TEXT_P(2);
    char *queryString = text_to_cstring(queryText);

    DistTableCacheEntry *cacheEntry = NULL;
    char partitionMethod;
    ShardInterval *shardInterval = NULL;
    Task *task = NULL;
    int32 affectedTupleCount = 0;

    /* 简化权限检查,避免SQL解析*/
    EnsureTablePermissions(relationId, ACL_INSERT | ACL_UPDATE | ACL_DELETE);

    CheckDistributedTable(relationId);

    cacheEntry = DistributedTableCacheEntry(relationId);
    partitionMethod = cacheEntry->partitionMethod;

    /* fast shard pruning is only supported for hash and range partitioned tables */
    if (partitionMethod != DISTRIBUTE_BY_HASH && partitionMethod != DISTRIBUTE_BY_RANGE)
    {
        ereport(ERROR, (errmsg("only hash and range distributed table are supported")));
    }

    shardInterval = FastShardPruning(relationId, partitionValue);
    if (shardInterval == NULL)
    {
        ereport(ERROR, (errmsg("could not find appropriate shard of relation \"%s\" for partition value \"%s\" ",
                        get_rel_name(relationId), TextDatumGetCString(partitionValue))));
    }

    CHECK_FOR_INTERRUPTS();

    task = ModifySingleShardTask(queryString, shardInterval);

    affectedTupleCount = ExecuteSingleModifyTask2( task, false);


    PG_RETURN_INT32(affectedTupleCount);
}


/*
 * ModifyMultipleShardsTaskList builds a list of tasks to execute a query on a
 * given list of shards.
 */
static Task *
ModifySingleShardTask(char *query, ShardInterval *shardInterval)
{
    uint64 jobId = INVALID_JOB_ID;
    int taskId = 1;
    Oid relationId = shardInterval->relationId;
    uint64 shardId = shardInterval->shardId;
    StringInfo shardQueryString = makeStringInfo();
    Task *task = NULL;

    /* lock metadata before getting placment lists */
    LockShardDistributionMetadata(shardId, ShareLock);

    generate_shard_query(query, relationId, shardId, shardQueryString);

    task = CitusMakeNode(Task);
    task->jobId = jobId;
    task->taskId = taskId;
    task->taskType = SQL_TASK;
    task->queryString = shardQueryString->data;
    task->dependedTaskList = NULL;
    task->anchorShardId = shardId;
    task->taskPlacementList = FinalizedShardPlacementList(shardId);

    return task;
}

/*
 * generate_shard_relation_name
 *      Compute the name to display for a shard
 *
 * If the provided relid is equal to the provided distrelid, this function
 * returns a shard-extended relation name; otherwise, it falls through to a
 * simple generate_relation_name call.
 */
static char *
generate_shard_relation_name(Oid relid, int64 shardid)
{
    char *relname = NULL;

    relname = get_rel_name(relid);
    if (!relname)
        elog(ERROR, "cache lookup failed for relation %u", relid);

    if (shardid > 0)
    {
        Oid schemaOid = get_rel_namespace(relid);
        char *schemaName = get_namespace_name(schemaOid);

        AppendShardIdToName(&relname, shardid);

        relname = quote_qualified_identifier(schemaName, relname);
    }

    return relname;
}


static void
generate_shard_query(char *query, Oid distrelid, int64 shardid, StringInfo buffer)
{
    appendStringInfo(buffer, query,
            generate_shard_relation_name(distrelid,shardid)
                    );
} 

src\backend\distributed\executor\multi_router_executor.c:

增加以下函数

int64
ExecuteSingleModifyTask2(Task *task,
                  bool expectResults)
{
    QueryDesc qdesc;
    EState executorState;
    bool resultsOK = false;
    qdesc.estate=&executorState;
    qdesc.operation=CMD_UPDATE;
    qdesc.tupDesc=NULL;
    qdesc.planstate=NULL;
    qdesc.params=NULL;

    ExecuteSingleModifyTask(&qdesc, task,expectResults);

    return executorState.es_processed;
} 

参考

  • Scaling Out Data Ingestion

    • Real-time Inserts:0-50k/s
    • Real-time Updates:0-50k/s
    • Bulk Copy:100-200k/s
    • Masterless Citus:50k/s-500k/s
上一篇:跨域请求ftp服务器获取图片文件,并以base64格式返回给前端


下一篇:Linux——搭建FTP服务