在线服务

介绍

../_images/online_serving.png

除了回测之外,测试模型有效性的一种方法是在真实市场条件下进行预测,甚至基于这些预测进行真实交易。 ``Online Serving``是一组使用最新数据的在线模型模块, 包括`在线管理器 <#Online Manager>`_、在线策略在线工具 <#Online Tool>`_和`更新器

这里 有几个参考示例,展示了``Online Serving``的不同功能。 如果您有许多模型或`任务`需要管理,请考虑使用`任务管理 <../advanced/task_management.html>`_。 这些`示例 <https://github.com/ssvip9527/qlib/tree/main/examples/online_srv>`_基于`任务管理 <../advanced/task_management.html>`_中的一些组件,如``TrainerRM``或``Collector``。

注意:用户应保持数据源更新以支持在线服务。例如,Qlib提供了`一组脚本 <https://github.com/ssvip9527/qlib/blob/main-cn/scripts/data_collector/yahoo/README.md#automatic-update-of-daily-frequency-datafrom-yahoo-finance>`_来帮助用户更新Yahoo的每日数据。

当前已知限制 - 目前支持对下一个交易日的每日更新预测。但由于`公开数据的限制 <https://github.com/ssvip9527/qlib/issues/215#issuecomment-766293563>_`,不支持为下一个交易日生成订单。

在线管理器

OnlineManager可以管理一组`Online Strategy <#Online Strategy>`_并动态运行它们。

随着时间的推移,决策模型也会发生变化。在本模块中,我们将这些贡献模型称为`online`模型。 在每个例行程序(如每天或每分钟)中,`online`模型可能会发生变化,需要更新它们的预测。 因此本模块提供了一系列方法来控制这个过程。

本模块还提供了一种在历史中模拟`Online Strategy <#Online Strategy>`_的方法。 这意味着您可以验证您的策略或找到更好的策略。

在不同情况下使用不同训练器共有4种情况:

以下是一些伪代码,展示了每种情况的工作流程

为简单起见
  • 策略中只使用一个策略

  • `update_online_pred`仅在在线模式下调用并被忽略

  1. 在线+训练器

tasks = first_train()
models = trainer.train(tasks)
trainer.end_train(models)
for day in online_trading_days:
    # OnlineManager.routine
    models = trainer.train(strategy.prepare_tasks())  # 对每个策略
    strategy.prepare_online_models(models)  # 对每个策略

    trainer.end_train(models)
    prepare_signals()  # 每日准备交易信号

在线+延迟训练器: 工作流程与`在线+训练器`相同。

  1. 模拟+延迟训练器

# 模拟
tasks = first_train()
models = trainer.train(tasks)
for day in historical_calendars:
    # OnlineManager.routine
    models = trainer.train(strategy.prepare_tasks())  # 对每个策略
    strategy.prepare_online_models(models)  # 对每个策略
# delay_prepare()
# FIXME: 目前delay_prepare没有以正确的方式实现。
trainer.end_train(<for all previous models>)
prepare_signals()

# 我们可以简化当前的工作流程吗?

  • 可以减少任务的状态数量吗?

    • 对于每个任务,我们有三个阶段(即任务、部分训练的任务、最终训练的任务)

class qlib.workflow.online.manager.OnlineManager(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer = None, begin_time: str | Timestamp = None, freq='day')

OnlineManager可以通过`Online Strategy <#Online Strategy>`_管理在线模型。 它还提供了哪些模型在什么时间在线的历史记录。

__init__(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer = None, begin_time: str | Timestamp = None, freq='day')

初始化OnlineManager。 一个OnlineManager必须至少有一个OnlineStrategy。

参数:

strategies (Union[OnlineStrategy, List[OnlineStrategy]]): OnlineStrategy实例或OnlineStrategy列表 begin_time (Union[str,pd.Timestamp], 可选): OnlineManager将在此时间开始。默认为None表示使用最新日期。 trainer (qlib.model.trainer.Trainer): 用于训练任务的训练器。None表示使用TrainerR。 freq (str, 可选): 数据频率。默认为"day"。

first_train(strategies: List[OnlineStrategy] = None, model_kwargs: dict = {})

从每个策略的first_tasks方法获取任务并训练它们。 如果使用DelayTrainer,它可以在每个策略的first_tasks之后一起完成所有训练。

参数:

strategies (List[OnlineStrategy]): 策略列表(添加策略时需要此参数)。None表示使用默认策略。 model_kwargs (dict): `prepare_online_models`的参数

routine(cur_time: str | Timestamp = None, task_kwargs: dict = {}, model_kwargs: dict = {}, signal_kwargs: dict = {})

每个策略的典型更新过程并记录在线历史。

