Skip to content

[Feature]Add output fallback support for OpenAI serving#7942

Merged
Jiang-Jia-Jun merged 35 commits into
PaddlePaddle:developfrom
luukunn:fallback
Jun 11, 2026
Merged

[Feature]Add output fallback support for OpenAI serving#7942
Jiang-Jia-Jun merged 35 commits into
PaddlePaddle:developfrom
luukunn:fallback

Conversation

@luukunn

@luukunn luukunn commented May 27, 2026

Copy link
Copy Markdown
Collaborator

Motivation

当前推理链路缺少统一的 output fallback 扩展机制,业务侧如果希望对模型输出进行兜底处理,只能在各个下游环节分别适配,难以统一管理。

本 PR 引入 output fallback framework,并将 output fallback 的实际处理前移到 data processor 中,在 reasoning/tool parsing 之前对原始 decoded stream 做统一处理。这样可以保证内容文本、reasoning 内容以及 tool call 相关文本都能共享同一套 fallback 逻辑,同时也为后续扩展自定义 fallback strategy 提供统一入口。

Modifications

本 PR 主要包含以下改动:

  1. 新增 output fallback framework

    • 新增 fastdeploy/output/fallback/ 模块
    • 新增 OutputFallbackStrategy 抽象基类
    • 新增 OutputFallbackContext
    • 新增 StreamFallbackDecision
    • 新增 OutputFallbackManager
    • 支持策略注册、实例化、链式执行、状态管理和插件导入
  2. 新增 output fallback 插件加载机制

    • 新增 fastdeploy.plugins.output_fallback
    • 支持通过插件组 fastdeploy.output_fallback_plugins 自动加载插件
    • 支持通过 --output-fallback-plugin 指定外部插件路径动态导入
  3. 新增 output fallback 相关启动参数

    • --output-fallback
    • --output-fallback-plugin
    • --output-fallback-config
  4. 将 output fallback 的应用前移到 data processor

    • fastdeploy/input/base_processor.py 中新增 output_fallback_manager
    • process_response_dict_normal() 中,对完整输出文本应用 fallback
    • process_response_dict_streaming() 中,对 streaming 增量文本应用 fallback
    • fallback 在 reasoning parser / tool parser 之前执行,确保后续解析基于修正后的文本进行
  5. 支持 streaming 场景下的 fallback 控制语义

    • send:发送当前 delta
    • hold:暂存当前 delta,本轮不输出
    • flush:在流结束时输出缓存内容
    • truncate:发送当前文本并提前终止后续生成
  6. 新增 processor 侧 fallback 状态管理

    • 新增 fallback_decode_status
    • 用于维护 fallback 修正后的流式历史文本
    • 避免 parser 继续基于未经修正的原始文本工作
    • 请求结束时同步清理 fallback 状态和 manager 状态
  7. 扩展 request / output 数据结构

    • CompletionOutput 中新增:
      • fallback_truncated
      • skipped
    • 并补充相关序列化 / 反序列化测试
  8. 补充测试

    • 新增 tests/output/test_fallback.py
    • 覆盖 strategy 默认行为、manager 链式执行、hold/flush/truncate、cleanup、插件导入等场景
    • 补充 input processor 中 fallback 应用与状态清理测试
    • 补充 OpenAI chat/completion 及 v1 serving 对 processor fallback 信号的兼容测试

Usage or Command

启用指定 fallback strategy:

--output-fallback your-strategy-name

加载自定义 fallback 插件:

--output-fallback your-strategy-name \
--output-fallback-plugin /path/to/custom_fallback.py

为策略传入配置:

--output-fallback your-strategy-name \
--output-fallback-plugin /path/to/custom_fallback.py \
--output-fallback-config '{"your-strategy-name": {"key": "value"}}'

如何增加自定义兜底协议

可以通过继承 OutputFallbackStrategy 并使用 OutputFallbackManager.register(...) 注册自定义策略。

示例:

from fastdeploy.output.fallback import (
    OutputFallbackContext,
    OutputFallbackManager,
    OutputFallbackStrategy,
    StreamFallbackDecision,
)


