• 开始收录
    • 订阅行情
    • 将订阅信息保存到json文件
    • 推送行情记录事件
    • 注册行情记录事件
    • 执行记录行情任务

    开始收录

    • 在“本地代码”选择输入需要订阅的行情,如rb1905.SHFE;
    • 然后点击后边“K线记录”或者“Tick记录”中的“添加”选项,会把记录特定品种任务添加到data_recorder_setting.json上,并且显示到“K线记录列表”或者“Tick记录列表”中,如图。
    • 通过queue.put()与queue.get()异步完成收录行情信息任务。https://vnpy-community.oss-cn-shanghai.aliyuncs.com/forum_experience/yazhang/data_recoder/start.png

    下面介绍行情收录的具体原理:若无合约记录的历史,用户需要先添加行情记录任务,如连接CTP接口后记录rb1905.SHFE的tick数据,然后调用add_tick_recording()函数执行下面工作:

    • 先创建tick_recordings字典;
    • 调用接口的suscribe()函数订阅行情;3 )保存该tick_recordings字典到json文件上;
    • 推送行情记录事件。
    1. def add_tick_recording(self, vt_symbol: str):
    2. """"""
    3. if vt_symbol in self.tick_recordings:
    4. self.write_log(f"已在Tick记录列表中:{vt_symbol}")
    5. return
    6.  
    7. contract = self.main_engine.get_contract(vt_symbol)
    8. if not contract:
    9. self.write_log(f"找不到合约:{vt_symbol}")
    10. return
    11.  
    12. self.tick_recordings[vt_symbol] = {}
    13. "symbol": contract.symbol,
    14. "exchange": contract.exchange.value,
    15. "gateway_name": contract.gateway_name
    16. }
    17.  
    18. self.subscribe(contract)
    19. self.save_setting()
    20. self.put_event()
    21.  
    22. self.write_log(f"添加Tick记录成功:{vt_symbol}")

    下面对add_tick_recording()函数里面调用的子函数进行扩展:

    订阅行情

    调用main_engine的suscribe()函数来订阅行情,需要填入的信息为symbol、exchange、gateway_name

    1. def subscribe(self, contract: ContractData):
    2. """"""
    3. req = SubscribeRequest(
    4. symbol=contract.symbol,
    5. exchange=contract.exchange
    6. )
    7. self.main_engine.subscribe(req, contract.gateway_name)

    将订阅信息保存到json文件

    • 主要把tick_recordings或者bar_recordings通过save_json()函数保存到C:\Users\Administrator.vntrader文件夹内的data_recorder_setting.json上。
    • 该json文件用于存放行情记录的任务,当每次启动行情模块后,会调用load_setting()函数来得到tick_recordings和bar_recordings字典,进而开始记录的任务。
    1. setting_filename = "data_recorder_setting.json"
    2. def save_setting(self):
    3. """"""
    4. setting = {
    5. "tick": self.tick_recordings,
    6. "bar": self.bar_recordings
    7. }
    8. save_json(self.setting_filename, setting)
    9.  
    10. def load_setting(self):
    11. """"""
    12. setting = load_json(self.setting_filename)
    13. self.tick_recordings = setting.get("tick", {})
    14. self.bar_recordings = setting.get("bar", {})

    推送行情记录事件

    • 创建行情记录列表tick_symbols和bar_symbols,并且缓存在data字典里;
    • 创建evnte对象,其类型是EVENT_RECORDER_UPDATE, 内容是data字典;
    • 调用event_engine的put()函数推送event事件。
    1. def put_event(self):
    2. """"""
    3. tick_symbols = list(self.tick_recordings.keys())
    4. tick_symbols.sort()
    5.  
    6. bar_symbols = list(self.bar_recordings.keys())
    7. bar_symbols.sort()
    8.  
    9. data = {
    10. "tick": tick_symbols,
    11. "bar": bar_symbols
    12. }
    13.  
    14. event = Event(
    15. EVENT_RECORDER_UPDATE,
    16. data
    17. )
    18. self.event_engine.put(event)

    注册行情记录事件

    register_event()函数分别注册2种事件:EVENT_CONTRACT、EVENT_TICK

    • EVENT_CONTRACT事件,调用的是process_contract_event()函数: 从tick_recordings和bar_recordings字典获取需要订阅的合约品种;然后使用subscribe()函数进行订阅行情。
    • EVENT_TICK事件,调用的是process_tick_event()函数:从tick_recordings和bar_recordings字典获取需要订阅的合约品种;然后使用record_tick()和record_bar()函数,把行情记录任务推送到queue队列中等待执行。
    1. def register_event(self):
    2. """"""
    3. self.event_engine.register(EVENT_TICK, self.process_tick_event)
    4. self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
    5.  
    6. def process_tick_event(self, event: Event):
    7. """"""
    8. tick = event.data
    9.  
    10. if tick.vt_symbol in self.tick_recordings:
    11. self.record_tick(tick)
    12.  
    13. if tick.vt_symbol in self.bar_recordings:
    14. bg = self.get_bar_generator(tick.vt_symbol)
    15. bg.update_tick(tick)
    16.  
    17. def process_contract_event(self, event: Event):
    18. """"""
    19. contract = event.data
    20. vt_symbol = contract.vt_symbol
    21.  
    22. if (vt_symbol in self.tick_recordings or vt_symbol in self.bar_recordings):
    23. self.subscribe(contract)
    24.  
    25. def record_tick(self, tick: TickData):
    26. """"""
    27. task = ("tick", copy(tick))
    28. self.queue.put(task)
    29.  
    30. def record_bar(self, bar: BarData):
    31. """"""
    32. task = ("bar", copy(bar))
    33. self.queue.put(task)
    34.  
    35. def get_bar_generator(self, vt_symbol: str):
    36. """"""
    37. bg = self.bar_generators.get(vt_symbol, None)
    38.  
    39. if not bg:
    40. bg = BarGenerator(self.record_bar)
    41. self.bar_generators[vt_symbol] = bg
    42.  
    43. return bg

    执行记录行情任务

    在while循环中,从queue队列读取任务,调用save_tick_data()或者save_bar_data()函数来记录数据,并且载入到数据库中。

    1. def run(self):
    2. """"""
    3. while self.active:
    4. try:
    5. task = self.queue.get(timeout=1)
    6. task_type, data = task
    7.  
    8. if task_type == "tick":
    9. database_manager.save_tick_data([data])
    10. elif task_type == "bar":
    11. database_manager.save_bar_data([data])
    12.  
    13. except Empty:
    14. continue