工作流引擎指南

目录


概述

LibreFang 工作流引擎支持多步骤 Agent 流水线——通过编排一系列任务,将每个步骤的工作路由到特定 Agent,并将上一步的输出作为下一步的输入。工作流让你无需编写任何 Rust 代码,即可将多个简单的、单一用途的 Agent 组合成复杂的行为。

适用场景:

  • 将多个 Agent 串联为处理流水线(例如:先调研、再撰写、最后审核)。
  • 将工作扇出到多个 Agent 并行处理,然后收集结果。
  • 根据前一步骤的输出进行条件分支。
  • 在循环中反复执行某个步骤,直到满足质量标准。
  • 构建可复现、可审计的多 Agent 流程,支持通过 API 或 CLI 触发。

实现代码位于 librefang-kernel/src/workflow.rs。工作流引擎通过闭包与内核解耦——它不会直接持有或引用内核,因此可以独立进行测试。


核心类型

Rust 类型说明
WorkflowId(Uuid)工作流定义的唯一标识符。
WorkflowRunId(Uuid)工作流运行实例的唯一标识符。
Workflow命名的工作流定义,包含一组 WorkflowStep 条目。
WorkflowStep单个步骤:Agent 引用、提示词模板、执行模式、超时、错误处理。
WorkflowRun运行实例:跟踪状态、步骤结果、最终输出、时间戳。
WorkflowRunState枚举:PendingRunningCompletedFailed
StepResult单个步骤的结果:Agent 信息、输出文本、token 计数、耗时。
WorkflowEngine引擎本身:使用 Arc<RwLock<HashMap>> 存储定义和运行实例。

工作流定义

工作流通过 REST API 以 JSON 格式注册。顶层结构如下:

{
  "name": "my-pipeline",
  "description": "Describe what the workflow does",
  "steps": [ ... ]
}

对应的 Rust 结构体为:

pub struct Workflow {
    pub id: WorkflowId,            // Auto-assigned on creation
    pub name: String,              // Human-readable name
    pub description: String,       // What this workflow does
    pub steps: Vec<WorkflowStep>,  // Ordered list of steps
    pub created_at: DateTime<Utc>, // Auto-assigned on creation
}

步骤配置

steps 数组中的每个步骤包含以下字段:

JSON 字段Rust 字段类型默认值说明
namenameString"step"步骤名称,用于日志和显示。
agent_nameagentStepAgent::ByName--按名称引用 Agent(匹配第一个)。与 agent_id 互斥。
agent_idagentStepAgent::ById--按 UUID 引用 Agent。与 agent_name 互斥。
promptprompt_templateString"{{input}}"带变量占位符的提示词模板。
modemodeStepMode"sequential"执行模式(见下文)。
timeout_secstimeout_secsu64120步骤超时时间上限(秒)。
error_modeerror_modeErrorMode"fail"错误处理方式(见下文)。
max_retries(在 ErrorMode::Retry 内部)u323error_mode"retry" 时的重试次数。
output_varoutput_varOption<String>null如果设置,将此步骤的输出存储为命名变量,供后续步骤引用。
condition(在 StepMode::Conditional 内部)String""在前一步输出中匹配的子字符串(不区分大小写)。
max_iterations(在 StepMode::Loop 内部)u325循环强制终止前的最大迭代次数。
until(在 StepMode::Loop 内部)String""在输出中匹配以终止循环的子字符串(不区分大小写)。

Agent 解析

每个步骤必须指定 agent_nameagent_id 其中之一。StepAgent 枚举定义如下:

pub enum StepAgent {
    ById { id: String },    // UUID of an existing agent
    ByName { name: String }, // Name match (first agent with this name)
}

如果在执行时无法解析到 Agent,工作流将失败并报错 "Agent not found for step '<name>'"


步骤模式

mode 字段控制步骤相对于工作流中其他步骤的执行方式。

Sequential(顺序执行,默认)

{ "mode": "sequential" }

该步骤在前一步骤完成后运行。前一步骤的输出将作为当前步骤的 {{input}}。省略 mode 时默认为此模式。

Fan-Out(扇出)

{ "mode": "fan_out" }

扇出步骤并行运行。引擎会收集所有连续的 fan_out 步骤,并使用 futures::future::join_all 同时启动它们。所有扇出步骤接收相同的 {{input}}——即扇出组之前最后一个运行步骤的输出。

如果任何扇出步骤失败或超时,整个工作流立即失败。

Collect(收集)

{ "mode": "collect" }

collect 步骤收集前一个扇出组的所有输出。它不执行 Agent——这是一个纯数据步骤,使用分隔符 "\n\n---\n\n" 连接所有累积的输出,并将结果设置为后续步骤的 {{input}}

典型的 fan-out/collect 模式:

step 1: fan_out  -->  runs in parallel
step 2: fan_out  -->  runs in parallel
step 3: collect  -->  joins outputs from steps 1 and 2
step 4: sequential --> receives joined output as {{input}}

Conditional(条件执行)

{ "mode": "conditional", "condition": "ERROR" }

仅当前一步骤的输出包含 condition 子字符串时(通过 to_lowercase().contains() 进行不区分大小写的比较),该步骤才会执行。如果条件不满足,该步骤将被完全跳过,{{input}} 不会被修改。

当条件满足时,该步骤按顺序执行模式运行。

Loop(循环)

{ "mode": "loop", "max_iterations": 5, "until": "APPROVED" }

该步骤最多重复 max_iterations 次。每次迭代后,引擎检查输出是否包含 until 子字符串(不区分大小写)。如果匹配到,循环提前终止。

每次迭代将其输出回传作为下一次迭代的 {{input}}。步骤结果以 "refine (iter 1)""refine (iter 2)" 等格式记录名称。

如果 until 条件始终未满足,循环将恰好运行 max_iterations 次,然后以最后一次迭代的输出继续执行下一步骤。


变量替换

提示词模板支持两种变量引用方式:

{{input}} -- 前一步骤的输出

始终可用。包含前一步骤的输出(对于第一个步骤,则为工作流的初始输入)。

{{variable_name}} -- 命名变量

当某个步骤设置了 "output_var": "my_var" 时,其输出会以 my_var 为键存储在变量映射中。后续任何步骤都可以在提示词模板中使用 {{my_var}} 来引用它。

变量展开逻辑(来自 WorkflowEngine::expand_variables):

fn expand_variables(template: &str, input: &str, vars: &HashMap<String, String>) -> String {
    let mut result = template.replace("{{input}}", input);
    for (key, value) in vars {
        result = result.replace(&format!("{{{{{key}}}}}"), value);
    }
    result
}

变量在整个工作流运行期间持久存在。后续步骤可以通过使用相同的 output_var 名称来覆盖变量。

示例:一个三步骤的工作流,其中步骤 3 同时引用步骤 1 和步骤 2 的输出:

{
  "steps": [
    { "name": "research", "output_var": "research_output", "prompt": "Research: {{input}}" },
    { "name": "outline",  "output_var": "outline_output",  "prompt": "Outline based on: {{input}}" },
    { "name": "combine",  "prompt": "Write article.\nResearch: {{research_output}}\nOutline: {{outline_output}}" }
  ]
}

错误处理

每个步骤都有一个 error_mode 字段,用于控制步骤失败或超时时的行为。

Fail(失败终止,默认)

{ "error_mode": "fail" }

工作流立即中止。运行状态设置为 Failed,记录错误信息并设置 completed_at。错误信息格式为 "Step '<name>' failed: <error>""Step '<name>' timed out after <N>s"

Skip(跳过)

{ "error_mode": "skip" }

发生错误或超时时静默跳过该步骤。会记录一条警告日志,但工作流继续执行。下一步骤的 {{input}} 保持不变(使用被跳过步骤之前的值)。被跳过的步骤不会记录 StepResult

Retry(重试)

{ "error_mode": "retry", "max_retries": 3 }

该步骤在初始尝试之后最多重试 max_retries 次(因此 max_retries: 3 意味着最多 4 次总尝试:1 次初始 + 3 次重试)。每次尝试独立享有完整的 timeout_secs 超时预算。如果所有尝试都失败,工作流中止并报错 "Step '<name>' failed after <N> retries: <last_error>"

