A2A 协议实战:用 Google Agent-to-Agent 构建多智能体协作系统

深入解析 Google A2A 协议核心机制,对比 MCP 定位差异,手把手用 TypeScript 实现 Agent 互发现与任务编排,帮你从单 Agent 迈向多智能体协作架构。

开发者效率 2026-05-29 12 分钟

2025 年底 Google DeepMind 开源的 A2A(Agent-to-Agent)协议,让不同框架构建的 AI Agent 首次拥有了标准化的互操作语言。截至目前,GitHub 上已有超过 4000 个项目基于 A2A 协议构建,SAP、Salesforce、LangChain 等主流平台全部接入。如果你正在构建多 Agent 系统,却还在用硬编码的 HTTP 调用串联各个 Agent,那你一定需要了解 A2A —— 它解决的正是"Agent 之间如何发现彼此、委托任务、传递上下文"这个核心问题。

🔍 一、A2A 是什么:定位、架构与核心概念

A2A 与 MCP 的本质区别

很多开发者把 A2A 和 MCP(Model Context Protocol)混为一谈,这是最大的误区。两者解决的是完全不同层面的问题:

维度 MCP A2A
解决问题 Agent 如何调用工具/获取数据 Agent 如何与其他 Agent 协作
通信方向 Agent ↔ 工具服务器 Agent ↔ Agent
发现机制 静态配置(tool list) 动态发现(AgentCard)
任务模型 单次请求-响应 多轮对话、长时间任务
适用场景 调 API、查数据库、读文件 委托复杂任务、多 Agent 编排
协议层 JSON-RPC 2.0 HTTP + JSON-RPC + SSE

💡 **提示:**MCP 是 Agent 的「手臂」—— 让它能操作外部世界;A2A 是 Agent 的「嘴巴」—— 让它能与其他 Agent 对话。两者是互补关系,不是替代关系。

一个典型的企业级 AI 应用架构是这样的:用户请求进入一个「编排 Agent」,它通过 A2A 协议发现并调用多个专业 Agent(如代码审查 Agent、测试 Agent、部署 Agent),每个专业 Agent 内部又通过 MCP 调用具体的工具(如 GitHub API、Jenkins、数据库)。

A2A 协议的三大核心概念

1. AgentCard —— Agent 的「名片」

每个 A2A Agent 都必须暴露一个 /.well-known/agent.json 文件,称为 AgentCard。它描述了这个 Agent 能做什么、支持什么能力、如何通信:

// AgentCard 示例:代码审查 Agent
{
  "name": "Code Review Agent",
  "description": "专业代码审查,支持 TypeScript/Python/Java,提供安全性、性能、可维护性分析",
  "url": "https://code-review.example.com",
  "version": "1.0.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": true
  },
  "authentication": {
    "schemes": ["Bearer"]
  },
  "defaultInputModes": ["text/plain", "application/json"],
  "defaultOutputModes": ["text/plain", "text/markdown"],
  "skills": [
    {
      "id": "security-review",
      "name": "安全审查",
      "description": "检测代码中的安全漏洞,包括 SQL 注入、XSS、认证绕过等",
      "tags": ["security", "code-review"],
      "examples": ["请审查这段登录代码的安全性", "检查这个 API 端点是否存在注入风险"]
    },
    {
      "id": "performance-review",
      "name": "性能审查",
      "description": "分析代码性能瓶颈,识别 N+1 查询、内存泄漏、不必要的重渲染等",
      "tags": ["performance", "optimization"],
      "examples": ["分析这段数据库查询的性能", "检查 React 组件的渲染性能"]
    }
  ]
}

2. Task —— 任务生命周期

A2A 的核心抽象是 Task(任务)。一个 Task 有完整的生命周期:submitted → working → input-required → completed / failed / canceled。这比 MCP 的单次请求-响应模型复杂得多,因为它支持长时间运行的任务和多轮交互。

3. Message & Part —— 消息结构

Agent 之间通过 Message 通信,每个 Message 包含多个 Part。Part 可以是文本、文件、结构化数据,甚至是 UI 组件的渲染指令:

