工作流引擎指南
目录
概述
LibreFang 工作流引擎支持多步骤 Agent 流水线——通过编排一系列任务,将每个步骤的工作路由到特定 Agent,并将上一步的输出作为下一步的输入。工作流让你无需编写任何 Rust 代码,即可将多个简单的、单一用途的 Agent 组合成复杂的行为。
适用场景:
- 将多个 Agent 串联为处理流水线(例如:先调研、再撰写、最后审核)。
- 将工作扇出到多个 Agent 并行处理,然后收集结果。
- 根据前一步骤的输出进行条件分支。
- 在循环中反复执行某个步骤,直到满足质量标准。
- 构建可复现、可审计的多 Agent 流程,支持通过 API 或 CLI 触发。
实现代码位于 librefang-kernel/src/workflow.rs。工作流引擎通过闭包与内核解耦——它不会直接持有或引用内核,因此可以独立进行测试。
核心类型
| Rust 类型 | 说明 |
|---|---|
WorkflowId(Uuid) | 工作流定义的唯一标识符。 |
WorkflowRunId(Uuid) | 工作流运行实例的唯一标识符。 |
Workflow | 命名的工作流定义,包含一组 WorkflowStep 条目。 |
WorkflowStep | 单个步骤:Agent 引用、提示词模板、执行模式、超时、错误处理。 |
WorkflowRun | 运行实例:跟踪状态、步骤结果、最终输出、时间戳。 |
WorkflowRunState | 枚举:Pending、Running、Completed、Failed。 |
StepResult | 单个步骤的结果:Agent 信息、输出文本、token 计数、耗时。 |
WorkflowEngine | 引擎本身:使用 Arc<RwLock<HashMap>> 存储定义和运行实例。 |
工作流定义
工作流通过 REST API 以 JSON 格式注册。顶层结构如下:
{
"name": "my-pipeline",
"description": "Describe what the workflow does",
"steps": [ ... ]
}
对应的 Rust 结构体为:
pub struct Workflow {
pub id: WorkflowId, // Auto-assigned on creation
pub name: String, // Human-readable name
pub description: String, // What this workflow does
pub steps: Vec<WorkflowStep>, // Ordered list of steps
pub created_at: DateTime<Utc>, // Auto-assigned on creation
}
步骤配置
steps 数组中的每个步骤包含以下字段:
| JSON 字段 | Rust 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|---|
name | name | String | "step" | 步骤名称,用于日志和显示。 |
agent_name | agent | StepAgent::ByName | -- | 按名称引用 Agent(匹配第一个)。与 agent_id 互斥。 |
agent_id | agent | StepAgent::ById | -- | 按 UUID 引用 Agent。与 agent_name 互斥。 |
prompt | prompt_template | String | "{{input}}" | 带变量占位符的提示词模板。 |
mode | mode | StepMode | "sequential" | 执行模式(见下文)。 |
timeout_secs | timeout_secs | u64 | 120 | 步骤超时时间上限(秒)。 |
error_mode | error_mode | ErrorMode | "fail" | 错误处理方式(见下文)。 |
max_retries | (在 ErrorMode::Retry 内部) | u32 | 3 | 当 error_mode 为 "retry" 时的重试次数。 |
output_var | output_var | Option<String> | null | 如果设置,将此步骤的输出存储为命名变量,供后续步骤引用。 |
condition | (在 StepMode::Conditional 内部) | String | "" | 在前一步输出中匹配的子字符串(不区分大小写)。 |
max_iterations | (在 StepMode::Loop 内部) | u32 | 5 | 循环强制终止前的最大迭代次数。 |
until | (在 StepMode::Loop 内部) | String | "" | 在输出中匹配以终止循环的子字符串(不区分大小写)。 |
Agent 解析
每个步骤必须指定 agent_name 或 agent_id 其中之一。StepAgent 枚举定义如下:
pub enum StepAgent {
ById { id: String }, // UUID of an existing agent
ByName { name: String }, // Name match (first agent with this name)
}
如果在执行时无法解析到 Agent,工作流将失败并报错 "Agent not found for step '<name>'"。
步骤模式
mode 字段控制步骤相对于工作流中其他步骤的执行方式。
Sequential(顺序执行,默认)
{ "mode": "sequential" }
该步骤在前一步骤完成后运行。前一步骤的输出将作为当前步骤的 {{input}}。省略 mode 时默认为此模式。
Fan-Out(扇出)
{ "mode": "fan_out" }
扇出步骤并行运行。引擎会收集所有连续的 fan_out 步骤,并使用 futures::future::join_all 同时启动它们。所有扇出步骤接收相同的 {{input}}——即扇出组之前最后一个运行步骤的输出。
如果任何扇出步骤失败或超时,整个工作流立即失败。
Collect(收集)
{ "mode": "collect" }
collect 步骤收集前一个扇出组的所有输出。它不执行 Agent——这是一个纯数据步骤,使用分隔符 "\n\n---\n\n" 连接所有累积的输出,并将结果设置为后续步骤的 {{input}}。
典型的 fan-out/collect 模式:
step 1: fan_out --> runs in parallel
step 2: fan_out --> runs in parallel
step 3: collect --> joins outputs from steps 1 and 2
step 4: sequential --> receives joined output as {{input}}
Conditional(条件执行)
{ "mode": "conditional", "condition": "ERROR" }
仅当前一步骤的输出包含 condition 子字符串时(通过 to_lowercase().contains() 进行不区分大小写的比较),该步骤才会执行。如果条件不满足,该步骤将被完全跳过,{{input}} 不会被修改。
当条件满足时,该步骤按顺序执行模式运行。
Loop(循环)
{ "mode": "loop", "max_iterations": 5, "until": "APPROVED" }
该步骤最多重复 max_iterations 次。每次迭代后,引擎检查输出是否包含 until 子字符串(不区分大小写)。如果匹配到,循环提前终止。
每次迭代将其输出回传作为下一次迭代的 {{input}}。步骤结果以 "refine (iter 1)"、"refine (iter 2)" 等格式记录名称。
如果 until 条件始终未满足,循环将恰好运行 max_iterations 次,然后以最后一次迭代的输出继续执行下一步骤。
变量替换
提示词模板支持两种变量引用方式:
{{input}} -- 前一步骤的输出
始终可用。包含前一步骤的输出(对于第一个步骤,则为工作流的初始输入)。
{{variable_name}} -- 命名变量
当某个步骤设置了 "output_var": "my_var" 时,其输出会以 my_var 为键存储在变量映射中。后续任何步骤都可以在提示词模板中使用 {{my_var}} 来引用它。
变量展开逻辑(来自 WorkflowEngine::expand_variables):
fn expand_variables(template: &str, input: &str, vars: &HashMap<String, String>) -> String {
let mut result = template.replace("{{input}}", input);
for (key, value) in vars {
result = result.replace(&format!("{{{{{key}}}}}"), value);
}
result
}
变量在整个工作流运行期间持久存在。后续步骤可以通过使用相同的 output_var 名称来覆盖变量。
示例:一个三步骤的工作流,其中步骤 3 同时引用步骤 1 和步骤 2 的输出:
{
"steps": [
{ "name": "research", "output_var": "research_output", "prompt": "Research: {{input}}" },
{ "name": "outline", "output_var": "outline_output", "prompt": "Outline based on: {{input}}" },
{ "name": "combine", "prompt": "Write article.\nResearch: {{research_output}}\nOutline: {{outline_output}}" }
]
}
错误处理
每个步骤都有一个 error_mode 字段,用于控制步骤失败或超时时的行为。
Fail(失败终止,默认)
{ "error_mode": "fail" }
工作流立即中止。运行状态设置为 Failed,记录错误信息并设置 completed_at。错误信息格式为 "Step '<name>' failed: <error>" 或 "Step '<name>' timed out after <N>s"。
Skip(跳过)
{ "error_mode": "skip" }
发生错误或超时时静默跳过该步骤。会记录一条警告日志,但工作流继续执行。下一步骤的 {{input}} 保持不变(使用被跳过步骤之前的值)。被跳过的步骤不会记录 StepResult。
Retry(重试)
{ "error_mode": "retry", "max_retries": 3 }
该步骤在初始尝试之后最多重试 max_retries 次(因此 max_retries: 3 意味着最多 4 次总尝试:1 次初始 + 3 次重试)。每次尝试独立享有完整的 timeout_secs 超时预算。如果所有尝试都失败,工作流中止并报错 "Step '<name>' failed after <N> retries: <last_error>"。
超时行为
每个步骤的执行都被 tokio::time::timeout(Duration::from_secs(step.timeout_secs), ...) 包裹。默认超时为 120 秒。超时被视为错误,按照步骤的 error_mode 进行处理。
对于扇出步骤,每个并行步骤各自拥有独立的超时。
示例
示例 1:代码审查流水线
一个顺序执行的流水线,先分析代码,再审查问题,最后生成摘要报告。
{
"name": "code-review-pipeline",
"description": "Analyze code, review for issues, and produce a summary report",
"steps": [
{
"name": "analyze",
"agent_name": "code-reviewer",
"prompt": "Analyze the following code for bugs, style issues, and security vulnerabilities:\n\n{{input}}",
"mode": "sequential",
"timeout_secs": 180,
"error_mode": "fail",
"output_var": "analysis"
},
{
"name": "security-check",
"agent_name": "security-auditor",
"prompt": "Review this code analysis for security issues. Flag anything critical:\n\n{{analysis}}",
"mode": "sequential",
"timeout_secs": 120,
"error_mode": "retry",
"max_retries": 2,
"output_var": "security_review"
},
{
"name": "summary",
"agent_name": "writer",
"prompt": "Write a concise code review summary.\n\nCode Analysis:\n{{analysis}}\n\nSecurity Review:\n{{security_review}}",
"mode": "sequential",
"timeout_secs": 60,
"error_mode": "fail"
}
]
}
示例 2:调研并撰写文章
调研一个主题,编写大纲,然后撰写——并附带一个条件性的事实核查步骤。
{
"name": "research-and-write",
"description": "Research a topic, outline, write, and optionally fact-check",
"steps": [
{
"name": "research",
"agent_name": "researcher",
"prompt": "Research the following topic thoroughly. Cite sources where possible:\n\n{{input}}",
"mode": "sequential",
"timeout_secs": 300,
"error_mode": "retry",
"max_retries": 1,
"output_var": "research"
},
{
"name": "outline",
"agent_name": "planner",
"prompt": "Create a detailed article outline based on this research:\n\n{{research}}",
"mode": "sequential",
"timeout_secs": 60,
"output_var": "outline"
},
{
"name": "write",
"agent_name": "writer",
"prompt": "Write a complete article.\n\nOutline:\n{{outline}}\n\nResearch:\n{{research}}",
"mode": "sequential",
"timeout_secs": 300,
"output_var": "article"
},
{
"name": "fact-check",
"agent_name": "analyst",
"prompt": "Fact-check this article and note any claims that need verification:\n\n{{article}}",
"mode": "conditional",
"condition": "claim",
"timeout_secs": 120,
"error_mode": "skip"
}
]
}
事实核查步骤仅在文章包含 "claim" 一词(不区分大小写)时运行。如果事实核查 Agent 失败,工作流会继续,保持文章原样。
示例 3:多 Agent 头脑风暴(Fan-Out + Collect)
三个 Agent 并行进行头脑风暴,然后由第四个 Agent 综合他们的想法。
{
"name": "brainstorm",
"description": "Parallel brainstorm with 3 agents, then synthesize",
"steps": [
{
"name": "creative-ideas",
"agent_name": "writer",
"prompt": "Brainstorm 5 creative ideas for: {{input}}",
"mode": "fan_out",
"timeout_secs": 60,
"output_var": "creative"
},
{
"name": "technical-ideas",
"agent_name": "architect",
"prompt": "Brainstorm 5 technically feasible ideas for: {{input}}",
"mode": "fan_out",
"timeout_secs": 60,
"output_var": "technical"
},
{
"name": "business-ideas",
"agent_name": "analyst",
"prompt": "Brainstorm 5 ideas with strong business potential for: {{input}}",
"mode": "fan_out",
"timeout_secs": 60,
"output_var": "business"
},
{
"name": "gather",
"agent_name": "planner",
"prompt": "unused",
"mode": "collect"
},
{
"name": "synthesize",
"agent_name": "orchestrator",
"prompt": "You received brainstorm results from three perspectives. Synthesize them into the top 5 actionable ideas, ranked by impact:\n\n{{input}}",
"mode": "sequential",
"timeout_secs": 120
}
]
}
三个扇出步骤并行运行。collect 步骤使用 --- 分隔符连接它们的输出。synthesize 步骤接收合并后的输出。
示例 4:迭代优化(Loop)
一个 Agent 反复优化草稿,直到达到质量标准。
{
"name": "iterative-refinement",
"description": "Refine a document until approved or max iterations reached",
"steps": [
{
"name": "first-draft",
"agent_name": "writer",
"prompt": "Write a first draft about: {{input}}",
"mode": "sequential",
"timeout_secs": 120,
"output_var": "draft"
},
{
"name": "review-and-refine",
"agent_name": "code-reviewer",
"prompt": "Review this draft. If it meets quality standards, respond with APPROVED at the start. Otherwise, provide specific feedback and a revised version:\n\n{{input}}",
"mode": "loop",
"max_iterations": 4,
"until": "APPROVED",
"timeout_secs": 180,
"error_mode": "retry",
"max_retries": 1
}
]
}
循环最多运行审核者 4 次。每次迭代接收前一次迭代的输出作为 {{input}}。一旦审核者在回复中包含 "APPROVED",循环提前终止。
触发器引擎
触发器引擎(librefang-kernel/src/triggers.rs)提供事件驱动的自动化能力。触发器监听内核的事件总线,当匹配的事件到达时,自动向 Agent 发送消息。
核心类型
| Rust 类型 | 说明 |
|---|---|
TriggerId(Uuid) | 触发器的唯一标识符。 |
Trigger | 已注册的触发器:Agent、匹配模式、提示词模板、触发次数、限制。 |
TriggerPattern | 定义要匹配哪些事件的枚举。 |
TriggerEngine | 触发器引擎:基于 DashMap 的并发存储,带有 Agent 到触发器的索引。 |
触发器定义
pub struct Trigger {
pub id: TriggerId,
pub agent_id: AgentId, // Which agent receives the message
pub pattern: TriggerPattern, // What events to match
pub prompt_template: String, // Template with {{event}} placeholder
pub enabled: bool, // Can be toggled on/off
pub created_at: DateTime<Utc>,
pub fire_count: u64, // How many times it has fired
pub max_fires: u64, // 0 = unlimited
}
事件模式
TriggerPattern 枚举支持 9 种匹配模式:
| 模式 | JSON | 说明 |
|---|---|---|
All | "all" | 匹配所有事件(通配符)。 |
Lifecycle | "lifecycle" | 匹配任何生命周期事件(spawned、started、terminated 等)。 |
AgentSpawned | {"agent_spawned": {"name_pattern": "coder"}} | 当名称包含 name_pattern 的 Agent 被创建时匹配。使用 "*" 匹配任意 Agent。 |
AgentTerminated | "agent_terminated" | 当任何 Agent 终止或崩溃时匹配。 |
System | "system" | 匹配任何系统事件(健康检查、配额警告等)。 |
SystemKeyword | {"system_keyword": {"keyword": "quota"}} | 匹配 Debug 表示中包含关键词的系统事件(不区分大小写)。 |
MemoryUpdate | "memory_update" | 匹配任何内存变更事件。 |
MemoryKeyPattern | {"memory_key_pattern": {"key_pattern": "config"}} | 匹配键包含 key_pattern 的内存更新。使用 "*" 匹配任意键。 |
ContentMatch | {"content_match": {"substring": "error"}} | 匹配人类可读描述中包含该子字符串的任何事件(不区分大小写)。 |
模式匹配详情
matches_pattern 函数决定每种模式的匹配逻辑:
All:始终返回true。Lifecycle:检查EventPayload::Lifecycle(_)。AgentSpawned:检查LifecycleEvent::Spawned,条件为name.contains(name_pattern)或name_pattern == "*"。AgentTerminated:检查LifecycleEvent::Terminated或LifecycleEvent::Crashed。System:检查EventPayload::System(_)。SystemKeyword:通过Debugtrait 格式化系统事件,转为小写后检查contains(keyword)。MemoryUpdate:检查EventPayload::MemoryUpdate(_)。MemoryKeyPattern:检查delta.key.contains(key_pattern)或key_pattern == "*"。ContentMatch:使用describe_event()函数生成人类可读的字符串,然后检查contains(substring)(不区分大小写)。
提示词模板与 {{event}}
当触发器触发时,引擎会将 prompt_template 中的 {{event}} 替换为人类可读的事件描述。describe_event() 函数生成的字符串示例:
"Agent 'coder' (id: <uuid>) was spawned""Agent <uuid> terminated: shutdown requested""Agent <uuid> crashed: out of memory""Kernel started""Quota warning: agent <uuid>, tokens at 85.0%""Health check failed: agent <uuid>, unresponsive for 30s""Memory Created on key 'config' for agent <uuid>""Tool 'web_search' succeeded (450ms): ..."
最大触发次数与自动禁用
当 max_fires 设置为大于 0 的值时,触发器在 fire_count >= max_fires 后会自动禁用(设置 enabled = false)。将 max_fires 设为 0 表示触发器无限次触发。
触发器使用场景
监控 Agent 健康状态:
{
"agent_id": "<ops-agent-uuid>",
"pattern": {"content_match": {"substring": "health check failed"}},
"prompt_template": "ALERT: {{event}}. Investigate and report the status of all agents.",
"max_fires": 0
}
响应新 Agent 创建:
{
"agent_id": "<orchestrator-uuid>",
"pattern": {"agent_spawned": {"name_pattern": "*"}},
"prompt_template": "A new agent was just created: {{event}}. Update the fleet roster.",
"max_fires": 0
}
一次性配额告警:
{
"agent_id": "<admin-agent-uuid>",
"pattern": {"system_keyword": {"keyword": "quota"}},
"prompt_template": "Quota event detected: {{event}}. Recommend corrective action.",
"max_fires": 1
}
Cron 任务
Cron 任务让你使用标准 cron 表达式为 Agent 安排定时消息。每条 cron 条目在 Agent manifest(.toml)中定义,独立于用户驱动的会话触发。
[[cron]]
name = "morning-brief"
schedule = "0 8 * * *"
message = "Summarize any new alerts from the last 24 hours."
| 字段 | 类型 | 说明 |
|---|---|---|
name | string | 人类可读名称,用于日志和审计事件 |
schedule | string | 标准 5 字段 cron 表达式(分钟、小时、日、月、星期) |
message | string | 每次触发时发送给 Agent 的提示词 |
session_mode | string 或 null | "persistent"(默认)或 "new",控制连续触发是否复用同一会话 |
关于会话的注意事项:所有使用
session_mode = "persistent"(默认)的 cron 触发共享一个以(agent, "cron")为键的会话。在 Agent manifest 中设置session_mode = "new"不会为 cron 的每次触发创建独立会话——cron 调度器会合成SenderContext { channel: "cron" },在查询session_mode之前已经走了 channel 分支。
[SILENT] 标记
在 cron 任务的 message 前加上 [SILENT] 可抑制 Agent 响应写入会话历史。任务照常执行——工具调用运行、计量记录、审计日志写入——但 assistant 的回复不会写回会话。
这适用于周期性维护任务(备份检查、清理例程、健康探针),否则会堆积对人类用户或其他 Agent 造成干扰的对话历史。
用法
[[cron]]
name = "daily-cleanup"
schedule = "0 3 * * *"
message = "[SILENT] Run daily cleanup: remove temp files older than 7 days"
[[cron]]
name = "hourly-health-check"
schedule = "0 * * * *"
message = "[SILENT] Check system health and log any anomalies."
行为细节
- 执行:Agent 轮次正常运行至完成,所有工具调用均执行。
- 计量与成本:Token 用量和费用照常记录到预算账本。
- 审计日志:无论是否带有
[SILENT]标记,均会输出包含 Agent 响应摘要的CronFire审计事件。 - 会话历史:仅抑制 assistant 响应。出站 cron 消息本身从不存入会话(cron 消息始终是临时的),因此
[SILENT]不影响用户可见的提示词侧。
与 session_mode = "new" 的区别
| 行为 | [SILENT] | session_mode = "new" |
|---|---|---|
| 抑制 assistant 响应 | 是 | 否 |
| 创建每次触发的独立会话 | 否(复用 (agent,"cron") 会话) | 对 cron 无效(见上方注意事项) |
| 防止历史污染 | 是 | 否 |
| 正常工具执行 | 是 | 是 |
需要 Agent 行动但不留任何对话历史痕迹时,使用 [SILENT]。
API 端点
工作流端点
POST /api/workflows -- 创建工作流
注册新的工作流定义。
请求体:
{
"name": "my-pipeline",
"description": "Description of the workflow",
"steps": [
{
"name": "step-1",
"agent_name": "researcher",
"prompt": "Research: {{input}}",
"mode": "sequential",
"timeout_secs": 120,
"error_mode": "fail",
"output_var": "research"
}
]
}
响应(201 Created):
{ "workflow_id": "<uuid>" }
GET /api/workflows -- 列出所有工作流
返回已注册工作流的摘要数组。
响应(200 OK):
[
{
"id": "<uuid>",
"name": "my-pipeline",
"description": "Description of the workflow",
"steps": 3,
"created_at": "2026-01-15T10:30:00Z"
}
]
POST /api/workflows/:id/run -- 执行工作流
启动同步工作流执行。调用将阻塞直到工作流完成或失败。
请求体:
{ "input": "The initial input text for the first step" }
响应(200 OK):
{
"run_id": "<uuid>",
"output": "Final output from the last step",
"status": "completed"
}
响应(500 Internal Server Error):
{ "error": "Workflow execution failed" }
GET /api/workflows/:id/runs -- 列出工作流运行记录
返回所有工作流运行记录(当前实现不按工作流 ID 过滤)。
响应(200 OK):
[
{
"id": "<uuid>",
"workflow_name": "my-pipeline",
"state": "completed",
"steps_completed": 3,
"started_at": "2026-01-15T10:30:00Z",
"completed_at": "2026-01-15T10:32:15Z"
}
]
触发器端点
POST /api/triggers -- 创建触发器
为 Agent 注册新的事件触发器。
请求体:
{
"agent_id": "<agent-uuid>",
"pattern": "lifecycle",
"prompt_template": "A lifecycle event occurred: {{event}}",
"max_fires": 0
}
响应(201 Created):
{
"trigger_id": "<uuid>",
"agent_id": "<agent-uuid>"
}
GET /api/triggers -- 列出所有触发器
可选按 Agent 过滤:GET /api/triggers?agent_id=<uuid>
响应(200 OK):
[
{
"id": "<uuid>",
"agent_id": "<agent-uuid>",
"pattern": "lifecycle",
"prompt_template": "Event: {{event}}",
"enabled": true,
"fire_count": 5,
"max_fires": 0,
"created_at": "2026-01-15T10:00:00Z"
}
]
PUT /api/triggers/:id -- 启用/禁用触发器
切换触发器的启用状态。
请求体:
{ "enabled": false }
响应(200 OK):
{ "status": "updated", "trigger_id": "<uuid>", "enabled": false }
DELETE /api/triggers/:id -- 删除触发器
响应(200 OK):
{ "status": "removed", "trigger_id": "<uuid>" }
响应(404 Not Found):
{ "error": "Trigger not found" }
CLI 命令
所有工作流和触发器 CLI 命令都需要正在运行的 LibreFang 守护进程。
工作流命令
librefang workflow list
列出所有已注册的工作流及其 ID、名称、步骤数量和创建日期。
librefang workflow create <file>
从 JSON 文件创建工作流。文件内容应与 POST /api/workflows 请求体的 JSON 结构一致。
librefang workflow run <workflow_id> <input>
通过 UUID 执行工作流,传入指定的输入文本。阻塞直到完成并打印输出。
触发器命令
librefang trigger list [--agent-id <uuid>]
列出所有已注册的触发器。可选按 Agent ID 过滤。
librefang trigger create <agent_id> <pattern_json> [--prompt <template>] [--max-fires <n>]
为指定 Agent 创建触发器。pattern_json 参数是描述匹配模式的 JSON 字符串。
默认值:
--prompt:"Event: {{event}}"--max-fires:0(无限次)
示例:
# Watch all lifecycle events
librefang trigger create <agent-id> '"lifecycle"' --prompt "Lifecycle: {{event}}"
# Watch for a specific agent spawn
librefang trigger create <agent-id> '{"agent_spawned":{"name_pattern":"coder"}}' --max-fires 1
# Watch for content containing "error"
librefang trigger create <agent-id> '{"content_match":{"substring":"error"}}'
librefang trigger delete <trigger_id>
通过 UUID 删除触发器。
执行限制
运行记录淘汰上限
工作流引擎最多保留 200 条运行记录(WorkflowEngine::MAX_RETAINED_RUNS)。创建新运行后超出此限制时,最旧的已完成或已失败的运行记录将被淘汰(按 started_at 排序)。处于 Pending 或 Running 状态的运行永远不会被淘汰。
步骤超时
每个步骤都有可配置的 timeout_secs(默认:120 秒)。超时通过 tokio::time::timeout 强制执行,按每次尝试独立计算——重试模式下每次尝试都有完整的超时预算。扇出步骤各自拥有独立的超时。
循环迭代上限
循环步骤受 max_iterations 限制(API 中默认为 5)。即使 until 条件始终未满足,引擎也绝不会执行超过此数量的迭代。
每小时 Token 配额
AgentScheduler(位于 librefang-kernel/src/scheduler.rs)通过 UsageTracker 以 1 小时滚动窗口跟踪每个 Agent 的 token 用量。如果 Agent 超出其 ResourceQuota.max_llm_tokens_per_hour,调度器返回 LibreFangError::QuotaExceeded。窗口在 3600 秒后自动重置。此配额适用于所有 Agent 交互,包括由工作流调用的交互。
工作流数据流图
input
|
v
+---------------+
| Step 1 | mode: sequential
| agent: A |
+-------+-------+
| output -> {{input}} for step 2
| -> variables["var1"] if output_var set
v
+---------------+
| Step 2 | mode: fan_out
| agent: B |---+
+---------------+ |
+---------------+ | parallel execution
| Step 3 | | (all receive same {{input}})
| agent: C |---+
+---------------+ |
| |
v v
+---------------+
| Step 4 | mode: collect
| (no agent) | joins all outputs with "---"
+-------+-------+
| combined output -> {{input}}
v
+---------------+
| Step 5 | mode: conditional { condition: "issue" }
| agent: D | (skipped if {{input}} does not contain "issue")
+-------+-------+
|
v
+---------------+
| Step 6 | mode: loop { max_iterations: 3, until: "DONE" }
| agent: E | repeats, feeding output back as {{input}}
+-------+-------+
|
v
final output
内部架构说明
WorkflowEngine与LibreFangKernel解耦。execute_run方法接受两个闭包:agent_resolver(将StepAgent解析为AgentId+ 名称)和send_message(向 Agent 发送提示词并返回输出 + token 计数)。这种设计使引擎无需活跃的内核即可测试。- 所有状态存储在
Arc<RwLock<HashMap>>中,允许并发读取和串行写入。 TriggerEngine使用DashMap实现无锁并发访问,并通过agent_triggers索引实现高效的按 Agent 触发器查找。- 扇出并行使用
futures::future::join_all——连续组中的所有扇出步骤同时启动。 - 触发器的
evaluate方法在DashMap上使用iter_mut()来原子地递增触发计数并检查模式,防止竞态条件。