Table API 和 SQL 集成在一个联合 API 中。该 API 的核心概念是被用作查询的输入和输出的 Table。本文档展示使用 Table API 和 SQL 查询的程序的常见结构,如何注册 Table,如何查询 Table,以及如何发出 Table


1. Table API 和 SQL 程序的结构

如下代码示例展示 Table API 和 SQL 程序的常见结构。

Table API 和 SQL 查询可以很容易地集成、嵌入到 DataStream 程序中。请查看 DataStream API 集成页面,了解如何将 DataStream 转换为 Table,反之亦然。


2. 创建 TableEnvironment

TableEnvironment 是 Table API 和 SQL 集成的入口点,负责:

Table 始终被绑定到特定的 TableEnvironment。在相同的查询中,不能结合不同 TableEnvironment 的表,比如,连接或联合它们。通过调用静态的 TableEnvironment.create() 方法的方式,创建 TableEnvironment

另外,用户可以从现有的 StreamExecutionEnvironment 创建 StreamTableEnvironment,以与 DataStream API 进行互操作。


3. 在 Catalog 中创建表

TableEnvironment 维护使用标识符创建的表的 Catalog 的映射。每个标识符包含 3 部分:Catalog 名称、数据库名称和对象名称。如果未指定 Catalog 或数据库,那么将使用当前的默认值。

表是虚表(VIEW)或常规表(TABLE)。可以从现有的 Table 对象创建 VIEW,通常是 Table API 或 SQL 查询的结果。TABLE 描述外部数据,比如文件、数据库表或消息队列。

3.1. 临时 vs 永久表

表可以是临时的,绑定到单个 Flink 会话的生命周期,或永久的,在多个 Flink 会话间和集群可见。

永久表需要 catalog(比如 Hive Metastore)维护表的元数据。永久表创建后,对连接到该 Catalog 的任何 Flink 会话可见,并且将持续存在,直到显式地删除表。

另一方面,临时表存储在内存中,并且只在创建它们的 Flink 会话期间存在。这些表对其它会话不可见。它们未被绑定到任何 Catalog 或数据库,但可以在其中之一的名称空间中创建。如果临时表对应的数据库被删除,临时表不会被删除。

3.1.1. 遮蔽(Shadowing)

可以注册具有与现有永久表相同标识符的临时表。临时表将遮蔽永久表,并且只要临时表存在,就无法访问永久表。所有带该标识符的查询都将对临时表执行。

这可能对实验有用。它允许先对临时表(比如,只有数据的子集,或者数据被混淆。)运行完全相同的查询。一旦验证查询是正确的,就可以对实际的生产表运行它。

3.2. 创建表

3.2.1. 虚拟表

在 SQL 术语中,Table API 对象对应于 VIEW(虚拟表)。它封装逻辑查询计划。可以在 Catalog 中创建它,如下所示:

注意:Table 对象类似于关系型数据库系统中的 VIEW,也就是说,定义 Table 的查询未被优化,但当另一个查询引用已注册的 Table 时,它将被内联。如果多个查询引用同一个已注册 Table,那么将为每个引用查询内联它,并且执行多次,即已注册 Table 的结果将不被共享。

3.2.2. 连接器表

可以通过连接器(connector)声明创建关系型数据库中已知的 TABLE。连接器描述存储表数据的外部系统。可以在这里声明存储系统,比如 Apache Kafka 或常规文件系统。

可以直接使用 Table API 创建这种表,或者通过切换到 SQL DDL 的方式。

3.3. 扩展表标识符

使用由 Catalog、数据库和表名组成的 3 部分标识符注册表。

用户可以将一个 Catalog 和其中的一个数据库设置为“当前 Catalog”和“当前数据库”。使用它们,上面提到的 3 部分标识符中的前两部分可以可选 - 如果未提供它们,那么将引用当前 Catalog 和当前数据库。用户可以通过 Table API 或 SQL 切换当前 Catalog 和当前数据库。

标识符遵循 SQL 要求,这意味着可以用反号字符(`)转义它们。


4. 查询表

4.1. Table API

Table API 是用于 Scala 和 Java 的语言集成的查询 API。与 SQL 相比,查询未被指定为 String,而是在宿主语言中逐步组成。

该 API 基于 Table 类,它代表一张表(流或批),并且提供应用关系型操作的方法。这些方法返回新 Table 对象,它代表在输入 Table 上应用关系型操作的结果。一些关系操作由多个方法调用组成,比如 table.groupBy(…).select(),其中 groupBy(…) 指定表的分组,select(…) 指定表分组上的投影。

Table API 文档描述流和批表上支持的所有 Table API 操作。

下面是展示简单的 Table API 聚合查询的示例:

4.2. SQL

Flink 的 SQL 集成基于实现 SQL 标准的 Apache Calcite。SQL 查询被指定为常规字符串。

SQL 文档描述用于流和批表的 Flink SQL 支持。

下面是展示如何指定查询,以及将结果作为 Table 返回的示例:

下面是展示如何指定将结果插入到已注册表的更新查询的示例:

4.3. 混合使用 Table API 和 SQL

混合使用 Table API 和 SQL 很容易,因为它们都返回 Table 对象:


5. 发出表

通过将 Table 写入 TableSink 的方式,发出表。TableSink 是一个通用接口,支持多种文件格式(比如 CSV、Apache Parquet、Apache Avro)、存储系统(比如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch),或者消息系统(比如 Apache Kafka、RabbitMQ)。

Table 仅能被写到 BatchTableSink,而流 Table 需要 AppendStreamTableSinkRetractStreamTableSinkUpsertStreamTableSink

请查阅关于 Table Sources & Sinks 的文档,了解关于可用的 Sink 的详细信息,以及如何实现自定义 DynamicTableSink 的说明。

Table.insertInto(String tableName) 方法定义将 Source 表发出到已注册 Sink 表的完整的端到端管道。该方法通过名称从 Catalog 中查找表 sink,并且验证 Table 的模式与 sink 的模式相同。可以使用 TablePipeline.explain() Explain 管道,调用 TablePipeline.execute() 执行管道。

下面是展示如何发出表的示例:


6. 转换及执行查询

Table API 和 SQL 查询被转换成 DataStream 程序,无论它们的输入是流还是批。在内部,查询被表示为逻辑查询计划,分两个阶段进行转换:

Table API 和 SQL 查询在以下情况下被转换:


7. 查询优化

Apache Flink 利用并且扩展 Apache Calcite,执行复杂的查询优化。这包括一系列规则和基于成本的优化,比如:

注意:当前仅在子查询重写中的连接词条件中支持 IN/EXISTS/NOT IN/NOT EXISTS。

优化器不仅基于计划,还基于数据源提供的丰富的统计信息和每个操作的细粒度成本(比如 io、cpu、网络和内存)做出智能决策。

高级用户可以通过 CalciteConfig 对象提供自定义的优化器,通过调用 TableEnvironment#getConfig#setPlannerConfig 的方式,将该对象提供给表环境。


8. Explain 表

Table API 提供 Explain 用于计算 Table 的逻辑和优化的查询计划的机制。这通过 Table.explain() 方法和 StatementSet.explain() 方法完成。Table.explain() 返回 Table 的计划。StatementSet.explain() 返回多个 Sink 的计划。它返回描述三个计划的字符串:

  1. 相关查询的抽象语法输,即未优化的逻辑查询计划,
  2. 优化的逻辑查询计划,和
  3. 物理执行计划。

TableEnvironment.explainSql()TableEnvironment.executeSql() 支持执行 EXPLAIN 语句,获取计划,请参阅 EXPLAIN 页面。

下面的代码展示一个示例,以及为给定的 Table 使用 Table.explain() 方法的相应输出:

上述示例的结果是:

下面的代码展示一个示例,以及为 Multi-Sink 计划使用 StatementSet.explain() 方法的相应输出:

Multi-Sink 计划的结果是: