分布式实时分析数据库citus数据插入性能优化之二
在上回的分布式实时分析数据库citus数据插入性能优化 提到citus的master上执行计划生成比较耗费时间,下面尝试通过修改源码绕过master的执行计划生成。
环境
软硬件配置
-
CentOS release 6.5 x64物理机(16C/128G/300GB SSD)
- CPU: 2*8core 16核32线程, Intel(R) Xeon(R) CPU E5-2630 v3 @ 2.40GHz
- PostgreSQL 9.6.2
- citus 6.1.0
- sysbench-1.0.3
机器列表
-
master
- 192.168.0.177
-
worker(8个)
- 192.168.0.181~192.168.0.188
软件的安装都比较简单,参考官方文档即可,这里略过。
postgresql.conf配置
listen_addresses = '*' port = 5432 max_connections = 1000 shared_buffers = 32GB effective_cache_size = 96GB work_mem = 16MB maintenance_work_mem = 2GB min_wal_size = 4GB max_wal_size = 32GB checkpoint_completion_target = 0.9 wal_buffers = 16MB default_statistics_target = 100 shared_preload_libraries = 'citus' checkpoint_timeout = 60min wal_level = replica wal_compression = on wal_log_hints = on synchronous_commit = on
注:和上次的测试不同,synchronous_commit改为on
测试场景
选用sysbench-1.0.3的oltp_insert.lua作为测试用例,执行的SQL的示例如下:
INSERT INTO sbtest1 (id, k, c, pad) VALUES (525449452, 5005, '28491622445-08162085385-16839726209-31171823540-28539137588-93842246002-13643098812-68836434394-95216556185-07917709646', '49165640733-86514010343-02300194630-37380434155-24438915047')
但是,sysbench-1.0.3的oltp_insert.lua中有一个bug,需要先将其改正
i = sysbench.rand.unique()
==>
i = sysbench.rand.unique() - 2147483648
单机测试
建表
CREATE TABLE sbtest1 ( id integer NOT NULL, k integer NOT NULL DEFAULT 0, c character(120) NOT NULL DEFAULT ''::bpchar, pad character(60) NOT NULL DEFAULT ''::bpchar, PRIMARY KEY (id) ); CREATE INDEX k_1 ON sbtest1(k);
插入数据
src/sysbench --test=src/lua/oltp_insert.lua \ --db-driver=pgsql \ --pgsql-host=127.0.0.1 \ --pgsql-port=5432 \ --pgsql-user=postgres \ --pgsql-db=dbone \ --auto_inc=0 \ --time=10 \ --threads=128 \ --report-interval=1 \ run
测试结果
TPS为122809
-bash-4.1$ src/sysbench --test=src/lua/oltp_insert.lua --db-driver=pgsql --pgsql-host=127.0.0.1 --pgsql-port=5432 --pgsql-user=postgres --pgsql-db=dbone --auto_inc=0 --time=10 --threads=128 --report-interval=1 run WARNING: the --test option is deprecated. You can pass a script name or path on the command line without any options. sysbench 1.0.3 (using bundled LuaJIT 2.1.0-beta2) Running the test with following options: Number of threads: 128 Report intermediate results every 1 second(s) Initializing random number generator from current time Initializing worker threads... Threads started! [ 1s ] thds: 128 tps: 124474.46 qps: 124474.46 (r/w/o: 0.00/124474.46/0.00) lat (ms,95%): 1.93 err/s: 0.00 reconn/s: 0.00 [ 2s ] thds: 128 tps: 124674.70 qps: 124674.70 (r/w/o: 0.00/124674.70/0.00) lat (ms,95%): 1.93 err/s: 0.00 reconn/s: 0.00 [ 3s ] thds: 128 tps: 125700.72 qps: 125700.72 (r/w/o: 0.00/125700.72/0.00) lat (ms,95%): 1.93 err/s: 0.00 reconn/s: 0.00 [ 4s ] thds: 128 tps: 125316.67 qps: 125316.67 (r/w/o: 0.00/125316.67/0.00) lat (ms,95%): 1.93 err/s: 0.00 reconn/s: 0.00 [ 5s ] thds: 128 tps: 114303.50 qps: 114303.50 (r/w/o: 0.00/114303.50/0.00) lat (ms,95%): 2.22 err/s: 0.00 reconn/s: 0.00 [ 6s ] thds: 128 tps: 124781.26 qps: 124781.26 (r/w/o: 0.00/124781.26/0.00) lat (ms,95%): 1.93 err/s: 0.00 reconn/s: 0.00 [ 7s ] thds: 128 tps: 124819.42 qps: 124819.42 (r/w/o: 0.00/124819.42/0.00) lat (ms,95%): 1.93 err/s: 0.00 reconn/s: 0.00 [ 8s ] thds: 128 tps: 125309.88 qps: 125309.88 (r/w/o: 0.00/125309.88/0.00) lat (ms,95%): 1.93 err/s: 0.00 reconn/s: 0.00 [ 9s ] thds: 128 tps: 125674.52 qps: 125674.52 (r/w/o: 0.00/125674.52/0.00) lat (ms,95%): 1.89 err/s: 0.00 reconn/s: 0.00 [ 10s ] thds: 128 tps: 116230.44 qps: 116230.44 (r/w/o: 0.00/116230.44/0.00) lat (ms,95%): 2.07 err/s: 0.00 reconn/s: 0.00 SQL statistics: queries performed: read: 0 write: 1232576 other: 0 total: 1232576 transactions: 1232576 (122809.76 per sec.) queries: 1232576 (122809.76 per sec.) ignored errors: 0 (0.00 per sec.) reconnects: 0 (0.00 per sec.) General statistics: total time: 10.0345s total number of events: 1232576 Latency (ms): min: 0.15 avg: 1.04 max: 24.45 95th percentile: 1.96 sum: 1276394.81 Threads fairness: events (avg/stddev): 9629.5000/65.84 execution time (avg/stddev): 9.9718/0.01
资源消耗
此时CPU利用率84%,已经接近瓶颈。
-bash-4.1$ iostat sdc -xk 5 ... avg-cpu: %user %nice %system %iowait %steal %idle 60.32 0.00 22.88 0.16 0.00 16.64 Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await svctm %util sdc 0.00 13649.00 18.00 11011.00 72.00 98632.00 17.90 0.45 0.04 0.03 32.10
citus集群测试
建表
CREATE TABLE sbtest1 ( id integer NOT NULL, k integer NOT NULL DEFAULT 0, c character(120) NOT NULL DEFAULT ''::bpchar, pad character(60) NOT NULL DEFAULT ''::bpchar, PRIMARY KEY (id) ); CREATE INDEX k_1 ON sbtest1(k); set citus.shard_count = 128; set citus.shard_replication_factor = 1; select create_distributed_table('sbtest1','id');
插入数据
/bak/soft/sysbench-1.0.3/src/sysbench --test=/bak/soft/sysbench-1.0.3/src/lua/oltp_insert.lua \ --db-driver=pgsql \ --pgsql-host=127.0.0.1 \ --pgsql-port=5432 \ --pgsql-user=postgres \ --pgsql-db=dbcitus \ --auto_inc=0 \ --time=10 \ --threads=64 \ --report-interval=1 \ run
执行结果
上次测试的TPS为44637,但是当时master上部署了pgbouncer,pgbouncer消耗了不少CPU。 把pgbouncer停掉后,再测的结果是55717。
-bash-4.1$ /bak/soft/sysbench-1.0.3/src/sysbench /bak/soft/sysbench-1.0.3/src/lua/oltp_insert.lua --db-driver=pgsql --pgsql-host=127.0.0.1 --pgsql-port=5432 --pgsql-user=postgres --pgsql-db=dbone --auto_inc=0 --time=5 --threads=64 --report-interval=1 run sysbench 1.0.3 (using bundled LuaJIT 2.1.0-beta2) Running the test with following options: Number of threads: 64 Report intermediate results every 1 second(s) Initializing random number generator from current time Initializing worker threads... Threads started! [ 1s ] thds: 64 tps: 52903.73 qps: 52903.73 (r/w/o: 0.00/52903.73/0.00) lat (ms,95%): 3.25 err/s: 0.00 reconn/s: 0.00 [ 2s ] thds: 64 tps: 56548.81 qps: 56548.81 (r/w/o: 0.00/56548.81/0.00) lat (ms,95%): 3.13 err/s: 0.00 reconn/s: 0.00 [ 3s ] thds: 64 tps: 56492.06 qps: 56492.06 (r/w/o: 0.00/56492.06/0.00) lat (ms,95%): 3.13 err/s: 0.00 reconn/s: 0.00 [ 4s ] thds: 64 tps: 56470.25 qps: 56470.25 (r/w/o: 0.00/56470.25/0.00) lat (ms,95%): 3.13 err/s: 0.00 reconn/s: 0.00 [ 5s ] thds: 64 tps: 56627.38 qps: 56627.38 (r/w/o: 0.00/56627.38/0.00) lat (ms,95%): 3.13 err/s: 0.00 reconn/s: 0.00 SQL statistics: queries performed: read: 0 write: 279214 other: 0 total: 279214 transactions: 279214 (55717.02 per sec.) queries: 279214 (55717.02 per sec.) ignored errors: 0 (0.00 per sec.) reconnects: 0 (0.00 per sec.) General statistics: total time: 5.0093s total number of events: 279214 Latency (ms): min: 0.45 avg: 1.14 max: 36.80 95th percentile: 3.13 sum: 319193.98 Threads fairness: events (avg/stddev): 4362.7188/79.40 execution time (avg/stddev): 4.9874/0.00
资源消耗
性能瓶颈在master的CPU上,master生成执行计划消耗了大量CPU。
master的CPU利用率达到82%
[root@node1 ~]# iostat sdc -xk 5 Linux 2.6.32-431.el6.x86_64 (node1) 2017年03月13日 _x86_64_ (32 CPU) ... avg-cpu: %user %nice %system %iowait %steal %idle 66.15 0.00 15.00 0.00 0.00 18.85 Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await svctm %util sdc 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
优化:master绕过SQL解析
定义分发SQL的函数
定义下面分发SQL的函数。
CREATE FUNCTION pg_catalog.master_distribute_dml(table_name regclass,distribution_column_value anyelement,dml_sql text) RETURNS integer LANGUAGE C STRICT AS 'citus.so', $$master_distribute_dml$$; COMMENT ON FUNCTION master_distribute_dml(regclass,anyelement,text) IS 'distribute delete insert and update query to appropriate shard';
各参数的含义如下 - table_name:表名 - distribution_column_value:分片列值 - dml_sql:DML SQL语句,其中表名用"%s"代替
该函数通过传入的分片列值判断出所属的分片,然后直接发SQL分发到该分片上,仅仅把SQL中“%s”替代为实际的分片表名。 函数的定义参考附录。
修改
在oltp_insert.lua的基础上生成oltp_insert2.lua
cp ./src/lua/oltp_insert.lua ./src/lua/oltp_insert2.lua vi ./src/lua/oltp_insert2.lua
修改内容如下:
con:query(string.format("INSERT INTO %s (id, k, c, pad) VALUES " .. "(%d, %d, '%s', '%s')", table_name, i, k_val, c_val, pad_val))
==>
con:query(string.format("select master_distribute_dml('%s', %d, $$" .. "INSERT INTO %%s (id, k, c, pad) VALUES " .. "(%d, %d, '%s', '%s')$$)", table_name, i, i, k_val, c_val, pad_val))
测试
修改后TPS增加到75973。
-bash-4.1$ /bak/soft/sysbench-1.0.3/src/sysbench /bak/soft/sysbench-1.0.3/src/lua/oltp_insert.lua --db-driver=pgsql --pgsql-host=127.0.0.1 --pgsql-port=5432 --pgsql-user=postgres --pgsql-db=dbcitus --auto_inc=0 --time=5 --threads=64 --report-interval=1 run sysbench 1.0.3 (using bundled LuaJIT 2.1.0-beta2) Running the test with following options: Number of threads: 64 Report intermediate results every 1 second(s) Initializing random number generator from current time Initializing worker threads... Threads started! [ 1s ] thds: 64 tps: 73760.99 qps: 73761.98 (r/w/o: 73761.98/0.00/0.00) lat (ms,95%): 2.52 err/s: 0.00 reconn/s: 0.00 [ 2s ] thds: 64 tps: 76409.47 qps: 76409.47 (r/w/o: 76409.47/0.00/0.00) lat (ms,95%): 2.48 err/s: 0.00 reconn/s: 0.00 [ 3s ] thds: 64 tps: 76669.99 qps: 76668.99 (r/w/o: 76668.99/0.00/0.00) lat (ms,95%): 2.57 err/s: 0.00 reconn/s: 0.00 [ 4s ] thds: 64 tps: 76587.58 qps: 76587.58 (r/w/o: 76587.58/0.00/0.00) lat (ms,95%): 2.57 err/s: 0.00 reconn/s: 0.00 [ 5s ] thds: 64 tps: 76996.79 qps: 76996.79 (r/w/o: 76996.79/0.00/0.00) lat (ms,95%): 2.52 err/s: 0.00 reconn/s: 0.00 SQL statistics: queries performed: read: 380635 write: 0 other: 0 total: 380635 transactions: 380635 (75973.81 per sec.) queries: 380635 (75973.81 per sec.) ignored errors: 0 (0.00 per sec.) reconnects: 0 (0.00 per sec.) General statistics: total time: 5.0081s total number of events: 380635 Latency (ms): min: 0.28 avg: 0.84 max: 11.70 95th percentile: 2.52 sum: 318897.47 Threads fairness: events (avg/stddev): 5947.4219/101.63 execution time (avg/stddev): 4.9828/0.00
master的CPU利用率降低到46%
[root@node1 ~]# iostat sdc -xk 5 Linux 2.6.32-431.el6.x86_64 (node1) 2017年03月13日 _x86_64_ (32 CPU) ... avg-cpu: %user %nice %system %iowait %steal %idle 25.10 0.00 20.81 0.00 0.00 54.08 Device: rrqm/s wrqm/s r/s w/s rkB/s wkB/s avgrq-sz avgqu-sz await svctm %util sdc 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
总结
- 通过修改源码绕过master的SQL解析,单master的SQL处理能力由5.6w/s提高到7.6w/s,CPU消耗反而从82%降到了46%,综合而言master处理效率提升为原来的2.4倍。
- 结合masterless优化,8个worker组成的citus集群实时数据插入的速度预计可达到40w/s左右。
- SQL越长越复杂,该方法的性能优化效果越好。我们另一个场景中,有个更新384个字段的超长UPDATE语句,通过这种方式TPS从715提升到8173,master的CPU利用率从98%降低到36%,master处理效能提升为原来的31倍。
附录:master_distribute_dml函数原型实现
src\backend\distributed\master\master_distribute_dml.c:
#include "postgres.h" #include "funcapi.h" #include "libpq-fe.h" #include "miscadmin.h" #include "access/htup_details.h" #include "catalog/pg_type.h" #include "access/xact.h" #include "catalog/namespace.h" #include "catalog/pg_class.h" #include "commands/dbcommands.h" #include "commands/event_trigger.h" #include "distributed/citus_clauses.h" #include "distributed/citus_ruleutils.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_partition.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_protocol.h" #include "optimizer/clauses.h" #include "optimizer/predtest.h" #include "optimizer/restrictinfo.h" #include "optimizer/var.h" #include "nodes/makefuncs.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" /* #include "Fmgr.h" */ #include "utils/catcache.h" #include "utils/fmgroids.h" #include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rel.h" #include "utils/typcache.h" extern int64 ExecuteSingleModifyTask2(Task *task, bool expectResults); static Task * ModifySingleShardTask(char *query, ShardInterval *shardInterval); static char * generate_shard_relation_name(Oid relid, int64 shardid); static void generate_shard_query(char *query, Oid distrelid, int64 shardid, StringInfo buffer); PG_FUNCTION_INFO_V1(master_distribute_dml); /* * master_modify_multiple_shards takes in a DELETE or UPDATE query string and * pushes the query to shards. It finds shards that match the criteria defined * in the delete command, generates the same delete query string for each of the * found shards with distributed table name replaced with the shard name and * sends the queries to the workers. It uses one-phase or two-phase commit * transactions depending on citus.copy_transaction_manager value. */ Datum master_distribute_dml(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); Datum partitionValue = PG_GETARG_DATUM(1); text *queryText = PG_GETARG_TEXT_P(2); char *queryString = text_to_cstring(queryText); DistTableCacheEntry *cacheEntry = NULL; char partitionMethod; ShardInterval *shardInterval = NULL; Task *task = NULL; int32 affectedTupleCount = 0; /* 简化权限检查,避免SQL解析*/ EnsureTablePermissions(relationId, ACL_INSERT | ACL_UPDATE | ACL_DELETE); CheckDistributedTable(relationId); cacheEntry = DistributedTableCacheEntry(relationId); partitionMethod = cacheEntry->partitionMethod; /* fast shard pruning is only supported for hash and range partitioned tables */ if (partitionMethod != DISTRIBUTE_BY_HASH && partitionMethod != DISTRIBUTE_BY_RANGE) { ereport(ERROR, (errmsg("only hash and range distributed table are supported"))); } shardInterval = FastShardPruning(relationId, partitionValue); if (shardInterval == NULL) { ereport(ERROR, (errmsg("could not find appropriate shard of relation \"%s\" for partition value \"%s\" ", get_rel_name(relationId), TextDatumGetCString(partitionValue)))); } CHECK_FOR_INTERRUPTS(); task = ModifySingleShardTask(queryString, shardInterval); affectedTupleCount = ExecuteSingleModifyTask2( task, false); PG_RETURN_INT32(affectedTupleCount); } /* * ModifyMultipleShardsTaskList builds a list of tasks to execute a query on a * given list of shards. */ static Task * ModifySingleShardTask(char *query, ShardInterval *shardInterval) { uint64 jobId = INVALID_JOB_ID; int taskId = 1; Oid relationId = shardInterval->relationId; uint64 shardId = shardInterval->shardId; StringInfo shardQueryString = makeStringInfo(); Task *task = NULL; /* lock metadata before getting placment lists */ LockShardDistributionMetadata(shardId, ShareLock); generate_shard_query(query, relationId, shardId, shardQueryString); task = CitusMakeNode(Task); task->jobId = jobId; task->taskId = taskId; task->taskType = SQL_TASK; task->queryString = shardQueryString->data; task->dependedTaskList = NULL; task->anchorShardId = shardId; task->taskPlacementList = FinalizedShardPlacementList(shardId); return task; } /* * generate_shard_relation_name * Compute the name to display for a shard * * If the provided relid is equal to the provided distrelid, this function * returns a shard-extended relation name; otherwise, it falls through to a * simple generate_relation_name call. */ static char * generate_shard_relation_name(Oid relid, int64 shardid) { char *relname = NULL; relname = get_rel_name(relid); if (!relname) elog(ERROR, "cache lookup failed for relation %u", relid); if (shardid > 0) { Oid schemaOid = get_rel_namespace(relid); char *schemaName = get_namespace_name(schemaOid); AppendShardIdToName(&relname, shardid); relname = quote_qualified_identifier(schemaName, relname); } return relname; } static void generate_shard_query(char *query, Oid distrelid, int64 shardid, StringInfo buffer) { appendStringInfo(buffer, query, generate_shard_relation_name(distrelid,shardid) ); }
src\backend\distributed\executor\multi_router_executor.c:
增加以下函数
int64 ExecuteSingleModifyTask2(Task *task, bool expectResults) { QueryDesc qdesc; EState executorState; bool resultsOK = false; qdesc.estate=&executorState; qdesc.operation=CMD_UPDATE; qdesc.tupDesc=NULL; qdesc.planstate=NULL; qdesc.params=NULL; ExecuteSingleModifyTask(&qdesc, task,expectResults); return executorState.es_processed; }
参考
-
- Real-time Inserts:0-50k/s
- Real-time Updates:0-50k/s
- Bulk Copy:100-200k/s
- Masterless Citus:50k/s-500k/s