超时行为

每个步骤的执行都被 tokio::time::timeout(Duration::from_secs(step.timeout_secs), ...) 包裹。默认超时为 120 秒。超时被视为错误,按照步骤的 error_mode 进行处理。

对于扇出步骤,每个并行步骤各自拥有独立的超时。


示例

示例 1:代码审查流水线

一个顺序执行的流水线,先分析代码,再审查问题,最后生成摘要报告。

{
  "name": "code-review-pipeline",
  "description": "Analyze code, review for issues, and produce a summary report",
  "steps": [
    {
      "name": "analyze",
      "agent_name": "code-reviewer",
      "prompt": "Analyze the following code for bugs, style issues, and security vulnerabilities:\n\n{{input}}",
      "mode": "sequential",
      "timeout_secs": 180,
      "error_mode": "fail",
      "output_var": "analysis"
    },
    {
      "name": "security-check",
      "agent_name": "security-auditor",
      "prompt": "Review this code analysis for security issues. Flag anything critical:\n\n{{analysis}}",
      "mode": "sequential",
      "timeout_secs": 120,
      "error_mode": "retry",
      "max_retries": 2,
      "output_var": "security_review"
    },
    {
      "name": "summary",
      "agent_name": "writer",
      "prompt": "Write a concise code review summary.\n\nCode Analysis:\n{{analysis}}\n\nSecurity Review:\n{{security_review}}",
      "mode": "sequential",
      "timeout_secs": 60,
      "error_mode": "fail"
    }
  ]
}

示例 2:调研并撰写文章

调研一个主题,编写大纲,然后撰写——并附带一个条件性的事实核查步骤。

{
  "name": "research-and-write",
  "description": "Research a topic, outline, write, and optionally fact-check",
  "steps": [
    {
      "name": "research",
      "agent_name": "researcher",
      "prompt": "Research the following topic thoroughly. Cite sources where possible:\n\n{{input}}",
      "mode": "sequential",
      "timeout_secs": 300,
      "error_mode": "retry",
      "max_retries": 1,
      "output_var": "research"
    },
    {
      "name": "outline",
      "agent_name": "planner",
      "prompt": "Create a detailed article outline based on this research:\n\n{{research}}",
      "mode": "sequential",
      "timeout_secs": 60,
      "output_var": "outline"
    },
    {
      "name": "write",
      "agent_name": "writer",
      "prompt": "Write a complete article.\n\nOutline:\n{{outline}}\n\nResearch:\n{{research}}",
      "mode": "sequential",
      "timeout_secs": 300,
      "output_var": "article"
    },
    {
      "name": "fact-check",
      "agent_name": "analyst",
      "prompt": "Fact-check this article and note any claims that need verification:\n\n{{article}}",
      "mode": "conditional",
      "condition": "claim",
      "timeout_secs": 120,
      "error_mode": "skip"
    }
  ]
}

事实核查步骤仅在文章包含 "claim" 一词(不区分大小写)时运行。如果事实核查 Agent 失败,工作流会继续,保持文章原样。

示例 3:多 Agent 头脑风暴(Fan-Out + Collect)

三个 Agent 并行进行头脑风暴,然后由第四个 Agent 综合他们的想法。

{
  "name": "brainstorm",
  "description": "Parallel brainstorm with 3 agents, then synthesize",
  "steps": [
    {
      "name": "creative-ideas",
      "agent_name": "writer",
      "prompt": "Brainstorm 5 creative ideas for: {{input}}",
      "mode": "fan_out",
      "timeout_secs": 60,
      "output_var": "creative"
    },
    {
      "name": "technical-ideas",
      "agent_name": "architect",
      "prompt": "Brainstorm 5 technically feasible ideas for: {{input}}",
      "mode": "fan_out",
      "timeout_secs": 60,
      "output_var": "technical"
    },
    {
      "name": "business-ideas",
      "agent_name": "analyst",
      "prompt": "Brainstorm 5 ideas with strong business potential for: {{input}}",
      "mode": "fan_out",
      "timeout_secs": 60,
      "output_var": "business"
    },
    {
      "name": "gather",
      "agent_name": "planner",
      "prompt": "unused",
      "mode": "collect"
    },
    {
      "name": "synthesize",
      "agent_name": "orchestrator",
      "prompt": "You received brainstorm results from three perspectives. Synthesize them into the top 5 actionable ideas, ranked by impact:\n\n{{input}}",
      "mode": "sequential",
      "timeout_secs": 120
    }
  ]
}

三个扇出步骤并行运行。collect 步骤使用 --- 分隔符连接它们的输出。synthesize 步骤接收合并后的输出。

示例 4:迭代优化(Loop)

一个 Agent 反复优化草稿,直到达到质量标准。

{
  "name": "iterative-refinement",
  "description": "Refine a document until approved or max iterations reached",
  "steps": [
    {
      "name": "first-draft",
      "agent_name": "writer",
      "prompt": "Write a first draft about: {{input}}",
      "mode": "sequential",
      "timeout_secs": 120,
      "output_var": "draft"
    },
    {
      "name": "review-and-refine",
      "agent_name": "code-reviewer",
      "prompt": "Review this draft. If it meets quality standards, respond with APPROVED at the start. Otherwise, provide specific feedback and a revised version:\n\n{{input}}",
      "mode": "loop",
      "max_iterations": 4,
      "until": "APPROVED",
      "timeout_secs": 180,
      "error_mode": "retry",
      "max_retries": 1
    }
  ]
}

循环最多运行审核者 4 次。每次迭代接收前一次迭代的输出作为 {{input}}。一旦审核者在回复中包含 "APPROVED",循环提前终止。


触发器引擎

触发器引擎(librefang-kernel/src/triggers.rs)提供事件驱动的自动化能力。触发器监听内核的事件总线,当匹配的事件到达时,自动向 Agent 发送消息。

核心类型

Rust 类型说明
TriggerId(Uuid)触发器的唯一标识符。
Trigger已注册的触发器:Agent、匹配模式、提示词模板、触发次数、限制。
TriggerPattern定义要匹配哪些事件的枚举。
TriggerEngine触发器引擎:基于 DashMap 的并发存储,带有 Agent 到触发器的索引。

触发器定义

pub struct Trigger {
    pub id: TriggerId,
    pub agent_id: AgentId,         // Which agent receives the message
    pub pattern: TriggerPattern,   // What events to match
    pub prompt_template: String,   // Template with {{event}} placeholder
    pub enabled: bool,             // Can be toggled on/off
    pub created_at: DateTime<Utc>,
    pub fire_count: u64,           // How many times it has fired
    pub max_fires: u64,            // 0 = unlimited
}

事件模式

TriggerPattern 枚举支持 9 种匹配模式:

模式JSON说明
All"all"匹配所有事件(通配符)。
Lifecycle"lifecycle"匹配任何生命周期事件(spawned、started、terminated 等)。
AgentSpawned{"agent_spawned": {"name_pattern": "coder"}}当名称包含 name_pattern 的 Agent 被创建时匹配。使用 "*" 匹配任意 Agent。
AgentTerminated"agent_terminated"当任何 Agent 终止或崩溃时匹配。
System"system"匹配任何系统事件(健康检查、配额警告等)。
SystemKeyword{"system_keyword": {"keyword": "quota"}}匹配 Debug 表示中包含关键词的系统事件(不区分大小写)。
MemoryUpdate"memory_update"匹配任何内存变更事件。
MemoryKeyPattern{"memory_key_pattern": {"key_pattern": "config"}}匹配键包含 key_pattern 的内存更新。使用 "*" 匹配任意键。
ContentMatch{"content_match": {"substring": "error"}}匹配人类可读描述中包含该子字符串的任何事件(不区分大小写)。

模式匹配详情

