Table API 和 SQL 集成在一个联合 API 中。该 API 的核心概念是被用作查询的输入和输出的 Table
。本文档展示使用 Table API 和 SQL 查询的程序的常见结构,如何注册 Table
,如何查询 Table
,以及如何发出 Table
。
如下代码示例展示 Table API 和 SQL 程序的常见结构。
xfrom pyflink.table import *
# Create a TableEnvironment for batch or streaming execution
table_env = ... # see "Create a TableEnvironment" section
# Create a source table
table_env.executeSql("""CREATE TEMPORARY TABLE SourceTable (
f0 STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100'
)
""")
# Create a sink table
table_env.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable (EXCLUDING OPTIONS) ")
# Create a Table from a Table API query
table1 = table_env.from_path("SourceTable").select(...)
# Create a Table from a SQL query
table2 = table_env.sql_query("SELECT ... FROM SourceTable ...")
# Emit a Table API result Table to a TableSink, same for SQL result
table_result = table1.execute_insert("SinkTable")
Table API 和 SQL 查询可以很容易地集成、嵌入到 DataStream 程序中。请查看 DataStream API 集成页面,了解如何将 DataStream 转换为 Table,反之亦然。
TableEnvironment
是 Table API 和 SQL 集成的入口点,负责:
Table
DataStream
和 Table
间转换(在 StreamTableEnvironment
的情况下)Table
始终被绑定到特定的 TableEnvironment
。在相同的查询中,不能结合不同 TableEnvironment 的表,比如,连接或联合它们。通过调用静态的 TableEnvironment.create()
方法的方式,创建 TableEnvironment
。
xxxxxxxxxx
from pyflink.table import EnvironmentSettings, TableEnvironment
# create a streaming TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
另外,用户可以从现有的 StreamExecutionEnvironment
创建 StreamTableEnvironment
,以与 DataStream
API 进行互操作。
xxxxxxxxxx
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
s_env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(s_env)
TableEnvironment
维护使用标识符创建的表的 Catalog 的映射。每个标识符包含 3 部分:Catalog 名称、数据库名称和对象名称。如果未指定 Catalog 或数据库,那么将使用当前的默认值。
表是虚表(VIEW
)或常规表(TABLE
)。可以从现有的 Table
对象创建 VIEW
,通常是 Table API 或 SQL 查询的结果。TABLE
描述外部数据,比如文件、数据库表或消息队列。
表可以是临时的,绑定到单个 Flink 会话的生命周期,或永久的,在多个 Flink 会话间和集群可见。
永久表需要 catalog(比如 Hive Metastore)维护表的元数据。永久表创建后,对连接到该 Catalog 的任何 Flink 会话可见,并且将持续存在,直到显式地删除表。
另一方面,临时表存储在内存中,并且只在创建它们的 Flink 会话期间存在。这些表对其它会话不可见。它们未被绑定到任何 Catalog 或数据库,但可以在其中之一的名称空间中创建。如果临时表对应的数据库被删除,临时表不会被删除。
可以注册具有与现有永久表相同标识符的临时表。临时表将遮蔽永久表,并且只要临时表存在,就无法访问永久表。所有带该标识符的查询都将对临时表执行。
这可能对实验有用。它允许先对临时表(比如,只有数据的子集,或者数据被混淆。)运行完全相同的查询。一旦验证查询是正确的,就可以对实际的生产表运行它。
在 SQL 术语中,Table
API 对象对应于 VIEW
(虚拟表)。它封装逻辑查询计划。可以在 Catalog 中创建它,如下所示:
x
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section
# table is the result of a simple projection query
proj_table = table_env.from_path("X").select(...)
# register the Table projTable as table "projectedTable"
table_env.register_table("projectedTable", proj_table)
注意:Table
对象类似于关系型数据库系统中的 VIEW,也就是说,定义 Table
的查询未被优化,但当另一个查询引用已注册的 Table
时,它将被内联。如果多个查询引用同一个已注册 Table
,那么将为每个引用查询内联它,并且执行多次,即已注册 Table
的结果将不被共享。
可以通过连接器(connector)声明创建关系型数据库中已知的 TABLE
。连接器描述存储表数据的外部系统。可以在这里声明存储系统,比如 Apache Kafka 或常规文件系统。
可以直接使用 Table API 创建这种表,或者通过切换到 SQL DDL 的方式。
xxxxxxxxxx
# Using table descriptors
source_descriptor = TableDescriptor.for_connector("datagen") \
.schema(Schema.new_builder()
.column("f0", DataTypes.STRING())
.build()) \
.option("rows-per-second", "100") \
.build()
t_env.create_table("SourceTableA", source_descriptor)
t_env.create_temporary_table("SourceTableB", source_descriptor)
# Using SQL DDL
t_env.execute_sql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
使用由 Catalog、数据库和表名组成的 3 部分标识符注册表。
用户可以将一个 Catalog 和其中的一个数据库设置为“当前 Catalog”和“当前数据库”。使用它们,上面提到的 3 部分标识符中的前两部分可以可选 - 如果未提供它们,那么将引用当前 Catalog 和当前数据库。用户可以通过 Table API 或 SQL 切换当前 Catalog 和当前数据库。
标识符遵循 SQL 要求,这意味着可以用反号字符(`)转义它们。
x
# get a TableEnvironment
t_env = TableEnvironment.create(...)
t_env.use_catalog("custom_catalog")
t_env.use_database("custom_database")
table = ...
# register the view named 'exampleView' in the catalog named 'custom_catalog'
# in the database named 'other_database'
t_env.create_temporary_view("other_database.exampleView", table)
# register the view named 'example.View' in the catalog named 'custom_catalog'
# in the database named 'custom_database'
t_env.create_temporary_view("`example.View`", table)
# register the view named 'exampleView' in the catalog named 'other_catalog'
# in the database named 'other_database'
t_env.create_temporary_view("other_catalog.other_database.exampleView", table)
Table API 是用于 Scala 和 Java 的语言集成的查询 API。与 SQL 相比,查询未被指定为 String,而是在宿主语言中逐步组成。
该 API 基于 Table
类,它代表一张表(流或批),并且提供应用关系型操作的方法。这些方法返回新 Table
对象,它代表在输入 Table
上应用关系型操作的结果。一些关系操作由多个方法调用组成,比如 table.groupBy(…).select()
,其中 groupBy(…)
指定表的分组,select(…)
指定表分组上的投影。
Table API 文档描述流和批表上支持的所有 Table API 操作。
下面是展示简单的 Table API 聚合查询的示例:
xxxxxxxxxx
# get a TableEnvironment
table_env = # see "Create a TableEnvironment" section
# register Orders table
# scan registered Orders table
orders = table_env.from_path("Orders")
# compute revenue for all customers from France
revenue = orders \
.filter(col('cCountry') == 'FRANCE') \
.group_by(col('cID'), col('cName')) \
.select(col('cID'), col('cName'), col('revenue').sum.alias('revSum'))
# emit or convert Table
# execute query
Flink 的 SQL 集成基于实现 SQL 标准的 Apache Calcite。SQL 查询被指定为常规字符串。
SQL 文档描述用于流和批表的 Flink SQL 支持。
下面是展示如何指定查询,以及将结果作为 Table
返回的示例:
xxxxxxxxxx
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section
# register Orders table
# compute revenue for all customers from France
revenue = table_env.sql_query(
"SELECT cID, cName, SUM(revenue) AS revSum "
"FROM Orders "
"WHERE cCountry = 'FRANCE' "
"GROUP BY cID, cName"
)
# emit or convert Table
# execute query
下面是展示如何指定将结果插入到已注册表的更新查询的示例:
xxxxxxxxxx
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section
# register "Orders" table
# register "RevenueFrance" output table
# compute revenue for all customers from France and emit to "RevenueFrance"
table_env.execute_sql(
"INSERT INTO RevenueFrance "
"SELECT cID, cName, SUM(revenue) AS revSum "
"FROM Orders "
"WHERE cCountry = 'FRANCE' "
"GROUP BY cID, cName"
)
混合使用 Table API 和 SQL 很容易,因为它们都返回 Table
对象:
Table
对象上定义 Table API 查询。TableEnvironment
中注册结果表,以及在 SQL 查询的 FROM
从句中引用它的方式,可以在 Table API 查询的结果上定义 SQL 查询。通过将 Table
写入 TableSink
的方式,发出表。TableSink
是一个通用接口,支持多种文件格式(比如 CSV、Apache Parquet、Apache Avro)、存储系统(比如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch),或者消息系统(比如 Apache Kafka、RabbitMQ)。
批 Table
仅能被写到 BatchTableSink
,而流 Table
需要 AppendStreamTableSink
、RetractStreamTableSink
或 UpsertStreamTableSink
。
请查阅关于 Table Sources & Sinks 的文档,了解关于可用的 Sink 的详细信息,以及如何实现自定义 DynamicTableSink
的说明。
Table.insertInto(String tableName)
方法定义将 Source 表发出到已注册 Sink 表的完整的端到端管道。该方法通过名称从 Catalog 中查找表 sink,并且验证 Table
的模式与 sink 的模式相同。可以使用 TablePipeline.explain()
Explain 管道,调用 TablePipeline.execute()
执行管道。
下面是展示如何发出表的示例:
xxxxxxxxxx
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section
# create a TableSink
schema = Schema.new_builder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.column("c", DataTypes.BIGINT())
.build()
table_env.create_temporary_table("CsvSinkTable", TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", "/path/to/file")
.format(FormatDescriptor.for_format("csv")
.option("field-delimiter", "|")
.build())
.build())
# compute a result Table using Table API operators and/or SQL queries
result = ...
# emit the result Table to the registered TableSink
result.execute_insert("CsvSinkTable")
Table API 和 SQL 查询被转换成 DataStream 程序,无论它们的输入是流还是批。在内部,查询被表示为逻辑查询计划,分两个阶段进行转换:
Table API 和 SQL 查询在以下情况下被转换:
TableEnvironment.executeSql()
被调用。该方法用于执行给定的语句,一旦该方法被调用,立即转换 SQL 查询。TablePipeline.execute()
被调用。该方法用于执行 source-to-sink 管道,一旦该方法被调用,立即转换 Table API 程序。Table.execute()
被调用。该方法用于将表内容收集到本地客户端,一旦该方法被调用,立即转换 Table API 程序。StatementSet.execute()
被调用。TablePipeline
(通过 StatementSet.add()
被发出到 Sink)或 INSERT
(通过 StatementSet.addInsertSql()
指定)语句先被缓冲在 StatementSet
。一旦 StatementSet.execute()
被调用,它们被转换。所有 Sink 将被优化进一个 DAG。Table
被转换成 Datastream
时,被转换。一旦转换,就是常规的 DataStream 程序,当调用 StreamExecutionEnvironment.execute()
方法时,被执行。Apache Flink 利用并且扩展 Apache Calcite,执行复杂的查询优化。这包括一系列规则和基于成本的优化,比如:
基于 Apache Calcite 的子查询解关联
项目修剪
分区修剪
过滤器下推
子计划去重,避免重复计算
特殊子查询重写,包括两部分:
可选的 Join 重排序
table.optimizer.join-reorder-enabled
启用注意:当前仅在子查询重写中的连接词条件中支持 IN/EXISTS/NOT IN/NOT EXISTS。
优化器不仅基于计划,还基于数据源提供的丰富的统计信息和每个操作的细粒度成本(比如 io、cpu、网络和内存)做出智能决策。
高级用户可以通过 CalciteConfig
对象提供自定义的优化器,通过调用 TableEnvironment#getConfig#setPlannerConfig
的方式,将该对象提供给表环境。
Table API 提供 Explain 用于计算 Table
的逻辑和优化的查询计划的机制。这通过 Table.explain()
方法和 StatementSet.explain()
方法完成。Table.explain()
返回 Table
的计划。StatementSet.explain()
返回多个 Sink 的计划。它返回描述三个计划的字符串:
TableEnvironment.explainSql()
和 TableEnvironment.executeSql()
支持执行 EXPLAIN 语句,获取计划,请参阅 EXPLAIN 页面。
下面的代码展示一个示例,以及为给定的 Table
使用 Table.explain()
方法的相应输出:
xxxxxxxxxx
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table1 = t_env.from_elements([(1, "hello")], ["count", "word"])
table2 = t_env.from_elements([(1, "hello")], ["count", "word"])
table = table1 \
.where(col('word').like('F%')) \
.union_all(table2)
print(table.explain())
上述示例的结果是:
xxxxxxxxxx
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[Unregistered_DataStream_1]])
+- LogicalTableScan(table=[[Unregistered_DataStream_2]])
== Optimized Physical Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])
== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])
下面的代码展示一个示例,以及为 Multi-Sink 计划使用 StatementSet.explain()
方法的相应输出:
xxxxxxxxxx
settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(environment_settings=settings)
schema = Schema.new_builder()
.column("count", DataTypes.INT())
.column("word", DataTypes.STRING())
.build()
t_env.create_temporary_table("MySource1", TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", "/source/path1")
.format("csv")
.build())
t_env.create_temporary_table("MySource2", TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", "/source/path2")
.format("csv")
.build())
t_env.create_temporary_table("MySink1", TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", "/sink/path1")
.format("csv")
.build())
t_env.create_temporary_table("MySink2", TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", "/sink/path2")
.format("csv")
.build())
stmt_set = t_env.create_statement_set()
table1 = t_env.from_path("MySource1").where(col('word').like('F%'))
stmt_set.add_insert("MySink1", table1)
table2 = table1.union_all(t_env.from_path("MySource2"))
stmt_set.add_insert("MySink2", table2)
explanation = stmt_set.explain()
print(explanation)
Multi-Sink 计划的结果是:
x== Abstract Syntax Tree ==
LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[count, word])
+- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink2`], fields=[count, word])
+- LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]])
== Optimized Physical Plan ==
LegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[count, word])
+- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
LegacySink(name=[`default_catalog`.`default_database`.`MySink2`], fields=[count, word])
+- Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
== Optimized Execution Plan ==
Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])(reuse_id=[1])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
LegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[count, word])
+- Reused(reference_id=[1])
LegacySink(name=[`default_catalog`.`default_database`.`MySink2`], fields=[count, word])
+- Union(all=[true], union=[count, word])
:- Reused(reference_id=[1])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])