Skip to content

Refactor rollout topology binding#1933

Open
YanhuiDua wants to merge 2 commits into
InternLM:mainfrom
YanhuiDua:refine-rollout-topology
Open

Refactor rollout topology binding#1933
YanhuiDua wants to merge 2 commits into
InternLM:mainfrom
YanhuiDua:refine-rollout-topology

Conversation

@YanhuiDua

@YanhuiDua YanhuiDua commented Jun 24, 2026

Copy link
Copy Markdown
Collaborator

Summary

这个 PR 重构了 rollout worker 启动时的拓扑表达方式,新增 RolloutTopology,并用 ServerLaunchSpec 作为 worker 启动 server 的输入。

核心目标是把 rollout 启动拓扑相关逻辑从 controller / worker 的散装字段里收敛出来:

  • RolloutTopology 表达 rollout 启动拓扑
  • _Engine 表达一个 logical inference engine
  • _ServerProcess 表达一个实际会启动 server 的 worker process
  • ServerLaunchSpec 表达传给 worker 的启动参数

为什么需要 RolloutTopologyServerLaunchSpec

  • EngineLaunchSpec不能表达rollout worker启动需要的全部内容,如dist_init_addr, request entrypoint, engine_mesh_array等
  • 从 RolloutTopology 映射到 ServerLaunchSpec 很重要,因为它把“全局拓扑规则”和“单 worker 启动参数”分开了
  • RolloutController 只负责构建 topology、遍历 server_launch_specs() 并绑定到对应 worker,不再需要理解哪个 rank 是 server rank、哪个rank 提供 dist_init_addr、一个 engine 内有几个 server process 等后端规则。这样后端差异集中在 topology 构建阶段,worker 只消费自己的 ServerLaunchSpec,controller 的启动流程也更稳定、更容易维护。

另外,额外修改进行以下修改:

  • RolloutHealthManager中将 start 等函数命名为 start_background_checks,明确为后台检查
  • 新增数据类型 RolloutWorkerEndpointMetadata 来统一rank, server_url, session_url 及 lifecycle_state,用于替换rank_to_serverl_url, rank_to_session_url及其status

本PR暂未修改:

  • 未修改权重更新侧的输入类型,还是保持dict的输入,并且在RolloutWorkerMetadata中增加to_legacy方法,适配原来的权重更新相关接口的输入,等权重更新重构的PR合入后,再进行修改

关键结构

RolloutTopology: rollout worker 启动时使用的整体拓扑。

它包含:

  • 所有 logical inference engine
  • training engine mesh
  • server launch spec 投影逻辑
  • request entrypoint 查询
  • lifecycle group 查询

_Engine (内部结构):表示一个 logical inference engine。

它包含:

  • engine_ranks:这个 engine 覆盖哪些 worker rank
  • dist_init_addr:这个 engine 内部通信使用的 rendezvous 地址
  • server_processes:这个 engine 里哪些 worker 会启动 rollout server

_ServerProcess (内部结构):表示一个 worker 上实际要启动的 server process。

它包含:

  • worker_rank
  • placement_group_bundle_idxs
  • accepts_rollout_requests
  • node_rank
  • nnodes

例如:

  • LMDeploy TP 场景通常一个 engine 只需要一个 server process
  • LMDeploy EP 场景可能一个 engine 有多个 server process
  • SGLang 跨节点场景中,每个 node 会有一个 server process,但只有 node rank 0 接收请求
  • 不同推理引擎的拓扑的差异都会被隐藏在RolloutTopology

ServerLaunchSpec:从 RolloutTopology 投影出来、真正传给 worker 启动 infer server的变量

-worker 不再需要理解完整 topology,只需要保存自己的 ServerLaunchSpec

init_worker流程

  1. RolloutController 先通过 placement group 创建所有 RolloutWorker actors。

  2. 每个 worker 调用 init_dist_port(),申请本地端口,并返回 (rank, dist_init_addr)。

  3. controller 汇总得到 rank_to_dist_init_addr。

  4. backend worker class 根据:

    • RolloutConfig
    • rank_bundle_idx_list
    • rank_to_dist_init_addr

    构造 RolloutTopology。

  5. RolloutTopology 根据内部 _Engine / _ServerProcess 生成 ServerLaunchSpec 列表。

  6. controller 对每个 ServerLaunchSpec 找到对应 worker,调用:

worker.bind_server_launch_spec.remote(launch_spec)

  1. 绑定完成后,再调用:worker.init.remote()
  2. worker 使用已经绑定的 ServerLaunchSpec 启动 rollout server 和 session server。
  3. controller 收集 RolloutWorkerInitResult,注册启动结果。

@YanhuiDua YanhuiDua force-pushed the refine-rollout-topology branch from 53c3a19 to 71a0a3a Compare June 24, 2026 09:06
@YanhuiDua