matches_pattern 函数决定每种模式的匹配逻辑:

  • All:始终返回 true
  • Lifecycle:检查 EventPayload::Lifecycle(_)
  • AgentSpawned:检查 LifecycleEvent::Spawned,条件为 name.contains(name_pattern)name_pattern == "*"
  • AgentTerminated:检查 LifecycleEvent::TerminatedLifecycleEvent::Crashed
  • System:检查 EventPayload::System(_)
  • SystemKeyword:通过 Debug trait 格式化系统事件,转为小写后检查 contains(keyword)
  • MemoryUpdate:检查 EventPayload::MemoryUpdate(_)
  • MemoryKeyPattern:检查 delta.key.contains(key_pattern)key_pattern == "*"
  • ContentMatch:使用 describe_event() 函数生成人类可读的字符串,然后检查 contains(substring)(不区分大小写)。

提示词模板与 {{event}}

当触发器触发时,引擎会将 prompt_template 中的 {{event}} 替换为人类可读的事件描述。describe_event() 函数生成的字符串示例:

  • "Agent 'coder' (id: <uuid>) was spawned"
  • "Agent <uuid> terminated: shutdown requested"
  • "Agent <uuid> crashed: out of memory"
  • "Kernel started"
  • "Quota warning: agent <uuid>, tokens at 85.0%"
  • "Health check failed: agent <uuid>, unresponsive for 30s"
  • "Memory Created on key 'config' for agent <uuid>"
  • "Tool 'web_search' succeeded (450ms): ..."

最大触发次数与自动禁用

max_fires 设置为大于 0 的值时,触发器在 fire_count >= max_fires 后会自动禁用(设置 enabled = false)。将 max_fires 设为 0 表示触发器无限次触发。

触发器使用场景

监控 Agent 健康状态:

{
  "agent_id": "<ops-agent-uuid>",
  "pattern": {"content_match": {"substring": "health check failed"}},
  "prompt_template": "ALERT: {{event}}. Investigate and report the status of all agents.",
  "max_fires": 0
}

响应新 Agent 创建:

{
  "agent_id": "<orchestrator-uuid>",
  "pattern": {"agent_spawned": {"name_pattern": "*"}},
  "prompt_template": "A new agent was just created: {{event}}. Update the fleet roster.",
  "max_fires": 0
}

一次性配额告警:

{
  "agent_id": "<admin-agent-uuid>",
  "pattern": {"system_keyword": {"keyword": "quota"}},
  "prompt_template": "Quota event detected: {{event}}. Recommend corrective action.",
  "max_fires": 1
}

Cron 任务

Cron 任务让你使用标准 cron 表达式为 Agent 安排定时消息。每条 cron 条目在 Agent manifest(.toml)中定义,独立于用户驱动的会话触发。

[[cron]]
name = "morning-brief"
schedule = "0 8 * * *"
message = "Summarize any new alerts from the last 24 hours."
字段类型说明
namestring人类可读名称,用于日志和审计事件
schedulestring标准 5 字段 cron 表达式(分钟、小时、日、月、星期)
messagestring每次触发时发送给 Agent 的提示词
session_modestring 或 null"persistent"(默认)或 "new",控制连续触发是否复用同一会话

关于会话的注意事项:所有使用 session_mode = "persistent"(默认)的 cron 触发共享一个以 (agent, "cron") 为键的会话。在 Agent manifest 中设置 session_mode = "new" 不会为 cron 的每次触发创建独立会话——cron 调度器会合成 SenderContext { channel: "cron" },在查询 session_mode 之前已经走了 channel 分支。

[SILENT] 标记

在 cron 任务的 message 前加上 [SILENT] 可抑制 Agent 响应写入会话历史。任务照常执行——工具调用运行、计量记录、审计日志写入——但 assistant 的回复不会写回会话。

这适用于周期性维护任务(备份检查、清理例程、健康探针),否则会堆积对人类用户或其他 Agent 造成干扰的对话历史。

用法

[[cron]]
name = "daily-cleanup"
schedule = "0 3 * * *"
message = "[SILENT] Run daily cleanup: remove temp files older than 7 days"

[[cron]]
name = "hourly-health-check"
schedule = "0 * * * *"
message = "[SILENT] Check system health and log any anomalies."

