量化交易中,数据系统主要需要支撑两个场景。第一个是策略回测,面对的是过去10年的历史数据,数据量在10TB级别。目标是当我们脑袋里面有一个自认为绝好的策略思路时,能够快速的进行验证其是不是有效。从技术实现的角度来说,回测就是一遍一遍的轮询大量的历史数据进行计算。这里面的历史数据是不会更改的,要求读取的速度特别快。
另一个是实时交易,面对的是每秒10k+的交易数据流入,能够即时通过原始数据计算出量化因子,做出交易决策。还有风险控制,在出现事先没有预期到的风险时,要能够迅速把持仓退出来。这都要求延迟尽可能地小,控制滑点成本。
金融时间序列数据的特点
- 数据量比较大。以目前A股的level1 tick数据为例,每支股票每3s就会生成一条数据,3k+支股票每天交易4个小时,总计生成接近1500万条原始记录,加入基于原始记录生成的各类因子,数据量要翻N倍。使用level2逐笔成交数据的话,数据量要更大。
- 数据是分块的。依赖于交易所的交易日界定,每个交易日都是独立的,所以可以将每支股票每天的数据作为一块互不相关的数据块。每个数据块大约1M大小。
- 全部是数值型,没有文本。对数据的压缩很有效。
- 数据稳定增长,不会出现访问峰值。这对于系统的承压能力要求相对较低。
- 一次写入多次读取,不会修改已经写入的数据,数据写入压力小。
- 不需要支持事务。
- 对时效性和准确性要求很高。如果出现比较大的延迟或者数据错误,那策略的表现变得不可控,无法执行。
数据库的选择
- MySQL:以上文所提到的A股level1 tick数据量,MySQL是无法支撑的。对于历史数据来说数据量太大了,MySQL的数据压缩效率不高,存储和效率都无法满足需求;对于实时交易来说延迟会比较大。
如果数据频率时分钟K线,那用MySQL是可以解决的。使用MyISAM存储引擎,因为MyISAM可以对数据压缩,节约存储空间,读性能也要比InnoDB要好。
-
MongoDB:一个不错的选择,目前有很多量化团队在使用MongoDB作数据存储。对于中低频策略应该完全没问题。Mongoing中文社区 也有一系列相关的文章:
- InfluxDB:无论是面对历史回测或者实时交易的场景,InfluxDB都是很好的选择。具体在下文讨论。
- HDF5:非常高效的二进制文件,用来存储静态数据,特别是面对科学计算问题。具体在下文讨论。
- Kdb+:商用软件,性能很强大,但是q查询语言学习曲线很陡峭,而且license很贵。
- DolphinDB:比较新的时序数据库,也是商用软件, 官方宣称其性能可以替代kdb+。
InfluxDB
为什么选择InfluxDB
InfluxDB是目前最受欢迎的时序数据库,而且社区活跃度增长非常快。一图胜千言,我们看下面两个图就可以了解时序数据库的现状。
Ranking of Time Series DBMS (from DB-Engines)
Trend of InfluxDB Popularity (from DB-Engines)
与其它数据库对比
MongoDB vs InfluxDB | InfluxData Time Series Workloads
- InfluxDB outperformed MongoDB by 2.4x when it came to data ingestion
- InfluxDB outperformed MongoDB by delivering 20x better compression
- InfluxDB outperformed MongoDB by delivering 5.7x better query performance
InfluxDB vs OpenTSDB | Time Series Database Comparison
InfluxDB和OpenTSDB是目前最受欢迎两个时序数据库。
易用性:
- 在单机上,InfluxDB就是一个独立安装包,安装配置都很简单。
- 在集群系统中,OpenTSDB使用HBase存储数据,比较成熟。InfluxDB的集群解决方案是商业化的。
性能: - InfluxDB outperformed OpenTSDB by 9x when it came to data ingestion
- InfluxDB outperformed OpenTSDB by delivering 8x better compression
- InfluxDB outperformed OpenTSDB by delivering a minimum of 7x better query throughput
InfluxDB硬件配置建议
Load | Field writes per second | Moderate queries per second | Unique series |
---|---|---|---|
Low | < 5 thousand | < 5 | < 100 thousand |
Moderate | < 250 thousand | < 25 | < 1 million |
High | > 250 thousand | > 25 | > 1 million |
Probably infeasible | > 750 thousand | > 100 | > 10 million |
- Low - CPU: 2-4 cores, RAM: 2-4 GB, IOPS: 500
- Moderate - CPU: 4-6 cores, RAM: 8-32 GB, IOPS: 500-1000
- High - CPU: 8+ cores, RAM: 32+ GB, IOPS: 1000+
- Probably infeasible load - cluster solution
根据上文的推算结果,这里的load介于Moderate与High之间,使用单机InfluxDB就够了。
目前很多量化团队用的都是单机架构,主要在提高单机性能。那为什么不用分布式系统,比如Hive/HBase?因为学习和维护成本高,对于中小团队不现实。另一个原因就是这里数据并不是高并发的场景,性能较好的单机就可以解决。
InfluxDB存储交易数据
InfluxDB使用细节不在这里展开。学习资料:
在我们的系统中,每支股票用一个独立的 measurement 存储,类似于MySQL里面的table。如上文所说,每支股票每天的交易tick被当作一个独立数据块,在InfluxDB里面存储为一个series,通过添加tag记录交易日来区分。还加入另一个tag来记录数据源,因为我们可能会有多个数据源,这个tag可用来做数据源可靠性分析。
数据(line protocol)示例,其中date
和source
就是数据的tag集:
000001,source=XYZ,date=20190103 Price=123.45,Volume=6789,Amount=10111213 1546480800000
检索示例,查询出某支股票一整天的交易数据,InfluxQL跟SQL使用基本一样:
SELECT * FROM "000001" WHERE date='20190103'
使用技巧
- InfluxDB是不支持事务的,所以在读/写操作同时进行的场景中,有可能一条记录只有一半被写入,就被读出来了,这就是脏数据。为了判断读出来的是不是脏数据,需要对取出来的数据进行检查,如果某个不可能为空的字段是空值,那么求需要重新取一次。
-
复制measurement:
SELECT * INTO measurement_new FROM measurement_old GROUP BY *
HDF5
对于实时交易的场景,用InfluxDB提供数据管理系统,使用方便,也可以解耦合数据模块、计算模块和交易模块。
但是在面对历史数据回测的场景中,我们会预先通过原始数据计算出因子数据,在整个回测过程中只会对数据进行读取,不会做任何更新。如果这里依旧使用InfluxDB,就会在数据库连接和网络传输上产生额外的时间开销,这是没有必要的。这种情况下,本地文件存储就是一个很好的选择。高效而且简单易用的HDF5就是首选,可参考Python和HDF5大数据应用。
HDF5中有一个dataset的概念,就是一个相关数据组成的一个数据集,在我们的问题里面,前文所说的数据块就很好的符合这个概念,每个股票每天的数据作为一个数据集存储。
API接口
使用技巧
- 不建议用pandas中的
Dataframe.to_hdf5
直接存储,而是使用h5py存储Dataframe
内部的numpy.ndarray
,读取时再将其组装为Dataframe
。因为pandas会存入很多冗余信息,存储大小是后者的5倍以上。 -
使用压缩功能对数据进行压缩,节约存储空间。
# save: ticks is an instance of pandas.Dataframe with h5py.File('data.h5', mode='w') as f: f.create_dataset('/20190101/000001', data=ticks, compression='gzip', compression_opts=6, chunks=ticks.shape) # load: read dataset and pack it to a pandas.Dataframe columns = ['Price', 'Volume', 'Amount'] with h5py.File('data.h5') as f: dset = f.get('/20190101/000001') values = dset.value ticks = pandas.Dataframe(data=numpy.array(values), columns=columns)