Copy link
Copy Markdown
Collaborator Author

@claude review

Comment thread xtuner/v1/rl/rollout/controller.py
Comment on lines +42 to +50
@dataclass(frozen=True)
class RolloutTopology:
"""Immutable rollout engine layout after dist-init addresses are resolved.

Actor handles, server URLs, session URLs, and lifecycle state belong to RolloutWorkerRegistry.
"""

engines: tuple[_Engine, ...]
training_engine_mesh: tuple[tuple[int, ...], ...]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: [Warning] The old validate_engine_launch_specs() (~90 lines) enforced a comprehensive set of invariants: no empty engines, no duplicate ranks within/across engines, server ranks ⊆ engine ranks, no duplicate bundle assignments, nnodes >= 1, node_rank < nnodes, at least one request entrypoint per engine, etc.

None of these checks exist on the new RolloutTopology. Invalid topologies will produce opaque downstream failures — e.g., engine.engine_ranks.index(server.worker_rank) in server_launch_specs() raises a bare ValueError if a server rank is not in engine_ranks.

Consider adding a __post_init__ with at least the key structural invariants (no duplicate ranks across engines, server ranks ⊆ engine ranks, at least one server per engine). Even a subset of the old checks would catch misconfiguration early with clear error messages instead of cryptic runtime failures.

Comment thread xtuner/v1/rl/rollout/rollout_topology.py Outdated
Comment thread xtuner/v1/rl/rollout/worker_registry.py Outdated
Comment thread xtuner/v1/rl/rollout/worker_registry.py
@claude

claude Bot commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Claude: ## Review Summary

Clean refactor that converges scattered topology fields into RolloutTopology / ServerLaunchSpec / RolloutWorkerInitResult. The new data model is well-structured and the init flow (bind → init → register) is clear.

ProduceBatchResult impact: not affected
RoutedExperts impact: not affected

Issues

Critical

  • [controller.py:260-266] Removed session_url validation for request-entrypoint workers. The old code raised RuntimeError when a request-entrypoint worker returned session_url=None. This safety check should be restored.

Warning

  • [rollout_topology.py:42-50] validate_engine_launch_specs() (~90 lines of structural validation) was removed with no replacement on RolloutTopology. Invalid topologies will produce opaque downstream failures. A __post_init__ with key invariant checks would catch configuration errors early.
  • [rollout_topology.py:82-84] cached_property on frozen=True dataclass relies on CPython __dict__-bypass behavior. Will break if slots=True is ever added. Consider __post_init__ + object.__setattr__ instead.
  • [worker_registry.py:131] register_started_server does not validate that rank exists in the topology, which can cause an unguarded KeyError in _build_worker_groups()lifecycle_group_for_server_rank().

Nit

  • [worker_registry.py:76] to_legacy() return type is bare dictdict[str, Any] would be better.
  • [worker.py] Pre-existing typo self.serverl_url (extra l) carried forward — appears to be dead code.

Verdict

REQUEST_CHANGES — the removed session_url validation is a silent safety regression that should be restored.

@YanhuiDua

Copy link
Copy Markdown
Collaborator Author

@claude review

Comment on lines +57 to +72
lifecycle_group = tuple(server.worker_rank for server in engine.server_processes)
for server in engine.server_processes:
if server.worker_rank in server_process_by_rank:
raise ValueError(f"Duplicate rollout server process worker_rank={server.worker_rank}.")
server_process_by_rank[server.worker_rank] = server
lifecycle_group_by_rank[server.worker_rank] = lifecycle_group

object.__setattr__(self, "_server_process_by_rank", server_process_by_rank)
object.__setattr__(self, "_lifecycle_group_by_rank", lifecycle_group_by_rank)

