Context Engine Plugin 开发指南¶
Context Engine Plugin 是 PyClaw v5.0 引入的插件化上下文管理架构。它允许开发者自定义 Agent 的上下文组装和压缩策略,无需修改核心代码。
概述¶
为什么需要插件?¶
v4.0 及之前版本,上下文压缩使用固定策略(summarize / truncate)。随着使用场景的增多,以下需求变得突出:
- 不同 Agent 不同策略 — 编程 Agent 和客服 Agent 对上下文的需求完全不同
- 持久化跨会话上下文 — 长对话需要记住关键事实和偏好
- 分层压缩 — 关键信息(系统提示词)应保留原文,工具调用可总结,填充词应丢弃
- 第三方插件 — 社区可以实现自己的压缩算法
ContextEnginePlugin 解决了这些问题。
架构¶
ContextEnginePlugin (ABC)
├── bootstrap(state) — 会话启动,初始化
├── ingest(messages, ctx) — 逐轮消息摄入
├── assemble(ctx) — 组装最终上下文
├── compact(messages, ctx) — 上下文压缩
├── on_compact_complete() — 压缩完成回调
└── on_session_end() — 会话结束
ContextEnginePluginRegistry (Singleton)
├── register(plugin)
├── get(name) → plugin
├── get_default() → plugin
└── list_plugins() → plugins
Builtin Plugins:
├── DefaultContextPlugin — summarize + truncate
└── DagCompressorPlugin — 四层 DAG 压缩
快速开始¶
使用内置插件¶
# 查看可用插件
python -c "
from pyclaw.agents.context.plugins import ContextEnginePluginRegistry
for p in ContextEnginePluginRegistry.get_instance().list_plugins():
print(f'{p[\"name\"]}: {p[\"description\"]}')
"
配置示例¶
在 ~/.pyclaw/pyclaw.json 中配置上下文引擎:
{
context: {
engine: {
plugin: "dag-compressor", // 使用 DAG 压缩插件
compaction: {
enabled: true,
max_tokens: 128000,
reserve_ratio: 0.3,
strategy: "dag", // dag | linear | hybrid
},
// DAG 层配置(仅 dag-compressor)
layers: {
critical: {
max_tokens: 2000,
strategy: "keep",
},
important: {
max_tokens: 3000,
strategy: "summarize",
},
contextual: {
max_tokens: 2000,
strategy: "extract_facts",
},
transient: {
max_tokens: 0,
strategy: "drop",
},
},
},
},
}
编程使用¶
from pyclaw.agents.context.plugins import (
ContextEnginePluginRegistry,
DagCompressorPlugin,
ContextState,
)
# 方式 1:注册并使用
registry = ContextEnginePluginRegistry.get_instance()
registry.register(DagCompressorPlugin(), set_default=True)
# 方式 2:直接使用
plugin = DagCompressorPlugin(total_max_tokens=128000)
state = ContextState(session_id="sess-1")
# 会话生命周期
await plugin.bootstrap(state)
await plugin.ingest(messages, state)
assembled = await plugin.assemble(state)
result = await plugin.compact(messages, state)
DAG 压缩详解¶
DagCompressorPlugin 实现四层 DAG 压缩:
层说明¶
| 层 | 优先级 | 策略 | 说明 |
|---|---|---|---|
| critical | 0 (最高) | keep | 系统消息、用户指令、工具 schema — 必须保留原文 |
| important | 1 | summarize | 工具调用+结果、长回复 — 用 LLM 总结 |
| contextual | 2 | extract_facts | 实体、偏好、决策、关键事实 — 提取结构化信息 |
| transient | 3 (最低) | drop | 填充词、短确认、"ok"/"yes" — 直接丢弃 |
消息分类规则¶
from pyclaw.agents.context.plugins.dag_compressor import _classify_message
# system → critical
_classify_message({"role": "system", "content": "You are a helpful assistant."})
# → "critical"
# tool_calls → important
_classify_message({"role": "assistant", "tool_calls": [{"name": "read_file"}]})
# → "important"
# tool result with data → important
_classify_message({"role": "tool", "content": "File contents: ..."})
# → "important"
# user instruction → critical
_classify_message({"role": "user", "content": "You must always respond in Chinese."})
# → "critical"
# short ack → transient (dropped)
_classify_message({"role": "assistant", "content": "ok"})
# → "transient"
事实提取¶
contextual 层的消息会被自动提取为结构化事实:
from pyclaw.agents.context.plugins.dag_compressor import _extract_facts
messages = [
{"role": "user", "content": "My email is alice@example.com. I prefer dark mode."},
{"role": "assistant", "content": "We decided to use PostgreSQL for the database."},
]
facts = _extract_facts(messages)
# → "email: alice@example.com\npreference: I prefer dark mode\ndecision: decided to use PostgreSQL"
提取的类别: - email — 邮件地址(regex 匹配) - preference — prefer/like/want/need 模式 - decision — decided to/we will/agreed to/confirmed 模式
编写自定义插件¶
最小实现¶
from pyclaw.agents.context.plugins.base import (
ContextEnginePlugin,
ContextState,
CompactResult,
)
class MyPlugin(ContextEnginePlugin):
@property
def name(self) -> str:
return "my-plugin"
async def bootstrap(self, state: ContextState) -> None:
"""Session init — load persisted context, inject persona, etc."""
pass
async def ingest(self, messages: list[dict], state: ContextState) -> None:
"""Process new messages — tokenize, index, pre-process."""
pass
async def assemble(self, state: ContextState) -> list[dict]:
"""Assemble final context for LLM consumption."""
return [] # return OpenAI-format message list
async def compact(
self,
messages: list[dict],
state: ContextState,
*,
max_tokens: int | None = None,
) -> CompactResult:
"""Compact messages when token budget exceeded."""
return CompactResult(
messages=messages,
tokens_before=1000,
tokens_after=800,
tokens_saved=200,
)
注册插件¶
from pyclaw.agents.context.plugins import ContextEnginePluginRegistry
registry = ContextEnginePluginRegistry.get_instance()
registry.register(MyPlugin(), set_default=True)
# 使用插件
plugin = registry.get("my-plugin")
state = ContextState(session_id="s1")
await plugin.bootstrap(state)
可选钩子¶
async def on_compact_complete(self, result: CompactResult, state: ContextState) -> None:
"""Called after each compaction — persist summaries, cleanup."""
await self._store.save_compaction_result(result)
async def on_session_end(self, state: ContextState, messages: list[dict]) -> None:
"""Called when session ends — flush caches, finalize state."""
await self._store.close()
配置参考¶
| 配置路径 | 类型 | 默认值 | 说明 |
|---|---|---|---|
context.engine.plugin | string | "default" | 插件名称 |
context.engine.compaction.enabled | bool | true | 是否启用压缩 |
context.engine.compaction.max_tokens | int | 128000 | 最大 token 数 |
context.engine.compaction.reserve_ratio | float | 0.3 | 预留比例(用于新消息) |
context.engine.compaction.strategy | string | "dag" | 压缩策略:dag / linear / hybrid |
context.engine.layers.critical.max_tokens | int | 2000 | critical 层 token 预算 |
context.engine.layers.important.max_tokens | int | 3000 | important 层 token 预算 |
context.engine.layers.contextual.max_tokens | int | 2000 | contextual 层 token 预算 |
与 ContextStore 集成¶
Context Engine Plugin 可以搭配 ContextStore 实现跨会话上下文持久化:
from pyclaw.agents.context.plugins import DefaultContextPlugin
from pyclaw.memory.context_store import ContextStore
from pathlib import Path
store = ContextStore(Path("data/context.db"))
await store.initialize()
class PersistentPlugin(DefaultContextPlugin):
async def bootstrap(self, state: ContextState) -> None:
await super().bootstrap(state)
chunks = await store.get_chunks(state.session_id)
for chunk in chunks:
# 恢复压缩后的上下文
self._load_chunk(chunk)
async def on_compact_complete(self, result: CompactResult, state: ContextState) -> None:
# 持久化压缩结果
await store.save_chunk(
session_id=state.session_id,
chunk_id=f"compaction-{state.compaction_count}",
content=result.summary or "",
)
API 参考¶
ContextEnginePlugin¶
| 方法 | 说明 |
|---|---|
name (property) | 插件标识符 |
description (property) | 人类可读描述 |
async bootstrap(state) | 会话初始化 |
async ingest(messages, state) | 消息摄入 |
async assemble(state) | 组装最终上下文 |
async compact(messages, state, max_tokens) | 上下文压缩 |
async on_compact_complete(result, state) | 压缩完成回调 |
async on_session_end(state, messages) | 会话结束回调 |
get_status(state) | 状态查询 |
ContextEnginePluginRegistry¶
| 方法 | 说明 |
|---|---|
get_instance() | 获取单例实例 |
register(plugin, set_default, allow_replace) | 注册插件 |
unregister(name) | 注销插件 |
get(name) | 按名称查找 |
get_default() | 获取默认插件 |
list_plugins() | 列出所有插件 |
reset() | 重置单例(测试用) |
DagCompressorPlugin¶
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
layers | dict[str, DagLayerConfig] | 默认四层 | 层配置 |
total_max_tokens | int | 128000 | 全局 token 上限 |
engine | ContextEngine | SummarizeEngine | 总结引擎 |
ContextStore¶
| 方法 | 说明 |
|---|---|
async save_chunk(session_id, chunk_id, content) | 保存上下文块 |
async save_chunks_batch(chunks) | 批量保存 |
async get_chunks(session_id, include_compacted) | 获取上下文块 |
async mark_compacted(session_id, chunk_ids) | 标记为已压缩 |
async delete_session(session_id) | 删除会话 |
async prune_old_sessions(max_age_days) | 清理旧会话 |
async get_stats() | 统计信息 |
文档版本: v5.0 Phase 2.4 | 最后更新: 2026-06-06