例行程序(如逐日或逐月)后的典型更新过程。 过程是: 更新预测 -> 准备任务 -> 准备在线模型 -> 准备信号。

如果使用DelayTrainer,它可以在每个策略的prepare_tasks之后一起完成所有训练。

参数:

cur_time (Union[str,pd.Timestamp], 可选): 在此时间运行routine方法。默认为None。 task_kwargs (dict): `prepare_tasks`的参数 model_kwargs (dict): `prepare_online_models`的参数 signal_kwargs (dict): `prepare_signals`的参数

get_collector(**kwargs) MergeCollector

获取`Collector <../advanced/task_management.html#Task Collecting>`_实例以收集每个策略的结果。 此收集器可以作为信号准备的基础。

参数:

**kwargs: get_collector的参数。

返回:

MergeCollector: 用于合并其他收集器的收集器。

add_strategy(strategies: OnlineStrategy | List[OnlineStrategy])

向OnlineManager添加一些新策略。

参数:

strategy (Union[OnlineStrategy, List[OnlineStrategy]]): OnlineStrategy列表

prepare_signals(prepare_func: ~typing.Callable = <qlib.model.ens.ensemble.AverageEnsemble object>, over_write=False)

在准备完最后一个例行程序(箱线图中的一个框)的数据后,这意味着例行程序的结束,我们可以为下一个例行程序准备交易信号。

注意: 给定一组预测,这些预测结束时间之前的所有信号都将准备好。

即使最新的信号已经存在,最新的计算结果也将被覆盖。

备注

给定某个时间的预测,此时间之前的所有信号都将准备好。

参数:

prepare_func (Callable, 可选): 从收集后的字典中获取信号。默认为AverageEnsemble(),由MergeCollector收集的结果必须是{xxx:pred}。 over_write (bool, 可选): 如果为True,新信号将覆盖。如果为False,新信号将附加到信号末尾。默认为False。

返回:

pd.DataFrame: 信号。

get_signals() Series | DataFrame

获取准备好的在线信号。

返回:

Union[pd.Series, pd.DataFrame]: pd.Series表示每个日期时间只有一个信号。 pd.DataFrame表示多个信号,例如买卖操作使用不同的交易信号。

simulate(end_time=None, frequency='day', task_kwargs={}, model_kwargs={}, signal_kwargs={}) Series | DataFrame

从当前时间开始,此方法将模拟OnlineManager中的每个例行程序,直到结束时间。

考虑到并行训练,模型和信号可以在所有例行程序模拟后准备。

延迟训练方式可以是``DelayTrainer``,延迟准备信号方式可以是``delay_prepare``。

参数:

end_time: 模拟结束的时间 frequency: 日历频率 task_kwargs (dict): `prepare_tasks`的参数 model_kwargs (dict): `prepare_online_models`的参数 signal_kwargs (dict): `prepare_signals`的参数

返回:

Union[pd.Series, pd.DataFrame]: pd.Series表示每个日期时间只有一个信号。 pd.DataFrame表示多个信号,例如买卖操作使用不同的交易信号。

delay_prepare(model_kwargs={}, signal_kwargs={})

如果有任何内容等待准备,则准备所有模型和信号。

参数:

model_kwargs: `end_train`的参数 signal_kwargs: `prepare_signals`的参数

在线策略

OnlineStrategy模块是在线服务的一个组件。

class qlib.workflow.online.strategy.OnlineStrategy(name_id: str)

OnlineStrategy与`Online Manager <#Online Manager>`_配合使用,负责处理任务生成、模型更新和信号准备的方式。

__init__(name_id: str)

初始化OnlineStrategy。 此模块**必须**使用`Trainer <../reference/api.html#qlib.model.trainer.Trainer>`_来完成模型训练。

参数:

name_id (str): 唯一的名称或ID。 trainer (qlib.model.trainer.Trainer, 可选): Trainer的实例。默认为None。

prepare_tasks(cur_time, **kwargs) List[dict]

在例行程序结束后,根据当前时间(None表示最新)检查是否需要准备和训练一些新任务。 返回等待训练的新任务。

您可以通过OnlineTool.online_models找到最后的在线模型。

prepare_online_models(trained_models, cur_time=None) List[object]

从训练好的模型中选择一些模型并将它们设置为在线模型。 这是一个将所有训练好的模型设为在线的典型实现,您可以重写它来实现更复杂的方法。 如果仍需要,可以通过OnlineTool.online_models找到最后的在线模型。

注意:将所有在线模型重置为训练好的模型。如果没有训练好的模型,则不执行任何操作。

注意:

当前实现非常简单。以下是一个更接近实际场景的复杂情况: 1. 在`test_start`前一天(时间戳`T`)训练新模型 2. 在`test_start`时(通常是时间戳`T + 1`)切换模型

参数:

models (list): 模型列表。 cur_time (pd.Dataframe): 来自OnlineManger的当前时间。None表示最新。

返回:

List[object]: 在线模型列表。

first_tasks() List[dict]

首先生成一系列任务并返回它们。

get_collector() Collector

获取`Collector <../advanced/task_management.html#Task Collecting>`_实例以收集此策略的不同结果。

例如:
  1. 在Recorder中收集预测

  2. 在txt文件中收集信号

返回:

Collector

class qlib.workflow.online.strategy.RollingStrategy(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)

此示例策略始终使用最新的滚动模型作为在线模型。

__init__(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)

初始化RollingStrategy。

假设:name_id的字符串、实验名称和训练器的实验名称相同。

参数:

name_id (str): 唯一的名称或ID。也将作为实验的名称。 task_template (Union[dict, List[dict]]): 任务模板列表或单个模板,将用于通过rolling_gen生成多个任务。 rolling_gen (RollingGen): RollingGen的实例

get_collector(process_list=[<qlib.model.ens.group.RollingGroup object>], rec_key_func=None, rec_filter_func=None, artifacts_key=None)

获取`Collector <../advanced/task_management.html#Task Collecting>`_实例以收集结果。返回的收集器必须能够区分不同模型的结果。

假设:可以根据模型名称和滚动测试段来区分模型。 如果不希望此假设,请实现您自己的方法或使用其他rec_key_func。

参数:

rec_key_func (Callable): 获取记录器键的函数。如果为None,则使用记录器ID。 rec_filter_func (Callable, 可选): 通过返回True或False来过滤记录器。默认为None。 artifacts_key (List[str], 可选): 要获取的工件键。如果为None,则获取所有工件。

first_tasks() List[dict]

使用rolling_gen基于task_template生成不同的任务

返回:

List[dict]: 任务列表

prepare_tasks(cur_time) List[dict]

根据当前时间(最新为None)准备新任务

可以通过OnlineToolR.online_models查找最新的在线模型

返回:

List[dict]: 新任务列表

在线工具

OnlineTool是一个用于设置和取消设置一系列`online`模型的模块。 `online`模型是在某些时间点的决定性模型,可以随时间变化而改变。 这使我们能够使用高效的子模型来适应市场风格的变化。

class qlib.workflow.online.utils.OnlineTool

OnlineTool将管理包含模型记录器的实验中的`online`模型。

__init__()

初始化OnlineTool。

set_online_tag(tag, recorder: list | object)

设置模型的`tag`标记其是否为在线状态。

参数:

tag (str): `ONLINE_TAG`或`OFFLINE_TAG`中的标签 recorder (Union[list,object]): 模型的记录器

get_online_tag(recorder: object) str

给定模型记录器,返回其在线标签。

参数:

recorder (Object): 模型的记录器

返回:

str: 在线标签

reset_online_tag(recorder: list | object)

将所有模型下线并将指定记录器设置为'online'。

参数:
recorder (Union[list,object]):

要重置为'online'的记录器

online_models() list

获取当前`online`模型

返回:

list: `online`模型列表

update_online_pred(to_date=None)

将`online`模型的预测更新到to_date。

参数:

to_date (pd.Timestamp): 更新此日期之前的预测。None表示更新到最新。

class qlib.workflow.online.utils.OnlineToolR(default_exp_name: str = None)

基于记录器(R)的OnlineTool实现。

__init__(default_exp_name: str = None)

初始化OnlineToolR。

参数:

default_exp_name (str): 默认实验名称

set_online_tag(tag, recorder: Recorder | List)

设置模型记录器的`tag`标记其是否为在线状态。

参数:

tag (str): ONLINE_TAG`NEXT_ONLINE_TAG`或`OFFLINE_TAG`中的标签 recorder (Union[Recorder, List]): 记录器列表或单个记录器实例

get_online_tag(recorder: Recorder) str

给定模型记录器,返回其在线标签。

参数:

recorder (Recorder): 记录器实例

返回:

str: 在线标签

reset_online_tag(recorder: Recorder | List, exp_name: str = None)

将所有模型下线并将指定记录器设置为'online'。

参数:
recorder (Union[Recorder, List]):

要重置为'online'的记录器

exp_name (str): 实验名称。如果为None则使用default_exp_name

online_models(exp_name: str = None) list

获取当前`online`模型

参数:

exp_name (str): 实验名称。如果为None则使用default_exp_name

返回:

list: `online`模型列表

update_online_pred(to_date=None, from_date=None, exp_name: str = None)

将在线模型的预测更新到to_date。

参数:

to_date (pd.Timestamp): 更新此日期之前的预测。None表示更新到日历中的最新时间 exp_name (str): 实验名称。如果为None则使用default_exp_name

更新器

