任务管理
简介
工作流 部分介绍了如何以松耦合方式运行研究工作流。但使用 qrun
时只能执行一个 任务
。
为了自动生成和执行不同的任务,任务管理
提供了包括 任务生成、任务存储、任务训练 和 任务收集 的完整流程。
通过此模块,用户可以在不同时间段、使用不同损失函数甚至不同模型自动运行其 任务
。任务生成、模型训练以及数据合并和收集的流程如下图所示。
整个流程可用于 在线服务。
完整流程的示例见 此处。
任务生成
一个 任务
由 模型、数据集、记录器 或用户添加的任何内容组成。
具体的任务模板可在
任务部分 中查看。
尽管任务模板是固定的,用户仍可以自定义 TaskGen``(任务生成器),通过任务模板生成不同的 ``任务
。
以下是 TaskGen
的基类:
- class qlib.workflow.task.gen.TaskGen
生成不同任务的基类
示例1:
输入: 一个特定的任务模板和滚动步骤
输出: 任务的滚动版本
示例2:
输入: 一个特定的任务模板和损失列表
输出: 一组具有不同损失的任务
- abstractmethod generate(task: dict) List[dict]
Generate different tasks based on a task template
- 参数:
task (dict) -- a task template
- 返回:
A list of tasks
- 返回类型:
List[dict]
Qlib
提供了一个 RollingGen 类,用于生成数据集在不同日期段的 任务
列表。
此类允许用户在一个实验中验证不同时期数据对模型的影响。更多信息见 此处。
任务存储
为了提高效率并支持集群操作,任务管理器
会将所有任务存储在 MongoDB 中。
``TaskManager``(任务管理器)可以自动获取未完成的任务,并通过错误处理管理一组任务的生命周期。
使用此模块时,用户 必须 完成 MongoDB 的配置。
用户需要在 初始化 中提供 MongoDB URL 和数据库名称以使用 TaskManager
,或进行如下配置:
from qlib.config import C C["mongo"] = { "task_url" : "mongodb://localhost:27017/", # 你的 MongoDB URL "task_db_name" : "rolling_db" # 数据库名称 }
- class qlib.workflow.task.manage.TaskManager(task_pool: str)
以下是TaskManager创建的任务示例:
{ 'def': 'pickle序列化的任务定义,使用pickle更方便', 'filter': '类JSON数据,用于过滤任务', 'status': 'waiting' | 'running' | 'done', 'res': 'pickle序列化的任务结果' }
任务管理器假设您只会更新已获取的任务。 MongoDB的获取和更新操作确保数据更新安全。
此类可作为命令行工具使用。以下是几个示例: 查看manage模块帮助的命令: python -m qlib.workflow.task.manage -h # 显示manage模块CLI手册 python -m qlib.workflow.task.manage wait -h # 显示wait命令手册
python -m qlib.workflow.task.manage -t <pool_name> wait python -m qlib.workflow.task.manage -t <pool_name> task_stat
备注
假设:MongoDB中的数据会被编码,取出的数据会被解码
四种状态说明:
STATUS_WAITING: 等待训练
STATUS_RUNNING: 训练中
STATUS_PART_DONE: 已完成部分步骤,等待下一步
STATUS_DONE: 全部工作完成
- __init__(task_pool: str)
Init Task Manager, remember to make the statement of MongoDB url and database name firstly. A TaskManager instance serves a specific task pool. The static method of this module serves the whole MongoDB.
- 参数:
task_pool (str) -- the name of Collection in MongoDB
- static list() list
列出数据库中所有集合(任务池)。
- 返回:
list
- replace_task(task, new_task)
Use a new task to replace a old one
- 参数:
task -- old task
new_task -- new task
- insert_task(task)
Insert a task.
- 参数:
task -- the task waiting for insert
- 返回:
pymongo.results.InsertOneResult
- insert_task_def(task_def)
Insert a task to task_pool
- 参数:
task_def (dict) -- the task definition
- 返回类型:
pymongo.results.InsertOneResult
- create_task(task_def_l, dry_run=False, print_nt=False) List[str]
如果task_def_l中的任务是新的,则插入新任务到任务池并记录inserted_id。 如果任务已存在,则只查询其_id。
参数
- task_def_l: list
任务列表
- dry_run: bool
是否实际插入新任务到任务池
- print_nt: bool
是否打印新任务
返回
- List[str]
task_def_l中各任务的_id列表
- fetch_task(query={}, status='waiting') dict
使用查询获取任务。
- 参数:
query (dict, optional): 查询字典,默认为{} status (str, optional): 任务状态,默认为STATUS_WAITING
- 返回:
dict: 解码后的任务(集合中的文档)
- safe_fetch_task(query={}, status='waiting')
使用contextmanager从任务池中获取任务
参数
- query: dict
查询字典
返回
dict: 解码后的任务(集合中的文档)
- query(query={}, decode=True)
查询集合中的任务。 如果迭代生成器耗时过长,此函数可能抛出异常`pymongo.errors.CursorNotFound: cursor id not found`
- 示例:
python -m qlib.workflow.task.manage -t <your task pool> query '{"_id": "615498be837d0053acbc5d58"}'
参数
- query: dict
查询字典
- decode: bool
是否解码结果
返回
dict: 解码后的任务(集合中的文档)
- re_query(_id) dict
使用_id查询任务。
- 参数:
_id (str): 文档的_id
- 返回:
dict: 解码后的任务(集合中的文档)
- commit_task_res(task, res, status='done')
提交结果到task['res']。
- 参数:
task ([type]): 任务 res (object): 要保存的结果 status (str, optional): STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为STATUS_DONE。
- return_task(task, status='waiting')
Return a task to status. Always using in error handling.
- 参数:
task ([type]) -- [description]
status (str, optional) -- STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE. Defaults to STATUS_WAITING.
- remove(query={})
Remove the task using query
- 参数:
query (dict) -- the dict of query
- task_stat(query={}) dict
Count the tasks in every status.
- 参数:
query (dict, optional) -- the query dict. Defaults to {}.
- 返回:
dict
- reset_waiting(query={})
将所有运行中的任务重置为等待状态。可用于某些任务意外退出的情况。
- 参数:
query (dict, optional): 查询字典,默认为{}
- prioritize(task, priority: int)
Set priority for task
- 参数:
task (dict) -- The task query from the database
priority (int) -- the target priority
- wait(query={})
在多进程环境下,主进程可能因为仍有任务在运行而无法从TaskManager获取任务。 因此主进程应等待其他进程或机器完成所有任务。
- 参数:
query (dict, optional): 查询字典,默认为{}
任务管理器
的更多信息见 此处。
任务训练
生成并存储这些 任务
后,就可以运行处于 WAITING*(等待)状态的 ``任务`` 了。
``Qlib`` 提供了 ``run_task`` 方法来运行任务池中的 ``任务``,不过用户也可以自定义任务的执行方式。
获取 ``task_func``(任务函数)的简单方法是直接使用 ``qlib.model.trainer.task_train``。
它将运行由 ``任务`` 定义的整个工作流,包括 *模型、数据集、记录器。
- qlib.workflow.task.manage.run_task(task_func: Callable, task_pool: str, query: dict = {}, force_release: bool = False, before_status: str = 'waiting', after_status: str = 'done', **kwargs)
当任务池不为空(有WAITING状态任务)时,使用task_func获取并运行任务池中的任务
运行此方法后,有以下4种情况(before_status -> after_status):
STATUS_WAITING -> STATUS_DONE: 使用task["def"]作为`task_func`参数,表示任务尚未开始
STATUS_WAITING -> STATUS_PART_DONE: 使用task["def"]作为`task_func`参数
STATUS_PART_DONE -> STATUS_PART_DONE: use task["res"] as task_func param, it means that the task has been started but not completed
STATUS_PART_DONE -> STATUS_DONE: use task["res"] as task_func param
- 参数:
task_func (Callable) --
def (task_def, **kwargs) -> <res which will be committed>
the function to run the task
task_pool (str) -- the name of the task pool (Collection in MongoDB)
query (dict) -- will use this dict to query task_pool when fetching task
force_release (bool) -- will the program force to release the resource
before_status (str:) -- the tasks in before_status will be fetched and trained. Can be STATUS_WAITING, STATUS_PART_DONE.
after_status (str:) -- the tasks after trained will become after_status. Can be STATUS_WAITING, STATUS_PART_DONE.
kwargs -- the params for task_func
同时,Qlib
提供了一个名为 ``Trainer``(训练器)的模块。
- class qlib.model.trainer.Trainer
训练器用于训练模型列表 Trainer和DelayTrainer的区别在于完成实际训练的时机不同
- __init__()
- train(tasks: list, *args, **kwargs) list
给定任务定义列表,开始训练并返回模型。
对于Trainer,此方法完成实际训练。 对于DelayTrainer,此方法仅做准备工作。
- 参数:
tasks (list): 任务定义列表
- 返回:
list: 模型列表
- 注意:
对于`Trainer`,此方法将直接训练模型
对于`DelayTrainer`,此方法仅做训练准备
- end_train(models: list, *args, **kwargs) list
给定模型列表,在训练结束时完成必要操作 模型可能是记录器、文本文件、数据库等
对于Trainer,该方法做一些收尾工作 对于DelayTrainer,该方法完成实际训练
- 参数:
models: 模型列表
- 返回:
list: 模型列表
- is_delay() bool
判断训练器是否会延迟完成`end_train`
- 返回:
bool: 是否为DelayTrainer
- has_worker() bool
判断是否启用了并行训练的后台工作器
返回
- bool:
工作器是否启用
- worker()
启动工作进程
- 异常:
NotImplementedError: 如果不支持工作进程
Trainer
会训练一系列任务并返回一系列模型记录器。
Qlib
提供两种训练器:TrainerR 是最简单的方式,而 TrainerRM 基于 TaskManager,可帮助自动管理任务生命周期。
如果不想使用 任务管理器
管理任务,使用 TrainerR 训练由 TaskGen
生成的任务列表即可。
不同 Trainer
的详细信息见 此处。
任务收集
收集模型训练结果前,需要使用 qlib.init
指定 mlruns 的路径。
为了收集训练后的 任务
结果,Qlib
提供了 Collector(收集器)、Group(分组器) 和 Ensemble(集成器),以可读、可扩展且松耦合的方式收集结果。
Collector 可以从任何地方收集对象并对其进行合并、分组、平均等处理。它包含两个步骤:``collect``(将任何内容收集到字典中)和 ``process_collect``(处理收集到的字典)。
Group 也有两个步骤:group``(可基于 `group_func` 对一组对象进行分组并转换为字典)和 ``reduce``(可基于某些规则将字典转换为集成结果)。
例如:{(A,B,C1): 对象, (A,B,C2): 对象} ---``group
---> {(A,B): {C1: 对象, C2: 对象}} ---reduce
---> {(A,B): 对象}
Ensemble 可以合并集成中的对象。
例如:{C1: 对象, C2: 对象} ---Ensemble
---> 对象。
你可以在 Collector
的 process_list 中设置所需的集成器。
常见的集成器包括 ``AverageEnsemble``(平均集成器)和 ``RollingEnsemble``(滚动集成器)。平均集成器用于集成同一时间段内不同模型的结果,滚动集成器用于集成同一时间段内不同模型的结果。
因此,层次结构为:Collector
的第二步对应 Group
,而 Group
的第二步对应 Ensemble
。