💻 Développement

dev-ai-workflow-orchestrator

Orchestration de workflows IA complexes avec chaînes et pipelines.

⚡ Installation & lancement en 1 commande

Copiez-collez dans votre terminal : le skill s'installe dans ~/.claude/skills et Claude Code se lance directement dessus.

macOS / Linux
curl -fsSL https://raw.githubusercontent.com/khalilbenaz/claude-skills-collection/main/install.sh | sh -s -- dev-ai-workflow-orchestrator --launch
Windows (PowerShell)
iex "& { $(iwr -useb https://raw.githubusercontent.com/khalilbenaz/claude-skills-collection/main/install.ps1) } dev-ai-workflow-orchestrator -Launch"

🚀 Déjà installé ?

claude "/dev-ai-workflow-orchestrator"

Ou tapez /dev-ai-workflow-orchestrator dans une session Claude Code, ou décrivez simplement votre besoin — le skill se déclenche automatiquement via le skill-router.

🔑 Déclencheurs automatiques

Le skill s'active automatiquement quand votre demande contient :

workflow IALangChainLangGraphpipeline IAchaîne de promptsorchestration LLMAI pipelinemulti-step AI

📦 Installation manuelle

git clone https://github.com/khalilbenaz/claude-skills-collection.git cp -r claude-skills-collection/skills/dev-ai-workflow-orchestrator ~/.claude/skills/

Payload du plugin : skills/dev-ai-workflow-orchestrator · source éditable : dev-skills/ai-workflow-orchestrator

📖 Manuel

AI Workflow Orchestrator

Critères de décision : quel framework choisir ?

ComplexitéPatternFramework recommandé
1–3 étapes linéairesSéquentiel simplePython natif ou LCEL
Fan-out / fan-in sans étatParallèle sans mémoireLCEL RunnableParallel
Boucles, conditions, état persistantAgent / cycleLangGraph
RAG + retrieval hybrideSearch pipelineHaystack
Stack Azure / .NETIntégration MicrosoftSemantic Kernel
Contrôle total, zéro dépendanceCustomScript async pur
Règle d'or : n'ajouter un framework que quand la complexité le justifie. 50 lignes de Python natif > architecture LangGraph mal comprise.

Workflow en étapes

1. Décomposer en DAG avant de coder

Identifier pour chaque étape : input attendu, output produit, dépendances. Dessiner un DAG — économise des heures de débogage.

Pipeline d'analyse de document :
[Chargement] → [Extraction] ─┬─ [Résumé]      ─┐
                               ├─ [Entités NER]  ├─ [Rapport final]
                               └─ [Sentiment]   ─┘

Questions à poser : quelles étapes sont indépendantes (candidats au parallélisme) ? Quelles étapes nécessitent un état partagé ? Y a-t-il des points de décision conditionnels ?


2. Implémenter les patterns de chaînes

Séquentiel (LCEL | operator)

from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")
pipeline = extract_prompt | llm | parse_output | report_prompt | llm
result = pipeline.invoke({"document": text})

Parallèle (fan-out / fan-in)

from langchain_core.runnables import RunnableParallel

parallel = RunnableParallel({
    "summary":   summarize_prompt | llm,
    "entities":  extract_prompt   | llm,
    "sentiment": sentiment_prompt | llm,
})
full = parallel | merge_results | report_prompt | llm
result = full.invoke({"document": text})

Map-Reduce (N documents)

from langchain_core.runnables import RunnableLambda
import asyncio

async def map_reduce(docs: list[str]) -> str:
    summaries = await asyncio.gather(*[
        summarize_chain.ainvoke({"text": d}) for d in docs
    ])
    return await aggregate_chain.ainvoke({"summaries": summaries})

3. Gérer l'état avec LangGraph

Définir un TypedDict typé qui circule à travers tous les nœuds. Chaque nœud retourne uniquement les champs qu'il modifie (merge partiel).

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator

class WorkflowState(TypedDict):
    document: str
    summary: str
    entities: list[str]
    report: str
    error: str | None
    step: str

def summarize_node(state: WorkflowState) -> dict:
    summary = summarize_chain.invoke({"text": state["document"]})
    return {"summary": summary, "step": "summarized"}

def route_after_summary(state: WorkflowState) -> str:
    if state.get("error"):
        return "handle_error"
    return "extract_entities"

graph = StateGraph(WorkflowState)
graph.add_node("summarize", summarize_node)
graph.add_node("extract_entities", entities_node)
graph.add_node("handle_error", error_node)
graph.set_entry_point("summarize")
graph.add_conditional_edges("summarize", route_after_summary)
graph.add_edge("extract_entities", END)
app = graph.compile()

4. Error handling, retry et checkpoints

Retry par nœud (erreurs transitoires)

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
def call_llm_with_retry(prompt: str) -> str:
    return llm.invoke(prompt)

Fallback chain (modèle de secours)

primary   = ChatOpenAI(model="gpt-4o")
fallback  = ChatOpenAI(model="gpt-4o-mini")
safe_llm  = primary.with_fallbacks([fallback])

Checkpoints LangGraph (reprise après crash)

from langgraph.checkpoint.sqlite import SqliteSaver

checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
app = graph.compile(
    checkpointer=checkpointer,
    interrupt_before=["send_email"],   # pause avant action irréversible
)

config = {"configurable": {"thread_id": "run-001"}}
app.invoke(initial_state, config)     # déclenche, s'arrête avant send_email
# ... approbation humaine ...
app.invoke(None, config)              # reprend depuis le checkpoint

5. Observabilité : tracer chaque nœud

LangSmith (natif, 0 code supplémentaire)

export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY="ls_..."
export LANGCHAIN_PROJECT="my-workflow"
# Toutes les exécutions sont automatiquement tracées

Phoenix / Arize (open-source)

import phoenix as px
from phoenix.otel import register
tracer_provider = register(project_name="my-workflow", endpoint="http://localhost:6006/v1/traces")

Métriques à surveiller


6. Optimiser latence et coût

Async pour les branches parallèles

import asyncio

async def parallel_analysis(doc: str) -> dict:
    summary, entities = await asyncio.gather(
        summarize_chain.ainvoke({"text": doc}),
        extract_chain.ainvoke({"text": doc}),
    )
    return {"summary": summary, "entities": entities}

Prompt caching Anthropic (économie ~90 % sur les tokens en cache)

from anthropic import Anthropic

client = Anthropic()
response = client.messages.create(
    model="claude-opus-4-5",
    system=[{"type": "text", "text": long_system_prompt,
             "cache_control": {"type": "ephemeral"}}],
    messages=[{"role": "user", "content": user_query}],
    max_tokens=1024,
)

Batch API OpenAI (-50 % coût, latence différée acceptable)

# Soumettre un batch de requêtes indépendantes
client.batches.create(
    input_file_id=file_id,
    endpoint="/v1/chat/completions",
    completion_window="24h",
)

7. Exposer le workflow comme service

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import uuid

app = FastAPI()

class WorkflowRequest(BaseModel):
    document: str
    webhook_url: str | None = None

@app.post("/workflows/analyze")
async def trigger(req: WorkflowRequest, bg: BackgroundTasks):
    run_id = str(uuid.uuid4())
    bg.add_task(run_workflow_async, run_id, req.document, req.webhook_url)
    return {"run_id": run_id, "status": "queued"}

@app.get("/workflows/{run_id}")
async def status(run_id: str):
    return await get_run_status(run_id)

Pour la scalabilité : Celery + Redis pour les workloads batch, Docker + Kubernetes pour le déploiement, LangGraph Platform pour un déploiement managé avec UI.


Anti-patterns et garde-fous

Anti-patternSymptômeCorrection
Sur-ingénierie dès le départLangGraph pour 2 appels LLMCommencer avec LCEL ou Python natif
Nœuds non testables isolémentImpossible à déboguerInterface input/output claire + test unitaire par nœud
Pas de checkpointsCrash à l'étape 9 = tout recommencerSqliteSaver ou PostgresSaver dès la phase dev
État mutable partagé sans verrouRace condition en asyncTypedDict immutable, retourner un dict partiel
Pas de timeout par nœudUn nœud bloqué immobilise tout le workflowasyncio.wait_for(node(), timeout=30)
Loguer uniquement les erreursDebug impossible en prodLogger l'état complet à chaque transition (JSON structuré)
Pas de limite de coûtUn bug boucle indéfinimentCompteur de tokens cumulés + guard dans la condition de sortie
Human-in-the-loop absent sur actions irréversiblesEmail envoyé, paiement débité par erreurinterrupt_before sur tout nœud à effet de bord externe

Bonnes pratiques 2026