摘要:在数据分析和数据处理领域中,高效的工作流编排管理至关重要。随着数据规模的不断扩大,和数据上下游依赖的不断复杂化,包括数据分析任务的有序执行、资源的合理利用,以及错误的及时处理在内的问题,成为了开发者们面临的挑战。Prefect 作为一款 Python 工作流编
在数据分析和数据处理领域中,高效的工作流编排管理至关重要。随着数据规模的不断扩大,和数据上下游依赖的不断复杂化,包括数据分析任务的有序执行、资源的合理利用,以及错误的及时处理在内的问题,成为了开发者们面临的挑战。Prefect 作为一款 Python 工作流编排工具,为解决这些问题提供了一个优雅而强大的解决方案。
Prefect 是一个用于工作流编排和管理的框架,其的代码仓库位于 https://github.com/PrefectHQ/prefect 。Perfect 能构建动态且有弹性数据管道,只需几行代码就能完成任务调度、缓存、重试和事件驱动的自动化,并完成工作流活动的跟踪。
Prefect 具有以下显著的特色功能:
简单易用:提供了简洁直观的 API,使用 Python 的装饰器语法,让开发者可以轻松定义和管理工作流,通过 @flow 和 @task 装饰器,可以快速将普通的 Python 函数转化为可编排的任务和工作流高度可定制:支持多种调度方式,如 Cron 调度、间隔调度等,还可以根据不同的需求定制任务的重试策略、错误处理机制等丰富的集成:拥有众多的集成插件,可与各种第三方服务和工具无缝对接,如 AWS、Snowflake、GitHub、Bitbucket 等,方便开发者在不同的环境中使用可视化界面:提供了一个可视化的 UI 界面,让用户可以直观地监控和管理工作流的执行情况,实时查看任务的状态、日志等信息Perfect 要求 Python 3.9 或以上,使用 pip 完成安装:
pip install -U prefectPerfect 主要通过 @flow 和 @task 装饰器定义工作流的和任务,以下是一个简单的例子:
from prefect import flow, taskimport httpx@task(log_prints=True)def get_stars(repo: str): url = f"https://api.github.com/repos/{repo}" count = httpx.get(url).json["stargazers_count"] print(f"{repo} has {count} stars!")@flow(name="GitHub Stars")def github_stars(repos: list[str]): for repo in repos: get_stars(repo)# run the flow!if __name__ == "__main__": github_stars(["PrefectHQ/Prefect"])该例子中批量地进行 Github 仓库 Stars 数量的获取,首先使用 task 装饰器,创建一个从 HTTP API 获取仓库 Stars 数并进行打印的任务,然后使用 flow 装饰器,对输入的仓库列表,逐个进行 get_stars 任务的执行。最后,在 main 脚本中,启动 github_stars 工作流。
完成代码编写后,可以启动一个 Perfect 服务器,然后在浏览器中访问 http://localhost:4200,打开 Perfect 的 UI 面板,监控工作流的运行情况:
prefect server start以上代码中,工作流只会运行一次,Perfect 支持通过提供 cron 表达式实现定时的任务调度:
if __name__ == "__main__": github_stars.serve( name="first-deployment", cron="* * * * *", parameters={"repos": ["PrefectHQ/prefect"]} )我们还可以把 get_stars 任务进行拆解,拆分为 HTTP API 调用,和 Stars 数据获取两部分,并通过工作流把两个任务串联起来:
@flow(log_prints=True)def show_stars(github_repos: list[str]): """Flow: Show the number of stars that GitHub repos have""" for repo in github_repos: # Call Task 1 repo_stats = fetch_stats(repo) # Call Task 2 stars = get_stars(repo_stats) # Print the result print(f"{repo}: {stars} stars")@taskdef fetch_stats(github_repo: str): """Task 1: Fetch the statistics for a GitHub repo""" return httpx.get(f"https://api.github.com/repos/{github_repo}").json@taskdef get_stars(repo_stats: dict): """Task 2: Get the number of stars from GitHub repo statistics""" return repo_stats['stargazers_count']Prefect 作为一款功能强大的工作流编排工具,为开发者提供了一个高效、灵活的解决方案。它简单易用,使得开发者可以快速上手,无需复杂的配置和学习成本,同时可以与各种第三方服务和工具无缝对接,满足不同场景的需求。Prefect 还提供了可视化的 UI 界面,则让用户可以直观地监控和管理工作流的执行情况,提高工作效率。
Prefect 适用于各种需要工作流编排和管理的场景,如数据处理、机器学习、ETL 流程等,可以帮助开发者将数据采集、清洗、转换等任务进行编排,确保数据的准确和及时处理。还可以管理模型训练、评估、部署等流程,提高模型开发的效率和可重复性。
来源:每日开源代码