🤖 Agents IA

agent-a2a-protocol-guide

Guide du protocole Agent-to-Agent (A2A) de Google pour l'interopérabilité entre agents IA — découverte, communication et collaboration inter-agents, avec exemples de code, critères de décision et pièges à éviter.

⚡ Installation & lancement en 1 commande

Copiez-collez dans votre terminal : le skill s'installe dans ~/.claude/skills et Claude Code se lance directement dessus.

macOS / Linux
curl -fsSL https://raw.githubusercontent.com/khalilbenaz/claude-skills-collection/main/install.sh | sh -s -- agent-a2a-protocol-guide --launch
Windows (PowerShell)
iex "& { $(iwr -useb https://raw.githubusercontent.com/khalilbenaz/claude-skills-collection/main/install.ps1) } agent-a2a-protocol-guide -Launch"

🚀 Déjà installé ?

claude "/agent-a2a-protocol-guide"

Ou tapez /agent-a2a-protocol-guide dans une session Claude Code, ou décrivez simplement votre besoin — le skill se déclenche automatiquement via le skill-router.

🔑 Déclencheurs automatiques

Le skill s'active automatiquement quand votre demande contient :

A2Aagent-to-agentprotocole A2AGoogle A2Ainteropérabilité agents

📦 Installation manuelle

git clone https://github.com/khalilbenaz/claude-skills-collection.git cp -r claude-skills-collection/skills/agent-a2a-protocol-guide ~/.claude/skills/

Payload du plugin : skills/agent-a2a-protocol-guide · source éditable : agent-skills/a2a-protocol-guide

📖 Manuel

Guide du Protocole A2A

Concepts fondamentaux

ComposantRôle
Agent CardFichier JSON décrivant l'agent (skills, endpoint, auth) — publié sur /.well-known/agent.json
TaskUnité de travail délégué ; cycle : submitted → working → completed / failed / canceled
ArtifactRésultat produit par une tâche (fichier, JSON, message)
PartFragment d'un message ou artifact : TextPart, FilePart, DataPart
Client / ServerUn agent peut être les deux selon la direction de l'appel

Workflow en étapes

1. Définir l'Agent Card

Fichier minimal exposé sur GET /.well-known/agent.json :

{
  "name": "DataExtractorAgent",
  "description": "Extrait des données structurées depuis des documents PDF",
  "url": "https://agent.example.com/a2a",
  "version": "1.0.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": false
  },
  "authentication": {
    "schemes": ["Bearer"]
  },
  "skills": [
    {
      "id": "extract-invoice",
      "name": "Invoice Extraction",
      "description": "Retourne les champs clés d'une facture PDF",
      "examples": ["Extrais les montants de cette facture"],
      "inputModes": ["application/pdf"],
      "outputModes": ["application/json"]
    }
  ]
}

Critère de décision — streaming : active streaming: true si les tâches durent > 5 s ou produisent des résultats progressifs. Sinon, mode synchrone suffit.


2. Implémenter le serveur A2A (JSON-RPC 2.0)

Endpoint unique POST /a2a — méthodes requises :

MéthodeUsage
tasks/sendTâche synchrone (réponse directe)
tasks/sendSubscribeTâche longue — SSE stream
tasks/getConsulter statut + artifacts
tasks/cancelAnnuler une tâche en cours
# FastAPI — squelette minimal
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, StreamingResponse
import uuid, asyncio

app = FastAPI()

tasks_store = {}  # En prod : Redis ou DB

@app.post("/a2a")
async def a2a_handler(request: Request):
    body = await request.json()
    method = body.get("method")
    params = body.get("params", {})
    rpc_id = body.get("id")

    if method == "tasks/send":
        task_id = str(uuid.uuid4())
        result = await run_task(params)
        tasks_store[task_id] = {"status": "completed", "result": result}
        return JSONResponse({
            "jsonrpc": "2.0", "id": rpc_id,
            "result": {"id": task_id, "status": {"state": "completed"}, "artifacts": [result]}
        })

    if method == "tasks/get":
        task_id = params["id"]
        task = tasks_store.get(task_id)
        if not task:
            return JSONResponse({"jsonrpc": "2.0", "id": rpc_id,
                                 "error": {"code": -32001, "message": "Task not found"}})
        return JSONResponse({"jsonrpc": "2.0", "id": rpc_id, "result": task})

    return JSONResponse({"jsonrpc": "2.0", "id": rpc_id,
                         "error": {"code": -32601, "message": "Method not found"}})