@OutputFallbackManager.register("custom-fallback")
class CustomFallbackStrategy(OutputFallbackStrategy):
    name = "custom-fallback"

    def should_apply(self, text: str, context: OutputFallbackContext) -> bool:
        return "bad" in text

    def apply(self, text: str, context: OutputFallbackContext) -> str:
        return text.replace("bad", "good")

    def on_delta(
        self,
        delta_text: str,
        context: OutputFallbackContext,
        state: dict,
    ) -> StreamFallbackDecision:
        # streaming 场景下可自定义增量处理逻辑
        if "[HOLD]" in delta_text:
            state["buffer"] = state.get("buffer", "") + delta_text.replace("[HOLD]", "")
            return StreamFallbackDecision(action="hold")

        if "[STOP]" in delta_text:
            return StreamFallbackDecision(action="truncate", text=delta_text.replace("[STOP]", ""))

        return StreamFallbackDecision(action="send", text=delta_text)

    def on_finish(
        self,
        context: OutputFallbackContext,
        state: dict,
    ) -> StreamFallbackDecision:
        return StreamFallbackDecision(action="flush", text=state.get("buffer", ""))

自定义策略说明:

  1. should_apply(text, context)
    • 判断当前文本是否需要应用 fallback
  2. apply(text, context)
    • 用于 non-streaming 场景下处理完整文本
    • 默认的 on_delta() 也会复用该逻辑处理无状态 streaming 文本
  3. on_delta(delta_text, context, state)
    • 用于 streaming 场景下处理每个增量文本
    • state 是按 request 维度维护的策略状态,可用于跨 chunk 缓存内容
    • 当前支持的 action:
      • send
      • hold
      • truncate
  4. on_finish(context, state)
    • 在流结束时返回 flush 内容
    • 常用于将 hold 阶段缓存的内容在最后统一输出

加载方式有两种:

  1. 通过插件路径加载
    • 使用:
       --output-fallback your-strategy-name \
       --output-fallback-plugin /path/to/custom_fallback.py
  2. 通过插件组自动加载
    • 将插件注册到 fastdeploy.output_fallback_plugins 对应的 entry point group

Accuracy Tests

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

Copilot AI review requested due to automatic review settings May 27, 2026 10:02
@paddle-bot

paddle-bot Bot commented May 27, 2026

Copy link
Copy Markdown

Thanks for your contribution!

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

本 PR 为 OpenAI 兼容服务新增 output fallback 兜底处理框架,在 streaming / non-streaming 路径上对模型输出做后处理(修复 Markdown 加粗冒号、Markdown 表格、检测重复输出截断),并通过策略注册 + 插件机制支持自定义扩展。

Changes:

  • 新增 fastdeploy/output/fallback/ 子包:定义 OutputFallbackStrategy 基类、OutputFallbackContextStreamFallbackDecisionOutputFallbackManager,并内置 markdown-bold-colon / markdown-table / repeat-truncate 三个策略。
  • EngineArgs / api_server 接入 --output-fallback--output-fallback-plugin--output-fallback-config 三个启动参数,并将 manager 注入到 v0 / v1 chat 和 completion 的 serving 类。
  • 在 streaming / non-streaming 处理流程中调用 manager 的 apply / on_delta / on_finish / cleanup;命中 repeat-truncate 时将 finish_reason 设为 repeat_truncate 并 abort 对应 choice。

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
fastdeploy/output/fallback/init.py 暴露公共类并导入三个内置策略以触发注册
fastdeploy/output/fallback/base.py 定义 fallback context / decision / 抽象基类
fastdeploy/output/fallback/manager.py 注册表 / 插件加载 / apply / on_delta / on_finish / cleanup
fastdeploy/output/fallback/markdown_bold_colon.py 修正 **xxx:** 冒号位置,支持跨 delta 缓存
fastdeploy/output/fallback/markdown_table.py 修复 Markdown 表格分隔行 / 列数不一致
fastdeploy/output/fallback/repeat_truncate.py 基于 token window 检测重复输出并触发 truncate
fastdeploy/engine/args_utils.py 增加 3 个新 CLI 参数
fastdeploy/entrypoints/openai/api_server.py 解析参数构建 manager 并注入各 handler,/config-info 暴露相应字段
fastdeploy/entrypoints/openai/serving_chat.py v0 chat 流/非流路径接入 fallback,含 repeat_truncate finish_reason
fastdeploy/entrypoints/openai/serving_completion.py v0 completion 流/非流路径接入 fallback
fastdeploy/entrypoints/openai/v1/serving_base.py 基类构造接收 manager 并在 finally 清理状态
fastdeploy/entrypoints/openai/v1/serving_chat.py v1 chat 接入 fallback(非多模态路径)
fastdeploy/entrypoints/openai/v1/serving_completion.py v1 completion 接入 fallback
tests/output/test_fallback.py 覆盖 manager、内置策略、流式 hold/flush/truncate、cleanup、插件导入

