关于MainEngine的代码阅读
在入口文件中,我们看到了除了窗体界面的产生,还有关于MainEngine
和EventEngin
部分。今天来学习下MainEngine
的代码。
首先在run代码中,我们看到以下的代码
main_engine.add_gateway(DeribitGateway)
main_engine.add_app(OptionMasterApp)
从上述代码可以基本猜测所有的网管,设置,甚至策略引擎行情,都跟MainEngine有关系,MainEngine应该是一个主线,把所有的组件穿插起来了。想必MainEngine一定是一条重要的大鱼。下面我们进入MainEngin开始学习 位置:\vnpy\trader\engine.py
MainEngine的大概脉络
class MainEngine:
#初始化
def __init__(self, event_engine: EventEngine = None):
pass
#添加引擎
def add_engine(self, engine_class: Any):
pass
#添加网管
def add_gateway(self, gateway_class: Type[BaseGateway]):
pass
#添加app
def add_app(self, app_class: Type[BaseApp]):
pass
#初始化引擎
def init_engines(self):
pass
#写入日志
def write_log(self, msg: str, source: str = ""):
pass
#获得引擎
def get_engine(self, engine_name: str):
pass
#获得默认设置
def get_default_setting(self, gateway_name: str):
pass
#获得所有引擎的名字
def get_all_gateway_names(self):
pass
#获得所有的APP
def get_all_apps(self):
pass
#获得所有的交易所
def get_all_exchanges(self):
pass
#连接到行情
def connect(self, setting: dict, gateway_name: str):
pass
#合约订阅
def subscribe(self, req: SubscribeRequest, gateway_name: str):
pass
#下单
def send_order(self, req: OrderRequest, gateway_name: str):
pass
#取消订单
def cancel_order(self, req: CancelRequest, gateway_name: str):
pass
#批量下单
def send_orders(self, reqs: Sequence[OrderRequest], gateway_name: str):
pass
#批量取消订单
def cancel_orders(self, reqs: Sequence[CancelRequest], gateway_name: str):
pass
#历史查询
def query_history(self, req: HistoryRequest, gateway_name: str):
pass
#关闭
def close(self):
pass
我把所有的方法体都去掉,仅仅保留方法名称和参数,从上面的结构基本上能看出来MainEngine
几乎是一个把各个组件(GateWay, App, Engin)都连接在一起,同时提供了跟交易相关的连接、订阅、下单、撤单、历史记录等都相关的柔和在一起的大杂烩。接下来我们就一个一个方法的阅读学习。
__init__
def __init__(self, event_engine: EventEngine = None):
""""""
if event_engine:
self.event_engine = event_engine
else:
self.event_engine = EventEngine()
self.event_engine.start()
self.gateways = {}
self.engines = {}
self.apps = {}
self.exchanges = []
os.chdir(TRADER_DIR) # Change working directory
self.init_engines() # Initialize function engines
初始化的代码基本上就是实例化事件引擎(EventEngin),然后启动,同时为getways
,engines
,apps
,exchange
建立一个字典,同时调用init_engines()的方法。
各种add方法,AddEngine, AddGateWay, AddApp
def add_engine(self, engine_class: Any):
"""
Add function engine.
"""
engine = engine_class(self, self.event_engine)
self.engines[engine.engine_name] = engine
return engine
def add_gateway(self, gateway_class: Type[BaseGateway]):
"""
Add gateway.
"""
gateway = gateway_class(self.event_engine)
self.gateways[gateway.gateway_name] = gateway
# Add gateway supported exchanges into engine
for exchange in gateway.exchanges:
if exchange not in self.exchanges:
self.exchanges.append(exchange)
return gateway
def add_app(self, app_class: Type[BaseApp]):
"""
Add app.
"""
app = app_class()
self.apps[app.app_name] = app
engine = self.add_engine(app.engine_class)
return engine
上述3个Add方法比较简单,基本上都是把添加各种APP、Engine、Gateway的话,都把它们的实例化和name形成键值对放入map中去。唯一我们可以得到的线索是gateway 和Exchanges之间应该是有对应关系的话,话一句话说,gateway就是交易场所的API对接网管。所以每次添加一个gateway,就添加了一个交易所名称。
get方法
def get_gateway(self, gateway_name: str):
"""
Return gateway object by name.
"""
gateway = self.gateways.get(gateway_name, None)
if not gateway:
self.write_log(f"找不到底层接口:{gateway_name}")
return gateway
def get_engine(self, engine_name: str):
"""
Return engine object by name.
"""
engine = self.engines.get(engine_name, None)
if not engine:
self.write_log(f"找不到引擎:{engine_name}")
return engine
def get_default_setting(self, gateway_name: str):
"""
Get default setting dict of a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
return gateway.get_default_setting()
return None
基本上是从map中根据指定的名字获得对象的方法。一目了然。get_default_setting
方法应该属于get_gateway的一个下属方法,获得gateway的setting.
get_all方法
def get_all_apps(self):
"""
Get all app objects.
"""
return list(self.apps.values())
def get_all_exchanges(self):
"""
Get all exchanges.
"""
return self.exchanges
也是对add进入的对象进行全局获取的方法。
connect
def connect(self, setting: dict, gateway_name: str):
"""
Start connection of a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
gateway.connect(setting)
基本上可以理解connect是对gateway的一种装饰方法。通过add可以插入gateway,然后调用connect方法的话,可以通过gateway_name获得接口,然后再调用gateway的connect方法。不出意料下面的订阅行情、下单、撤单、批量撤单都是类似的作用。
gateway的其他类似方法
def subscribe(self, req: SubscribeRequest, gateway_name: str):
"""
Subscribe tick data update of a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
gateway.subscribe(req)
def send_order(self, req: OrderRequest, gateway_name: str):
"""
Send new order request to a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
return gateway.send_order(req)
else:
return ""
def cancel_order(self, req: CancelRequest, gateway_name: str):
"""
Send cancel order request to a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
gateway.cancel_order(req)
def send_orders(self, reqs: Sequence[OrderRequest], gateway_name: str):
"""
"""
gateway = self.get_gateway(gateway_name)
if gateway:
return gateway.send_orders(reqs)
else:
return ["" for req in reqs]
def cancel_orders(self, reqs: Sequence[CancelRequest], gateway_name: str):
"""
"""
gateway = self.get_gateway(gateway_name)
if gateway:
gateway.cancel_orders(reqs)
def query_history(self, req: HistoryRequest, gateway_name: str):
"""
Send cancel order request to a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
return gateway.query_history(req)
else:
return None
init_engine
def init_engines(self):
"""
Init all engines.
"""
self.add_engine(LogEngine)
self.add_engine(OmsEngine)
self.add_engine(EmailEngine)
把日志, OMSEngin, EmialEngin装配进来
close 方法
def close(self):
"""
Make sure every gateway and app is closed properly before
programme exit.
"""
# Stop event engine first to prevent new timer event.
self.event_engine.stop()
for engine in self.engines.values():
engine.close()
for gateway in self.gateways.values():
gateway.close()
基本上是程序退出以后的善后工作罢了。
总结
通过对MainEngin代码的梳理,我们看到其实MainEngine本身没有什么深奥之处。就是一个适配器的模式,把所有的操作抽象成了APP,Gateway, Engine。并且提供了一个统一的操作入口,这样可以方便实现扩展。