🤖 Agents IA

agent-multi-agent-orchestrator

Conception de systèmes multi-agents avec patterns d'orchestration (supervisor, swarm, hierarchy, voting). Coordination entre agents spécialisés, routing sémantique, gestion d'état distribué, error handling.

⚡ Installation & lancement en 1 commande

Copiez-collez dans votre terminal : le skill s'installe dans ~/.claude/skills et Claude Code se lance directement dessus.

macOS / Linux
curl -fsSL https://raw.githubusercontent.com/khalilbenaz/claude-skills-collection/main/install.sh | sh -s -- agent-multi-agent-orchestrator --launch
Windows (PowerShell)
iex "& { $(iwr -useb https://raw.githubusercontent.com/khalilbenaz/claude-skills-collection/main/install.ps1) } agent-multi-agent-orchestrator -Launch"

🚀 Déjà installé ?

claude "/agent-multi-agent-orchestrator"

Ou tapez /agent-multi-agent-orchestrator dans une session Claude Code, ou décrivez simplement votre besoin — le skill se déclenche automatiquement via le skill-router.

🔑 Déclencheurs automatiques

Le skill s'active automatiquement quand votre demande contient :

multi-agentorchestrationsupervisor agentswarmagent coordinationagent hierarchyagents qui collaborentrépartir entre agents

📦 Installation manuelle

git clone https://github.com/khalilbenaz/claude-skills-collection.git cp -r claude-skills-collection/skills/agent-multi-agent-orchestrator ~/.claude/skills/

Payload du plugin : skills/agent-multi-agent-orchestrator · source éditable : agent-skills/multi-agent-orchestrator

📖 Manuel

Multi-Agent Orchestrator

Quand utiliser ce skill

Utilise ce skill dès qu'une tâche gagne à être décomposée en sous-tâches spécialisées et exécutées par des agents distincts : pipelines de recherche parallèle, systèmes supervisor/worker, débats multi-agents, workflows ETL IA, etc.

Seuil de déclenchement — un seul agent suffit si la tâche est linéaire et tient en un contexte. Passe en multi-agent si :


Choix du pattern (critères de décision)

PatternQuand l'utiliserComplexité
Supervisor / RouterTâche décomposable en N sous-tâches distinctes ; résultats à consoliderFaible
Pipeline séquentielSous-tâches avec dépendances strictes A→B→CFaible
Parallel fan-outSous-tâches indépendantes, latence critiqueMoyenne
HiérarchiqueOrganisation de type chef de projet → équipes → exécutantsMoyenne
SwarmExploration large, diversité de solutions, pas de dépendancesHaute
Debate / VotingDécisions à fort enjeu, besoin de consensus ou de validation croiséeHaute

Workflow en étapes

1. Décomposition de la tâche

Identifie les sous-tâches atomiques et leurs dépendances. Produis un graphe orienté (DAG) avant tout code.

tâche principale
├── sous-tâche A (indépendante)
├── sous-tâche B (indépendante)
└── sous-tâche C (dépend de A + B)

Règle : si deux sous-tâches n'ont aucune dépendance, elles doivent s'exécuter en parallèle.

2. Définition des agents

Un agent = un rôle unique + un system prompt ciblé. Évite les agents "couteau suisse".

AGENTS = {
    "researcher": Agent(
        system="Tu es un agent de recherche. Tu extrais des faits vérifiables. "
               "Retourne TOUJOURS un JSON {facts: [], sources: []}.",
        tools=[web_search, fetch_url],
    ),
    "writer": Agent(
        system="Tu es un rédacteur. Tu transformes des faits structurés en prose claire.",
        tools=[],
    ),
    "validator": Agent(
        system="Tu es un agent de validation. Tu détectes les affirmations non sourcées.",
        tools=[],
    ),
}

3. Routing sémantique

Pour router dynamiquement une requête vers l'agent le plus adapté :

import numpy as np

def cosine_sim(a, b):
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

def route(task: str, agents: dict[str, Agent]) -> Agent:
    task_vec = embed(task)
    scores = {
        name: cosine_sim(task_vec, agent.profile_embedding)
        for name, agent in agents.items()
    }
    best = max(scores, key=scores.get)
    return agents[best]