// A2A 消息结构
interface Message {
  role: 'user' | 'agent';
  parts: Part[];
  metadata?: Record<string, unknown>;
}

type Part = TextPart | FilePart | DataPart;

interface TextPart {
  type: 'text';
  text: string;
}

interface FilePart {
  type: 'file';
  file: {
    name: string;
    mimeType: string;
    bytes?: string;  // base64
    uri?: string;
  };
}

interface DataPart {
  type: 'data';
  data: Record<string, unknown>;
  metadata?: Record<string, unknown>;
}

🛠️ 二、从零实现 A2A Agent:TypeScript 实战

接下来我们用 TypeScript 实现一个完整的 A2A Agent 系统,包含一个「旅行规划 Agent」和一个「天气查询 Agent」,演示 Agent 之间如何通过 A2A 协议协作。

实现 A2A Server(天气查询 Agent)

// weather-agent/server.ts
// 天气查询 A2A Server —— 暴露天气查询能力供其他 Agent 调用
import express from 'express';
import { v4 as uuidv4 } from 'uuid';

const app = express();
app.use(express.json());

// AgentCard —— 告诉外界"我能做什么"
app.get('/.well-known/agent.json', (_req, res) => {
  res.json({
    name: 'Weather Agent',
    description: '全球天气查询 Agent,支持实时天气、7日预报、历史天气对比',
    url: 'http://localhost:3001',
    version: '1.0.0',
    capabilities: {
      streaming: false,
      pushNotifications: false,
      stateTransitionHistory: false,
    },
    defaultInputModes: ['text/plain'],
    defaultOutputModes: ['application/json', 'text/plain'],
    skills: [
      {
        id: 'current-weather',
        name: '实时天气',
        description: '查询指定城市的当前天气状况',
        tags: ['weather', 'realtime'],
        examples: ['北京今天天气怎么样?', '上海现在多少度?'],
      },
      {
        id: 'weather-forecast',
        name: '天气预报',
        description: '查询指定城市未来7天的天气预报',
        tags: ['weather', 'forecast'],
        examples: ['北京未来一周天气', '杭州下周会下雨吗?'],
      },
    ],
  });
});

// 模拟天气数据
const weatherDB: Record<string, { temp: number; condition: string; humidity: number }> = {
  北京: { temp: 28, condition: '晴', humidity: 35 },
  上海: { temp: 31, condition: '多云', humidity: 72 },
  杭州: { temp: 29, condition: '小雨', humidity: 85 },
  深圳: { temp: 33, condition: '雷阵雨', humidity: 90 },
  成都: { temp: 25, condition: '阴', humidity: 65 },
};

// 核心:任务处理逻辑
async function handleTask(taskId: string, city: string) {
  const weather = weatherDB[city] || { temp: 22, condition: '未知', humidity: 50 };
  return {
    id: taskId,
    status: { state: 'completed', timestamp: new Date().toISOString() },
    artifacts: [
      {
        name: 'weather-result',
        parts: [
          {
            type: 'data',
            data: {
              city,
              temperature: weather.temp,
              condition: weather.condition,
              humidity: weather.humidity,
              unit: '°C',
              advice: weather.temp > 30 ? '注意防暑防晒' : weather.temp < 10 ? '注意添衣保暖' : '适合出行',
            },
          },
        ],
      },
    ],
  };
}

// A2A JSON-RPC 端点 —— 所有 A2A 请求都走这里
app.post('/', async (req, res) => {
  const { method, params, id } = req.body;

  try {
    switch (method) {
      case 'tasks/send': {
        const { message, id: taskId } = params;
        const userText = message.parts.find((p: any) => p.type === 'text')?.text || '';
        // 从自然语言中提取城市名
        const cityMatch = userText.match(/([\u4e00-\u9fa5]{2,4})(?:的|今天|现在|未来|下周)/);
        const city = cityMatch ? cityMatch[1] : '北京';
        const result = await handleTask(taskId || uuidv4(), city);
        res.json({ jsonrpc: '2.0', id, result });
        break;
      }

      case 'tasks/get': {
        // 查询任务状态(长时间任务场景)
        res.json({
          jsonrpc: '2.0',
          id,
          result: { id: params.id, status: { state: 'completed' } },
        });
        break;
      }

      default:
        res.status(400).json({
          jsonrpc: '2.0',
          id,
          error: { code: -32601, message: `Unknown method: ${method}` },
        });
    }
  } catch (error: any) {
    res.status(500).json({
      jsonrpc: '2.0',
      id,
      error: { code: -32603, message: error.message },
    });
  }
});

