如何通过几行代码优化您的数据科学工作流程



动机
作为一名数据科学家,您为什么要关心优化您的数据科学工作流程?让我们从一个基础数据科学项目的例子开始。
想象一下,您正在使用 Iris 数据集。您从构建函数开始处理数据。
from typing import Any, Dict, Listimport pandas as pddef load_data(path: str) -> pd.DataFrame:    ...def get_classes(data: pd.DataFrame, target_col: str) -> List[str]:    """Task for getting the classes from the Iris data set."""    ...def encode_categorical_columns(data: pd.DataFrame, target_col: str) -> pd.DataFrame:    """Task for encoding the categorical columns in the Iris data set."""    ...def split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dict[str, Any]:    """Task for splitting the classical Iris data set into training and test    sets, each split into features and labels.    """    ...
定义函数后,执行它们。
# Define parameterstarget_col = 'species'test_data_ratio = 0.2# Run functionsdata = load_data(path="data/raw/iris.csv")categorical_columns = encode_categorical_columns(data=data, target_col=target_col)classes = get_classes(data=data, target_col=target_col)train_test_dict = split_data(data=categorical_columns,                            test_data_ratio=test_data_ratio,                            classes=classes)
您的代码运行良好,并且您发现输出没有任何问题,因此您认为工作流程已经足够好。但是,像上面这样的线性工作流程可能存在许多缺点。

缺点是:
如果函数get_classes出现错误,函数 encode_categorical_columns 产生的输出将会丢失,工作流需要从头开始。如果执行encode_categorical_columns需要很长时间,这可能会令人沮丧 。

由于函数 encode_categorical_columns 和 get_classes 不相互依赖,因此可以同时执行以节省时间:

以这种方式运行函数还可以防止在不起作用的函数上浪费不必要的时间。如果函数get_classes中出现错误,工作流将立即重新启动,而无需等待函数 encode_categorical_columns 完成。

现在,您可能同意我的观点,即优化不同功能的工作流程很重要。但是,手动管理工作流程可能需要做很多工作。
有没有一种方法可以通过仅添加几行代码来 自动优化工作流程? 这时候 Prefect 就派上用场了。
什么是 Prefect?
Prefect[1] 是一个开源框架,用于在 Python 中构建工作流。Prefect 可以轻松地大规模构建、运行和监控数据管道。
要安装 Prefect,请键入:
pip install prefect
使用 Prefect 构建您的工作流程
要了解 Prefect 的工作原理,让我们用 Prefect 封装文章开头的工作流程。
第一步——创建任务
Task 是 Prefect 流中的离散动作。首先使用装饰器将上面定义的函数转换为任务 prefect.task:
# 文件名:data-engineer.pyfrom prefect import taskfrom typing import Any, Dict, Listimport pandas as pd@taskdef load_data(path: str) -> pd.DataFrame:    ...@taskdef get_classes(data: pd.DataFrame, target_col: str) -> List[str]:    """Task for getting the classes from the Iris data set."""    ...@taskdef encode_categorical_columns(data: pd.DataFrame, target_col: str) -> pd.DataFrame:    """Task for encoding the categorical columns in the Iris data set."""    ...@taskdef split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dict[str, Any]:    """Task for splitting the classical Iris data set into training and test    sets, each split into features and labels.    """    ...
第二步——创建流程
Flow 通过管理任务之间的依赖关系来表示整个工作流程。要创建流,只需在 with Flow(...) 上下文管理器中插入代码以运行您的函数。
from prefect import task, Flowwith Flow("data-engineer") as flow:    # Define parameters    target_col = 'species'    test_data_ratio = 0.2    # Define tasks    data = load_data(path="data/raw/iris.csv")    classes = get_classes(data=data, target_col=target_col)    categorical_columns = encode_categorical_columns(data=data, target_col=target_col)    train_test_dict = split_data(data=categorical_columns,    test_data_ratio=test_data_ratio, classes=classes)
请注意,在运行上面的代码时,这些任务都不会执行。Prefect 允许您立即运行流程或安排稍后运行。
让我们尝试使用以下命令立即执行流程 flow.run():
with Flow("data-engineer") as flow:  # Define your flow here  ...flow.run()
运行上面的代码会给你类似这样的输出:
└── 15:49:46 | INFO    | Beginning Flow run for 'data-engineer'└── 15:49:46 | INFO    | Task 'target_col': Starting task run...└── 15:49:46 | INFO    | Task 'target_col': Finished task run for task with final state: 'Success'└── 15:49:46 | INFO    | Task 'test_data_ratio': Starting task run...└── 15:49:47 | INFO    | Task 'test_data_ratio': Finished task run for task with final state: 'Success'└── 15:49:47 | INFO    | Task 'load_data': Starting task run...└── 15:49:47 | INFO    | Task 'load_data': Finished task run for task with final state: 'Success'└── 15:49:47 | INFO    | Task 'encode_categorical_columns': Starting task run...└── 15:49:47 | INFO    | Task 'encode_categorical_columns': Finished task run for task with final state: 'Success'└── 15:49:47 | INFO    | Task 'get_classes': Starting task run...└── 15:49:47 | INFO    | Task 'get_classes': Finished task run for task with final state: 'Success'└── 15:49:47 | INFO    | Task 'split_data': Starting task run...└── 15:49:47 | INFO    | Task 'split_data': Finished task run for task with final state: 'Success'└── 15:49:47 | INFO    | Flow run SUCCESS: all reference tasks succeededFlow run succeeded!
要了解 Prefect 创建的工作流程,让我们可视化整个工作流程。
从安装开始 prefect[viz]:
pip install prefect[viz]
然后将方法添加 visualize 到代码中:
flow.visualize()
您应该会看到 data-engineer 如下所示的工作流程的可视化!

请注意,Prefect 会自动管理任务之间的执行顺序,以便优化工作流程。这对于一些额外的代码来说非常酷!
第三步——添加参数
如果您发现自己经常尝试使用一个变量的不同值,最好将该变量转换为 Parameter.
test_data_ratio = 0.2train_test_dict = split_data(data=categorical_columns,                            test_data_ratio=test_data_ratio,                            classes=classes)
您可以将 Parameter 视为 Task,但它可以在流程运行时接收用户输入。要将变量转换为参数,只需使用 task.Parameter.
from prefect import task, Flow, Parametertest_data_ratio = Parameter("test_data_ratio", default=0.2)train_test_dict = split_data(data=categorical_columns,                            test_data_ratio=test_data_ratio,                            classes=classes)
的第一个参数 Parameter 指定参数的名称。default 是一个可选参数,它指定参数的默认值。
再次运行 flow.visualize 将为我们提供如下输出:

您可以通过以下方式覆盖每次运行的默认参数:
将参数添加 parameters 到 flow.run():
flow.run(parameters={'test_data_ratio': 0.3})
或使用 Prefect CLI:
$ prefect run -p data_engineering.py --param test_data_ratio=0.2
或使用 JSON 文件:
$ prefect run -p data_engineering.py --param-file='params.json'
您的 JSON 文件应如下所示:
{"test_data_ratio": 0.3}
您还可以使用 Prefect Cloud 更改每次运行的参数,这将在下一节中介绍。
监控您的工作流程
概述
Prefect 还允许您在 Prefect Cloud 中监控您的工作流程。按照此说明[2]安装 Prefect Cloud 的相关依赖项。
在安装和设置所有依赖项之后,首先通过运行在 Prefect 上创建一个项目:
$ prefect create project "Iris Project"
接下来,启动本地代理以在单台机器上本地部署我们的流程:
$ prefect agent local start
然后加:
flow.register(project_name="Iris Project")
在文件的末尾。
运行该文件后,您应该会看到类似于以下内容的内容:
Flow URL: https://cloud.prefect.io/khuyentran1476-gmail-com-s-account/flow/dba26bea-8827-4db4-9988-3289f6cb662f └── ID: 2944dc36-cdac-4079-8497-be4ec5594785 └── Project: Iris Project └── Labels: ['khuyen-Precision-7740']
单击输出中的 URL,您将被重定向到概览页面。概述页面显示流程的版本、创建时间、流程的运行历史记录及其运行摘要。

您还可以查看其他运行的摘要、执行时间及其配置。

Perfect 自动跟踪这些重要信息的方式非常酷!
使用默认参数运行工作流
请注意,工作流已注册到 Prefect Cloud,但尚未执行。要使用默认参数执行工作流,请单击右上角的快速运行。

单击创建的运行。现在您将能够实时查看新流程运行的活动!

使用自定义参数运行工作流
要使用自定义参数运行工作流,请单击运行选项卡,然后更改输入下的参数。

当您对参数感到满意时,只需单击“运行”按钮即可开始运行。
查看工作流图
单击 Schematic 将为您提供整个工作流程的图表。

其他特性
除了上面提到的一些基本功能外,Prefect 还提供了一些其他很酷的功能,可以显着提高工作流程的效率。
输入缓存
还记得我们在文章开头提到的问题吗?通常,如果函数 get_classes 失败,函数encode_categorical_columns创建的数据将被丢弃,整个工作流程需要从头开始。
但是,使用 Prefect, 会存储 encode_categorical_columns 的输出。下次重新运行工作流时,下一个任务将使用encode_categorical_columns 的输出,而无需重新运行 encode_categorical_columns。