3. Appeler un agent distant (côté client)

import httpx, json

async def send_task(agent_url: str, message: str, token: str) -> dict:
    payload = {
        "jsonrpc": "2.0", "id": "1", "method": "tasks/send",
        "params": {
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": message}]
            }
        }
    }
    async with httpx.AsyncClient() as client:
        resp = await client.post(
            agent_url,
            json=payload,
            headers={"Authorization": f"Bearer {token}"},
            timeout=30
        )
        resp.raise_for_status()
        return resp.json()["result"]

4. Streaming SSE (tasks/sendSubscribe)

from fastapi.responses import StreamingResponse

@app.post("/a2a/stream")
async def stream_task(request: Request):
    body = await request.json()

    async def event_generator():
        # Mise à jour intermédiaire
        yield f"data: {json.dumps({'type': 'TaskStatusUpdateEvent', 'status': {'state': 'working'}})}\n\n"
        await asyncio.sleep(1)
        # Résultat final
        artifact = {"type": "DataPart", "data": {"key": "value"}}
        yield f"data: {json.dumps({'type': 'TaskArtifactUpdateEvent', 'artifact': artifact})}\n\n"
        yield f"data: {json.dumps({'type': 'TaskStatusUpdateEvent', 'status': {'state': 'completed'}})}\n\n"

    return StreamingResponse(event_generator(), media_type="text/event-stream")

5. Découverte et sélection d'agent

async def discover_agent(base_url: str) -> dict:
    async with httpx.AsyncClient() as client:
        r = await client.get(f"{base_url}/.well-known/agent.json", timeout=5)
        r.raise_for_status()
        return r.json()

# Critère de sélection
def select_agent(agents: list[dict], required_skill: str) -> dict | None:
    for a in agents:
        skills = [s["id"] for s in a.get("skills", [])]
        if required_skill in skills:
            return a
    return None

Registre simple : maintenir une liste d'URLs connues + cache de leurs Agent Cards (TTL 5 min). Pour du dynamique, utiliser un service de registre central (ex. : endpoint GET /agents dans l'orchestrateur).


6. Sécurité

# Middleware auth minimal
from fastapi import Depends, HTTPException, Header

async def verify_token(authorization: str = Header(...)):
    if not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401)
    token = authorization[7:]
    if token not in ALLOWED_TOKENS:
        raise HTTPException(status_code=403)

7. Orchestration multi-agents

Pattern fan-out / fan-in :

async def orchestrate(agents: list[str], prompt: str) -> list:
    tasks = [send_task(url, prompt, TOKEN) for url in agents]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return [r for r in results if not isinstance(r, Exception)]

Pattern chaîne séquentielle : sortie artifact de l'agent N devient le message d'entrée de l'agent N+1. Extraire le bon Part depuis artifacts[0].parts.


8. Monitoring et debug


Garde-fous et anti-patterns

Anti-patternProblèmeCorrect
Stocker l'état des tâches en mémoire processPerdu au redémarrageRedis / DB persistante
Pas de timeout sur tasks/sendBlocage indéfiniTimeout HTTP + tasks/cancel si dépassé
Agent Card statique en cache illimitéCapabilities obsolètesTTL max 5 min, invalider sur erreur 404/401
Exposer /.well-known/agent.json sans authEnumération par tiersAuth optionnelle mais recommandée en prod interne
Format propriétaire dans les partsInterop casséeUtiliser TextPart, FilePart, DataPart standard
Ignorer les erreurs return_exceptions en gatherRésultats silencieusement manquantsLogger chaque exception, circuit breaker si taux > seuil

Bonnes pratiques 2026