1. 什么是状态(State)?

在 Flink 中,状态(state)是指在流处理应用程序中维护的、跨多个事件(events)持久化存储的数据。在流处理应用程序中,有些操作需要根据之前处理过的事件来决定当前事件的处理方式,这些操作被称为有状态的操作。例如,计算每小时的平均温度、查找特定事件模式、训练机器学习模型等都需要状态的支持。

Flink 对状态的管理非常重要,因为状态的正确性和持久性对于保证流处理应用程序的正确性和鲁棒性至关重要。Flink 利用一些技术来管理状态,包括:

Flink 中的状态是根据事件的键(key)进行分区和分布的,因此状态只能在键控流(keyed stream)上访问和维护。状态的分区和事件流的分区一致,这样可以确保所有状态更新都是本地操作,从而保证一致性而不需要事务开销。Flink 将键控状态进一步组织成称为键组(Key Group)的单元,以便于状态的重新分配和管理。


2. 键控状态(Keyed State)

在 Flink 中,Keyed State 是指根据事件的键(key)进行分区和分布的状态。Flink 的 Keyed State 是非常重要的状态,因为它是许多状态操作的基础,例如窗口聚合、Join 等。

在 Flink 中,键控流(Keyed Stream)是指根据事件的键进行分区的流。在键控流上运行的任何状态操作都是键控状态(Keyed State)。Flink 通过将键控状态组织为键组(Key Group)来管理和分配状态。

每个键组都有一个主要的键,该键用于将键组分配给特定的并行任务(Parallel Task)。Flink 根据键的哈希值将事件分配到与键组相应的并行任务中,并且所有与该键相关的状态都将存在该并行任务中。在 Flink 中,哈希函数的选择非常重要,因为它直接影响并行任务之间状态的分配方式。如果哈希函数不平衡,可能导致一些并行任务拥有比其它任务更多或更少的状态,从而影响应用程序的性能和可伸缩性。

Flink 提供多种类型的键控状态,包括:

在 Flink 中,所有键控状态都支持异步访问。这意味着应用程序可以在等待状态结果时继续处理其它事件。此外,Flink 还允许应用程序在状态更新时异步处理副作用(side effect),例如将状态写入外部系统或发送异步通知等。


3. 状态持久化(State Persistent)

在 Flink 中,Stateful Stream Processing 涉及在流处理应用程序中维护状态,比如在窗口聚合、Join 和模式匹配等操作中。为确保状态的正确性和持久性,Flink 提供状态持久化机制。

状态持久化是指将状态存储在可靠的存储介质(比如文件系统或数据库)中,以便在出现故障时可以恢复状态。Flink 支持两种类型的状态持久化方式:

  1. Checkpointing

Checkpointing 是自动触发的状态快照机制,用于定期记录整个应用程序的状态。Checkpointing 记录应用程序的状态,并且将其存储在持久化存储介质中,比如 HDFS 或 S3 等。在出现故障时,Flink 可以使用最近的 checkpoint 恢复应用程序的状态。Checkpointing 保证状态的精确一次语义(exactly-once semantics),即每个事件都仅被处理一次。

  1. Savepoints

Savepoints 是手动触发的 checkpoint,可以在应用程序运行时手动创建。Savepoints 与 Checkpointing 非常相似,但是它由应用程序开发人员显式触发。Savepoints 允许开发人员在代码更改之后,重新启动应用程序,并且将应用程序恢复到之前的状态。这对于应用程序的升级和回归测试非常有用。

Flink 提供多种状态后端(state backend)来存储状态。状态后端是 Flink 用于存储状态的组件。比如:

状态持久化是 Flink 中状态管理的核心,它确保状态的正确性和可恢复性。通过使用 checkpointing 和 savepoints,Flink 可以保证应用程序的精确一次语义,并且在出现故障时可以快速恢复应用程序的状态。

3.1. Barrier

Checkpointing 机制的核心是 Barrier。

Barrier 是一种特殊的事件,用于将事件流分成一个个 checkpoint。在 Flink 中,每个 Barrier 都有特定的 ID,表示此 Barrier 属于哪个 Checkpoint。当 Barrier 到达 operator 时,该 operator 将其转发到下游 operator,并且在 Barrier 之后的所有事件都被分配到相同的 Checkpoint 中。当所有 operator 都收到相同 Checkpoint ID 的 Barrier 时,触发 Checkpoint。

Checkpointing 的触发和处理分为两个阶段:

  1. 触发 Checkpoint:在此阶段中,Flink 向所有 operator 发送 Barrier,以将事件流分成一个个 Checkpoint。当所有 operator 都收到相同 Checkpoint ID 的 Barrier 时,触发 Checkpoint。
  2. 处理 Checkpoint:在此阶段中,Flink 将应用程序的状态异步写入持久化存储介质中。在写入状态的同时,Flink 暂停事件的处理,以确保状态的一致性。一旦所有状态都已被写入持久化存储介质中,Flink 向所有 operator 发送确认消息,以便它们可以继续处理事件。

Checkpointing 使用 Barrier 机制来确保状态的一致性和可靠性。Barrier 分割事件流,使得其后的所有事件都被分配到相同的 Checkpoint 中。当 Checkpoint 被触发时,Flink 将应用程序的状态异步写入持久化存储介质中,以确保状态的可靠性。如果在写入状态时出现故障,Flink 可以使用最近的 Checkpoint 来恢复应用程序的状态。

3.2. 快照 Operator 状态

在 Flink 中,Operator 状态是指在单个算子(operator)内维护的状态。例如,窗口聚合操作中维护的每个窗口的计数器就是 Operator 状态。为确保 Operator 状态的可靠性和一致性,Flink 提供 Checkpointing 机制,用于定期记录,并且保存 Operator 状态的快照。

当 Checkpoint 被触发时,Flink 将 Operator 状态异步写入持久化存储介质中。为实现 Operator 状态的快照,Flink 需要记录 Operator 的内部状态,以便在出现故障时可以恢复。Flink 提供两种机制来实现 Operator 状态的快照:

  1. Chained Operators

在 Flink 中,多个算子可以被链接(chained)在一起。当多个算子被链接在一起时,它们共享状态,并且状态的快照也被合并。在这种情况下,Flink 只需要记录最后一个算子的状态,即可恢复整个链的状态。

  1. State Partitioning

Flink 可以将 Operator 状态分成多个分区,以便在出现故障时可以更快地重启应用程序。当 Operator 状态被分成多个分区时,每个分区被保存在不同的存储介质中。在出现故障时,Flink 只需要恢复那些已经丢失的分区即可。为保证状态的一致性,Flink 使用 Barrier 机制将事件流分成一个个 Checkpoint,并且在写入状态时暂停事件的处理,以确保状态的一致性。

总之,Flink 通过 Checkpointing 机制来记录和保存 Operator 状态的快照,以确保在出现故障时可以快速恢复状态。Flink 可以将 Operator 状态分成多个分区,以提高恢复速度,使用 Barrier 机制来保证状态的一致性。

3.3. 恢复

这种机制下的恢复过程很简单:在出现故障时,Flink 选择最近完成的检查点 k。然后,系统重新部署整个分布式数据流,并且将每个算子的状态设置为检查点 k 中的快照状态。数据源被设置为从位置 Sk 开始读取流。比如,在 Apache Kafka 中,这意味着告诉消费者从偏移量 Sk 开始获取数据。

如果状态是增量快照的,那么算子将从最新的完整快照状态开始,并且将一系列增量快照更新应用到该状态上。

3.4. Unaligned Checkpointing

在 Flink 中,Checkpointing 是用于记录和保存应用程序状态的机制,以确保在出现故障时可以快速恢复状态。Flink 提供两种 Checkpointing 机制:Aligned Checkpointing 和 Unaligned Checkpointing。

Unaligned Checkpointing 是一种 Checkpointing 机制,它不需要在所有算子上同时暂停事件处理,因此可以减少应用程序的停顿时间。在 Unaligned Checkpointing 中,每个算子可以在不同的时间点进行快照,而不需要与其它算子同步。这使得 Unaligned Checkpointing 可以在不影响应用程序吞吐量的情况下完成 Checkpointing。

在 Unaligned Checkpointing 中,Flink 使用 Barrier 机制来将事件流分成一个个 Checkpoint,并且在写入状态时暂停事件的处理,以确保状态的一致性。当算子准备好进行快照时,它向 Flink 发送请求,请求将其状态异步写入持久化存储介质中。在快照写入完成后,算子向 Flink 发送确认消息,以便 Flink 可以继续处理事件。

与 Aligned Checkpointing 相比,Unaligned Checkpointing 的优势在于不需要在所有算子上同时暂停事件处理,因此可以减少应用程序的停顿时间。但是,Unaligned Checkpointing 可能导致状态不一致,因为每个算子在不同的时间点进行快照。为解决该问题,Flink 提供一些机制来确保状态的一致性,例如在算子之间交换元数据和使用 Barrier 机制等。

总之,Unaligned Checkpointing 是一种可以在不影响应用程序吞吐量的情况下完成 Checkpointing 的机制。与 Aligned Checkpointing 相比,Unaligned Checkpointing 不需要在所有算子上同时暂停事件处理,因此可以减少应用程序的停顿时间。但是,为确保状态的一致性,需要采取一些措施,例如在算子之间交换元数据和使用 Barrier 机制等。

3.5. 未对齐的恢复

在非对齐的检查点中,算子在开始处理来自上游的数据之前,先恢复数据。除此之外,它执行与对齐的检查点恢复期间相同的步骤。

3.6. Exactly Once vs. At Least Once

对齐步骤可能增加流程序的延迟。通常,该额外的延迟大约为几毫秒,但我们已经见过一些极端值的延迟显著增加的情况。对于所有记录都需要超低延迟(几毫秒)的应用程序,Flink 有一个开关,可以在检查点期间跳过流对齐。算子一遇到每个输入的检查点屏障,就绘制检查点快照。

当跳过对齐时,算子保持处理所有输入,即便在检查点 n 的检查点屏障到达后。这样,在执行检查点 n 的状态快照之前,算子同样处理属于检查点 n+1 的元素。在恢复时,这些记录将重复出现,因为它们都被包含在检查点 n 的状态快照中,并且将在检查点 n 之后作为数据的一部分重放。

对齐只适用于具有多个前驱(joins)的算子以及具有多个发送者的算子(在流重新分区/shuffle 之后)。正因为如此,只拥有并行流操作(map()flatMap()filter()、…)的数据流才真正地提供 exactly once 保证,即使在 at least once 模式下。


4. 批处理程序中的状态和故障容错

Flink 将批处理程序作为流处理程序的特殊情况执行,其中流是有界的(有限数量的元素)。DataSet 在内部被视为数据流。因此,上述概念在批处理程序中同样适用于流处理程序,只有一些小的例外: