当前,可以通过 CLI(命令行接口)提交 PyFlink 作业。无需指定 Jar 文件路径或入口主类,这与 Java 作业提交不同。
当通过
flink run
提交 Python 作业时,Flink 将运行命令”python“。请运行如下命令确认当前环境中的 Python 可执行程序指向受支持的 Python 版本(3.7+)。
xxxxxxxxxx
$ python --version
# 这里打印的版本必须是 3.7+
以下命令显示不同的 PyFlink 作业提交用例:
运行 PyFlink 作业:
xxxxxxxxxx
$ ./bin/flink run --python examples/python/table/word_count.py
使用额外的源和资源文件运行 PyFlink 作业。--pyFiles
中指定的文件将被添加到 PYTHONPATH,因此可以在 Python 代码中使用。
xxxxxxxxxx
$ ./bin/flink run \
--python examples/python/table/word_count.py \
--pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt
运行引用 Java UDF 或外部连接器的 PyFlink 作业。在 --jarfile
指定的文件将被上传到集群。
xxxxxxxxxx
$ ./bin/flink run \
--python examples/python/table/word_count.py \
--jarfile <jarFile>
运行拥有 pyFiles 和在 --pyModules
中指定的主入口模块的 PyFlink 作业:
xxxxxxxxxx
$ ./bin/flink run \
--pyModule table.word_count \
--pyFiles examples/python/table
在运行在主机 <jobmanagerHost>上的特定 JobManger 上提交 PyFlink 作业(相应地调整命令)。
xxxxxxxxxx
$ ./bin/flink run \
--jobmanager <jobmanagerHost>:8081 \
--python examples/python/table/word_count.py
使用 YARN 集群以 Per-Job 模式运行 PyFlink 作业:
xxxxxxxxxx
$ ./bin/flink run \
--target yarn-per-job
--python examples/python/table/word_count.py
使用 YARN 机群以 Application 模式运行 PyFlink 作业:
xxxxxxxxxx
$ ./bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=1024m \
-Dyarn.application.name=<ApplicationName> \
-Dyarn.ship-files=/path/to/shipfiles \
-pyarch shipfiles/venv.zip \
-pyclientexec venv.zip/venv/bin/python3 \
-pyexec venv.zip/venv/bin/python3 \
-pyfs shipfiles \
-pym word_count
注意:假定执行作业所需的 Python 依赖已被放在目录 /path/to/shipfiles
。比如,对于上述示例,它应该包含 venv.zip 和 word_count.py。
注意:因为在 YARN application 模式中,在 JobManager 上执行 Job,所以在 -pyarch
和 -pyfs
中指定的路径相对于 shipfiles
(附带文件的目录名)。建议使用 -pym
而不是 -py
指定程序入口点,因为不可能知道入口点程序的绝对路径或相对路径。
注意:通过 -pyarch
指定的归档文件将通过 Blob 服务被分发到 TaskManager,文件大小限制是 2GB。如果归档文件的大小超过 2GB,那么可以将它上传到分布式文件系统,然后在命令行选项 -pyarch
中使用路径。
在拥有 Cluster ID <ClusterId> 的原生 Kubernetes 集群上运行 PyFlink 应用,需要安装了 PyFlink 的 Docker 镜像,请参阅 Enabling PyFlink in docker:
xxxxxxxxxx
$ ./bin/flink run-application \
--target kubernetes-application \
--parallelism 8 \
-Dkubernetes.cluster-id=<ClusterId> \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image.ref=<PyFlinkImageName> \
--pyModule word_count \
--pyFiles /opt/flink/examples/python/table/word_count.py
如果想了解更多可用选项,请参阅 Resource Provider 部分中详细描述的 Kubernetes 和 YARN。
除上面提及的 --pyFiles
、--pyModule
和 --python
外,还有其它 Python 相关的选项。以下是 Flink CLI 工具支持的 run
和 run-application
操作的所有 Python 相关选项的概述:
选项 | 描述 |
---|---|
-py、--python | 带有程序入口点的 Python 脚本。可以用 --pyFiles 选项配置依赖资源。 |
-pym、--pyModule | 带有程序入口点的 Python 模块。该选项必须与 --pyFiles 一起使用。 |
-pyfs、--pyFiles | 为作业附加自定义文件。支持 .py/.egg/.zip/.whl 之类的标准资源文件后缀或目录。这些文件将被添加到本地客户端和远程 Python UDF Worker 的 PYTHONPATH 中。以 .zip 为后缀的文件将被解压缩,并且被添加到 PYTHONPATH。逗号(“,”)可以用作指定多个文件的分隔符(比如,--pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)。 |
-pyarch、--pyArchives | 为作业添加 Python 归档文件。归档文件将被解压缩到 Python UDF Worker 的工作目录。可以为每个归档文件指定目标目录。如果指定目标目录名称,那么归档文件将被解压缩到具有指定名称的目录。否则,归档文件将被解压缩到与归档文件同名的目录中。通过该选项上传的文件可以通过相对路径访问。可以使用 “#” 作为归档文件路径和目标目录名称的分隔符。可以使用 “,” 作为指定多个归档文件的分隔符。可以使用该选项上传 Python UDF 中使用的虚拟环境和数据文件(比如,--pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutable py37.zip/py37/bin/python)。在 Python UDF 中可以访问数据文件,比如:f = open('data/data.txt', 'r')。 |
-pyclientexec、--pyClientExecutable | 当通过 “flink run” 提交 Python 作业或编译包含 Python UDF 的 Java/Scala 作业时,用于发起 Python 进程的 Python 解释器的路径(比如,--pyArchives file:///tmp/py37.zip --pyClientExecutable py37.zip/py37/python)。 |
-pyexec、--pyExecutable | 指定用于执行 Python UDF Worker 的 Python 解释器的路径(比如,--pyExecutable /usr/local/bin/python3)。Python UDF Worker 依赖 Python 3.7+、Apache Beam(版本 == 2.43.0)、Pip(版本 >= 20.3)和 SetupTools(版本 >= 37.0.0)。请确保指定的环境满足上述要求。 |
-pyreq、--pyRequirements | 指定定义第三方依赖的 requirements.txt 文件。这些依赖将被安装,并且被添加到 Python UDF Worker 的 PYTHONPATH。可选地指定包含这些依赖的安装包的目录。如果可选参数存在,那么使用“#”作为分隔符(--pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir)。 |
除在提交作业期间的命令行选项外,还支持通过代码中的配置或 Python API 指定依赖项。详细信息请参考依赖项管理。
在 Python API 程序内,存在使用依赖的需求。比如用户可能需要在 Python UDF 中使用第三方 Python 库。此外,在机器学习预测等场景中,用户可能希望在 Python UDF 中加载机器学习模型。
当本地执行 PyFlink 作业时,用户可以将第三方 Python 库安装到本地 Python 环境中,将机器学习模型下载到本地等。但是,当用户希望将 PyFlink 作业提交到远程集群时,这种方法不太好用。下面将介绍 PyFlink 为这些需求提供的选项。
注意:Python DataStream API 和 Python Table API 都为每种依赖提供 API。如果在单个作业中混合使用 Python DataStream API 和 Python Table API,那么应该通过 Python DataStream API 指定依赖项,以使它们同时适用于 Python DataStream API 和 Python Table API。
如果使用第三方 Jar,那么可以在 Python Table API 中指定 Jar,如下所示:
xxxxxxxxxx
# Specify a list of jar URLs via "pipeline.jars". The jars are separated by ";"
# and will be uploaded to the cluster.
# NOTE: Only local file URLs (start with "file://") are supported.
table_env.get_config().set("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
# It looks like the following on Windows:
table_env.get_config().set("pipeline.jars", "file:///E:/my/jar/path/connector.jar;file:///E:/my/jar/path/udf.jar")
# Specify a list of URLs via "pipeline.classpaths". The URLs are separated by ";"
# and will be added to the classpath during job execution.
# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the URLs are accessible on both the client and the cluster.
table_env.get_config().set("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
或在 Python DataStream API 中,如下所示:
xxxxxxxxxx
# Use the add_jars() to add local jars and the jars will be uploaded to the cluster.
# NOTE: Only local file URLs (start with "file://") are supported.
stream_execution_environment.add_jars("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")
# It looks like the following on Windows:
stream_execution_environment.add_jars("file:///E:/my/jar/path/connector1.jar", "file:///E:/my/jar/path/connector2.jar")
# Use the add_classpaths() to add the dependent jars URLs into the classpath.
# The URLs will also be added to the classpath of both the client and the cluster.
# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the
# URLs are accessible on both the client and the cluster.
stream_execution_environment.add_classpaths("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")
或者在提交作业时,通过命令行参数 --jarfile
。
注意:使用命令行参数 --jarfile
仅支持指定一个 Jar 文件,因此如果有多个 Jar 文件,需要构建一个胖 Jar。
可以在 Python UDF 中使用第三方 Python 库。可以通过多种方式指定 Python 库。
可以使用 Python Table API 在代码内部指定它们,如下所示:
xxxxxxxxxx
table_env.add_python_file(file_path)
或使用 Python DataStream API,如下所示:
xxxxxxxxxx
stream_execution_environment.add_python_file(file_path)
也可以使用配置 python.files
,或者在提交作业时,通过命令行参数 --pyfs
或 --pyFiles
指定 Python 库。
注意:Python 库可能是本地文件或本地目录。它们将被添加到 Python UDF Worker 的 PYTHONPATH 中。
也可以指定定义第三方 Python 依赖的 requirements.txt
。这些 Python 依赖将被安装到 Python UDF Worker 的工作目录,并且被添加到 PYTHONPATH。
可以手动准备 requirements.txt
,如下所示:
xxxxxxxxxx
echo numpy==1.16.5 >> requirements.txt
echo pandas==1.0.0 >> requirements.txt
或者使用 pip freeze
列出当前 Python 环境中安装的所有包:
xxxxxxxxxx
pip freeze > requirements.txt
requirements.txt 文件的内容可能如下所示:
xxxxxxxxxx
numpy==1.16.5
pandas==1.0.0
可以手动编辑该文件,移除不必要的条目,或者添加额外的条目等。
可以使用 Python Table API 在代码内部指定 requirements.txt
文件,如下所示:
xxxxxxxxxx
# requirements_cache_dir is optional
table_env.set_python_requirements(
requirements_file_path="/path/to/requirements.txt",
requirements_cache_dir="cached_dir")
或者使用 Python DataStream API,如下所示:
xxxxxxxxxx
# requirements_cache_dir is optional
stream_execution_environment.set_python_requirements(
requirements_file_path="/path/to/requirements.txt",
requirements_cache_dir="cached_dir")
注意:对于不能在集群内访问的依赖,使用参数 requirements_cached_dir
指定包含这些依赖的安装包的目录。它将被上传到集群,以支持离线安装。可以这样准备 requirements_cache_dir
:
xxxxxxxxxx
pip download -d cached_dir -r requirements.txt --no-binary :all:
注意:请确保准备的包匹配集群的平台,以及使用的 Python 版本。
还可以使用配置 python.requirements
,或者在提交作业时,通过命令行参数 -pyreq
或 --pyRequirements
指定 requirements.txt
。
注意:将使用 pip 安装 requirements.txt
文件中指定的包,因此请确保 pip(版本 >= 20.3)和 setuptools(版本 >= 37.0.0)可用。
可以使用归档文件指定自定义 Python 虚拟环境、数据文件等。可以使用 Python Table API 在代码内部指定归档文件,如下所示:
xxxxxxxxxx
table_env.add_python_archive(archive_path="/path/to/archive_file", target_dir=None)
或者使用 Python DataStream API,如下所示:
xxxxxxxxxx
stream_execution_environment.add_python_archive(archive_path="/path/to/archive_file", target_dir=None)
注意:参数 target_dir
是可选的。如果指定,那么在执行期间,归档文件将被解压缩到名为 target_dir
的目录中。否则,归档文件将被解压缩到与归档文件同名的目录中。
假设按照如下方式指定归档文件:
xxxxxxxxxx
table_env.add_python_archive("/path/to/py_env.zip", "myenv")
然后,可以在 Python UDF 中访问归档文件的内容,如下所示:
xxxxxxxxxx
def my_udf():
with open("myenv/py_env/data/data.txt") as f:
...
如果未指定参数 target_dir
:
xxxxxxxxxx
table_env.add_python_archive("/path/to/py_env.zip")
可以在 Python UDF 中访问归档文件的内容,如下所示:
xxxxxxxxxx
def my_udf():
with open("py_env.zip/py_env/data/data.txt") as f:
...
注意:归档文件将被解压缩到 Python UDF Worker 的工作目录,因此可以使用相对路径访问归档文件内部的文件。
还可以使用配置 python.archives
,或者在提交作业时,通过命令行参数 -pyarch
或 --pyArchives
指定归档文件。
注意:如果归档文件包含 Python 虚拟环境,请确保 Python 虚拟环境与运行集群的平台匹配。
注意:当前仅支持 zip 文件(即 zip、jar、whl、egg 等)和 tar 文件(即 tar、tar.gz、tgz)。
支持指定执行 Python Worker 的 Python 解释器的路径。
可以使用 Python Table API 在代码内部指定 Python 解释器,如下所示:
xxxxxxxxxx
table_env.get_config().set_python_executable("/path/to/python")
或使用 Python DataStream API,如下所示:
xxxxxxxxxx
stream_execution_environment.set_python_executable("/path/to/python")
还支持使用归档文件内部的 Python 解释器:
xxxxxxxxxx
# Python Table API
table_env.add_python_archive("/path/to/py_env.zip", "venv")
table_env.get_config().set_python_executable("venv/py_env/bin/python")
# Python DataStream API
stream_execution_environment.add_python_archive("/path/to/py_env.zip", "venv")
stream_execution_environment.set_python_executable("venv/py_env/bin/python")
还可以使用配置 python.executable
,或者在提交作业时,通过命令行参数 -pyexec
或 --pyExecutable
指定 Python 解释器。
注意:如果 Python 解释器的路径指向 Python 归档文件,那么使用相对路径代替绝对路径。
客户端侧需要 Python,以在编译作业期间,解析 Python UDF。
可以通过在当前会话中激活它的方式,指定在客户端侧使用的自定义 Python 解释器。
xxxxxxxxxx
source my_env/bin/activate
或者使用配置 python.client.executable
、命令行参数 -pyclientexec
或 --pyClientExecutable
、环境变量 PYFLINK_CLIENT_EXECUTABLE 指定它。
在 Java Table API 程序或纯 SQL 程序中支持使用 Python UDF。
下面是关于如何在 Java Table API 程序中使用 Python UDF 的简单示例:
xxxxxxxxxx
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
TableEnvironment tEnv = TableEnvironment.create(
EnvironmentSettings.inBatchMode());
tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM, 1);
// register the Python UDF
tEnv.executeSql("create temporary system function add_one as 'add_one.add_one' language python");
tEnv.createTemporaryView("source", tEnv.fromValues(1L, 2L, 3L).as("a"));
// use Python UDF in the Java Table API program
tEnv.executeSql("select add_one(a) as a from source").collect();
关于如何使用 SQL 语句创建 Python UDF 的更多细节,可以参考关于 CREATE FUNCTION 的 SQL 语句。
然后可以通过 Python 配置选项,指定 Python 依赖,比如 python.archives、python.files、python.requirements、python.client.executable、python.executable 等,或者在提交作业时,通过命令行参数。