choice_completion_tokens = response_ctx.choice_completion_tokens_dict[output.index]
choice.finish_reason = self._calc_finish_reason(request_output, max_tokens, choice_completion_tokens)
if fallback_truncated:
choice.finish_reason = "repeat_truncate"
if res.get("error_msg") is not None and "Aborted" in res["error_msg"]:
choices[-1].finish_reason = "abort"
if fallback_truncated:
choices[-1].finish_reason = "repeat_truncate"
choice.finish_reason = "abort"

if fallback_truncated:
choice.finish_reason = "repeat_truncate"
Comment on lines +307 to +308
if fallback_truncated:
choice.finish_reason = "repeat_truncate"
PaddlePaddle-bot

This comment was marked as outdated.

@codecov-commenter

codecov-commenter commented May 27, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 86.32812% with 35 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (develop@bbe5f81). Learn more about missing BASE report.

Files with missing lines Patch % Lines
fastdeploy/input/base_processor.py 61.11% 9 Missing and 5 partials ⚠️
fastdeploy/entrypoints/openai/api_server.py 20.00% 6 Missing and 2 partials ⚠️
fastdeploy/plugins/output_fallback/__init__.py 60.00% 2 Missing and 2 partials ⚠️
fastdeploy/output/fallback/manager.py 97.70% 0 Missing and 3 partials ⚠️
fastdeploy/entrypoints/openai/serving_chat.py 83.33% 1 Missing and 1 partial ⚠️
...astdeploy/entrypoints/openai/serving_completion.py 85.71% 1 Missing and 1 partial ⚠️
fastdeploy/output/fallback/base.py 92.30% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             develop    #7942   +/-   ##
==========================================
  Coverage           ?   67.56%           
==========================================
  Files              ?      475           
  Lines              ?    66609           
  Branches           ?    10259           
==========================================
  Hits               ?    45005           
  Misses             ?    18738           
  Partials           ?     2866           
Flag Coverage Δ
GPU 77.56% <86.32%> (?)
XPU 6.98% <5.07%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@PaddlePaddle-bot

PaddlePaddle-bot commented May 27, 2026

Copy link
Copy Markdown

🤖 Paddle-CI-Agent | ci_status_monitor | 2026-06-11 23:10:10

CI报告基于以下代码生成(30分钟更新一次):
PR commit: d6077b1 | Merge base: bbe5f81 (branch: develop)


1 Required任务 : 10/10 通过

总执行(rerun次数) 总任务 ✅ 通过 ❌ 失败 ⏳ 运行中 ⏸️ 等待中 跳过
42(0) 42 40 2 0 0 0
任务 错误类型 置信度 日志
无(required)

2 失败详情

PaddlePaddle-bot

This comment was marked as outdated.

PaddlePaddle-bot

This comment was marked as outdated.

This comment was marked as outdated.

PaddlePaddle-bot

This comment was marked as outdated.

This comment was marked as outdated.

@luukunn luukunn changed the title [Feature][APIServer] Add output fallback support for OpenAI serving [Feature]Add output fallback support for OpenAI serving May 28, 2026
Copilot AI review requested due to automatic review settings June 8, 2026 02:54

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 27 out of 27 changed files in this pull request and generated 2 comments.