行为细节

  • 执行:Agent 轮次正常运行至完成,所有工具调用均执行。
  • 计量与成本:Token 用量和费用照常记录到预算账本。
  • 审计日志:无论是否带有 [SILENT] 标记,均会输出包含 Agent 响应摘要的 CronFire 审计事件。
  • 会话历史:仅抑制 assistant 响应。出站 cron 消息本身从不存入会话(cron 消息始终是临时的),因此 [SILENT] 不影响用户可见的提示词侧。

session_mode = "new" 的区别

行为[SILENT]session_mode = "new"
抑制 assistant 响应
创建每次触发的独立会话否(复用 (agent,"cron") 会话)对 cron 无效(见上方注意事项)
防止历史污染
正常工具执行

需要 Agent 行动但不留任何对话历史痕迹时,使用 [SILENT]


API 端点

工作流端点

POST /api/workflows -- 创建工作流

注册新的工作流定义。

请求体:

{
  "name": "my-pipeline",
  "description": "Description of the workflow",
  "steps": [
    {
      "name": "step-1",
      "agent_name": "researcher",
      "prompt": "Research: {{input}}",
      "mode": "sequential",
      "timeout_secs": 120,
      "error_mode": "fail",
      "output_var": "research"
    }
  ]
}

响应(201 Created):

{ "workflow_id": "<uuid>" }

GET /api/workflows -- 列出所有工作流

返回已注册工作流的摘要数组。

响应(200 OK):

[
  {
    "id": "<uuid>",
    "name": "my-pipeline",
    "description": "Description of the workflow",
    "steps": 3,
    "created_at": "2026-01-15T10:30:00Z"
  }
]

POST /api/workflows/:id/run -- 执行工作流

启动同步工作流执行。调用将阻塞直到工作流完成或失败。

请求体:

{ "input": "The initial input text for the first step" }

响应(200 OK):

{
  "run_id": "<uuid>",
  "output": "Final output from the last step",
  "status": "completed"
}

响应(500 Internal Server Error):

{ "error": "Workflow execution failed" }

GET /api/workflows/:id/runs -- 列出工作流运行记录

返回所有工作流运行记录(当前实现不按工作流 ID 过滤)。

响应(200 OK):

[
  {
    "id": "<uuid>",
    "workflow_name": "my-pipeline",
    "state": "completed",
    "steps_completed": 3,
    "started_at": "2026-01-15T10:30:00Z",
    "completed_at": "2026-01-15T10:32:15Z"
  }
]

触发器端点

POST /api/triggers -- 创建触发器

为 Agent 注册新的事件触发器。

请求体:

{
  "agent_id": "<agent-uuid>",
  "pattern": "lifecycle",
  "prompt_template": "A lifecycle event occurred: {{event}}",
  "max_fires": 0
}

响应(201 Created):

{
  "trigger_id": "<uuid>",
  "agent_id": "<agent-uuid>"
}

GET /api/triggers -- 列出所有触发器

可选按 Agent 过滤:GET /api/triggers?agent_id=<uuid>

响应(200 OK):

[
  {
    "id": "<uuid>",
    "agent_id": "<agent-uuid>",
    "pattern": "lifecycle",
    "prompt_template": "Event: {{event}}",
    "enabled": true,
    "fire_count": 5,
    "max_fires": 0,
    "created_at": "2026-01-15T10:00:00Z"
  }
]

PUT /api/triggers/:id -- 启用/禁用触发器

切换触发器的启用状态。

请求体:

{ "enabled": false }

响应(200 OK):

{ "status": "updated", "trigger_id": "<uuid>", "enabled": false }

DELETE /api/triggers/:id -- 删除触发器

响应(200 OK):

{ "status": "removed", "trigger_id": "<uuid>" }

响应(404 Not Found):

{ "error": "Trigger not found" }

CLI 命令

所有工作流和触发器 CLI 命令都需要正在运行的 LibreFang 守护进程。

工作流命令

librefang workflow list

列出所有已注册的工作流及其 ID、名称、步骤数量和创建日期。

librefang workflow create <file>

从 JSON 文件创建工作流。文件内容应与 POST /api/workflows 请求体的 JSON 结构一致。

librefang workflow run <workflow_id> <input>

通过 UUID 执行工作流,传入指定的输入文本。阻塞直到完成并打印输出。

