1. 提交 PyFlink 作业

当前,可以通过 CLI(命令行接口)提交 PyFlink 作业。无需指定 Jar 文件路径或入口主类,这与 Java 作业提交不同。

当通过 flink run 提交 Python 作业时,Flink 将运行命令”python“。请运行如下命令确认当前环境中的 Python 可执行程序指向受支持的 Python 版本(3.7+)。

以下命令显示不同的 PyFlink 作业提交用例:

如果想了解更多可用选项,请参阅 Resource Provider 部分中详细描述的 KubernetesYARN

除上面提及的 --pyFiles--pyModule--python 外,还有其它 Python 相关的选项。以下是 Flink CLI 工具支持的 runrun-application 操作的所有 Python 相关选项的概述:

选项描述
-py、--python带有程序入口点的 Python 脚本。可以用 --pyFiles 选项配置依赖资源。
-pym、--pyModule带有程序入口点的 Python 模块。该选项必须与 --pyFiles 一起使用。
-pyfs、--pyFiles为作业附加自定义文件。支持 .py/.egg/.zip/.whl 之类的标准资源文件后缀或目录。这些文件将被添加到本地客户端和远程 Python UDF Worker 的 PYTHONPATH 中。以 .zip 为后缀的文件将被解压缩,并且被添加到 PYTHONPATH。逗号(“,”)可以用作指定多个文件的分隔符(比如,--pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)。
-pyarch、--pyArchives为作业添加 Python 归档文件。归档文件将被解压缩到 Python UDF Worker 的工作目录。可以为每个归档文件指定目标目录。如果指定目标目录名称,那么归档文件将被解压缩到具有指定名称的目录。否则,归档文件将被解压缩到与归档文件同名的目录中。通过该选项上传的文件可以通过相对路径访问。可以使用 “#” 作为归档文件路径和目标目录名称的分隔符。可以使用 “,” 作为指定多个归档文件的分隔符。可以使用该选项上传 Python UDF 中使用的虚拟环境和数据文件(比如,--pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutable py37.zip/py37/bin/python)。在 Python UDF 中可以访问数据文件,比如:f = open('data/data.txt', 'r')。
-pyclientexec、--pyClientExecutable当通过 “flink run” 提交 Python 作业或编译包含 Python UDF 的 Java/Scala 作业时,用于发起 Python 进程的 Python 解释器的路径(比如,--pyArchives file:///tmp/py37.zip --pyClientExecutable py37.zip/py37/python)。
-pyexec、--pyExecutable指定用于执行 Python UDF Worker 的 Python 解释器的路径(比如,--pyExecutable /usr/local/bin/python3)。Python UDF Worker 依赖 Python 3.7+、Apache Beam(版本 == 2.43.0)、Pip(版本 >= 20.3)和 SetupTools(版本 >= 37.0.0)。请确保指定的环境满足上述要求。
-pyreq、--pyRequirements指定定义第三方依赖的 requirements.txt 文件。这些依赖将被安装,并且被添加到 Python UDF Worker 的 PYTHONPATH。可选地指定包含这些依赖的安装包的目录。如果可选参数存在,那么使用“#”作为分隔符(--pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir)。

除在提交作业期间的命令行选项外,还支持通过代码中的配置或 Python API 指定依赖项。详细信息请参考依赖项管理。


2. 依赖管理

在 Python API 程序内,存在使用依赖的需求。比如用户可能需要在 Python UDF 中使用第三方 Python 库。此外,在机器学习预测等场景中,用户可能希望在 Python UDF 中加载机器学习模型。

当本地执行 PyFlink 作业时,用户可以将第三方 Python 库安装到本地 Python 环境中,将机器学习模型下载到本地等。但是,当用户希望将 PyFlink 作业提交到远程集群时,这种方法不太好用。下面将介绍 PyFlink 为这些需求提供的选项。

注意:Python DataStream API 和 Python Table API 都为每种依赖提供 API。如果在单个作业中混合使用 Python DataStream API 和 Python Table API,那么应该通过 Python DataStream API 指定依赖项,以使它们同时适用于 Python DataStream API 和 Python Table API。

2.1. Jar 依赖

如果使用第三方 Jar,那么可以在 Python Table API 中指定 Jar,如下所示:

或在 Python DataStream API 中,如下所示:

或者在提交作业时,通过命令行参数 --jarfile

注意:使用命令行参数 --jarfile 仅支持指定一个 Jar 文件,因此如果有多个 Jar 文件,需要构建一个胖 Jar。

2.2. Python 依赖

2.2.1. Python 库

可以在 Python UDF 中使用第三方 Python 库。可以通过多种方式指定 Python 库。

可以使用 Python Table API 在代码内部指定它们,如下所示:

或使用 Python DataStream API,如下所示:

也可以使用配置 python.files,或者在提交作业时,通过命令行参数 --pyfs--pyFiles 指定 Python 库。

注意:Python 库可能是本地文件或本地目录。它们将被添加到 Python UDF Worker 的 PYTHONPATH 中。

2.2.2. requirements.txt

也可以指定定义第三方 Python 依赖的 requirements.txt。这些 Python 依赖将被安装到 Python UDF Worker 的工作目录,并且被添加到 PYTHONPATH。

可以手动准备 requirements.txt,如下所示:

或者使用 pip freeze 列出当前 Python 环境中安装的所有包:

requirements.txt 文件的内容可能如下所示:

可以手动编辑该文件,移除不必要的条目,或者添加额外的条目等。

可以使用 Python Table API 在代码内部指定 requirements.txt 文件,如下所示:

或者使用 Python DataStream API,如下所示:

注意:对于不能在集群内访问的依赖,使用参数 requirements_cached_dir 指定包含这些依赖的安装包的目录。它将被上传到集群,以支持离线安装。可以这样准备 requirements_cache_dir

注意:请确保准备的包匹配集群的平台,以及使用的 Python 版本。

还可以使用配置 python.requirements,或者在提交作业时,通过命令行参数 -pyreq--pyRequirements 指定 requirements.txt

注意:将使用 pip 安装 requirements.txt 文件中指定的包,因此请确保 pip(版本 >= 20.3)和 setuptools(版本 >= 37.0.0)可用。

2.2.3. 归档文件

可以使用归档文件指定自定义 Python 虚拟环境、数据文件等。可以使用 Python Table API 在代码内部指定归档文件,如下所示:

或者使用 Python DataStream API,如下所示:

注意:参数 target_dir 是可选的。如果指定,那么在执行期间,归档文件将被解压缩到名为 target_dir 的目录中。否则,归档文件将被解压缩到与归档文件同名的目录中。

假设按照如下方式指定归档文件:

然后,可以在 Python UDF 中访问归档文件的内容,如下所示:

如果未指定参数 target_dir

可以在 Python UDF 中访问归档文件的内容,如下所示:

注意:归档文件将被解压缩到 Python UDF Worker 的工作目录,因此可以使用相对路径访问归档文件内部的文件。

还可以使用配置 python.archives,或者在提交作业时,通过命令行参数 -pyarch--pyArchives 指定归档文件。

注意:如果归档文件包含 Python 虚拟环境,请确保 Python 虚拟环境与运行集群的平台匹配。

注意:当前仅支持 zip 文件(即 zip、jar、whl、egg 等)和 tar 文件(即 tar、tar.gz、tgz)。

2.2.4. Python 解释器

支持指定执行 Python Worker 的 Python 解释器的路径。

可以使用 Python Table API 在代码内部指定 Python 解释器,如下所示:

或使用 Python DataStream API,如下所示:

还支持使用归档文件内部的 Python 解释器:

还可以使用配置 python.executable,或者在提交作业时,通过命令行参数 -pyexec--pyExecutable 指定 Python 解释器。

注意:如果 Python 解释器的路径指向 Python 归档文件,那么使用相对路径代替绝对路径。

2.2.5. 客户端的 Python 解释器

客户端侧需要 Python,以在编译作业期间,解析 Python UDF。

可以通过在当前会话中激活它的方式,指定在客户端侧使用的自定义 Python 解释器。

或者使用配置 python.client.executable、命令行参数 -pyclientexec--pyClientExecutable、环境变量 PYFLINK_CLIENT_EXECUTABLE 指定它。

2.3. 如何在 Java/Scala 程序中指定 Python 依赖

在 Java Table API 程序或纯 SQL 程序中支持使用 Python UDF。

下面是关于如何在 Java Table API 程序中使用 Python UDF 的简单示例:

关于如何使用 SQL 语句创建 Python UDF 的更多细节,可以参考关于 CREATE FUNCTION 的 SQL 语句。

然后可以通过 Python 配置选项,指定 Python 依赖,比如 python.archivespython.filespython.requirementspython.client.executablepython.executable 等,或者在提交作业时,通过命令行参数。