Comment on lines +80 to +87
def import_fallback_plugin(cls, plugin_path: str) -> None:
module_name = os.path.splitext(os.path.basename(plugin_path))[0]
try:
import_from_path(module_name, plugin_path)
except Exception:
data_processor_logger.exception(
"Failed to load output fallback module '%s' from %s.", module_name, plugin_path
)
def apply(self, text: str, context: OutputFallbackContext) -> str:
return text + "-suffix"

def on_delta(self, delta_text: str, context: OutputFallbackContext, state: dict) -> StreamFallbackDecision:
PaddlePaddle-bot

This comment was marked as outdated.

Copilot AI review requested due to automatic review settings June 9, 2026 02:50

This comment was marked as low quality.

Copilot AI review requested due to automatic review settings June 10, 2026 02:41

This comment was marked as outdated.

Copilot AI review requested due to automatic review settings June 10, 2026 10:45

This comment was marked as low quality.

@PaddlePaddle-bot PaddlePaddle-bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Paddle-CI-Agent | pr_review | 2026-06-10 19:05:16

📋 Review 摘要

PR 概述:新增 OpenAI output fallback framework,并将 fallback 前移到 data processor。
变更范围:Engine CLI、OpenAI serving、DataProcessor、output fallback manager/plugin、相关单测。
影响面 Tag[APIServer] [DataProcessor] [Engine]

问题

级别 文件 概述
🔴 Bug fastdeploy/entrypoints/openai/api_server.py:245 FD_ENABLE_ASYNC_LLM=1 时 fallback manager 只挂到 engine_client,v1/AsyncLLM 实际输出链路不会执行 fallback
🟡 建议 fastdeploy/input/base_processor.py:394 non-streaming 空输出被 and full_text 短路,策略无法实现空回复兜底

历史 Findings 修复情况

Finding 问题 状态
F1 on_finish truncate action 未向调用方传播 ⚠️ 仍存在
F2 多策略 on_finish flush 处理未按链式语义传递 ⚠️ 仍存在
F3 on_finish context 仍携带原始 delta_text ⚠️ 仍存在
F4 on_delta 传入累积 buffer 而非当前 delta ⚠️ 仍存在
F5 hold 提前返回时 trial state 不写回 ⚠️ 仍存在
F6 buffer 非空的 on_finish 仍先调用 strategy.on_delta ⚠️ 仍存在
F8 enable_mm_output text 分支关闭 fallback ✅ 已修复

📝 PR 规范检查

标题已从历史的双 Tag 问题改为单 Tag,但当前标题 [Feature]Add output fallback support for OpenAI serving 仍缺少 Tag 后空格。PR 描述结构完整。

标题建议(可直接复制):

  • [Feature] Add output fallback support for OpenAI serving

总体评价

本轮按风险优先审查了 fallback manager 接入、DataProcessor 前置处理、OpenAI streaming/non-streaming 消费链路。非 async/v0 路径已经基本串起,但 async/v1 模式下启动参数当前不会真正生效,需要先修复。

# (content / reasoning / tool calls) benefit. Serving handlers no longer
# invoke the manager themselves.
if output_fallback_manager is not None and getattr(engine_client, "data_processor", None) is not None:
engine_client.data_processor.output_fallback_manager = output_fallback_manager

This comment was marked as low quality.

This comment was marked as low quality.

# Apply output fallback to the full raw text BEFORE reasoning /
# tool parsing so all sub-streams (content, reasoning, tools)
# benefit from the rewrite.
if output_fallback_manager is not None and full_text:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 当前 guard 会让 non-streaming 空输出绕过 fallback。

OutputFallbackStrategy.should_apply() 已经是策略自己的判定入口,这里的 and full_text 会让策略无法处理空字符串,例如把模型空回复替换为默认兜底文案。Streaming 路径没有这个限制,导致流式和非流式行为不一致。

建议去掉 and full_text,始终调用 output_fallback_manager.apply(full_text, context),是否处理空文本由 strategy.should_apply() 决定。

@LiqinruiG LiqinruiG left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Jiang-Jia-Jun Jiang-Jia-Jun merged commit e524ea5 into PaddlePaddle:develop Jun 11, 2026
41 of 43 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants