任务管理

简介

工作流 部分介绍了如何以松耦合方式运行研究工作流。但使用 qrun 时只能执行一个 任务。 为了自动生成和执行不同的任务,任务管理 提供了包括 任务生成任务存储任务训练任务收集 的完整流程。 通过此模块,用户可以在不同时间段、使用不同损失函数甚至不同模型自动运行其 任务。任务生成、模型训练以及数据合并和收集的流程如下图所示。

../_images/Task-Gen-Recorder-Collector.svg

整个流程可用于 在线服务

完整流程的示例见 此处

任务生成

一个 任务模型数据集记录器 或用户添加的任何内容组成。 具体的任务模板可在 任务部分 中查看。 尽管任务模板是固定的,用户仍可以自定义 TaskGen``(任务生成器),通过任务模板生成不同的 ``任务

以下是 TaskGen 的基类:

class qlib.workflow.task.gen.TaskGen

生成不同任务的基类

示例1:

输入: 一个特定的任务模板和滚动步骤

输出: 任务的滚动版本

示例2:

输入: 一个特定的任务模板和损失列表

输出: 一组具有不同损失的任务

abstractmethod generate(task: dict) List[dict]

Generate different tasks based on a task template

参数:

task (dict) -- a task template

返回:

A list of tasks

返回类型:

List[dict]

Qlib 提供了一个 RollingGen 类,用于生成数据集在不同日期段的 任务 列表。 此类允许用户在一个实验中验证不同时期数据对模型的影响。更多信息见 此处

任务存储

为了提高效率并支持集群操作,任务管理器 会将所有任务存储在 MongoDB 中。 ``TaskManager``(任务管理器)可以自动获取未完成的任务,并通过错误处理管理一组任务的生命周期。 使用此模块时,用户 必须 完成 MongoDB 的配置。

用户需要在 初始化 中提供 MongoDB URL 和数据库名称以使用 TaskManager,或进行如下配置:

from qlib.config import C
C["mongo"] = {
    "task_url" : "mongodb://localhost:27017/", # 你的 MongoDB URL
    "task_db_name" : "rolling_db" # 数据库名称
}
class qlib.workflow.task.manage.TaskManager(task_pool: str)

以下是TaskManager创建的任务示例:

{
    'def': 'pickle序列化的任务定义,使用pickle更方便',
    'filter': '类JSON数据,用于过滤任务',
    'status': 'waiting' | 'running' | 'done',
    'res': 'pickle序列化的任务结果'
}

任务管理器假设您只会更新已获取的任务。 MongoDB的获取和更新操作确保数据更新安全。

此类可作为命令行工具使用。以下是几个示例: 查看manage模块帮助的命令: python -m qlib.workflow.task.manage -h # 显示manage模块CLI手册 python -m qlib.workflow.task.manage wait -h # 显示wait命令手册

python -m qlib.workflow.task.manage -t <pool_name> wait
python -m qlib.workflow.task.manage -t <pool_name> task_stat

备注

假设:MongoDB中的数据会被编码,取出的数据会被解码

四种状态说明:

STATUS_WAITING: 等待训练

STATUS_RUNNING: 训练中

STATUS_PART_DONE: 已完成部分步骤,等待下一步

STATUS_DONE: 全部工作完成

__init__(task_pool: str)

Init Task Manager, remember to make the statement of MongoDB url and database name firstly. A TaskManager instance serves a specific task pool. The static method of this module serves the whole MongoDB.

参数:

task_pool (str) -- the name of Collection in MongoDB

static list() list

列出数据库中所有集合(任务池)。

返回:

list

replace_task(task, new_task)

Use a new task to replace a old one

参数:
  • task -- old task

  • new_task -- new task

insert_task(task)

Insert a task.

参数:

task -- the task waiting for insert

返回:

pymongo.results.InsertOneResult

insert_task_def(task_def)

Insert a task to task_pool

参数:

task_def (dict) -- the task definition

返回类型:

pymongo.results.InsertOneResult

create_task(task_def_l, dry_run=False, print_nt=False) List[str]

如果task_def_l中的任务是新的,则插入新任务到任务池并记录inserted_id。 如果任务已存在,则只查询其_id。

参数

task_def_l: list

任务列表

dry_run: bool

是否实际插入新任务到任务池

print_nt: bool

是否打印新任务

返回

List[str]

task_def_l中各任务的_id列表

fetch_task(query={}, status='waiting') dict

使用查询获取任务。

参数:

query (dict, optional): 查询字典,默认为{} status (str, optional): 任务状态,默认为STATUS_WAITING

返回:

dict: 解码后的任务(集合中的文档)

safe_fetch_task(query={}, status='waiting')

使用contextmanager从任务池中获取任务

参数

query: dict

查询字典

返回

dict: 解码后的任务(集合中的文档)

query(query={}, decode=True)

查询集合中的任务。 如果迭代生成器耗时过长,此函数可能抛出异常`pymongo.errors.CursorNotFound: cursor id not found`

示例:

python -m qlib.workflow.task.manage -t <your task pool> query '{"_id": "615498be837d0053acbc5d58"}'

参数

query: dict

查询字典

decode: bool

是否解码结果

返回

dict: 解码后的任务(集合中的文档)

re_query(_id) dict

使用_id查询任务。

参数:

_id (str): 文档的_id

返回:

dict: 解码后的任务(集合中的文档)

commit_task_res(task, res, status='done')

提交结果到task['res']。

参数:

task ([type]): 任务 res (object): 要保存的结果 status (str, optional): STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为STATUS_DONE。

return_task(task, status='waiting')

Return a task to status. Always using in error handling.

参数:
  • task ([type]) -- [description]

  • status (str, optional) -- STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE. Defaults to STATUS_WAITING.

remove(query={})

Remove the task using query

参数:

query (dict) -- the dict of query

task_stat(query={}) dict

Count the tasks in every status.

参数:

query (dict, optional) -- the query dict. Defaults to {}.

返回:

dict

reset_waiting(query={})

将所有运行中的任务重置为等待状态。可用于某些任务意外退出的情况。

参数:

query (dict, optional): 查询字典,默认为{}

prioritize(task, priority: int)

Set priority for task

参数:
  • task (dict) -- The task query from the database

  • priority (int) -- the target priority

wait(query={})

在多进程环境下,主进程可能因为仍有任务在运行而无法从TaskManager获取任务。 因此主进程应等待其他进程或机器完成所有任务。

参数:

query (dict, optional): 查询字典,默认为{}

任务管理器 的更多信息见 此处

任务训练

生成并存储这些 任务 后,就可以运行处于 WAITING*(等待)状态的 ``任务`` 了。 ``Qlib`` 提供了 ``run_task`` 方法来运行任务池中的 ``任务``,不过用户也可以自定义任务的执行方式。 获取 ``task_func``(任务函数)的简单方法是直接使用 ``qlib.model.trainer.task_train``。 它将运行由 ``任务`` 定义的整个工作流,包括 *模型数据集记录器

qlib.workflow.task.manage.run_task(task_func: Callable, task_pool: str, query: dict = {}, force_release: bool = False, before_status: str = 'waiting', after_status: str = 'done', **kwargs)

当任务池不为空(有WAITING状态任务)时,使用task_func获取并运行任务池中的任务

运行此方法后,有以下4种情况(before_status -> after_status):

STATUS_WAITING -> STATUS_DONE: 使用task["def"]作为`task_func`参数,表示任务尚未开始

STATUS_WAITING -> STATUS_PART_DONE: 使用task["def"]作为`task_func`参数

STATUS_PART_DONE -> STATUS_PART_DONE: use task["res"] as task_func param, it means that the task has been started but not completed

STATUS_PART_DONE -> STATUS_DONE: use task["res"] as task_func param

参数:
  • task_func (Callable) --

    def (task_def, **kwargs) -> <res which will be committed>

    the function to run the task

  • task_pool (str) -- the name of the task pool (Collection in MongoDB)

  • query (dict) -- will use this dict to query task_pool when fetching task

  • force_release (bool) -- will the program force to release the resource

  • before_status (str:) -- the tasks in before_status will be fetched and trained. Can be STATUS_WAITING, STATUS_PART_DONE.

  • after_status (str:) -- the tasks after trained will become after_status. Can be STATUS_WAITING, STATUS_PART_DONE.

  • kwargs -- the params for task_func

同时,Qlib 提供了一个名为 ``Trainer``(训练器)的模块。

class qlib.model.trainer.Trainer

训练器用于训练模型列表 Trainer和DelayTrainer的区别在于完成实际训练的时机不同

__init__()
train(tasks: list, *args, **kwargs) list

给定任务定义列表,开始训练并返回模型。

对于Trainer,此方法完成实际训练。 对于DelayTrainer,此方法仅做准备工作。

参数:

tasks (list): 任务定义列表

返回:

list: 模型列表

注意:
  • 对于`Trainer`,此方法将直接训练模型

  • 对于`DelayTrainer`,此方法仅做训练准备

end_train(models: list, *args, **kwargs) list

给定模型列表,在训练结束时完成必要操作 模型可能是记录器、文本文件、数据库等

对于Trainer,该方法做一些收尾工作 对于DelayTrainer,该方法完成实际训练

参数:

models: 模型列表

返回:

list: 模型列表

is_delay() bool

判断训练器是否会延迟完成`end_train`

返回:

bool: 是否为DelayTrainer

has_worker() bool

判断是否启用了并行训练的后台工作器

返回

bool:

工作器是否启用

worker()

启动工作进程

异常:

NotImplementedError: 如果不支持工作进程

Trainer 会训练一系列任务并返回一系列模型记录器。 Qlib 提供两种训练器:TrainerR 是最简单的方式,而 TrainerRM 基于 TaskManager,可帮助自动管理任务生命周期。 如果不想使用 任务管理器 管理任务,使用 TrainerR 训练由 TaskGen 生成的任务列表即可。 不同 Trainer 的详细信息见 此处

任务收集

收集模型训练结果前,需要使用 qlib.init 指定 mlruns 的路径。

为了收集训练后的 任务 结果,Qlib 提供了 Collector(收集器)Group(分组器)Ensemble(集成器),以可读、可扩展且松耦合的方式收集结果。

Collector 可以从任何地方收集对象并对其进行合并、分组、平均等处理。它包含两个步骤:``collect``(将任何内容收集到字典中)和 ``process_collect``(处理收集到的字典)。

Group 也有两个步骤:group``(可基于 `group_func` 对一组对象进行分组并转换为字典)和 ``reduce``(可基于某些规则将字典转换为集成结果)。 例如:{(A,B,C1): 对象, (A,B,C2): 对象} ---``group---> {(A,B): {C1: 对象, C2: 对象}} ---reduce---> {(A,B): 对象}

Ensemble 可以合并集成中的对象。 例如:{C1: 对象, C2: 对象} ---Ensemble---> 对象。 你可以在 Collector 的 process_list 中设置所需的集成器。 常见的集成器包括 ``AverageEnsemble``(平均集成器)和 ``RollingEnsemble``(滚动集成器)。平均集成器用于集成同一时间段内不同模型的结果,滚动集成器用于集成同一时间段内不同模型的结果。

因此,层次结构为:Collector 的第二步对应 Group,而 Group 的第二步对应 Ensemble

更多信息,请参见 CollectorGroupEnsemble,或 示例