触发器命令

librefang trigger list [--agent-id <uuid>]

列出所有已注册的触发器。可选按 Agent ID 过滤。

librefang trigger create <agent_id> <pattern_json> [--prompt <template>] [--max-fires <n>]

为指定 Agent 创建触发器。pattern_json 参数是描述匹配模式的 JSON 字符串。

默认值:

  • --prompt: "Event: {{event}}"
  • --max-fires: 0(无限次)

示例:

# Watch all lifecycle events
librefang trigger create <agent-id> '"lifecycle"' --prompt "Lifecycle: {{event}}"

# Watch for a specific agent spawn
librefang trigger create <agent-id> '{"agent_spawned":{"name_pattern":"coder"}}' --max-fires 1

# Watch for content containing "error"
librefang trigger create <agent-id> '{"content_match":{"substring":"error"}}'
librefang trigger delete <trigger_id>

通过 UUID 删除触发器。


执行限制

运行记录淘汰上限

工作流引擎最多保留 200 条运行记录(WorkflowEngine::MAX_RETAINED_RUNS)。创建新运行后超出此限制时,最旧的已完成已失败的运行记录将被淘汰(按 started_at 排序)。处于 PendingRunning 状态的运行永远不会被淘汰。

步骤超时

每个步骤都有可配置的 timeout_secs(默认:120 秒)。超时通过 tokio::time::timeout 强制执行,按每次尝试独立计算——重试模式下每次尝试都有完整的超时预算。扇出步骤各自拥有独立的超时。

循环迭代上限

循环步骤受 max_iterations 限制(API 中默认为 5)。即使 until 条件始终未满足,引擎也绝不会执行超过此数量的迭代。

每小时 Token 配额

AgentScheduler(位于 librefang-kernel/src/scheduler.rs)通过 UsageTracker 以 1 小时滚动窗口跟踪每个 Agent 的 token 用量。如果 Agent 超出其 ResourceQuota.max_llm_tokens_per_hour,调度器返回 LibreFangError::QuotaExceeded。窗口在 3600 秒后自动重置。此配额适用于所有 Agent 交互,包括由工作流调用的交互。


工作流数据流图

                    input
                      |
                      v
              +---------------+
              |   Step 1      |  mode: sequential
              |   agent: A    |
              +-------+-------+
                      | output -> {{input}} for step 2
                      |          -> variables["var1"] if output_var set
                      v
              +---------------+
              |   Step 2      |  mode: fan_out
              |   agent: B    |---+
              +---------------+   |
              +---------------+   |  parallel execution
              |   Step 3      |   |  (all receive same {{input}})
              |   agent: C    |---+
              +---------------+   |
                      |           |
                      v           v
              +---------------+
              |   Step 4      |  mode: collect
              |   (no agent)  |  joins all outputs with "---"
              +-------+-------+
                      | combined output -> {{input}}
                      v
              +---------------+
              |   Step 5      |  mode: conditional { condition: "issue" }
              |   agent: D    |  (skipped if {{input}} does not contain "issue")
              +-------+-------+
                      |
                      v
              +---------------+
              |   Step 6      |  mode: loop { max_iterations: 3, until: "DONE" }
              |   agent: E    |  repeats, feeding output back as {{input}}
              +-------+-------+
                      |
                      v
                 final output

内部架构说明

  • WorkflowEngineLibreFangKernel 解耦。execute_run 方法接受两个闭包:agent_resolver(将 StepAgent 解析为 AgentId + 名称)和 send_message(向 Agent 发送提示词并返回输出 + token 计数)。这种设计使引擎无需活跃的内核即可测试。
  • 所有状态存储在 Arc<RwLock<HashMap>> 中,允许并发读取和串行写入。
  • TriggerEngine 使用 DashMap 实现无锁并发访问,并通过 agent_triggers 索引实现高效的按 Agent 触发器查找。
  • 扇出并行使用 futures::future::join_all——连续组中的所有扇出步骤同时启动。
  • 触发器的 evaluate 方法在 DashMap 上使用 iter_mut() 来原子地递增触发计数并检查模式,防止竞态条件。