app.listen(3001, () => console.log('Weather Agent running on :3001'));

实现 A2A Client(旅行规划 Agent)

// travel-agent/client.ts
// 旅行规划 A2A Client —— 调用天气 Agent 获取目的地天气,制定旅行建议
import { v4 as uuidv4 } from 'uuid';

interface AgentCard {
  name: string;
  description: string;
  url: string;
  skills: Array<{ id: string; name: string; tags: string[] }>;
}

// 发现 Agent —— 通过 URL 获取 AgentCard
async function discoverAgent(baseUrl: string): Promise<AgentCard> {
  const response = await fetch(`${baseUrl}/.well-known/agent.json`);
  if (!response.ok) throw new Error(`Agent discovery failed: ${response.status}`);
  return response.json();
}

// 向 Agent 发送任务
async function sendTask(agentUrl: string, text: string): Promise<any> {
  const taskId = uuidv4();
  const response = await fetch(agentUrl, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      jsonrpc: '2.0',
      id: uuidv4(),
      method: 'tasks/send',
      params: {
        id: taskId,
        message: {
          role: 'user',
          parts: [{ type: 'text', text }],
        },
      },
    }),
  });

  const rpcResponse = await response.json();
  if (rpcResponse.error) {
    throw new Error(`A2A Error: ${rpcResponse.error.message}`);
  }
  return rpcResponse.result;
}

// 旅行规划核心逻辑
async function planTrip(destination: string, days: number) {
  console.log(`\n🗺️  正在为 ${destination} 规划 ${days} 天旅行...\n`);

  // 第一步:通过 A2A 发现天气 Agent
  const weatherAgent = await discoverAgent('http://localhost:3001');
  console.log(`📡 发现 Agent: ${weatherAgent.name}`);
  console.log(`   能力: ${weatherAgent.skills.map((s) => s.name).join(', ')}\n`);

  // 第二步:通过 A2A 委托天气查询任务
  const weatherResult = await sendTask(weatherAgent.url, `${destination}今天天气怎么样?`);
  const weatherData = weatherResult.artifacts[0].parts[0].data;

  console.log(`🌤️  ${destination}天气: ${weatherData.temperature}°C, ${weatherData.condition}`);
  console.log(`💧 湿度: ${weatherData.humidity}%`);
  console.log(`👔 建议: ${weatherData.advice}\n`);

  // 第三步:根据天气生成旅行建议
  const recommendations = generateRecommendations(destination, weatherData, days);

  return { weather: weatherData, recommendations };
}

function generateRecommendations(
  destination: string,
  weather: { temp: number; condition: string; humidity: number },
  days: number,
) {
  const tips: string[] = [];

  if (weather.temp > 30) {
    tips.push('⚠️ 高温天气,建议安排室内景点(博物馆、商场)在午后最热时段');
    tips.push('✅ 清晨和傍晚适合户外活动');
  } else if (weather.temp < 10) {
    tips.push('🧣 气温较低,多带保暖衣物');
    tips.push('✅ 适合温泉、室内文化体验');
  } else {
    tips.push('✅ 气温舒适,适合全程户外活动');
  }

  if (weather.humidity > 80) {
    tips.push('🌧️ 湿度较高,随身携带雨具');
  }

  tips.push(`📅 ${days}天行程建议: 前${Math.ceil(days / 2)}天集中游览核心景点,后${Math.floor(days / 2)}天深度体验当地文化`);

  return tips;
}

// 运行
planTrip('杭州', 3).then((result) => {
  console.log('📋 旅行规划完成:');
  result.recommendations.forEach((tip) => console.log(`   ${tip}`));
});