@staticmethod
def engine(
*,
engine_ranks: tuple[int, ...],
dist_init_addr: str,
server_processes: tuple[_ServerProcess, ...],

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: The __post_init__ validates duplicate server process ranks, which is good — but the old validate_engine_launch_specs also checked these invariants that are now unguarded:

  • Server process worker_rank must be part of its engine's engine_ranks
  • Each engine must have at least one request entrypoint (accepts_rollout_requests=True)
  • No duplicate engine ranks across engines
  • Valid node_rank bounds (0 <= node_rank < nnodes)

Consider adding at least the first two checks — they catch backend construction bugs early with clear error messages, rather than surfacing as confusing downstream failures (e.g., is_request_entrypoint_rank returning False for all ranks in an engine).

Suggested change
lifecycle_group = tuple(server.worker_rank for server in engine.server_processes)
for server in engine.server_processes:
if server.worker_rank in server_process_by_rank:
raise ValueError(f"Duplicate rollout server process worker_rank={server.worker_rank}.")
server_process_by_rank[server.worker_rank] = server
lifecycle_group_by_rank[server.worker_rank] = lifecycle_group
object.__setattr__(self, "_server_process_by_rank", server_process_by_rank)
object.__setattr__(self, "_lifecycle_group_by_rank", lifecycle_group_by_rank)
@staticmethod
def engine(
*,
engine_ranks: tuple[int, ...],
dist_init_addr: str,
server_processes: tuple[_ServerProcess, ...],
def __post_init__(self) -> None:
server_process_by_rank: dict[int, _ServerProcess] = {}
lifecycle_group_by_rank: dict[int, tuple[int, ...]] = {}
for engine in self.engines:
lifecycle_group = tuple(server.worker_rank for server in engine.server_processes)
engine_rank_set = set(engine.engine_ranks)
has_entrypoint = False
for server in engine.server_processes:
if server.worker_rank in server_process_by_rank:
raise ValueError(f"Duplicate rollout server process worker_rank={server.worker_rank}.")
if server.worker_rank not in engine_rank_set:
raise ValueError(
f"Server worker_rank={server.worker_rank} is not part of "
f"engine_ranks={engine.engine_ranks}."
)
if server.accepts_rollout_requests:
has_entrypoint = True
server_process_by_rank[server.worker_rank] = server
lifecycle_group_by_rank[server.worker_rank] = lifecycle_group
if not has_entrypoint:
raise ValueError(
f"Engine with ranks={engine.engine_ranks} must have at least one "
f"request entrypoint (accepts_rollout_requests=True)."
)
object.__setattr__(self, "_server_process_by_rank", server_process_by_rank)
object.__setattr__(self, "_lifecycle_group_by_rank", lifecycle_group_by_rank)

with self._lock:
return tuple(sorted(self._rollout_topology.lifecycle_groups()))

def _build_worker_groups(self) -> dict[tuple[int, ...], WorkerGroup]:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: The _build_worker_groups method calls lifecycle_group_for_server_rank(worker.rank) which raises KeyError if the rank isn't in the topology. While registered ranks should always match the topology's server_launch_specs in normal operation, a defensive guard would make debugging easier if registrations and topology drift:

Suggested change
def _build_worker_groups(self) -> dict[tuple[int, ...], WorkerGroup]:
def _build_worker_groups(self) -> dict[tuple[int, ...], WorkerGroup]:
grouped_ranks: set[tuple[int, ...]] = set()
for worker in self._workers.values():
try:
grouped_ranks.add(self._rollout_topology.lifecycle_group_for_server_rank(worker.rank))
except KeyError:
grouped_ranks.add((worker.rank,))

Alternatively, leave as-is if you prefer a hard failure — the current KeyError message from lifecycle_group_for_server_rank is reasonably descriptive.

self.config,
rank_bundle_idx_list,
rank_to_dist_init_addr,
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Nit — rank_to_worker is built by iterating zip(workers, dist_init_results, strict=True). This correctly pairs each worker actor with the (rank, dist_init_addr) tuple it returned. Compared to the old code that relied on index-aligned ordering between workers and rank_bundle_idx_list, the new approach is safer since it uses the rank returned by the worker itself.

However, the variable name _dist_init_addr in the comprehension could be clearer — it looks like a bug at first glance (why discard?). A brief comment would help:

Suggested change
)
rank_to_worker = {
rank: worker
for worker, (rank, _dist_init_addr) in zip(workers, dist_init_results, strict=True) # rank from worker
}

@claude

claude Bot commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Claude: ## Review Summary

The follow-up commit (patch 2/2) addresses all critical and major findings from the prior review:

  • ✅ Session URL validation for request-entrypoint workers restored
  • cached_property on frozen dataclass replaced with __post_init__ + object.__setattr__
  • to_legacy() return type annotated as dict[str, Any]

The refactor is well-structured. RolloutTopology cleanly separates the global topology (engine layout, lifecycle groups, request entrypoints) from worker-local state (ServerLaunchSpec). The bind → init → register controller flow is clearer than the old single-call approach.

ProduceBatchResult impact: not affected
RoutedExperts impact: not affected

Remaining Issues

Warning

  • [rollout_topology.py:57-72] __post_init__ validates duplicate server process ranks but drops most of the old validate_engine_launch_specs invariants (server rank ∈ engine_ranks, each engine has ≥1 request entrypoint, valid node_rank bounds). Suggested adding the two highest-value checks inline.

Nit

  • [worker_registry.py:180] _build_worker_groups has an unguarded lifecycle_group_for_server_rank call — a programming error in registration could surface as a confusing KeyError.
  • [controller.py:239] _dist_init_addr discard in the comprehension is non-obvious without context.

Verdict

APPROVE — the core design is sound and all prior critical findings are resolved. The remaining items are defensive suggestions, not blockers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant