Flink Agents:外部副作用一致性 (ActionStateStore) 演进分析

张开发
2026/4/20 0:51:40 15 分钟阅读

分享文章

Flink Agents:外部副作用一致性 (ActionStateStore) 演进分析
Flink Agents外部副作用一致性 (ActionStateStore) 演进分析本篇主要分析 Flink Agents 框架中为了解决外部副作用 (Side Effects) 重复执行问题而引入的ActionStateStore(特别是KafkaActionStateStore) 机制。这是让流式 Agent 在面对故障恢复时能够真正做到 Exactly-once (精确一次) 语义的关键防线。1. 痛点大模型调用与副作用的非确定性在传统的 Flink 数据流处理中如果算子崩溃Flink 会从上一个 Checkpoint 恢复状态如聚合值并将 Source 的 Offset 回退以重新消费数据。传统 Flink 能够做到 Exactly-once 的本质前提是算子处理逻辑是确定性的 (Deterministic)且副作用仅限于 Flink 内部状态或支持两阶段提交 (2PC) 的 Sink。只要输入数据一致重跑后的内部状态一定能与崩溃前对齐。但在 Agent 场景中情况完全不同外部副作用Agent 会调用 MCP 工具如“发送一封邮件”、“扣除用户余额”。如果在发完邮件后算子崩溃了。恢复时Flink 会重新把触发发邮件的Event送给 Agent导致邮件被发了两次。非确定性 (Non-determinism)大模型的输出是带随机性的即便 temperature0。如果重跑一次大模型可能生成了完全不同的工具调用链路这会导致恢复后的状态树与崩溃前彻底分叉 (Divergence)。2. 核心设计ActionState 与拦截跳过机制为了解决这个问题框架在ActionExecutionOperator中引入了ActionStateStore。2.1 状态记录与拦截 (The Interception)联合主键设计 (The Composite Key)在从 Store 中获取状态时使用了一个复合键key sequenceNumber event action。参考 ActionStateUtil.java#L40-L52。为什么需要这四个维度缺一不可吗key代表当前的会话或用户实体如user_123。显然必须隔离不同用户。sequenceNumber这是极其关键的一层防线。如果用户发了两条完全相同的消息比如在两分钟内发了两次 “你好”sequenceNumber保证了它们被视为两次独立的交互而不会发生“第二次发你好时命中了第一次的缓存而直接跳过”的灾难。补充说明这个sequenceNumber是Agent 算子自己内部生成并维护的而不是 Source 传过来的。算子内部有一个 Flink 状态ValueStateLong sequenceNumberKState。每次该 Key 收到一个新的InputEvent外部原始输入算子就会把这个内部状态 1。这意味着算子在自己给每个用户的对话进行“本地递增编号”。参考 ActionExecutionOperator.java#L946-L954。event在一个sequenceNumber生命周期内一次完整的交互网络中可能衍生出多个内部事件比如触发了工具A的回调Event又触发了工具B的回调Event。必须用具体的Event哈希值来精确命中当前的执行流分支。action因为一个Event可能会被多个不同的Action监听并触发。比如收到OrderCreatedEvent可能同时触发SendEmailAction和UpdateInventoryAction。所以必须带上 Action 的签名才能精确到“某个用户在第几次对话的某个事件流分支上的具体某一个动作”。状态的更新时机与可续跑机制 (The Update Timing Durable Execution)你可能会问ActionStateStore.put()是在什么时候调用的如果它是在整个ActionTask执行完才更新那在执行过程中比如调用了 LLM 还没返回崩溃了恢复后不还是会重新调用 LLM 吗这里需要分两个层级来理解状态的更新粗粒度 (Action 级别) 的拦截参考 ActionExecutionOperator.java#L518-L524。在整个ActionTask成功执行完毕后算子会调用maybePersistTaskResult()。这会将isCompletedtrue、产生的子事件和记忆修改一起存入ActionStateStore。这解决的是“整个动作是否需要完全跳过”的问题。细粒度 (网络调用级别) 的拦截 (Durable Execution)为了解决“执行到一半崩溃导致部分工具重复调用”的问题框架在RunnerContext中提供了durableExecute()方法。当用户代码中调用ctx.durableExecute(llm.chat(...))时它不仅会执行网络调用在拿到结果的瞬间就会立刻将这个具体的CallResult包含函数名和结果追加到当前的ActionState中并实时调用actionStateStore.put()刷入 Kafka。极小概率的空隙依然存在 (The Residual Gap)必须承认这种设计并没有 100% 解决重复调用的问题。如果在工具 API如发送邮件的远端服务器已经执行完毕但在它把 HTTP Response 返回给 Flink 算子的途中网络断了或者算子崩溃了。此时durableExecute没拿到结果Kafka 里也就没存CallResult。恢复时邮件依然会被重发。这是因为不支持两阶段提交 (2PC) 的外部系统在没有幂等键 (Idempotency Key) 的情况下理论上是不可能实现绝对的 Exactly-once 的。框架能做的只是把这个重复调用的“空隙窗口”从“整个 Action 执行期的几分钟”缩小到了“单次网络请求的几毫秒”。恢复时ActionTask虽然会从头重新运行但当它走到durableExecute()时框架会发现ActionState内部已经有这个CallResult的记录了就会直接返回缓存的 JSON 结果而不会发起真实的外部网络请求。这也就是我们在RunnerContext演进分析中提到的“可续跑状态机”的底层支撑。过程 (How)在算子准备执行一个具体的ActionTask之前会先去查ActionStateStore参考 ActionExecutionOperator.java#L469-L494ActionStateactionStatemaybeGetActionState(key,sequenceNumber,actionTask.action,actionTask.event);if(actionState!nullactionState.isCompleted()){// Action has completed, skip execution and replay memory/eventsisFinishedtrue;outputEventsactionState.getOutputEvents();// Replay memory updates...}else{// 真正去执行 Action (调用 LLM / 工具)actionTaskResultactionTask.invoke(...);// 记录结果maybePersistTaskResult(...,actionTaskResult);}原理 (Why)如果发现这个动作在崩溃前已经成功执行过了isCompleted()算子会直接跳过用户代码的执行并且把当时该动作产生的OutputEvents和MemoryUpdate增量记忆修改直接回放 (Replay) 到当前的上下文中。这就像是游戏里的“读档”直接把进度推到了打完 Boss 之后而不需要再打一次 Boss。3. KafkaActionStateStore 的物理实现演进既然需要记录ActionState为什么不直接存在 Flink 的 RocksDB 里而是要引入一个外部的KafkaActionStateStore呢3.1 为什么一定要引入一个新的 Kafka 依赖你可能会有疑问为了去重引入一个重量级的 Kafka 依赖这值得吗有没有轻量级的替代方案必须使用外部 WAL 的原因因为 Flink State 是跟着 Checkpoint 周期性刷盘的比如每 5 分钟一次。如果一个 Agent 调用了发邮件工具邮件发出去了但 Checkpoint 还没触发此时机器崩溃。恢复时Flink 状态里根本没有“邮件已发”的记录依然会重复发送。因此必须有一个系统能在网络请求返回的瞬间以极低的延迟将记录持久化 (Write-Ahead Log, WAL)。为什么选 Kafka高吞吐 Append-Only在 Flink 的生态里Kafka 是最成熟的高吞吐 WAL 存储。每次ActionTask完成框架会将包含结果的ActionState作为一条消息通过KafkaProducer发送。为了保证强一致性配置了acksall并执行了producer.flush()。Topic Compaction利用 Kafka 自带的cleanup.policycompact可以自动合并同一个 Key 的旧状态防止存储无限膨胀。架构复用通常流处理架构的 Source 和 Sink 已经重度依赖 Kafka因此复用 Kafka 作为 StateStore 不会引入额外的运维负担。当然从架构抽象上看这里完全可以替换为 Redis、Cassandra 等其他支持高速持久化的 KV 存储这也是为什么框架定义了ActionStateStore接口。3.2 恢复时的风暴海量消息的重播与去重这里有一个非常极端的场景如果 Flink 已经平稳运行了 4 分钟处理了 10 万条消息但在第 5 分钟即将做 Checkpoint 时崩溃了。那重启时Flink 会从 5 分钟前的 Offset 重新拉取这 10 万条消息。这 10 万条消息难道全都要根据 ActionState 一条条做“无重复处理 (Replay)”吗答案是是的这不仅不可思议而且是必须的。但这正是流处理引擎 (Flink) 和外部 WAL (Kafka) 协同工作的精妙之处。重播速度极快 (Fast Replay)因为这 10 万条消息对应的动作在崩溃前其实都已经成功执行了。在恢复时当框架拦截到maybeGetActionState并发现isCompletedtrue时它会直接跳过所有耗时的大模型推理、网络请求和工具调用。参考代码ActionExecutionOperator.java#L472-L494 。系统只需要执行简单的内存赋值把当时该动作产生的OutputEvents放入队列把MemoryUpdate赋值回状态。这就把一个原本需要10 秒 (网络 I/O 主导)的动作变成了1 微秒 (内存 CPU 主导)的动作。Kafka 状态重建 (Rebuild State)参考 KafkaActionStateStore.java#L201-L270。当算子从 Checkpoint 恢复时会调用rebuildState()。框架会启动一个底层的KafkaConsumer从 Flink Checkpoint 中保存的 Kafka Topic 偏移量开始一直读取到当前 Kafka 的最新消息。把这些在“上一次 Checkpoint 之后、崩溃之前”发生的 10 万条动作状态全部预加载 (Pre-load)到内存的actionStatesMap 缓存中。因此接下来的 10 万条重播数据去重判断完全是纯内存的 Hash 查找 (O(1))没有任何外部 I/O。动态修剪防 OOM (Prune)如果在正常运行期间内存里一直存着所有用户的 ActionState迟早会 OOM。所以框架会在处理完一个用户的完整对话流后调用pruneState(key, sequenceNumber)。参考 KafkaActionStateStore.java#L272-L302。这保证了内存中只保留“正在进行中”或“刚刚完成但还未 Checkpoint”的危险期状态。3.3 妥协At-most-once 语义下的幽灵状态 (Ghost State) 问题正如前面提到的很多业务为了追求极致的吞吐量或容忍一定的数据丢失在配置 Flink 的 Kafka Source 时可能会选择At-most-once最多一次甚至是在重启时直接从最新的 Offset (Latest) 开始读取而不是从 Checkpoint 记录的旧 Offset 回放。在这种“放弃精确重播”的场景下ActionStateStore会面临什么问题场景推演算子处理了用户 A 的请求sequenceNumber5成功发了邮件把状态写入了 Kafka 的ActionStateStore。算子崩溃重启。因为用户配置了从最新的 Offset 消费Source直接跳过了之前积压的数据。用户 A 的那条原始请求再也不会被 Flink 拉取到了。然而在算子启动的rebuildState阶段它依然会老老实实地把“用户 Aseq5发邮件成功”这个状态加载到内存的actionStatesHashMap 中。幽灵状态的诞生由于原始事件再也不会来了算子也就永远不会执行到maybeGetActionState去命中这个缓存同时由于事件流没走完也不会触发pruneState去清理它。这条状态记录就变成了永远悬在内存里的“幽灵状态 (Ghost State)”。如何处理基于 SequenceNumber 的被动修剪框架在KafkaActionStateStore.get()的实现中加入了一个非常巧妙的兜底机制版本压制清理。参考 KafkaActionStateStore.java#L161-L184。当用户 A 发来下一条新消息比如sequenceNumber6时算子去查缓存发现请求的 seq6。此时它会触发一个清理逻辑遍历缓存把所有当前 Key 下sequenceNumber 6 的历史状态全部删掉。这样即使发生了跳过消费导致状态泄漏只要这个用户再次活跃之前的“幽灵状态”就会被自动清理从而防止了内存泄漏。重新回放时联合主键会变化吗这是一个非常核心的问题。回放时(key, sequenceNumber, event, action)到底变不变决定了缓存能不能被命中。我们需要分场景来看精确一次 (Exactly-once) 语义下从 Checkpoint 回放sequenceNumber绝对不会变。因为它是存在 Flink RocksDB StatesequenceNumberKState里的。算子从 Checkpoint 恢复时State 也会回滚到当时的值随着重播相同的输入sequenceNumber的递增轨迹和崩溃前一模一样。event和action大概率不变但有极小概率发生分叉 (Divergence)。如果是InputEvent触发的第一个Action那event是确定的外部输入哈希值绝对不变。但如果是中间的某个环节比如大模型生成了ToolCallEvent。如果在重播时之前的ActionState丢失了比如 Kafka WAL 丢数据了或者大模型调用还没完成就崩了没存下来算子被迫重新请求大模型。由于大模型的非确定性它这次可能生成了一个和上次完全不同的ToolCallEvent。一旦发生这种情况后续产生的event哈希值就全变了执行流就走向了另一个平行宇宙。这就是我们在checkDivergence()中要检测的如果在同一个sequenceNumber下缓存里明明存着“调用搜索工具”的记录但现在算子却拿着“调用计算器工具”的Event来查缓存说明发生了执行分叉。此时框架会果断清理掉这个 SeqNum 下的所有未来缓存因为未来的路已经不同了老老实实重新执行。参考代码RunnerContextImpl.java#L524-L552 细粒度拦截时的 Divergence 警告与清理。最多一次 / 最新拉取 (At-most-once / Latest) 语义下更正概念如果在重启时放弃历史积压数据直接从最新的 Offset 拉取丢弃了崩溃期间未处理完的数据这实际上是最多一次 (At-most-once)的语义。为什么 sequenceNumber 会跳过假设在 Checkpoint 记录时sequenceNumber是 2。之后系统处理了外部流入的第 3 条、第 4 条消息这期间算子内部的sequenceNumberKState变成了 4。但此时系统崩溃了且没有来得及做 Checkpoint。当算子重启时Flink 的内部状态会回滚到上一个 Checkpoint也就是sequenceNumber恢复成了 2。此时由于你配置了 Kafka Source 从Latest最新读取Kafka 直接把第 5 条消息塞给了算子。算子拿到这条新消息会在恢复后的seq2基础上加 1也就是给这第 5 条消息打上了seq3的标记。幽灵状态的产生与清理问题来了在崩溃前那条没处理完的真实第 3 条消息可能已经向KafkaActionStateStore里写了“调用工具”的记录键为seq3。重启后系统用新的第 5 条消息但被打上了seq3的标去查缓存。此时由于消息内容不同Event的哈希值发生了改变触发了Divergence (状态分叉) 逻辑。框架发现哈希不匹配就会认为发生了分叉从而触发清理逻辑把之前真实第 3 条消息留下的旧缓存当做“脏数据”清理掉从而避免了幽灵状态的泄露。状态分叉 (Divergence) 检测的终极意义在 Flink 的原生世界里“回放”意味着 100% 的轨迹重现。但在加入了大模型的非确定性后框架必须接受一个现实如果缓存不完整导致了重算重算的轨迹可能会和之前存下的一小部分未来缓存产生冲突。checkDivergence()就是用来切断这个平行宇宙的冲突保证逻辑严谨性的。4. 总结系统复杂度拆项Flink Exactly-onceSource Offset (重播数据)RocksDB State (恢复内部状态)。Agent Exactly-onceFlink Exactly-onceActionStateStore (外部副作用跳过记录)。主导项在高频调用外部 API 时Kafka Producer Flush的网络 I/O 延迟将成为算子吞吐量的主要瓶颈。这也是为什么框架允许将其作为“可选 (Optional)”配置用户可以在“极致吞吐量”和“绝对不重复执行”之间做出权衡。

更多文章