🤖 Agents IA

agent-subagent-delegator

Conception de systèmes de délégation parent-agent → sous-agents avec dispatch intelligent, suivi des tâches et agrégation des résultats.

⚡ Installation & lancement en 1 commande

Copiez-collez dans votre terminal : le skill s'installe dans ~/.claude/skills et Claude Code se lance directement dessus.

macOS / Linux
curl -fsSL https://raw.githubusercontent.com/khalilbenaz/claude-skills-collection/main/install.sh | sh -s -- agent-subagent-delegator --launch
Windows (PowerShell)
iex "& { $(iwr -useb https://raw.githubusercontent.com/khalilbenaz/claude-skills-collection/main/install.ps1) } agent-subagent-delegator -Launch"

🚀 Déjà installé ?

claude "/agent-subagent-delegator"

Ou tapez /agent-subagent-delegator dans une session Claude Code, ou décrivez simplement votre besoin — le skill se déclenche automatiquement via le skill-router.

🔑 Déclencheurs automatiques

Le skill s'active automatiquement quand votre demande contient :

sous-agentsubagentdélégation agentagent parentdispatch agentagent qui délèguespawner sous-agentdelegate task

📦 Installation manuelle

git clone https://github.com/khalilbenaz/claude-skills-collection.git cp -r claude-skills-collection/skills/agent-subagent-delegator ~/.claude/skills/

Payload du plugin : skills/agent-subagent-delegator · source éditable : agent-skills/subagent-delegator

📖 Manuel

Subagent Delegator

Quand utiliser ce skill

SituationAction
Tâche décomposable en sous-tâches indépendantesDéléguer en parallèle (fan-out)
Domaines métier distincts (research / write / validate)Sous-agents spécialisés
Tâche trop longue pour un seul contexte LLMPipeline séquentiel A → B → C
Besoin de résultats redondants/comparésFan-out + voting
Charge variable, agents réutilisablesPool pré-alloué

Ne pas déléguer si la tâche tient en 1-2 appels directs : la coordination coûte plus que le gain.


1. Définir le contrat d'interface avant tout

Avant de coder la logique de dispatch, figer les types d'entrée/sortie. Tout le reste en dépend.

from pydantic import BaseModel
from typing import Any, Literal

class TaskDefinition(BaseModel):
    task_id: str                          # UUID unique, tracé de bout en bout
    objective: str                        # Ce que le sous-agent DOIT produire (1 phrase)
    context: dict[str, Any]              # Données nécessaires (pas le world state entier)
    constraints: list[str]               # Ce qui est interdit ou imposé
    output_format: Literal["json", "text", "structured"]
    timeout: int = 30                     # Secondes — toujours explicite
    priority: int = 1
    retry_limit: int = 3

class TaskResult(BaseModel):
    task_id: str
    agent_id: str
    status: Literal["success", "partial", "failed"]
    data: Any
    errors: list[str] = []
    execution_time: float
    confidence: float = 1.0              # 0.0-1.0, utile pour le fan-in

Règle : le context ne contient que ce dont le sous-agent a besoin — pas le state global complet. Surcharger le contexte = latence + hallucination.


2. Choisir la stratégie de dispatch

Tâches identiques, N agents disponibles  →  round-robin
Agents avec capacités différentes        →  capability-based (manifest)
Charge en temps réel connue              →  load-based (queue size)
Tâches sémantiquement variées            →  semantic routing (embeddings)

Exemple — capability-based avec manifest déclaratif :

# Chaque sous-agent déclare son manifest au démarrage
AGENT_MANIFESTS = {
    "research-agent": {"domains": ["web_search", "doc_retrieval"], "max_tokens": 4000},
    "writer-agent":   {"domains": ["text_generation", "summarization"], "max_tokens": 8000},
    "validator-agent":{"domains": ["fact_check", "schema_validation"], "max_tokens": 2000},
}

def route_task(task: TaskDefinition) -> str:
    """Retourne l'agent_id le plus adapté à la tâche."""
    keyword = task.objective.split()[0].lower()
    for agent_id, manifest in AGENT_MANIFESTS.items():
        if any(keyword in d for d in manifest["domains"]):
            return agent_id
    return "default-agent"

Pour le semantic routing (charge plus élevée, pertinent si > 10 types de tâches) :

import numpy as np

def semantic_route(task: TaskDefinition, agents: list, embed_fn) -> str:
    task_vec = embed_fn(task.objective)
    scores = {
        a.id: float(np.dot(task_vec, a.capability_embedding))
        for a in agents
    }
    return max(scores, key=scores.get)

3. Instancier les sous-agents (lifecycle)

Option A — dynamic (tâche unique, isolation totale) :

async def delegate_once(task: TaskDefinition, agent_class) -> TaskResult:
    agent = agent_class()
    try:
        return await execute_with_retry(agent, task)
    finally:
        await agent.shutdown()

Option B — pool pré-alloué (haute fréquence, latence faible) :

import asyncio

class AgentPool:
    def __init__(self, agent_class, size: int = 5):
        self._pool = asyncio.Queue()
        for _ in range(size):
            self._pool.put_nowait(agent_class())

    async def acquire(self):
        return await self._pool.get()

    async def release(self, agent):
        await self._pool.put(agent)

    async def run(self, task: TaskDefinition) -> TaskResult:
        agent = await self.acquire()
        try:
            return await execute_with_retry(agent, task)
        finally:
            await self.release(agent)

Choisir dynamic si chaque tâche nécessite un état propre. Choisir pool si les agents sont stateless ou quasi-stateless.


4. Patterns de délégation

Fan-out / Fan-in (parallèle + agrégation)

async def fan_out(tasks: list[TaskDefinition], pool: AgentPool) -> list[TaskResult]:
    return await asyncio.gather(*[pool.run(t) for t in tasks], return_exceptions=True)

def fan_in(results: list[TaskResult], strategy: str = "best_confidence") -> TaskResult:
    valid = [r for r in results if isinstance(r, TaskResult) and r.status == "success"]
    if strategy == "best_confidence":
        return max(valid, key=lambda r: r.confidence)
    if strategy == "merge":
        merged_data = [r.data for r in valid]
        return valid[0].model_copy(update={"data": merged_data})
    raise ValueError(f"Unknown strategy: {strategy}")

Pipeline séquentiel (A → B → C)

async def pipeline(initial_input: Any, stages: list[tuple]) -> TaskResult:
    """stages = [(agent, task_factory_fn), ...]"""
    result = initial_input
    for agent, make_task in stages:
        task = make_task(result)           # construit la TaskDefinition à partir du résultat précédent
        task_result = await execute_with_retry(agent, task)
        if task_result.status == "failed":
            raise RuntimeError(f"Pipeline failed at stage {agent.id}: {task_result.errors}")
        result = task_result.data
    return task_result

LangGraph — fan-out déclaratif

from langgraph.graph import StateGraph, END

def build_delegation_graph():
    graph = StateGraph(dict)
    graph.add_node("dispatch",  dispatch_node)
    graph.add_node("research",  research_node)
    graph.add_node("write",     write_node)
    graph.add_node("aggregate", aggregate_node)
    graph.add_edge("dispatch",  "research")
    graph.add_edge("dispatch",  "write")
    graph.add_edge("research",  "aggregate")
    graph.add_edge("write",     "aggregate")
    graph.add_edge("aggregate", END)
    return graph.compile()

5. Retry, circuit breaker et error propagation

async def execute_with_retry(agent, task: TaskDefinition) -> TaskResult:
    """Backoff exponentiel : 1s, 2s, 4s, 8s..."""
    last_exc = None
    for attempt in range(task.retry_limit):
        try:
            return await asyncio.wait_for(agent.run(task), timeout=task.timeout)
        except asyncio.TimeoutError:
            last_exc = TimeoutError(f"Agent {agent.id} timed out after {task.timeout}s")
        except Exception as e:
            last_exc = e
        if attempt < task.retry_limit - 1:
            await asyncio.sleep(2 ** attempt)
    # Escalade au parent : retourner un TaskResult failed plutôt que lever
    return TaskResult(
        task_id=task.task_id, agent_id=agent.id,
        status="failed", data=None,
        errors=[str(last_exc)], execution_time=0.0,
    )

Circuit breaker minimal :

from collections import defaultdict

_failures: dict[str, int] = defaultdict(int)
CIRCUIT_THRESHOLD = 5

def is_healthy(agent_id: str) -> bool:
    return _failures[agent_id] < CIRCUIT_THRESHOLD

def record_failure(agent_id: str):
    _failures[agent_id] += 1

def reset_circuit(agent_id: str):
    _failures[agent_id] = 0

Décision d'escalade :


6. Monitoring et heartbeat

from datetime import datetime, timedelta

class AgentMonitor:
    def __init__(self, timeout_s: int = 30):
        self._last_seen: dict[str, datetime] = {}
        self._threshold = timedelta(seconds=timeout_s)

    def heartbeat(self, agent_id: str):
        self._last_seen[agent_id] = datetime.utcnow()

    def is_alive(self, agent_id: str) -> bool:
        last = self._last_seen.get(agent_id)
        return bool(last and datetime.utcnow() - last < self._threshold)

    def dead_agents(self) -> list[str]:
        return [aid for aid in self._last_seen if not self.is_alive(aid)]

Log minimum à émettre par le parent à chaque délégation :

[DELEGATE] task_id=<uuid> agent=<id> objective="..." timeout=30
[RESULT]   task_id=<uuid> agent=<id> status=success exec=1.2s confidence=0.95
[RETRY]    task_id=<uuid> agent=<id> attempt=2/3 error="TimeoutError"
[ESCALATE] task_id=<uuid> agent=<id> status=failed errors=[...]

7. Capability manifest (contrat de sous-agent)

Chaque sous-agent expose son manifest. Le parent l'utilise pour le routing et la validation.

MANIFEST = {
    "agent_id": "research-agent-v2",
    "version": "2.1.0",
    "domains": ["web_search", "doc_retrieval", "summarization"],
    "input_schema": {
        "query": "str",
        "max_results": "int",
        "language": "str",
    },
    "output_schema": {
        "summary": "str",
        "sources": "list[str]",
        "confidence": "float",
    },
    "constraints": {
        "max_timeout": 60,
        "max_context_tokens": 4000,
    },
}

Anti-patterns et pièges

Anti-patternConséquenceCorrection
Objective vague ("fais quelque chose avec ça")Résultat inutilisable1 phrase précise + output_format explicite
Aucun timeout sur la tâcheAgent bloqué = système gelétimeout obligatoire, même si généreux
Passer le state global en contexteHallucination, tokens gaspillésNe passer que les données strictement nécessaires
Délégation récursive > 3 niveauxDébogage impossible, latenceAplatir la hiérarchie, max 2-3 niveaux
Résultat accepté sans validationErreurs silencieuses propagéesValider le schema avec Pydantic avant usage
Sous-agent stateful dans un poolRace condition, état corrompuRéinitialiser l'état à chaque acquire()
Retry infini sans circuit breakerCascade de pannesThreshold + quarantaine automatique
task_id non uniqueTracing impossibleGénérer avec uuid.uuid4() côté parent

Bonnes pratiques 2026