跳到主要内容

消息总线与事件系统:nanobot 的"神经网络"

· 阅读需 7 分钟
Jiajie Wu
一名崭新水手

系列导读:在前面的文章中,我们介绍了 Agent 核心引擎和工具系统。那么,各个组件之间是如何通信的?如何确保系统的高内聚低耦合?答案就是 Message Bus(消息总线)

Nanobot 系列导航

  1. 项目概览与设计哲学:为什么我们需要另一个 Bot 框架?
  2. 安装配置完全指南:五分钟搭建你的 AI 助手
  3. 多渠道接入配置详解:连接 Telegram, Discord, WhatsApp 与飞书
  4. 架构总览与模块划分:Nanobot 的内功心法
  5. Agent 核心引擎解析:nanobot 的智慧中枢
  6. 工具系统设计与实现:让 AI 拥有"双手"
  7. 消息总线与事件系统:nanobot 的"神经网络"
  8. Channel 适配器实现:让 AI 连接世界
  9. 会话管理与记忆系统:让 nanobot 拥有"记忆"
  10. LLMServiceAdapter与模型接入:打造 AI 的"通用翻译官"
  11. 未来展望与生态建设:通往 AGI 的星辰大海

🎯 本文目标

读完这篇文章,你将能够:

  1. 理解基于消息总线的架构优势
  2. 掌握 InboundMessageOutboundMessage 的数据结构
  3. 学习 MessageBus 的实现原理(Producer-Consumer 模式)
  4. 了解 nanobot 的事件驱动机制

🔌 为什么需要消息总线?

在复杂的系统设计中,组件之间的直接调用会导致紧密耦合,难以维护。

糟糕的设计

  • Telegram Channel 直接调用 Agent Loop
  • Agent Loop 直接调用 Discord API
  • 工具直接修改数据库

nanobot 的设计

  • 所有组件都与 Message Bus 交互
  • Channel 生产消息 -> Bus -> Agent 消费消息
  • Agent 生产回复 -> Bus -> Channel 消费回复
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│ Producers │ │ Message │ │ Consumers │
│ (Channels) │────▶│ Bus │────▶│ (Agent) │
└─────────────┘ └─────────────┘ └─────────────┘
▲ │
└────────────────────────────────────────┘
(Outbound)

优势

  1. 解耦:添加新 Channel 不需要修改 Agent
  2. 缓冲:高并发时,Bus 可以作为缓冲区
  3. 可观测:可以在 Bus 上统一记录日志、监控流量
  4. 异步:天然支持异步处理

📨 消息模型:标准化的通用语言

为了让不同组件能够沟通,我们需要定义一种"通用语言"。nanobot 定义了两种核心消息类型。

1. InboundMessage (入站消息)

表示从外部世界(用户)发给 Agent 的消息。

# bus/models.py
@dataclass
class InboundMessage:
"""Message from a channel to the agent."""

channel: str # 来源渠道 (telegram, discord, etc.)
sender_id: str # 发送者 ID
chat_id: str # 会话 ID
content: str # 文本内容
media: list[str] = None # 媒体文件 URL/Path 列表
metadata: dict = None # 额外元数据 (reply_to, etc.)

@property
def session_key(self) -> str:
"""Unique key for the conversation session."""
return f"{self.channel}:{self.chat_id}"

2. OutboundMessage (出站消息)

表示从 Agent 发给外部世界(用户)的消息。

# bus/models.py
@dataclass
class OutboundMessage:
"""Message from the agent to a channel."""

channel: str # 目标渠道
chat_id: str # 目标会话 ID
content: str # 回复内容
media: list[str] = None # 媒体文件
metadata: dict = None # (reply_id, etc.)

设计亮点

  • 极简主义:只包含必要字段
  • 渠道无关:没有特定平台的字段(如 telegram_update_id),由 Channel Adapter 负责转换
  • Session Key:统一的会话标识符 channel:chat_id

🚌 MessageBus 实现:极简的异步队列

nanobot 的 Message Bus 实现非常精简,核心是一个异步队列。

# bus/bus.py
class MessageBus:
"""
Central message bus for the application.

Routes messages between channels and the agent loop using asyncio queues.
"""

