如何提升AnalyticDB实时写入性能

从AnalyticDB写入原理分析,可以从三个方面提升AnalyticDB的写入能力:降低网络传输开销、减少与硬件设备io操作和尽量少消耗cpu资源。针对这三个特性本文将介绍如何对写入sql进行改造以达到最优性能。

  • 采用批量写入(batch insert)模式,即每次在VALUES部分添加多行数据,一般建议每次批量写入数据量大约为16KB,以提高网络和磁盘吞吐。如下
INSERT INTO db_name.table_name (col1, col2, col3) VALUES ('xxx', 111, 'xxx'), ('xxx', 222, 'xxx'), ('xxx', 333, 'xxx');
  • 如果对一行的所有列都进行插入,则去除col_name并保证values顺序与表结构中的col_name顺序一致,以降低网络带宽耗用。如下
INSERT INTO db_name.table_name VALUES ('xxx', 111, 'xxx'), ('xxx', 222, 'xxx'), ('xxx', 333, 'xxx');
  • 保持主键相对有序。AnalyticDB的insert语句要求必须提供主键,且主键可以为复合主键。当确定复合主键时,根据业务含义调整复合主键中各个列的次序,从业务层面保证插入时主键是严格递增或近似递增的,也可以提升实时写入速度。
  • 增加ignore关键字。执行不带ignore关键字的insert sql,当主键冲突时,后续数据会覆盖之前插入的数据;带上ignore关键字,则主键冲突时,会保留之前插入的数据而自动忽略新数据。如果业务层没有数据覆盖的语义要求,则建议所有insert sql都加上ignore关键字,以减小覆盖数据带来的性能开销。
  • AnalyticDB需要对数据进行分区存储,当一次Batch insert中含有属于不同分区的多行数据时,将会耗费大量CPU资源进行分区号计算。因此建议在写入程序中提前计算好每行数据的分区号,并且将属于同一分区的多行数据组成一个批次,一次性插入。

实现聚合写入目前主要有两种途径:

  • 用户自行实现该聚合方法,对分区号的计算规则为:partition_num = CRC32(hash_partition_column_value) mod m,其中hash_partition_column_value是分区列的值,m是分区总数。如下代码
public class HashInsert extends AbstractJavaSamplerClient{
    private static Logger log = Logger.getLogger(HashInsert1M.class.getName());
    private static AtomicLong idGen = new AtomicLong();
    private int bufferSize =2000 
    private int batchSize = 20;
    private int partitionCnt = 100;

    public SampleResult runTest(JavaSamplerContext arg0) {
        ..........
        ..........
        String sqls[] = new String[bufferSize];
        int partNo[] = new int [bufferSize];
        int sortedSqlIndex[] = new int [bufferSize];
        int end = 100;
        for(int i = 0; i < bufferSize; i++) {
            long id = idGen.getAndIncrement();
            boolean boolean_id = DataUtil.getBoolean_test(id);
            int byte_id = DataUtil.getByte_test(id);
            int short_id = DataUtil.getShort_test(id);
            long user_id = DataUtil.getInt_test(id);
            long seller_id = id;
            float float_id = DataUtil.getFloat_test(id);
            double double_id = DataUtil.getDouble_test(id);
            String follow_id = DataUtil.getString_test(id);
            String time_id = DataUtil.getTime_test(id);
            String date_id = DataUtil.getDate_test(id);
            String timestamp_id = DataUtil.getTimestamp_test(id);
            String interest_flag = DataUtil.getMutilValue(id);
               StringBuffer sb = new StringBuffer();
            sb.append("(").append(boolean_id).append(",").append(byte_id).append(",").append(short_id).append(",").append(user_id)
            .append(",").append(seller_id).append(",").append(float_id).append(",").append(double_id).append(",'").append(follow_id)
            .append("','").append(time_id).append("','").append(date_id).append("','").append(timestamp_id).append("','").append(interest_flag)
            .append("',");
            for(int j=0;j<end-1;j++){
                sb.append("'").append(follow_id).append("',");
            }
            sb.append("'").append(follow_id).append("')");
            sqls[i]  = sb.toString();

            partNo[i] = getHashPartition("" + user_id, partitionCnt);
            sortedSqlIndex[i] = i;
        }

        for(int i = 0; i < bufferSize - 1; i++) {
            for(int j = i + 1; j < bufferSize; j++) {
                if (partNo[sortedSqlIndex[i]] > partNo[sortedSqlIndex[j]]) {
                    int tmp = sortedSqlIndex[i];
                    sortedSqlIndex[i] = sortedSqlIndex[j];
                    sortedSqlIndex[j] = tmp;
                }
            }
        }

        batchSize =  Integer.valueOf(AdsUtil.getBatchNum());
        try {
            .........
            .........
            String dbName = AdsUtil.getDBName();
            String tableName = AdsUtil.getTableName();
            String sql = "insert into " + dbName + "." + tableName  + " values ";
            for(int i = 0; i < bufferSize  - batchSize; i+= batchSize) {
                StringBuffer sb = new StringBuffer(sql);
                for(int j = 0 ; j < batchSize; j++) {
                    if (j != 0)
                        sb.append(",");
                    sb.append(sqls[sortedSqlIndex[i + j]]);                
                }
                ..............
                ..............
            }
            res = true;
        } catch (Exception e) {
            ...........
            ...........
        } finally {
            ...........
            ...........
        }

        return ...;
    }

    public static int getHashPartition(String value, int totalHashPartitionNum) {
        long crc32 = (value == null ? getCRC32("-1") : getCRC32(value));
        return (int) (crc32 % totalHashPartitionNum);
    }

    private static long getCRC32(String value) {
        Checksum checksum = new CRC32();
        byte[] bytes = value.getBytes();
        checksum.update(bytes, 0, bytes.length);
        return checksum.getValue();
    }
}
  • 采用AnalyticDB搭配的同步工具”数据集成”进行实时数据同步。一般建议采用第二种方法。
上一篇:聊聊高并发(二十九)解析java.util.concurrent各个组件(十一) 再看看ReentrantReadWriteLock可重入读-写锁


下一篇:写烦了CRUD的代码,要不要来学点AI放松一下?