⚠️ **警告:**实际生产中,Agent 之间的通信必须启用 TLS 和认证。上面的示例为了演示简洁省略了认证层,但 /.well-known/agent.json 中的 authentication 字段必须正确配置。

⚡ 三、MCP + A2A 双协议架构实战

架构设计

真正的生产级多 Agent 系统需要同时使用 MCP 和 A2A。下面是一个典型的「代码审查流水线」架构:

用户请求 → 编排 Agent(A2A Server + MCP Client)
              ├── 代码分析 Agent(A2A Server,内部通过 MCP 调用 Git 工具)
              ├── 安全扫描 Agent(A2A Server,内部通过 MCP 调用 SAST 工具)
              └── 测试生成 Agent(A2A Server,内部通过 MCP 调用测试框架)

编排 Agent 通过 A2A 协议发现并调度各个专业 Agent,每个专业 Agent 内部通过 MCP 协议调用具体的工具。这种分层架构的优势在于:

  • ✅ 每个 Agent 可以独立开发、测试、部署
  • ✅ 新增能力只需注册新的 Agent,编排层无需修改
  • ✅ 不同团队可以用不同技术栈构建各自的 Agent

编排 Agent 核心实现

// orchestrator/agent.ts
// 编排 Agent —— 同时作为 A2A Server(接收用户请求)和 A2A Client(调用子 Agent)
import express from 'express';

const app = express();
app.use(express.json());

// 已知的子 Agent 列表(生产环境应从注册中心动态获取)
const SUB_AGENTS = [
  'http://localhost:3001', // 天气 Agent
  'http://localhost:3002', // 代码分析 Agent
  'http://localhost:3003', // 安全扫描 Agent
];

// 动态发现所有可用 Agent
async function discoverAgents(): Promise<Map<string, any>> {
  const agents = new Map();
  const results = await Promise.allSettled(
    SUB_AGENTS.map(async (url) => {
      const res = await fetch(`${url}/.well-known/agent.json`);
      if (res.ok) {
        const card = await res.json();
        return { url, card };
      }
      return null;
    }),
  );

  results.forEach((r) => {
    if (r.status === 'fulfilled' && r.value) {
      const { url, card } = r.value;
      card.skills.forEach((skill: any) => {
        agents.set(skill.id, { url, skill });
      });
    }
  });

  console.log(`📡 已发现 ${agents.size} 个可用技能`);
  return agents;
}

// 智能路由 —— 根据用户意图选择合适的 Agent
async function routeToAgent(userText: string, agents: Map<string, any>) {
  const text = userText.toLowerCase();

  // 简单的关键词匹配路由(生产环境用 LLM 做意图识别)
  for (const [skillId, agent] of agents) {
    const keywords = agent.skill.tags || [];
    if (keywords.some((kw: string) => text.includes(kw))) {
      return agent;
    }
  }

  // 回退:返回第一个可用 Agent
  return agents.values().next().value;
}

// 并行编排 —— 同时调用多个 Agent
async function parallelOrchestration(tasks: Array<{ url: string; text: string }>) {
  const results = await Promise.allSettled(
    tasks.map(async ({ url, text }) => {
      const res = await fetch(url, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          jsonrpc: '2.0',
          id: crypto.randomUUID(),
          method: 'tasks/send',
          params: {
            id: crypto.randomUUID(),
            message: { role: 'user', parts: [{ type: 'text', text }] },
          },
        }),
      });
      return res.json();
    }),
  );

  return results
    .filter((r) => r.status === 'fulfilled')
    .map((r: any) => r.value.result);
}

