site logo

Marico's space

如何部署跨云多Agent系统[Python]

AI技术与应用 2026-05-05 14:49:47 3

多Agent系统(Multi-Agent System,MAS)的跨云部署确实是个很有意思的课题。我在实际项目中也踩过类似的坑——当系统从单机扩展到多云环境时,原本在本地跑得好好的同步调用会突然变得脆弱无比。LLM推理的不可预测延迟、企业网络里那些严格的NAT防火墙、还有容器漂移带来的状态丢失,每一个点都可能让你的Agent系统在生产环境里彻底失控。这篇文章提到的几个架构要点都相当实用,特别是异步消息队列和外部化状态的设计思路,我觉得值得好好聊聊。

说实话,部署跨云多Agent系统的过程,更像是在为自治软件搭建一套私有互联网。你需要重新思考很多本地开发时理所当然的假设——同步HTTP会超时、容器会漂移、IP地址会变化、网络会不可达。下面这五个架构要点,就是解决这些跨云挑战的关键。

同步HTTP会卡死你的Agent架构

从单个Agent扩展到多个Agent协作时,开发者通常会习惯性地使用标准的REST API,让一个Agent向另一个发送同步POST请求。但这套在生产环境里根本跑不通——因为LLM(大语言模型)推理时间波动太大了。生成一个响应或者执行一个未经优化的工具调用,动不动就要十到四十秒。云的负载均衡器和标准HTTP客户端可等不了这么久,连接会被强制断开,Agent不得不重新开始整个推理循环。

跨云的Agent通信必须采用异步模式。与其阻塞等待HTTP响应,不如让Agent把任务丢给分布式消息队列。这样编排Agent(orchestrator)可以继续处理其他输入,而工作Agent在另一个节点上异步处理任务。

# 使用 Celery 和 Redis 实现跨云异步任务分发
from celery import Celery

app = Celery('agent_tasks', broker='redis://external-broker-url:6379/0')

@app.task
def delegate_to_research_agent(prompt, context):
    # 任务在 GCP 工作节点上异步执行
    result = research_agent.execute(prompt, context)
    # 将结果存储到外部数据库,供阿里云的 Agent 后续获取
    db.store_result(task_id=delegate_to_research_agent.request.id, data=result)
    return True

# 在阿里云编排节点上:触发任务但不阻塞
task = delegate_to_research_agent.delay("分析第三季度财报", previous_context)
print(f"任务已分发,ID: {task.id}")

临时容器会摧毁对话状态

运行在自动扩缩容云实例里的Agent是临时的。如果Agent进程因为上下文窗口过大导致内存溢出而崩溃,容器就会重启。如果对话历史和任务轨迹都存在Agent进程的本地内存里,那整个工作流在重启后就会全部消失。

为了让系统扛住节点迁移,Agent进程必须完全无状态。每个工具输出、中间推理步骤和用户提示都应该立刻推送到一个外部全局可访问的数据存储。Agent启动时,通过查询这个外部记忆来重建上下文窗口。

# 将 Agent 状态外置到 Redis
import redis
import json

r = redis.Redis(host='global-redis.internal', port=6379, db=0)

def save_agent_thought(session_id, step_data):
    # 将最新的推理步骤推送到列表
    r.rpush(f"agent_state:{session_id}", json.dumps(step_data))

def rebuild_context(session_id):
    # 容器重启后重建状态
    raw_steps = r.lrange(f"agent_state:{session_id}", 0, -1)
    return [json.loads(step) for step in raw_steps]

跨网络边界的工具执行管理

把API密钥和数据库连接字符串直接硬编码到Agent逻辑里,在不可信的云虚拟机上会造成巨大的安全隐患。Agent的推理循环必须与工具执行权限严格分离。

Model Context Protocol(MCP,模型上下文协议)就是这个解耦方案的行业标准做法。把内部数据库封装成MCP服务器,通过标准化的JSON-RPC模式精确控制Agent能访问哪些数据。云Agent请求工具执行,安全可信的MCP服务器负责实际执行,确保自主运行的模型永远不会直接接触到原始的基础设施凭证。

# 跨网络连接 Agent 与安全的 MCP 服务器
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

async def query_secure_tool():
    # 服务器参数定义了到安全工具环境的连接
    server_params = StdioServerParameters(
        command="python",
        args=["secure_mcp_server.py"],
    )

    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()

            # Agent 动态发现可用工具
            tools = await session.list_tools()

            # Agent 执行工具但看不到底层凭证
            result = await session.call_tool("query_internal_db", arguments={"target": "Q3_sales"})
            print(result)

asyncio.run(query_secure_tool())

克服IP变动和NAT防火墙实现直连传输

虽然Model Context Protocol负责格式化工具请求,但它假设底层网络已经可达。云容器面临持续的IP变动,企业网络则使用严格的NAT防火墙。在各云之间暴露本地工具服务器通常需要虚拟私有云对等连接或中心化的API网关,这会引入延迟和单点故障。

这个传输层问题需要用Pilot Protocol为Agent分配持久化的加密身份。不是把通信绑定到脆弱的物理IP上,而是这个用户空间覆盖网络会分配一个与Ed25519密钥对数学绑定的永久48位虚拟地址。纯Go语言实现的守护进程利用自动UDP打洞技术穿越严格的防火墙,并执行X25519椭圆曲线Diffie-Hellman密钥交换。这样阿里云上的编排Agent就能直接与内网中的工作Agent通信,无需反向代理。

# 安装纯 Go 语言实现用户空间网络栈
curl -fsSL https://pilotprotocol.network/install.sh | sh

# 在本地安全机器上启动守护进程(节点 A)
pilotctl daemon start --hostname secure-mcp-tool

# 在云端 VPS Agent 上启动守护进程(节点 B)
pilotctl daemon start --hostname cloud-worker-agent

# 现在节点 B 可以绕过 NAT 直接路由到节点 A
# 利用底层 TCP-over-UDP 传输层
pilotctl connect secure-mcp-tool --message '{"jsonrpc": "2.0", "method": "call_tool"}'

分布式追踪对Agent调试不可或缺

当跨云多Agent工作流失败时,定位确切的故障点非常困难。如果Azure上的编排Agent把任务委托给GCP上的研究Agent,而GCP的Agent遇到了幻觉循环,本地日志只会显示一个通用的HTTP超时。

对自治系统来说,实现分布式追踪是硬性要求。将追踪上下文注入到跨云传递的载荷中,让工程师能够用OpenTelemetry标准在网络边界之间可视化整个工具调用和提示生成的完整序列。

# 将 OpenTelemetry 追踪 ID 注入跨云载荷
from opentelemetry import trace
from opentelemetry.propagate import inject

tracer = trace.get_tracer(__name__)

def dispatch_task_to_peer(agent_endpoint, payload):
    with tracer.start_as_current_span("cross_cloud_delegation") as span:
        headers = {}
        # 将当前追踪上下文注入到请求头或载荷中
        inject(headers)

        # 将追踪头添加到发送到远程 Agent 的载荷中
        payload["trace_context"] = headers

        # 到远程 Agent 的标准请求
        response = requests.post(agent_endpoint, json=payload)
        span.set_attribute("peer.response", response.status_code)
        return response

原文链接:How to Deploy Multi-Agent Systems Cross-Cloud[Python]