消息总线与事件系统:nanobot 的"神经网络"
系列导读:在前面的文章中,我们介绍了 Agent 核心引擎和工具系统。那么,各个组件之间是如何通信的?如何确保系统的高内聚低耦合?答案就是 Message Bus(消息总线)。
Nanobot 系列导航
- 项目概览与设计哲学:为什么我们需要另一个 Bot 框架?
- 安装配置完全指南:五分钟搭建你的 AI 助手
- 多渠道接入配置详解:连接 Telegram, Discord, WhatsApp 与飞书
- 架构总览与模块划分:Nanobot 的内功心法
- Agent 核心引擎解析:nanobot 的智慧中枢
- 工具系统设计与实现:让 AI 拥有"双手"
- 消息总线与事件系统:nanobot 的"神经网络"
- Channel 适配器实现:让 AI 连接世界
- 会话管理与记忆系统:让 nanobot 拥有"记忆"
- LLMServiceAdapter与模型接入:打造 AI 的"通用翻译官"
- 未来展望与生态建设:通往 AGI 的星辰大海
🎯 本文目标
读完这篇文章,你将能够:
- 理解基于消息总线的架构优势
- 掌握
InboundMessage和OutboundMessage的数据结构 - 学习
MessageBus的实现原理(Producer-Consumer 模式) - 了解 nanobot 的事件驱动机制
🔌 为什么需要消息总线?
在复杂的系统设计中,组件之间的直接调用会导致紧密耦合,难以维护。
糟糕的设计:
- Telegram Channel 直接调用 Agent Loop
- Agent Loop 直接调用 Discord API
- 工具直接修改数据库
nanobot 的设计:
- 所有组件都只与 Message Bus 交互
- Channel 生产消息 -> Bus -> Agent 消费消息
- Agent 生产回复 -> Bus -> Channel 消费回复
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Producers │ │ Message │ │ Consumers │
│ (Channels) │────▶│ Bus │────▶│ (Agent) │
└─────────────┘ └─────────────┘ └─────────────┘
▲ │
└────────────────────────────────────────┘
(Outbound)
优势:
- 解耦:添加新 Channel 不需要修改 Agent
- 缓冲:高并发时,Bus 可以作为缓冲区
- 可观测:可以在 Bus 上统一记录日志、监控流量
- 异步:天然支持异步处理
📨 消息模型:标准化的通用语言
为了让不同组件能够沟通,我们需要定义一种"通用语言"。nanobot 定义了两种核心消息类型。
1. InboundMessage (入站消息)
表示从外部世界(用户)发给 Agent 的消息。
# bus/models.py
@dataclass
class InboundMessage:
"""Message from a channel to the agent."""
channel: str # 来源渠道 (telegram, discord, etc.)
sender_id: str # 发送者 ID
chat_id: str # 会话 ID
content: str # 文本内容
media: list[str] = None # 媒体文件 URL/Path 列表
metadata: dict = None # 额外元数据 (reply_to, etc.)
@property
def session_key(self) -> str:
"""Unique key for the conversation session."""
return f"{self.channel}:{self.chat_id}"
2. OutboundMessage (出站消息)
表示从 Agent 发给外部世界(用户)的消息。
# bus/models.py
@dataclass
class OutboundMessage:
"""Message from the agent to a channel."""
channel: str # 目标渠道
chat_id: str # 目标会话 ID
content: str # 回复内容
media: list[str] = None # 媒体文件
metadata: dict = None # (reply_id, etc.)
设计亮点:
- 极简主义:只包含必要字段
- 渠道无关:没有特定平台的字段(如
telegram_update_id),由 Channel Adapter 负责转换 - Session Key:统一的会话标识符
channel:chat_id
🚌 MessageBus 实现:极简的异步队列
nanobot 的 Message Bus 实现非常精简,核心是一个异步队列。
# bus/bus.py
class MessageBus:
"""
Central message bus for the application.
Routes messages between channels and the agent loop using asyncio queues.
"""
def __init__(self):
# 入站队列:Channel -> Agent
self._inbound_queue: asyncio.Queue[InboundMessage] = asyncio.Queue()
# 出站队列:Agent -> Channel
# 这是一个 Pub/Sub 系统,每个 Channel 订阅自己的消息
self._outbound_queues: list[asyncio.Queue[OutboundMessage]] = []
async def publish_inbound(self, message: InboundMessage) -> None:
"""Publish a message from a channel to the agent."""
await self._inbound_queue.put(message)
logger.debug(f"Inbound bus: {message.channel}:{message.sender_id} -> Agent")
async def consume_inbound(self) -> InboundMessage:
"""Consume a message from the inbound queue (called by Agent)."""
return await self._inbound_queue.get()
async def publish_outbound(self, message: OutboundMessage) -> None:
"""Publish a message from the agent to channels."""
# 广播给所有订阅者 (Channels)
for queue in self._outbound_queues:
await queue.put(message)
logger.debug(f"Outbound bus: Agent -> {message.channel}:{message.chat_id}")
def subscribe(self) -> asyncio.Queue[OutboundMessage]:
"""Subscribe to outbound messages (called by Channels)."""
queue = asyncio.Queue()
self._outbound_queues.append(queue)
return queue
Pub/Sub 模式详解
注意 publish_outbound 的实现:它是广播的。
- 订阅:每个 Channel 在启动时调用
bus.subscribe(),获得一个专属的队列 - 发布:Agent 发送回复时,Bus 将消息放入所有订阅者的队列
- 过滤:Channel 收到消息后,检查
message.channel是否是自己,如果不是则丢弃
# channels/telegram.py (伪代码)
async def _consume_loop(self):
queue = self.bus.subscribe()
while True:
msg = await queue.get()
if msg.channel == "telegram":
# 是发给我的,处理它
await self._send_telegram_message(msg)
else:
# 不是发给我的,忽略
pass
为什么不是精确路由? 为了简单。在 nanobot 的规模下(<100 QPS),广播造成的额外开销可以忽略不计,但却大大简化了 Bus 的逻辑。如果要支持高并发,可以改为基于 Topic 的路由。
🔄 事件驱动流程全景
让我们串联起整个流程:
- 用户发送:Telegram 用户发送 "Hello"
- Channel 接收:
TelegramChannel收到 Update - 转换:
TelegramChannel将 Update 转换为InboundMessage - 发布入站:
bus.publish_inbound(msg) - Agent 消费:
AgentLoop调用bus.consume_inbound()拿到消息 - 处理:Agent 思考,决定回复 "Hi there"
- 发布出站:Agent 调用
bus.publish_outbound(response) - 广播:Bus 将消息放入
TelegramQueue和DiscordQueue - Chat 消费:
TelegramChannel检查 channel=="telegram",发送回复DiscordChannel检查 channel!="discord",忽略
- 用户收到:用户在 Telegram 看到 "Hi there"
🚦 系统控制消息
除了聊天消息,Bus 还可以传输控制信号。nanobot 使用特殊的 channel="system" 来标识系统消息。
# System shutdown signal
msg = InboundMessage(
channel="system",
sender_id="admin",
chat_id="broadcast",
content="shutdown"
)
AgentLoop 会优先处理系统消息:
# agent/loop.py
if msg.channel == "system":
if msg.content == "shutdown":
self._running = False
📝 小结
nanobot 的消息总线系统展示了如何用最简单的代码实现高效的解耦架构:
- ✅ Inbound/Outbound 模型:统一消息格式
- ✅ Asyncio Queue:利用 Python 原生异步队列实现高性能缓冲
- ✅ Pub/Sub 广播:简化分发逻辑,支持多渠道订阅
- ✅ 非阻塞设计:全链路异步,保证高并发下的响应能力
有了这个坚实的通信基础,下一篇我们将探讨如何让 nanobot 支持多种聊天平台——Channel 层详解。
下一篇预告:《Channel 适配器实现:如何让 AI 接入 Telegram、Discord、WhatsApp 与飞书》
我们将学习如何编写 Channel Adapter,将不同的聊天平台协议转换为 nanobot 的标准消息格式。
本文是 nanobot 深度解析系列的第 7 篇,共 11 篇。

评论