vnpy源码没有提供指数合约相关的功能,需要自行开发,于是参考了vnpy论坛上的方案(https://www.vnpy.com/forum/topic/5242-gua-he-suo-you-gatewaylei-xing-de-zhi-shu-he-cheng-fang-an)在此基础上进行完善。
思路:在tick的event处理事件中,注册合成指数的handle事件即可。
流程:创建IndexGenerator类,负责实现指数合约的订阅以及合成,将指数合成的处理方法注册到事件引擎中。vnpy执行行情订阅,触发tick的event事件,经过IndexGenerator产生主力合约tick,推送tick。
class IndexGenerator(ABC):
def __init__(self, main_engine, event_engine: EventEngine):
self.main_engine = main_engine
self.event_engine = event_engine
self.subscribe_index_symbol: Set[str] = set() # 保存已订阅的指数编号
self.subscribe_index_contract: Dict[str, ContractData] = {} # 指数合约
self.subscribe_sec_id: Set[str] = set() # 保存已经订阅的sec编号
self.symbol_tick_dict: Dict[str, dict] = {} # 保存每个指数的每个合约的最新tick
self.symbol_last_tick: Dict[str, TickData] = {} # 保存每个指数的下的最后一个tick
self.old_symbol_tick_dict: Dict[str, dict] = {} # 推送上一个指数tick时每个指数的每个合约的当时tick记录
self.register_event()
def register_event(self):
self.event_engine.register(EVENT_TICK, self.process_tick_event)
def subscribe(self, req: SubscribeRequest):
index_symbol_id = vt_symbol_to_index_symbol(req.vt_symbol)
self.subscribe_index_symbol.add(index_symbol_id)
sec_id = extract_sec_id(req.vt_symbol)
self.subscribe_sec_id.add(sec_id)
self.subscribe_index_contract[sec_id] = self.main_engine.get_index_contract(req.vt_symbol)
def process_tick_event(self, event: Event):
tick_data = event.data
# print(tick_data.vt_symbol,tick_data.datetime,tick_data.volume)
vt_symbol = tick_data.vt_symbol
# 过滤掉指数数据
if vt_symbol == vt_symbol_to_index_symbol(vt_symbol):
return
sec_id = extract_sec_id(vt_symbol)
if sec_id not in self.subscribe_sec_id:
return
if tick_data.bid_price_1 > 9999999 or tick_data.ask_price_1 > 9999999:
return
# 下面合成最新的指数tick:每秒合成1个
symbol_tick_dict = self.symbol_tick_dict.setdefault(sec_id, {})
old_symbol_tick_dict = self.old_symbol_tick_dict.setdefault(sec_id, {})
symbol_last_tick = self.symbol_last_tick.get(sec_id)
# 非主力合约有时候会延迟推送 比如15秒的时候推送一个14秒的tick 导致多生成1个指数tick,在合成bar的时候会过滤延迟的tick 造成volume异常
if symbol_last_tick and tick_data.datetime > symbol_last_tick.datetime and symbol_tick_dict:
index_tick = TickData(
symbol=f"{sec_id}99",
exchange=tick_data.exchange,
datetime=tick_data.datetime,
gateway_name=tick_data.gateway_name,
name=self.subscribe_index_contract[sec_id].name
)
for tick in symbol_tick_dict.values():
index_tick.open_interest += tick.open_interest
index_tick.volume += tick.volume
if index_tick.open_interest:
for tick in symbol_tick_dict.values():
#tick_weight = float(tick.open_interest) / index_tick.open_interest
#参考TB的合成指标 持仓量约占90%的权重,成交量约10%
open_interest_weight = float(tick.open_interest) / index_tick.open_interest * 0.9
volume_weight = float(tick.volume) / index_tick.volume * 0.1
tick_weight = open_interest_weight + volume_weight
index_tick.last_price += tick.last_price * tick_weight
#index_tick.volume += tick.volume
#index_tick.last_volume += tick.last_volume
# 若上一个index_tick中不包含这个合约tick,则将该tick的volume增加入last_volume,用于index_bar正确计算volume
if tick.vt_symbol not in old_symbol_tick_dict:
index_tick.last_volume += tick.volume
index_tick.limit_up += tick.limit_up * tick_weight
index_tick.limit_down += tick.limit_down * tick_weight
index_tick.open_price += tick.open_price * tick_weight
index_tick.high_price += tick.high_price * tick_weight
index_tick.low_price += tick.low_price * tick_weight
index_tick.pre_close += tick.pre_close * tick_weight
index_tick.bid_price_1 += tick.bid_price_1 * tick_weight
index_tick.ask_price_1 += tick.ask_price_1 * tick_weight
index_tick.bid_volume_1 += tick.bid_volume_1
index_tick.ask_volume_1 += tick.ask_volume_1
# 5档有需要再加进来吧,省点计算资源
# tick_data.ask_price_2 += tick.ask_price_2 * tick_weight
# tick_data.ask_price_3 += tick.ask_price_3 * tick_weight
# tick_data.ask_price_4 += tick.ask_price_4 * tick_weight
# tick_data.ask_price_5 += tick.ask_price_5 * tick_weight
# tick_data.bid_price_2 += tick.bid_price_2 * tick_weight
# tick_data.bid_price_3 += tick.bid_price_3 * tick_weight
# tick_data.bid_price_4 += tick.bid_price_4 * tick_weight
# tick_data.bid_price_5 += tick.bid_price_5 * tick_weight
# tick_data.bid_volume_2 += tick.bid_volume_2 * tick_weight
# tick_data.bid_volume_3 += tick.bid_volume_3 * tick_weight
# tick_data.bid_volume_4 += tick.bid_volume_4 * tick_weight
# tick_data.bid_volume_5 += tick.bid_volume_5 * tick_weight
# tick_data.ask_volume_2 += tick.ask_volume_2 * tick_weight
# tick_data.ask_volume_3 += tick.ask_volume_3 * tick_weight
# tick_data.ask_volume_4 += tick.ask_volume_4 * tick_weight
# tick_data.ask_volume_5 += tick.ask_volume_5 * tick_weight
# 价格取整到最小价位变动
price_tick = self.subscribe_index_contract[sec_id].pricetick
index_tick.last_price = round_to(index_tick.last_price, price_tick)
index_tick.bid_price_1 = round_to(index_tick.bid_price_1, price_tick)
index_tick.ask_price_1 = round_to(index_tick.ask_price_1, price_tick)
index_tick.limit_up = round_to(index_tick.limit_up, price_tick)
index_tick.limit_down = round_to(index_tick.limit_down, price_tick)
index_tick.open_price = round_to(index_tick.open_price, price_tick)
index_tick.high_price = round_to(index_tick.high_price, price_tick)
index_tick.low_price = round_to(index_tick.low_price, price_tick)
index_tick.pre_close = round_to(index_tick.pre_close, price_tick)
self.old_symbol_tick_dict[sec_id] = deepcopy(symbol_tick_dict)
event = Event(EVENT_TICK, index_tick)
self.event_engine.put(event)
# 当前tick虽然合成完毕 但是清空的话会影响持仓量计算 需要确定没成交量的时候 tick是否推送
# symbol_tick_dict.clear()
symbol_tick_dict[vt_symbol] = tick_data
if not symbol_last_tick or tick_data.datetime >= symbol_last_tick.datetime:
self.symbol_last_tick[sec_id] = tick_data
完成了指数合成逻辑后,现在考虑将配置到整个框架中
1.MainEngine挂载IndexGenerator
def __init__(self, event_engine: EventEngine = None):
""""""
if event_engine:
self.event_engine: EventEngine = event_engine
else:
self.event_engine = EventEngine()
self.event_engine.start()
self.gateways: Dict[str, BaseGateway] = {}
self.engines: Dict[str, BaseEngine] = {}
self.apps: Dict[str, BaseApp] = {}
self.exchanges: List[Exchange] = []
# 挂载指数数据生成器
self.index_generator: IndexGenerator = IndexGenerator(self, self.event_engine)
os.chdir(TRADER_DIR) # Change working directory
self.init_engines() # Initialize function engines
2.MainEngine实现指数订阅
def subscribe(self, req: SubscribeRequest, gateway_name: str) -> None:
"""
Subscribe tick data update of a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
# 同类账户全部订阅
if req.vt_symbol == vt_symbol_to_index_symbol(req.vt_symbol):
# 指数订阅
contract_list = self.get_all_index_trade_contract(req.vt_symbol)
for contract in contract_list:
symbol, exchange = extract_vt_symbol(contract.vt_symbol)
contract_req = SubscribeRequest(symbol, exchange)
gateway.subscribe(contract_req)
self.index_generator.subscribe(req)
else:
gateway.subscribe(req)
3.MainEngine关于合约的部分功能由OmsEngine实现
def add_function(self) -> None:
"""Add query function to main engine."""
self.main_engine.get_tick = self.get_tick
self.main_engine.get_order = self.get_order
self.main_engine.get_trade = self.get_trade
self.main_engine.get_position = self.get_position
self.main_engine.get_account = self.get_account
self.main_engine.get_contract = self.get_contract
self.main_engine.get_all_ticks = self.get_all_ticks
self.main_engine.get_all_orders = self.get_all_orders
self.main_engine.get_all_trades = self.get_all_trades
self.main_engine.get_all_positions = self.get_all_positions
self.main_engine.get_all_accounts = self.get_all_accounts
self.main_engine.get_all_contracts = self.get_all_contracts
self.main_engine.get_all_active_orders = self.get_all_active_orders
#增加的新方法
self.main_engine.get_all_index_trade_contract = self.get_all_index_trade_contract
self.main_engine.get_index_contract = self.get_index_contract
def get_all_index_trade_contract(self, vt_symbol):
# 查询该合约对应品种的所有在市合约
contract_list = []
target_sec_id = extract_sec_id(vt_symbol)
contracts = self.contracts
for vt_symbol, contract_data in contracts.items():
sec_id = extract_sec_id(vt_symbol)
if target_sec_id == sec_id and not contract_data.is_index_contract and contract_data.product == Product.FUTURES:
contract_list.append(contract_data)
return contract_list
def get_index_contract(self, vt_symbol):
contract_id = vt_symbol_to_index_symbol(vt_symbol)
return self.get_contract(contract_id)
4.由于增加了指数合约,需要OmsEngine处理合约事件时,自动生成指数合约到合约字典中。
def process_contract_event(self, event: Event) -> None:
""""""
contract = event.data
if contract.vt_symbol not in self.contracts.keys():
self.contracts[contract.vt_symbol] = contract
# 插入指数合约contract
sec_id = extract_sec_id(contract.vt_symbol)
index_id = f"{sec_id}99"
index_symbol_id = f"{index_id}.{contract.exchange.value}"
if index_symbol_id not in self.contracts.keys():
index_contract = ContractData(
symbol=index_id,
exchange=contract.exchange,
name=f"{index_id}指数合约",
product=contract.product,
size=contract.size,
pricetick=contract.pricetick,
# margin_ratio=contract.margin_ratio,
# open_date="19990101",
# expire_date="20990101",
gateway_name=contract.gateway_name
)
index_contract.is_index_contract = True
self.contracts[index_symbol_id] = index_contract
5.utility.py增加新方法
def extract_sec_id(vt_symbol: str) -> str:
"""
return sec_id
"""
return vt_symbol[:2] if vt_symbol[1].isalpha() else vt_symbol[0]
def vt_symbol_to_index_symbol(vt_symbol):
symbol_id, exchange_value = vt_symbol.split(".")
sec_id = extract_sec_id(vt_symbol)
index_id = f"{sec_id}99"
return f"{index_id}.{exchange_value}"
6.Object的ContractData增加一个字段表示该合约是否有指数合约
is_index_contract: bool = False # 是否是指数合约,默认为否
到这一步,基本实现指数合同需要的功能,但是使用起来有部分不影响使用的小bug,在这里对涉及的模块进行修改。
- 若加载了app下data_recorder的RecorderEngine,会发现要是启动后 先点击了data_recorder,再点击ctp链接,会发现指数合约不在搜索框中,并不能订阅指数合约。
原因是CTP链接前初始化data_recorder,他会通过OmsEngine去查询当前获取的合约,但是由于CTP未链接,所以合约字典为空。随后CTP链接之后,触发data_recorder的process_contract_event方法,该方法没有增加相应的指数合约生成代码,所以无法订阅。而链接CTP之后再初始化data_recorder,OmsEngine的合约字典中直接包含了指数合约,所以能订阅成功。
修改RecorderManager代码 使搜索框中出现指数合约
def process_contract_event(self, event: Event):
""""""
contract = event.data
self.vt_symbols.append(contract.vt_symbol)
vt_symbol = contract.vt_symbol
sec_id = vt_symbol[:2] if vt_symbol[1].isalpha() else vt_symbol[0]
index_id = f"{sec_id}99"
index_symbol_id = f"{index_id}.{contract.exchange.value}"
if index_symbol_id not in self.vt_symbols:
self.vt_symbols.append(index_symbol_id)
model = self.symbol_completer.model()
model.setStringList(self.vt_symbols)
修改RecorderEngine代码 实现指数合约订阅
def process_contract_event(self, event: Event):
""""""
contract = event.data
vt_symbol = contract.vt_symbol
# 订阅具体合约
if (vt_symbol in self.tick_recordings or vt_symbol in self.bar_recordings):
self.subscribe(contract)
# 订阅指数合约
sec_id = vt_symbol[:2] if vt_symbol[1].isalpha() else vt_symbol[0]
index_id = f"{sec_id}99"
vt_index_symbol = f"{index_id}.{contract.exchange.value}"
if (vt_index_symbol in self.tick_recordings or vt_index_symbol in self.bar_recordings):
index_contract = copy(contract)
index_contract.symbol = index_id
self.subscribe(index_contract)
- tick合成逻辑修改
按照论坛的方法生成的tick,还是有些问题。
行情推送时
A.一些不活跃合约的tick会有延迟,比如15秒的时候推送一个14秒的tick。
B.合成上一个tick的时候,只有5个合约参与,合成当前tick的时候,有6个tick参与(来了个不活跃合约tick)
以上情况合成的指数tick会出现成交量异常(把某个合约的当日交易量总和单做这个tick的成交量)
部分代码在IndexGenerator中已经修改,继续修改utility.py的BarGenerator
def update_tick(self, tick: TickData) -> None:
"""
Update new tick data into generator.
"""
new_minute = False
# Filter tick data with 0 last price
if not tick.last_price:
return
# Filter tick data with older timestamp
if self.last_tick and tick.datetime < self.last_tick.datetime:
return
if not self.bar:
new_minute = True
elif (
(self.bar.datetime.minute != tick.datetime.minute)
or (self.bar.datetime.hour != tick.datetime.hour)
):
self.bar.datetime = self.bar.datetime.replace(
second=0, microsecond=0
)
self.on_bar(self.bar)
new_minute = True
if new_minute:
self.bar = BarData(
symbol=tick.symbol,
exchange=tick.exchange,
interval=Interval.MINUTE,
datetime=tick.datetime,
gateway_name=tick.gateway_name,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price,
open_interest=tick.open_interest
)
else:
self.bar.high_price = max(self.bar.high_price, tick.last_price)
if tick.high_price > self.last_tick.high_price:
self.bar.high_price = max(self.bar.high_price, tick.high_price)
self.bar.low_price = min(self.bar.low_price, tick.last_price)
if tick.low_price < self.last_tick.low_price:
self.bar.low_price = min(self.bar.low_price, tick.low_price)
self.bar.close_price = tick.last_price
self.bar.open_interest = tick.open_interest
self.bar.datetime = tick.datetime
if self.last_tick:
# 这样计算丢失 行情连接时第一个tick的volume
# 当tick为主力合约时,第一个tick可能缺失非主力合约数据 若下一个tick包含上次缺失的非主力tick 会把非主力tick的当日volume增加到指数tick中
# volume_change = tick.volume - self.last_tick.volume
volume_change = tick.volume - self.last_tick.volume - tick.last_volume
# print("-------------------------------------")
# print("now_tick-->", tick.datetime,"------>",tick.volume,"------>",tick.last_volume)
# print("last_tick-->", self.last_tick.datetime,"------>",self.last_tick.volume)
# print("index_volume-->",max(volume_change, 0))
# print("-------------------------------------")
self.bar.volume += max(volume_change, 0)
# volume_change = tick.volume - tick.last_volume #last_volume原始数据这个字段没数据
# self.bar.volume += max(volume_change, 0)
self.last_tick = tick
上诉方法还是有不足之处,后续找到更加合理的方案后再做更新。