标签:shel 信息 发送 连接 控制 count() dad == set
转并修改于:用python的交易员
【前言】中层引擎在设计上主要是为了进一步封装底层接口所暴露出的API函数,使得其更容易被上层的GUI和策略组件调用。本篇的内容会相对简单,主要以LTS接口DEMO为例介绍一些设计方面的思路。
1 ######################################################################## 2 class MainEngine: 3 """主引擎,负责对API的调度""" 4 5 #---------------------------------------------------------------------- 6 def __init__(self): 7 """Constructor""" 8 self.ee = EventEngine() # 创建事件驱动引擎 9 10 self.md = DemoMdApi(self.ee) # 创建API接口 11 #self.md = DemoL2Api(self.ee) # 如果使用L2行情就改为这行 12 self.td = DemoTdApi(self.ee) 13 14 self.ee.start() # 启动事件驱动引擎 15 16 # 循环查询持仓和账户相关 17 self.countGet = 0 # 查询延时计数 18 self.lastGet = ‘Account‘ # 上次查询的性质 19 self.ee.register(EVENT_TDLOGIN, self.initGet) # 登录成功后开始初始化查询 20 21 # 合约储存相关 22 self.dictInstrument = {} # 字典(保存合约查询数据) 23 self.ee.register(EVENT_INSTRUMENT, self.insertInstrument)
首先以主引擎成员变量的形式创建事件驱动引擎ee、行情接口md和交易接口td的对象,两个接口对象创建时传入事件驱动引擎ee对象作为构造函数的参数。
在创建以上三个对象后,立即启动事件驱动引擎,此后当用户调用接口的连接、登录等功能收到事件推送后,ee可以立即推送到监听这些事件的组件进行处理。
LTS和CTP接口的持仓情况和账户情况并不会通过回调函数主动推送,只有在投资者调用查询函数时才会返回,我们的DEMO作为一个手动交易终端,选择使用循环查询的模式不断获取持仓和账户情况的更新。这里的每次查询都会占用网络带宽导致系统延时的增加,对于某些运行全自动策略的程序,可以选择不进行查询,而通过对成交、行情等数据的统计来自行计算持仓和账户情况(甚至对于某些策略可以直接忽略该步骤,进一步降低延时水平)。
LTS和CTP发送主动查询指令时,存在流量控制(通常限制在1秒一次查询),因此选择间隔发送查询账户和查询持仓的指令。
在登录成功后,我们需要查询柜台上所有可交易的合约信息,将这些信息保存到字典dictInstrument中,方便后面需要时进行查询。这里很好的体现出了Python的方便,笔者在设计封装API的回调函数时特别选择使用Python字典的形式推送信息,包含合约信息的字典收到后,可以直接插入到dictInstrument字典进行保存(字典嵌字典),查询时直接使用合约代码即可。而用C++语言开发时通常还需要用到Sqlite之类的内存数据库进行保存,麻烦了不少。
#---------------------------------------------------------------------- def login(self, userid, mdPassword, tdPassword, brokerid, mdAddress, tdAddress): """登陆""" self.md.login(mdAddress, userid, mdPassword, brokerid) self.td.login(tdAddress, userid, tdPassword, brokerid) #---------------------------------------------------------------------- def subscribe(self, instrumentid, exchangeid): """订阅合约""" self.md.subscribe(instrumentid, exchangeid) #---------------------------------------------------------------------- def getAccount(self): """查询账户""" self.td.getAccount() #---------------------------------------------------------------------- def getInvestor(self): """查询投资者""" self.td.getInvestor() #---------------------------------------------------------------------- def getPosition(self): """查询持仓""" self.td.getPosition() #---------------------------------------------------------------------- def getInstrument(self): """获取合约""" event = Event(type_=EVENT_LOG) log = u‘查询合约信息‘ event.dict_[‘log‘] = log self.ee.put(event) self.td.getInstrument() #---------------------------------------------------------------------- def sendOrder(self, instrumentid, exchangeid, price, pricetype, volume, direction, offset): """发单""" self.td.sendOrder(instrumentid, exchangeid, price, pricetype, volume, direction, offset) #---------------------------------------------------------------------- def cancelOrder(self, instrumentid, exchangeid, orderref, frontid, sessionid): """撤单""" self.td.cancelOrder(instrumentid, exchangeid, orderref, frontid, sessionid) #---------------------------------------------------------------------- def getAccountPosition(self, event): """循环查询账户和持仓""" self.countGet = self.countGet + 1 # 每5秒发一次查询 if self.countGet > 5: self.countGet = 0 # 清空计数 if self.lastGet == ‘Account‘: self.getPosition() self.lastGet = ‘Position‘ else: self.getAccount() self.lastGet = ‘Account‘ #---------------------------------------------------------------------- def initGet(self, event): """在交易服务器登录成功后,开始初始化查询""" # 打开设定文件setting.vn f = shelve.open(‘setting.vn‘) # 尝试读取设定字典,若该字典不存在,则发出查询请求 try: d = f[‘instrument‘] # 如果本地保存的合约数据是今日的,则载入,否则发出查询请求 today = date.today() if d[‘date‘] == today: self.dictInstrument = d[‘dictInstrument‘] event = Event(type_=EVENT_LOG) log = u‘合约信息读取完成‘ event.dict_[‘log‘] = log self.ee.put(event) self.getInvestor() # 开始循环查询 self.ee.register(EVENT_TIMER, self.getAccountPosition) else: self.getInstrument() except KeyError: self.getInstrument() f.close() #---------------------------------------------------------------------- def insertInstrument(self, event): """插入合约对象""" data = event.dict_[‘data‘] last = event.dict_[‘last‘] self.dictInstrument[data[‘InstrumentID‘]] = data # 合约对象查询完成后,查询投资者信息并开始循环查询 if last: # 将查询完成的合约信息保存到本地文件,今日登录可直接使用不再查询 self.saveInstrument() event = Event(type_=EVENT_LOG) log = u‘合约信息查询完成‘ event.dict_[‘log‘] = log self.ee.put(event) self.getInvestor() # 开始循环查询 self.ee.register(EVENT_TIMER, self.getAccountPosition) #---------------------------------------------------------------------- def selectInstrument(self, instrumentid): """获取合约信息对象""" try: instrument = self.dictInstrument[instrumentid] except KeyError: instrument = None return instrument #---------------------------------------------------------------------- def exit(self): """退出""" # 销毁API对象 self.td = None self.md = None # 停止事件驱动引擎 self.ee.stop() #---------------------------------------------------------------------- def saveInstrument(self): """保存合约属性数据""" f = shelve.open(‘setting.vn‘) d = {} d[‘dictInstrument‘] = self.dictInstrument d[‘date‘] = date.today() f[‘instrument‘] = d f.close()
LTS和CTP的行情和交易账户通常是由经纪商一起提供的,为了简洁起见,把两个接口的登录一起封装在了login函数中。
subscribe, getAccount, getPosition, getInvestor, getInstrument, sendOrder, cancelOrder都只是直接调用行情和交易接口的功能,做这种设计的目的是为了让用户在写上层组件代码时,可以直接写self.mainEngine.subscribe(‘IF1506‘, ‘CFFEX‘),而不必去写self.mainEngine.md.subscribe(‘IF1506‘, ‘CFFEX‘),很小的区别,但是相信我,程序规模大了后会让你烦到崩溃。
getAccountPosition用于实现循环查询账户和持仓,这里我设为了每5秒发一次,基本够用就好。
initGet和insertInstrument两个函数实现了登录后初始化查询相关的功能。首先登录完成后,程序用shelve模块打开本地保存的文件setting.vn,检查其中的合约信息数据,如果该数据的更新日期为今日,则直接读取使用,无需再次查询可交易合约信息(一般该信息每天日内保持不变)。否则调用getInstrument函数查询合约信息,insertInstrument函数负责将收到的合约信息插入到dictInstrument字典中,当所有合约信息都插入完成后,通过saveInstrument函数将该字典保存到setting.vn中,即可实现日内再次登录无需查询直接读取。
合约信息查询完成后,通常会再查询一次投资者姓名getInvestor,主要用于输出在程序的标题栏上(方便用户检查,防止登录错账户等),并将循环查询函数getAccountPosition注册到每秒触发的定时器事件监听上,开始循环查询。
当用户退出程序时,需要调用主引擎的exit函数,释放行情和交易API接口对象(在C++环境中会自动析构),并停止事件驱动引擎的工作线程(否则可能报线程退出错误等)。
中层引擎的设计思路主要是封装底层接口,从而让用户在开发顶层GUI和策略组件时无需直接调用底层接口的主动函数,降低开发和维护的复杂度。
DEMO中的这个中层引擎非常简单,功能上只实现了个循环查询(其实也可以在接口层中实现,视乎需求)。而在实际开发中,中层引擎通常会包含用户最常用的一些功能模块,比如做期权策略的会加入期权定价引擎,做期现套利的会加入持仓组合的敞口监控对冲引擎等等。
标签:shel 信息 发送 连接 控制 count() dad == set
原文地址:https://www.cnblogs.com/huangfuyuan/p/9565852.html