vnpy指数合成

vnpy源码没有提供指数合约相关的功能,需要自行开发,于是参考了vnpy论坛上的方案(https://www.vnpy.com/forum/topic/5242-gua-he-suo-you-gatewaylei-xing-de-zhi-shu-he-cheng-fang-an)在此基础上进行完善。
思路:在tick的event处理事件中,注册合成指数的handle事件即可。
流程:创建IndexGenerator类,负责实现指数合约的订阅以及合成,将指数合成的处理方法注册到事件引擎中。vnpy执行行情订阅,触发tick的event事件,经过IndexGenerator产生主力合约tick,推送tick。

class IndexGenerator(ABC):

    def __init__(self, main_engine, event_engine: EventEngine):
        self.main_engine = main_engine
        self.event_engine = event_engine

        self.subscribe_index_symbol: Set[str] = set()  # 保存已订阅的指数编号
        self.subscribe_index_contract: Dict[str, ContractData] = {}  # 指数合约
        self.subscribe_sec_id: Set[str] = set()  # 保存已经订阅的sec编号
        self.symbol_tick_dict: Dict[str, dict] = {}  # 保存每个指数的每个合约的最新tick
        self.symbol_last_tick: Dict[str, TickData] = {}  # 保存每个指数的下的最后一个tick
        self.old_symbol_tick_dict: Dict[str, dict] = {}  # 推送上一个指数tick时每个指数的每个合约的当时tick记录


        self.register_event()

    def register_event(self):
        self.event_engine.register(EVENT_TICK, self.process_tick_event)

    def subscribe(self, req: SubscribeRequest):
        index_symbol_id = vt_symbol_to_index_symbol(req.vt_symbol)
        self.subscribe_index_symbol.add(index_symbol_id)
        sec_id = extract_sec_id(req.vt_symbol)
        self.subscribe_sec_id.add(sec_id)
        self.subscribe_index_contract[sec_id] = self.main_engine.get_index_contract(req.vt_symbol)

    def process_tick_event(self, event: Event):
        tick_data = event.data
        # print(tick_data.vt_symbol,tick_data.datetime,tick_data.volume)
        vt_symbol = tick_data.vt_symbol
        # 过滤掉指数数据
        if vt_symbol == vt_symbol_to_index_symbol(vt_symbol):
            return
        sec_id = extract_sec_id(vt_symbol)
        if sec_id not in self.subscribe_sec_id:
            return
        if tick_data.bid_price_1 > 9999999 or tick_data.ask_price_1 > 9999999:
            return

        # 下面合成最新的指数tick:每秒合成1个
        symbol_tick_dict = self.symbol_tick_dict.setdefault(sec_id, {})
        old_symbol_tick_dict = self.old_symbol_tick_dict.setdefault(sec_id, {})
        symbol_last_tick = self.symbol_last_tick.get(sec_id)


        # 非主力合约有时候会延迟推送 比如15秒的时候推送一个14秒的tick 导致多生成1个指数tick,在合成bar的时候会过滤延迟的tick 造成volume异常
        if symbol_last_tick and tick_data.datetime > symbol_last_tick.datetime and symbol_tick_dict:
            index_tick = TickData(
                symbol=f"{sec_id}99",
                exchange=tick_data.exchange,
                datetime=tick_data.datetime,
                gateway_name=tick_data.gateway_name,
                name=self.subscribe_index_contract[sec_id].name
            )
            for tick in symbol_tick_dict.values():
                index_tick.open_interest += tick.open_interest
                index_tick.volume += tick.volume

            if index_tick.open_interest:
                for tick in symbol_tick_dict.values():
                    #tick_weight = float(tick.open_interest) / index_tick.open_interest

                    #参考TB的合成指标  持仓量约占90%的权重,成交量约10%
                    open_interest_weight = float(tick.open_interest) / index_tick.open_interest * 0.9
                    volume_weight = float(tick.volume) / index_tick.volume * 0.1
                    tick_weight = open_interest_weight + volume_weight


                    index_tick.last_price += tick.last_price * tick_weight
                    #index_tick.volume += tick.volume


                    #index_tick.last_volume += tick.last_volume
                    # 若上一个index_tick中不包含这个合约tick,则将该tick的volume增加入last_volume,用于index_bar正确计算volume
                    if tick.vt_symbol not in old_symbol_tick_dict:
                        index_tick.last_volume += tick.volume

                    index_tick.limit_up += tick.limit_up * tick_weight
                    index_tick.limit_down += tick.limit_down * tick_weight

                    index_tick.open_price += tick.open_price * tick_weight
                    index_tick.high_price += tick.high_price * tick_weight
                    index_tick.low_price += tick.low_price * tick_weight
                    index_tick.pre_close += tick.pre_close * tick_weight

                    index_tick.bid_price_1 += tick.bid_price_1 * tick_weight
                    index_tick.ask_price_1 += tick.ask_price_1 * tick_weight
                    index_tick.bid_volume_1 += tick.bid_volume_1
                    index_tick.ask_volume_1 += tick.ask_volume_1

                    # 5档有需要再加进来吧,省点计算资源
                    # tick_data.ask_price_2 += tick.ask_price_2 * tick_weight
                    # tick_data.ask_price_3 += tick.ask_price_3 * tick_weight
                    # tick_data.ask_price_4 += tick.ask_price_4 * tick_weight
                    # tick_data.ask_price_5 += tick.ask_price_5 * tick_weight
                    # tick_data.bid_price_2 += tick.bid_price_2 * tick_weight
                    # tick_data.bid_price_3 += tick.bid_price_3 * tick_weight
                    # tick_data.bid_price_4 += tick.bid_price_4 * tick_weight
                    # tick_data.bid_price_5 += tick.bid_price_5 * tick_weight
                    # tick_data.bid_volume_2 += tick.bid_volume_2 * tick_weight
                    # tick_data.bid_volume_3 += tick.bid_volume_3 * tick_weight
                    # tick_data.bid_volume_4 += tick.bid_volume_4 * tick_weight
                    # tick_data.bid_volume_5 += tick.bid_volume_5 * tick_weight
                    # tick_data.ask_volume_2 += tick.ask_volume_2 * tick_weight
                    # tick_data.ask_volume_3 += tick.ask_volume_3 * tick_weight
                    # tick_data.ask_volume_4 += tick.ask_volume_4 * tick_weight
                    # tick_data.ask_volume_5 += tick.ask_volume_5 * tick_weight
                # 价格取整到最小价位变动
                price_tick = self.subscribe_index_contract[sec_id].pricetick
                index_tick.last_price = round_to(index_tick.last_price, price_tick)

                index_tick.bid_price_1 = round_to(index_tick.bid_price_1, price_tick)
                index_tick.ask_price_1 = round_to(index_tick.ask_price_1, price_tick)
                index_tick.limit_up = round_to(index_tick.limit_up, price_tick)
                index_tick.limit_down = round_to(index_tick.limit_down, price_tick)
                index_tick.open_price = round_to(index_tick.open_price, price_tick)
                index_tick.high_price = round_to(index_tick.high_price, price_tick)
                index_tick.low_price = round_to(index_tick.low_price, price_tick)
                index_tick.pre_close = round_to(index_tick.pre_close, price_tick)


                self.old_symbol_tick_dict[sec_id] = deepcopy(symbol_tick_dict)
                event = Event(EVENT_TICK, index_tick)
                self.event_engine.put(event)

                # 当前tick虽然合成完毕 但是清空的话会影响持仓量计算 需要确定没成交量的时候 tick是否推送
                # symbol_tick_dict.clear()

        symbol_tick_dict[vt_symbol] = tick_data
        if not symbol_last_tick or tick_data.datetime >= symbol_last_tick.datetime:
            self.symbol_last_tick[sec_id] = tick_data

完成了指数合成逻辑后,现在考虑将配置到整个框架中

1.MainEngine挂载IndexGenerator

   def __init__(self, event_engine: EventEngine = None):
        """"""
        if event_engine:
            self.event_engine: EventEngine = event_engine
        else:
            self.event_engine = EventEngine()
        self.event_engine.start()

        self.gateways: Dict[str, BaseGateway] = {}
        self.engines: Dict[str, BaseEngine] = {}
        self.apps: Dict[str, BaseApp] = {}
        self.exchanges: List[Exchange] = []

        # 挂载指数数据生成器
        self.index_generator: IndexGenerator = IndexGenerator(self, self.event_engine)

        os.chdir(TRADER_DIR)    # Change working directory
        self.init_engines()     # Initialize function engines

2.MainEngine实现指数订阅

    def subscribe(self, req: SubscribeRequest, gateway_name: str) -> None:
        """
        Subscribe tick data update of a specific gateway.
        """
        gateway = self.get_gateway(gateway_name)
        # 同类账户全部订阅
        if req.vt_symbol == vt_symbol_to_index_symbol(req.vt_symbol):
            # 指数订阅
            contract_list = self.get_all_index_trade_contract(req.vt_symbol)
            for contract in contract_list:
                symbol, exchange = extract_vt_symbol(contract.vt_symbol)
                contract_req = SubscribeRequest(symbol, exchange)
                gateway.subscribe(contract_req)
            self.index_generator.subscribe(req)
        else:
            gateway.subscribe(req)

3.MainEngine关于合约的部分功能由OmsEngine实现

 def add_function(self) -> None:
        """Add query function to main engine."""
        self.main_engine.get_tick = self.get_tick
        self.main_engine.get_order = self.get_order
        self.main_engine.get_trade = self.get_trade
        self.main_engine.get_position = self.get_position
        self.main_engine.get_account = self.get_account
        self.main_engine.get_contract = self.get_contract
        self.main_engine.get_all_ticks = self.get_all_ticks
        self.main_engine.get_all_orders = self.get_all_orders
        self.main_engine.get_all_trades = self.get_all_trades
        self.main_engine.get_all_positions = self.get_all_positions
        self.main_engine.get_all_accounts = self.get_all_accounts
        self.main_engine.get_all_contracts = self.get_all_contracts
        self.main_engine.get_all_active_orders = self.get_all_active_orders
        #增加的新方法
        self.main_engine.get_all_index_trade_contract = self.get_all_index_trade_contract
        self.main_engine.get_index_contract = self.get_index_contract


    def get_all_index_trade_contract(self, vt_symbol):
        # 查询该合约对应品种的所有在市合约
        contract_list = []
        target_sec_id = extract_sec_id(vt_symbol)
        contracts = self.contracts
        for vt_symbol, contract_data in contracts.items():
            sec_id = extract_sec_id(vt_symbol)
            if target_sec_id == sec_id and not contract_data.is_index_contract and contract_data.product == Product.FUTURES:
                contract_list.append(contract_data)
        return contract_list

    def get_index_contract(self, vt_symbol):
        contract_id = vt_symbol_to_index_symbol(vt_symbol)
        return self.get_contract(contract_id)

4.由于增加了指数合约,需要OmsEngine处理合约事件时,自动生成指数合约到合约字典中。

    def process_contract_event(self, event: Event) -> None:
        """"""
        contract = event.data
        if contract.vt_symbol not in self.contracts.keys():
            self.contracts[contract.vt_symbol] = contract
            # 插入指数合约contract
            sec_id = extract_sec_id(contract.vt_symbol)
            index_id = f"{sec_id}99"
            index_symbol_id = f"{index_id}.{contract.exchange.value}"
            if index_symbol_id not in self.contracts.keys():
                index_contract = ContractData(
                    symbol=index_id,
                    exchange=contract.exchange,
                    name=f"{index_id}指数合约",
                    product=contract.product,
                    size=contract.size,
                    pricetick=contract.pricetick,
                    # margin_ratio=contract.margin_ratio,
                    # open_date="19990101",
                    # expire_date="20990101",
                    gateway_name=contract.gateway_name
                )
                index_contract.is_index_contract = True
                self.contracts[index_symbol_id] = index_contract

5.utility.py增加新方法

def extract_sec_id(vt_symbol: str) -> str:
    """
    return sec_id
    """
    return vt_symbol[:2] if vt_symbol[1].isalpha() else vt_symbol[0]


def vt_symbol_to_index_symbol(vt_symbol):
    symbol_id, exchange_value = vt_symbol.split(".")
    sec_id = extract_sec_id(vt_symbol)
    index_id = f"{sec_id}99"
    return f"{index_id}.{exchange_value}"

6.Object的ContractData增加一个字段表示该合约是否有指数合约

is_index_contract: bool = False     # 是否是指数合约,默认为否

到这一步,基本实现指数合同需要的功能,但是使用起来有部分不影响使用的小bug,在这里对涉及的模块进行修改。

  • 若加载了app下data_recorder的RecorderEngine,会发现要是启动后 先点击了data_recorder,再点击ctp链接,会发现指数合约不在搜索框中,并不能订阅指数合约。
    原因是CTP链接前初始化data_recorder,他会通过OmsEngine去查询当前获取的合约,但是由于CTP未链接,所以合约字典为空。随后CTP链接之后,触发data_recorder的process_contract_event方法,该方法没有增加相应的指数合约生成代码,所以无法订阅。而链接CTP之后再初始化data_recorder,OmsEngine的合约字典中直接包含了指数合约,所以能订阅成功。

修改RecorderManager代码 使搜索框中出现指数合约

    def process_contract_event(self, event: Event):
        """"""
        contract = event.data
        self.vt_symbols.append(contract.vt_symbol)
        vt_symbol = contract.vt_symbol

        sec_id = vt_symbol[:2] if vt_symbol[1].isalpha() else vt_symbol[0]
        index_id = f"{sec_id}99"
        index_symbol_id = f"{index_id}.{contract.exchange.value}"

        if index_symbol_id not in self.vt_symbols:
            self.vt_symbols.append(index_symbol_id)
        model = self.symbol_completer.model()
        model.setStringList(self.vt_symbols)

修改RecorderEngine代码 实现指数合约订阅

    def process_contract_event(self, event: Event):
        """"""
        contract = event.data
        vt_symbol = contract.vt_symbol

        # 订阅具体合约
        if (vt_symbol in self.tick_recordings or vt_symbol in self.bar_recordings):
            self.subscribe(contract)

        # 订阅指数合约
        sec_id = vt_symbol[:2] if vt_symbol[1].isalpha() else vt_symbol[0]
        index_id = f"{sec_id}99"
        vt_index_symbol = f"{index_id}.{contract.exchange.value}"
        if (vt_index_symbol in self.tick_recordings or vt_index_symbol in self.bar_recordings):
            index_contract = copy(contract)
            index_contract.symbol = index_id

            self.subscribe(index_contract)
  • tick合成逻辑修改
    按照论坛的方法生成的tick,还是有些问题。
    行情推送时
    A.一些不活跃合约的tick会有延迟,比如15秒的时候推送一个14秒的tick。
    B.合成上一个tick的时候,只有5个合约参与,合成当前tick的时候,有6个tick参与(来了个不活跃合约tick)
    以上情况合成的指数tick会出现成交量异常(把某个合约的当日交易量总和单做这个tick的成交量)

部分代码在IndexGenerator中已经修改,继续修改utility.py的BarGenerator

    def update_tick(self, tick: TickData) -> None:
        """
        Update new tick data into generator.
        """
        new_minute = False

        # Filter tick data with 0 last price
        if not tick.last_price:
            return

        # Filter tick data with older timestamp
        if self.last_tick and tick.datetime < self.last_tick.datetime:
            return

        if not self.bar:
            new_minute = True
        elif (
            (self.bar.datetime.minute != tick.datetime.minute)
            or (self.bar.datetime.hour != tick.datetime.hour)
        ):
            self.bar.datetime = self.bar.datetime.replace(
                second=0, microsecond=0
            )
            self.on_bar(self.bar)

            new_minute = True

        if new_minute:
            self.bar = BarData(
                symbol=tick.symbol,
                exchange=tick.exchange,
                interval=Interval.MINUTE,
                datetime=tick.datetime,
                gateway_name=tick.gateway_name,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price,
                open_interest=tick.open_interest
            )
        else:
            self.bar.high_price = max(self.bar.high_price, tick.last_price)
            if tick.high_price > self.last_tick.high_price:
                self.bar.high_price = max(self.bar.high_price, tick.high_price)

            self.bar.low_price = min(self.bar.low_price, tick.last_price)
            if tick.low_price < self.last_tick.low_price:
                self.bar.low_price = min(self.bar.low_price, tick.low_price)

            self.bar.close_price = tick.last_price
            self.bar.open_interest = tick.open_interest
            self.bar.datetime = tick.datetime

        if self.last_tick:
            # 这样计算丢失 行情连接时第一个tick的volume
            # 当tick为主力合约时,第一个tick可能缺失非主力合约数据  若下一个tick包含上次缺失的非主力tick 会把非主力tick的当日volume增加到指数tick中
            # volume_change = tick.volume - self.last_tick.volume
            volume_change = tick.volume - self.last_tick.volume - tick.last_volume
            # print("-------------------------------------")
            # print("now_tick-->", tick.datetime,"------>",tick.volume,"------>",tick.last_volume)
            # print("last_tick-->", self.last_tick.datetime,"------>",self.last_tick.volume)
            # print("index_volume-->",max(volume_change, 0))
            # print("-------------------------------------")
            self.bar.volume += max(volume_change, 0)

        # volume_change = tick.volume - tick.last_volume #last_volume原始数据这个字段没数据
        # self.bar.volume += max(volume_change, 0)

        self.last_tick = tick

上诉方法还是有不足之处,后续找到更加合理的方案后再做更新。

vnpy指数合成

上一篇:LightKV-高性能key-value存储组件


下一篇:Firebird的自增字段