app.post('/', async (req, res) => {
  const { method, params, id } = req.body;

  if (method === 'tasks/send') {
    const userText = params.message.parts.find((p: any) => p.type === 'text')?.text || '';
    const agents = await discoverAgents();

    // 判断是否需要多 Agent 协作
    if (userText.includes('全面审查') || userText.includes('完整分析')) {
      // 并行调用所有 Agent
      const tasks = SUB_AGENTS.map((url) => ({ url, text: userText }));
      const results = await parallelOrchestration(tasks);
      res.json({
        jsonrpc: '2.0',
        id,
        result: {
          id: params.id,
          status: { state: 'completed' },
          artifacts: [
            { name: 'combined-result', parts: [{ type: 'data', data: { results } }] },
          ],
        },
      });
    } else {
      // 路由到最匹配的单个 Agent
      const agent = await routeToAgent(userText, agents);
      if (!agent) {
        res.json({
          jsonrpc: '2.0',
          id,
          result: {
            id: params.id,
            status: { state: 'failed', message: '没有找到合适的 Agent 处理该请求' },
          },
        });
        return;
      }

      const agentRes = await fetch(agent.url, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          jsonrpc: '2.0',
          id: crypto.randomUUID(),
          method: 'tasks/send',
          params: {
            id: crypto.randomUUID(),
            message: { role: 'user', parts: [{ type: 'text', text: userText }] },
          },
        }),
      });
      const agentResult = await agentRes.json();
      res.json({ jsonrpc: '2.0', id, result: agentResult.result });
    }
  }
});

app.listen(3000, () => console.log('Orchestrator running on :3000'));

📊 四、避坑指南与生产建议

常见踩坑点

坑点 1:AgentCard 缓存导致服务发现失败

AgentCard 应该有合理的缓存策略。很多开发者在本地开发时直接硬编码 Agent URL,上线后才发现动态发现机制有问题。建议 AgentCard 设置 Cache-Control: max-age=300(5 分钟),客户端实现缓存和降级逻辑。

坑点 2:长时间任务的超时处理

A2A 支持长时间运行的任务,但客户端不能无限等待。建议实现三级超时:

超时级别 时间 处理方式
连接超时 5 秒 直接重试
响应超时 30 秒 轮询 tasks/get
任务超时 5 分钟 取消任务 tasks/cancel

坑点 3:错误传播链路断裂

当子 Agent 返回错误时,编排 Agent 必须正确地将错误信息传递给用户,而不是静默吞掉。A2A 的 status.message 字段专门用于传递可读的错误信息:

// ❌ 错误处理 —— 吞掉错误
try {
  const result = await sendTask(agentUrl, text);
  return result;
} catch (e) {
  return { status: { state: 'completed' } }; // 静默失败,用户毫无感知
}

// ✅ 正确处理 —— 透传错误
try {
  const result = await sendTask(agentUrl, text);
  return result;
} catch (e: any) {
  return {
    status: {
      state: 'failed',
      message: `Agent 调用失败: ${e.message},请稍后重试`,
    },
  };
}

生产环境 Checklist

  • 认证安全:Agent 之间使用 OAuth 2.0 或 mTLS 认证,AgentCard 中声明认证方案
  • 限流保护:每个 Agent 设置请求速率限制,防止被上游 Agent 打爆
  • 可观测性:为每个 Task 生成 traceId,贯穿整个调用链
  • 降级策略:当子 Agent 不可用时,编排 Agent 应有兜底逻辑
  • 版本兼容:AgentCard 的 version 字段遵循语义化版本,客户端做兼容性检查

💡 总结

A2A 协议填补了 AI Agent 生态中「Agent 间通信」的空白,与 MCP 形成了完整的双层协议栈。MCP 让 Agent 能操作工具,A2A 让 Agent 能协作分工。

对于开发者来说,关键建议是:

  1. 先做好单 Agent + MCP,再考虑 A2A 多 Agent 架构 —— 没有扎实的单 Agent 能力,多 Agent 只是放大问题
  2. 从 2-3 个 Agent 的小规模场景开始,验证 A2A 通信链路后再扩展
  3. AgentCard 是核心契约,投入足够精力设计好 skill 描述和输入输出模式
  4. 监控和日志比功能更重要 —— 多 Agent 系统的调试难度是单 Agent 的指数级

⚡ **关键结论:**A2A 不是什么银弹,它本质上是为 Agent 间通信提供了一套标准化的"握手协议"。真正决定多 Agent 系统成败的,是每个 Agent 自身的能力质量,以及编排策略的设计智慧。

相关工具推荐:

📚 相关文章