def __init__(self):
# 入站队列:Channel -> Agent
self._inbound_queue: asyncio.Queue[InboundMessage] = asyncio.Queue()

# 出站队列:Agent -> Channel
# 这是一个 Pub/Sub 系统,每个 Channel 订阅自己的消息
self._outbound_queues: list[asyncio.Queue[OutboundMessage]] = []

async def publish_inbound(self, message: InboundMessage) -> None:
"""Publish a message from a channel to the agent."""
await self._inbound_queue.put(message)
logger.debug(f"Inbound bus: {message.channel}:{message.sender_id} -> Agent")

async def consume_inbound(self) -> InboundMessage:
"""Consume a message from the inbound queue (called by Agent)."""
return await self._inbound_queue.get()

async def publish_outbound(self, message: OutboundMessage) -> None:
"""Publish a message from the agent to channels."""
# 广播给所有订阅者 (Channels)
for queue in self._outbound_queues:
await queue.put(message)
logger.debug(f"Outbound bus: Agent -> {message.channel}:{message.chat_id}")

def subscribe(self) -> asyncio.Queue[OutboundMessage]:
"""Subscribe to outbound messages (called by Channels)."""
queue = asyncio.Queue()
self._outbound_queues.append(queue)
return queue

Pub/Sub 模式详解

注意 publish_outbound 的实现:它是广播的。

  1. 订阅:每个 Channel 在启动时调用 bus.subscribe(),获得一个专属的队列
  2. 发布:Agent 发送回复时,Bus 将消息放入所有订阅者的队列
  3. 过滤:Channel 收到消息后,检查 message.channel 是否是自己,如果不是则丢弃
# channels/telegram.py (伪代码)
async def _consume_loop(self):
queue = self.bus.subscribe()
while True:
msg = await queue.get()
if msg.channel == "telegram":
# 是发给我的,处理它
await self._send_telegram_message(msg)
else:
# 不是发给我的,忽略
pass

为什么不是精确路由? 为了简单。在 nanobot 的规模下(<100 QPS),广播造成的额外开销可以忽略不计,但却大大简化了 Bus 的逻辑。如果要支持高并发,可以改为基于 Topic 的路由。


🔄 事件驱动流程全景

让我们串联起整个流程:

  1. 用户发送:Telegram 用户发送 "Hello"
  2. Channel 接收TelegramChannel 收到 Update
  3. 转换TelegramChannel 将 Update 转换为 InboundMessage
  4. 发布入站bus.publish_inbound(msg)
  5. Agent 消费AgentLoop 调用 bus.consume_inbound() 拿到消息
  6. 处理:Agent 思考,决定回复 "Hi there"
  7. 发布出站:Agent 调用 bus.publish_outbound(response)
  8. 广播:Bus 将消息放入 TelegramQueueDiscordQueue
  9. Chat 消费
    • TelegramChannel 检查 channel=="telegram",发送回复
    • DiscordChannel 检查 channel!="discord",忽略
  10. 用户收到:用户在 Telegram 看到 "Hi there"

🚦 系统控制消息

除了聊天消息,Bus 还可以传输控制信号。nanobot 使用特殊的 channel="system" 来标识系统消息。

# System shutdown signal
msg = InboundMessage(
channel="system",
sender_id="admin",
chat_id="broadcast",
content="shutdown"
)

AgentLoop 会优先处理系统消息:

# agent/loop.py
if msg.channel == "system":
if msg.content == "shutdown":
self._running = False

📝 小结

nanobot 的消息总线系统展示了如何用最简单的代码实现高效的解耦架构:

  • Inbound/Outbound 模型:统一消息格式
  • Asyncio Queue:利用 Python 原生异步队列实现高性能缓冲
  • Pub/Sub 广播:简化分发逻辑,支持多渠道订阅
  • 非阻塞设计:全链路异步,保证高并发下的响应能力

有了这个坚实的通信基础,下一篇我们将探讨如何让 nanobot 支持多种聊天平台——Channel 层详解。


下一篇预告《Channel 适配器实现:如何让 AI 接入 Telegram、Discord、WhatsApp 与飞书》

我们将学习如何编写 Channel Adapter,将不同的聊天平台协议转换为 nanobot 的标准消息格式。


本文是 nanobot 深度解析系列的第 7 篇,共 11 篇。

评论