之前写过一篇文章,是为VNPY1.9.2 版本加入聚宽的数据源,这个算是后续版本,为VNPY2版本加入聚宽数据源。
VNPY论坛里面有个篇文章也是讲加入聚宽数据源的,不过他是相当于集成到VNPY库代码中去,和VNPY自带的米筐数据源一样。我这篇其实也是借鉴他的,当然他说他借鉴之前我的1.92那篇。反正互相借鉴。
之前VNPY 1版本中,我的个人代码很多是直接在VNPY库代码直接修改或者增加的。每次VNPY升级就是非常头疼,要做代码对比,在一些可能被更新覆盖的地方再次维护测试。而且因为更新的地方很乱,造成后面生产版本一致停留在VNPY1.92。
还是要慢慢迁移到VNPY2版本,毕竟很多吸引的特性。这次准备不在VNPY的库文件代码上修改,而是像引用NUMPY或者Pandas这样,采用引用继承的方式,把自己的代码和VNPY的库代码隔离;这样即使VNPY升级,个人代码不用太担心,只要简单测试,保证继承引用VNPY的类或方法正常工作就可以了。
这里只要安装好了VNPY2.1以后版本,只要在任意地方拷贝下面代码,维护聚宽登录信息运行就可以下载聚宽数据。
其中config.json是保存聚宽登录账户,和需要下载的品种。品种信息是按照VNPY的品种和交易所信息组合字符串,如Symbol.Exchang。
而JQDataload.py是运行下载的,类JQDataService初始化是连接聚宽信息源,方法to_jq_symbol是把VNPY品种名称转换为聚宽名称,方法query_history是按照时间段下载分钟线要指定凭证,方法downloadAllMinuteBar是按照指定最近天数,下载config.json所以的品种数据。
最好去我的Github下载,比较方便
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# encoding: UTF-8
import json
import time
from datetime import datetime, timedelta
from typing import List
import jqdatasdk as jq
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.database import database_manager
from vnpy.trader. object import (
BarData
)
class JQDataService:
"""
Service for download market data from Joinquant
"""
def __init__( self ):
# 加载配置
config = open ( 'config.json' )
self .setting = json.load(config)
USERNAME = self .setting[ 'jqdata.Username' ]
PASSWORD = self .setting[ 'jqdata.Password' ]
try :
jq.auth(USERNAME, PASSWORD)
except Exception as ex:
print ( "jq auth fail:" + repr (ex))
def to_jq_symbol( self , symbol: str , exchange: Exchange):
"""
CZCE product of RQData has symbol like "TA1905" while
vt symbol is "TA905.CZCE" so need to add "1" in symbol.
"""
if exchange in [Exchange.SSE, Exchange.SZSE]:
if exchange = = Exchange.SSE:
jq_symbol = f "{symbol}.XSHG" # 上海证券交易所
else :
jq_symbol = f "{symbol}.XSHE" # 深圳证券交易所
elif exchange = = Exchange.SHFE:
jq_symbol = f "{symbol}.XSGE" # 上期所
elif exchange = = Exchange.CFFEX:
jq_symbol = f "{symbol}.CCFX" # 中金所
elif exchange = = Exchange.DCE:
jq_symbol = f "{symbol}.XDCE" # 大商所
elif exchange = = Exchange.INE:
jq_symbol = f "{symbol}.XINE" # 上海国际能源期货交易所
elif exchange = = Exchange.CZCE:
# 郑商所 的合约代码年份只有三位 需要特殊处理
for count, word in enumerate (symbol):
if word.isdigit():
break
# Check for index symbol
time_str = symbol[count:]
if time_str in [ "88" , "888" , "99" , "8888" ]:
return f "{symbol}.XZCE"
# noinspection PyUnboundLocalVariable
product = symbol[:count]
year = symbol[count]
month = symbol[count + 1 :]
if year = = "9" :
year = "1" + year
else :
year = "2" + year
jq_symbol = f "{product}{year}{month}.XZCE"
return jq_symbol.upper()
def query_history( self , symbol, exchange, start, end, interval = '1m' ):
"""
Query history bar data from JQData and update Database.
"""
jq_symbol = self .to_jq_symbol(symbol, exchange)
# if jq_symbol not in self.symbols:
# return None
# For querying night trading period data
# end += timedelta(1)
now = datetime.now()
if end > = now:
end = now
elif end.year = = now.year and end.month = = now.month and end.day = = now.day:
end = now
df = jq.get_price(
jq_symbol,
frequency = interval,
fields = [ "open" , "high" , "low" , "close" , "volume" ],
start_date = start,
end_date = end,
skip_paused = True
)
data: List [BarData] = []
if df is not None :
for ix, row in df.iterrows():
bar = BarData(
symbol = symbol,
exchange = exchange,
interval = Interval.MINUTE,
datetime = row.name.to_pydatetime() - timedelta(minutes = 1 ),
open_price = row[ "open" ],
high_price = row[ "high" ],
low_price = row[ "low" ],
close_price = row[ "close" ],
volume = row[ "volume" ],
gateway_name = "JQ"
)
data.append(bar)
database_manager.save_bar_data(data)
return data
def downloadAllMinuteBar( self , days = 1 ):
"""下载所有配置中的合约的分钟线数据"""
if days ! = 0 :
startDt = datetime.today() - days * timedelta( 1 )
enddt = datetime.today()
else :
startDt = datetime.today() - 10 * timedelta( 1 )
enddt = datetime.today()
print ( '-' * 50 )
print (u '开始下载合约分钟线数据' )
print ( '-' * 50 )
if 'Bar.Min' in self .setting:
l = self .setting[ "Bar.Min" ]
for VNSymbol in l:
dt0 = time.process_time()
symbol = VNSymbol.split( "." )[ 0 ]
exchange = Exchange(VNSymbol.split( "." )[ 1 ])
self .query_history(symbol, exchange, startDt, enddt, interval = '1m' )
cost = (time.process_time() - dt0)
print (u '合约%s的分钟K线数据下载完成%s - %s,耗时%s秒' % (symbol, startDt, enddt, cost))
print (jq.get_query_count())
print ( '-' * 50 )
print
u '合约分钟线数据下载完成'
print ( '-' * 50 )
return None
if __name__ = = '__main__' :
JQdata = JQDataService()
JQdata.downloadAllMinuteBar(days = 30 )
|
config.json
1 2 3 4 5 6 7 8 9 10 11 |
{
"jqdata.Username" : "",
"jqdata.Password" : "",
"Bar.Min" :
[
"MA8888.CZCE" ,
"MA009.CZCE" ,
"SR8888.CZCE" ,
"RM8888.CZCE" ,
]
}
|