这也许会大大减少运行工作流所需的时间。
持续输出
有时,您可能希望将任务数据导出到外部位置。这可以通过向任务函数插入保存数据的代码来完成。
def split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dict[str, Any]:  X_train, X_test, y_train, y_test = ...  import pickle  pickle.save(...)
但是,这样做会使测试功能变得困难。
Prefect 可以通过以下方式轻松保存每次运行的任务输出:
将检查点设置为 True
$ export PREFECT__FLOWS__CHECKPOINTING=true
并添加 result = LocalResult(dir=...)) 到装饰器 @task 中。
from prefect.engine.results import LocalResult@task(result = LocalResult(dir='data/processed'))def split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dict[str, Any]:    """Task for splitting the classical Iris data set into training and test    sets, each split into features and labels.    """    X_train, X_test, y_train, y_test = ...    return dict(        train_x=X_train,        train_y=y_train,        test_x=X_test,        test_y=y_test,
现在任务split_data的输出将被保存到data/processed目录中!该名称将类似于以下内容:
prefect-result-2021-11-06t15-37-29-605869-00-00
如果要自定义文件名,可以将参数添加 target 到 @task:
from prefect.engine.results import LocalResult@task(target="{date:%a_%b_%d_%Y_%H:%M:%S}/{task_name}_output",      result = LocalResult(dir='data/processed'))def split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dict[str, Any]:    """Task for splitting the classical Iris data set into training and test    sets, each split into features and labels.    """    ...
Prefect 还提供了其他 Result 类,例如 GCSResult 和 S3Result。您可以在此处[3]查看 API 文档以获取结果。
将另一个流的输出用于当前流
如果您正在使用多个流,例如 data-engineer 流和 data-science 流,您可能希望将流的输出 data-engineer 用于 data-science 流。

将流的输出保存 data-engineer 为文件后,您可以使用以下 read 方法读取该文件:
from prefect.engine.results import LocalResulttrain_test_dict = LocalResult(dir=...).read(location=...).value
连接依赖流
想象一下这种情况:您创建了两个相互依赖的流。流程 data-engineer 需要在流程 data-science 之前执行 
看过您的工作流程的人不了解这两个流程之间的关系。结果他们同时执行流程 data-science 和流程 data-engineer,遇到了错误!

为了防止这种情况发生,我们应该指定流之间的关系。幸运的是,Prefect 让我们更容易做到这一点。
首先使用 StartFlowRun.  添加 wait=True 到参数,以便仅在上游流程完成执行后执行下游流程。
from prefect import Flowfrom prefect.tasks.prefect import StartFlowRundata_engineering_flow = StartFlowRun(flow_name="data-engineer",                                    project_name='Iris Project',                                    wait=True)data_science_flow = StartFlowRun(flow_name="data-science",                                project_name='Iris Project',                                wait=True)
接下来,data_science_flow 在 with Flow(...) 上下文管理器下调用。upstream_tasks用于指定将在执行 data-science 流程之前执行的任务/流程。
with Flow("main-flow") as flow:    result = data_science_flow(upstream_tasks=[data_engineering_flow])flow.run()
现在两个流连接如下:

很酷!
安排您的流程
Prefect 还可以无缝地在某个时间或某个时间间隔执行流程。
例如,要每 1 分钟运行一次流,您可以启动类 IntervalSchedule 并添加 schedule 到 with Flow(...) 上下文管理器:
from prefect.schedules import IntervalScheduleschedule = IntervalSchedule(    start_date=datetime.utcnow() + timedelta(seconds=1),    interval=timedelta(minutes=1),)data_engineering_flow = ...data_science_flow = ...with Flow("main-flow", schedule=schedule) as flow:    data_science = data_science_flow(upstream_tasks=[data_engineering_flow])
现在您的流程将每 1 分钟重新运行一次!
在此处[4]了解有关安排流程的不同方法的更多信息。
日志记录
log_stdout=True 您可以通过简单地添加以下内容来记录任务中的打印语句 @task:
@task(log_stdout=True)def report_accuracy(predictions: np.ndarray, test_y: pd.DataFrame) -> None:  target = ...  accuracy = ...  print(f"Model accuracy on test set: {round(accuracy * 100, 2)}")
执行任务时,您应该会看到如下输出:
[2021-11-06 11:41:16-0500] 信息 - prefect.TaskRunner | 测试集上的模型精度:93.33
结论
恭喜!您刚刚了解了 Prefect 如何通过几行 Python 代码优化您的数据科学工作流程。从长远来看,代码中的小幅优化可以大大提高效率。
原文:https://towardsdatascience.com/orchestrate-a-data-science-project-in-python-with-prefect-e69c61a49074
完整的源代码:https://github.com/khuyentran1401/Data-science/tree/master/data_science_tools/prefect_example
参考资料
[1]
Prefect: https://www.prefect.io/[2]
此说明: https://docs.prefect.io/orchestration/getting-started/set-up.html#server-or-cloud[3]
您可以在此处: https://docs.prefect.io/api/latest/engine/results.html[4]
在此处: https://docs.prefect.io/core/concepts/schedules.html#overview
到顶部