import time
import pandas
import re
from vnpy.trader.tqz_extern.tqz_model import TQZMonitorTimeModel
from vnpy.trader.tqz_extern.tools.position_operator.position_operator import TQZJsonOperator
from vnpy.trader.tqz_extern.tools.symbol_operator.symbol_operator import TQZSymbolOperator
from vnpy.app.tqz_main_contracts_app.tqz_load_main_contracts import (
TQZTqClient,
TQZMainContractsChangeFilePath
)
from vnpy.trader.tqz_extern.tqz_constant import (
TQZMainContractsSheetType,
TQZMainContractsColumnType,
TQZMainContractsStrategyPathType,
TQZMainContractsKeyType,
TQZWeekDayType
)
class TQZMainContractsEngine:
def __init__(self, tq_id, tq_password):
self.tq_id = tq_id
self.tq_password = tq_password
self.__today_main_contracts_has_load = False
self.__today_main_contracts_has_change = False
self.__current_day = -1
def tqz_start(self):
while True:
# init when new day
self.__init_today()
if self.__current_day in [TQZWeekDayType.SATURDAY.value, TQZWeekDayType.SUNDAY.value]:
if time.localtime().tm_sec is 0 and time.localtime().tm_min is 0:
print("today is weekend, no operator to change main contracts")
pass
else:
if self.__is_refresh_time(now_time=time.localtime().tm_sec, interval_time=60):
print(time.strftime("%Y/%m/%d %H:%M:%S", time.localtime()))
# load main contracts from tq
if TQZMonitorTimeModel.is_load_main_contracts_time() is True:
self.__load_main_contracts_from_tq(
tq_id=self.tq_id,
tq_password=self.tq_password
)
# change main contract to per path
if TQZMonitorTimeModel.is_change_main_contracts_time() is True:
self.__change_main_contracts()
time.sleep(1)
...