更新器模块,用于在股票数据更新时更新预测等artifact。

class qlib.workflow.online.update.RMDLoader(rec: Recorder)

Recorder Model Dataset Loader

__init__(rec: Recorder)
get_dataset(start_time, end_time, segments=None, unprepared_dataset: DatasetH | None = None) DatasetH

加载、配置和设置数据集

该数据集用于推理

参数:
start_time :

基础数据的开始时间

end_time :

基础数据的结束时间

segmentsdict

数据集的分段配置 对于时间序列数据集(TSDatasetH),测试段可能与开始时间和结束时间不同

unprepared_dataset: Optional[DatasetH]

如果用户不想从记录器加载数据集,请指定用户的数据集

返回:

DatasetH: DatasetH实例

class qlib.workflow.online.update.RecordUpdater(record: Recorder, *args, **kwargs)

Update a specific recorders

__init__(record: Recorder, *args, **kwargs)
abstractmethod update(*args, **kwargs)

Update info for specific recorder

class qlib.workflow.online.update.DSBasedUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

基于数据集的更新器

  • 提供基于Qlib数据集更新数据的功能

假设条件

  • 基于Qlib数据集

  • 要更新的数据是多级索引的pd.DataFrame,例如标签、预测

                             LABEL0
    datetime   instrument
    2021-05-10 SH600000    0.006965
               SH600004    0.003407
    ...                         ...
    2021-05-28 SZ300498    0.015748
               SZ300676   -0.001321
    
__init__(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

初始化预测更新器

在以下情况下的预期行为:

  • 如果`to_date`大于日历中的最大日期,数据将更新到最新日期

  • 如果有数据在`from_date`之前或`to_date`之后,只有`from_date`和`to_date`之间的数据会受到影响

参数:
recordRecorder

记录器

to_date :

更新预测到`to_date`

如果to_date为None:

数据将更新到最新日期

from_date :

更新将从`from_date`开始

如果from_date为None:

更新将在历史数据中最新数据的下一个时间点进行

hist_refint

有时数据集会有历史依赖 将历史依赖长度的问题留给用户设置 如果用户不指定此参数,更新器将尝试加载数据集自动确定hist_ref

备注

start_time不包含在`hist_ref`中;因此`hist_ref`在大多数情况下会是`step_len - 1`

loader_clstype

加载模型和数据集的类

prepare_data(unprepared_dataset: DatasetH | None = None) DatasetH

加载数据集 - 如果指定了unprepared_dataset,则直接准备数据集 - 否则

分离此函数将使重用数据集更容易

返回:

DatasetH: DatasetH实例

update(dataset: DatasetH = None, write: bool = True, ret_new: bool = False) object | None

参数

datasetDatasetH

DatasetH实例。None表示需要重新准备

writebool

是否执行写入操作

ret_newbool

是否返回更新后的数据

返回

Optional[object]

更新后的数据集

abstractmethod get_update_data(dataset: Dataset) DataFrame

基于给定数据集返回更新后的数据

`get_update_data`和`update`的区别 - `update_date`只包含一些数据特定的功能 - `update`包含一些常规步骤(例如准备数据集、检查)

class qlib.workflow.online.update.PredUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

更新记录器中的预测

get_update_data(dataset: Dataset) DataFrame

基于给定数据集返回更新后的数据

`get_update_data`和`update`的区别 - `update_date`只包含一些数据特定的功能 - `update`包含一些常规步骤(例如准备数据集、检查)

class qlib.workflow.online.update.LabelUpdater(record: Recorder, to_date=None, **kwargs)

更新记录器中的标签

假设条件 - 标签由record_temp.SignalRecord生成

__init__(record: Recorder, to_date=None, **kwargs)

初始化预测更新器

在以下情况下的预期行为:

  • 如果`to_date`大于日历中的最大日期,数据将更新到最新日期

  • 如果有数据在`from_date`之前或`to_date`之后,只有`from_date`和`to_date`之间的数据会受到影响

参数:
recordRecorder

记录器

to_date :

更新预测到`to_date`

如果to_date为None:

数据将更新到最新日期

from_date :

更新将从`from_date`开始

如果from_date为None:

更新将在历史数据中最新数据的下一个时间点进行

hist_refint

有时数据集会有历史依赖 将历史依赖长度的问题留给用户设置 如果用户不指定此参数,更新器将尝试加载数据集自动确定hist_ref

备注

start_time不包含在`hist_ref`中;因此`hist_ref`在大多数情况下会是`step_len - 1`

loader_clstype

加载模型和数据集的类

get_update_data(dataset: Dataset) DataFrame

基于给定数据集返回更新后的数据

`get_update_data`和`update`的区别 - `update_date`只包含一些数据特定的功能 - `update`包含一些常规步骤(例如准备数据集、检查)