Alternative légère : classification d'intention via LLM avec un prompt de routing dédié.

4. Exécution parallèle

import asyncio

async def run_parallel(tasks: list[tuple[Agent, str]]) -> list[str]:
    coros = [agent.run(task) for agent, task in tasks]
    return await asyncio.gather(*coros, return_exceptions=True)

# Usage
results = await run_parallel([
    (agents["researcher"], "Recherche les faits sur X"),
    (agents["researcher"], "Recherche les faits sur Y"),
])

Pour des workers synchrones lourds, utilise concurrent.futures.ThreadPoolExecutor.

5. État partagé

Centralise l'état dans un objet mutable accessible à tous les agents de l'orchestrateur :

from dataclasses import dataclass, field

@dataclass
class WorkflowState:
    task_id: str
    inputs: dict = field(default_factory=dict)
    results: dict = field(default_factory=dict)   # {agent_name: output}
    errors: list = field(default_factory=list)
    token_budget_used: int = 0
    token_budget_max: int = 50_000

Pour un système distribué : Redis (HSET par task_id) ou une table SQL.

6. Error handling et résilience

async def safe_call(agent: Agent, task: str, retries: int = 3) -> str:
    backoff = 1
    for attempt in range(retries):
        try:
            return await asyncio.wait_for(agent.run(task), timeout=30)
        except asyncio.TimeoutError:
            pass
        except Exception as e:
            state.errors.append({"agent": agent.name, "error": str(e), "task": task})
        await asyncio.sleep(backoff)
        backoff *= 2
    return agent.fallback_response(task)   # réponse dégradée

Stratégies de fallback (par ordre de préférence) :

  1. Retry avec backoff exponentiel
  2. Agent de remplacement (généraliste)
  3. Valeur par défaut documentée
  4. Escalade humaine (webhook / notification)

7. Consolidation et vote

Quand plusieurs agents produisent des réponses concurrentes :

def majority_vote(responses: list[str], arbitrator: Agent) -> str:
    if len(set(responses)) == 1:
        return responses[0]
    # Désaccord → arbitrage LLM
    prompt = f"Voici {len(responses)} réponses d'agents. Choisis la plus correcte et justifie.\n\n"
    for i, r in enumerate(responses, 1):
        prompt += f"Agent {i}: {r}\n"
    return arbitrator.run(prompt)

Vote pondéré : attribue un poids de confiance à chaque agent selon son historique (success_rate, latence, spécialisation).

8. Budget tokens et coût

def check_budget(state: WorkflowState, agent: Agent, estimated_tokens: int):
    if state.token_budget_used + estimated_tokens > state.token_budget_max:
        raise BudgetExceededError(
            f"Budget {state.token_budget_max} tokens atteint après {state.token_budget_used}"
        )

Règle : définis un budget global par workflow, pas par agent. Loggue chaque appel avec son coût réel.

9. Monitoring

Instrumente chaque agent call :

import time

async def instrumented_call(agent, task, state):
    t0 = time.perf_counter()
    result = await safe_call(agent, task)
    elapsed = time.perf_counter() - t0
    metrics.record(agent=agent.name, latency=elapsed, tokens=result.usage.total_tokens)
    return result

Métriques clés à suivre : latence p50/p95, taux de retry, taux d'erreur, coût par workflow, agent le plus sollicité.


Garde-fous et anti-patterns

Anti-patternSymptômeCorrectif
Cascade linéaireUn agent lent bloque toutFan-out parallèle + timeout par étape
Agent trop généralisteQualité médiocre sur toutes les tâchesDiviser en agents spécialisés
Contexte non partagéAgents qui se répètent ou se contredisentState centralisé, passé explicitement
Pas d'idempotenceRetries créent des effets de bordRendre chaque appel idempotent (ID de tâche unique)
Boucle infinie agent→agentStack overflow ou coût infiniLimite de profondeur (max_depth=5) + détection de cycle
Absence de fallbackPanne d'un agent = panne totaleFallback systématique + circuit breaker
Confiance aveugleHallucinations propagéesAgent validateur indépendant obligatoire

Bonnes pratiques 2026