💻 Développement

ai-workflow-orchestrator

Orchestration de workflows IA complexes avec chaînes et pipelines.

⚡ Installation & lancement en 1 commande

Copiez-collez dans votre terminal : le skill s'installe dans ~/.claude/skills et Claude Code se lance directement dessus.

macOS / Linux
curl -fsSL https://raw.githubusercontent.com/khalilbenaz/claude-skills-collection/main/install.sh | sh -s -- ai-workflow-orchestrator --launch
Windows (PowerShell)
iex "& { $(iwr -useb https://raw.githubusercontent.com/khalilbenaz/claude-skills-collection/main/install.ps1) } ai-workflow-orchestrator -Launch"

🚀 Déjà installé ?

claude "/ai-workflow-orchestrator"

Ou tapez /ai-workflow-orchestrator dans une session Claude Code, ou décrivez simplement votre besoin — le skill se déclenche automatiquement via le skill-router.

🔑 Déclencheurs automatiques

Le skill s'active automatiquement quand votre demande contient :

workflow IALangChainLangGraphpipeline IAchaîne de promptsorchestration LLMAI pipelinemulti-step AI

📦 Installation manuelle

git clone https://github.com/khalilbenaz/claude-skills-collection.git cp -r claude-skills-collection/dev-skills/ai-workflow-orchestrator ~/.claude/skills/

Source : dev-skills/ai-workflow-orchestrator

📖 Manuel

AI Workflow Orchestrator

Workflow

  1. Décomposition du workflow — Analyser la tâche complexe et la décomposer en étapes atomiques avec des inputs/outputs clairement définis. Identifier les dépendances (quelles étapes dépendent d'autres ?), les opportunités de parallélisme (étapes indépendantes pouvant s'exécuter simultanément), et les points de décision (conditions qui déterminent le chemin d'exécution). Créer un DAG (Directed Acyclic Graph) visuel avant de coder — économise des heures de débogage.

``` Exemple : pipeline d'analyse de document [Chargement] → [Extraction info] ─┬─ [Résumé] ─┐ ├─ [Entités NER] ├─ [Rapport final] └─ [Sentiment] ─┘ ```

  1. Choix du framework — Évaluer selon la complexité : LangChain LCEL (chaînes simples à modérées, bonne DX, vaste écosystème), LangGraph (workflows avec état, cycles, conditions, agents — idéal pour les cas complexes), Haystack (orienté search/RAG, pipelines déclaratifs), Semantic Kernel (Microsoft, bien intégré avec Azure, .NET/Python), custom (contrôle total, zéro dépendance, recommandé pour les équipes expérimentées). Éviter la sur-ingénierie : un script Python avec des appels LLM séquentiels suffit souvent.
  1. Design des chaînes — Implémenter les patterns selon le besoin : Sequential (A → B → C, pipeline linéaire), Parallel (A → [B || C || D] → E, fan-out/fan-in), Conditional (router selon le contenu de l'output), Map-Reduce (traiter N documents indépendamment, puis aggréger). Utiliser LCEL (| operator) pour la composition déclarative — chaque étape est une Runnable testable individuellement.

```python from langchain_core.runnables import RunnableParallel, RunnableLambda from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")

# Parallel : résumé + extraction en simultané parallel_chain = RunnableParallel({ "summary": summarize_prompt | llm, "entities": extract_prompt | llm, "sentiment": sentiment_prompt | llm })

# Sequential : analyse puis rapport full_pipeline = parallel_chain | merge_results | report_prompt | llm result = full_pipeline.invoke({"document": text}) ```

  1. State management — Définir un objet d'état global typé (TypedDict ou Pydantic) qui circule à travers le workflow. Chaque nœud lit l'état, effectue son traitement, et met à jour les champs pertinents. Avec LangGraph, l'état est automatiquement persisté à chaque nœud — permet le replay et la reprise après erreur.

```python from langgraph.graph import StateGraph, END from typing import TypedDict, List

class WorkflowState(TypedDict): document: str summary: str entities: List[str] final_report: str error: str | None step: str

def summarize_node(state: WorkflowState) -> WorkflowState: summary = summarize_chain.invoke({"text": state["document"]}) return {**state, "summary": summary, "step": "summarized"}

graph = StateGraph(WorkflowState) graph.add_node("summarize", summarize_node) graph.add_node("extract_entities", entities_node) graph.add_edge("summarize", "extract_entities") ```

  1. Error handling et recovery — Implémenter des retry par nœud (erreurs transitoires), des fallback chains (modèle de secours si le premier échoue), et des checkpoints pour reprendre depuis le dernier état valide. Les nœuds critiques (appels API externes, actions irréversibles) doivent avoir un human-in-the-loop — LangGraph supporte nativement les interruptions et reprises (interrupt_before, interrupt_after).

```python from langgraph.checkpoint.sqlite import SqliteSaver

# Persistance de l'état pour recovery checkpointer = SqliteSaver.from_conn_string(":memory:") app = graph.compile(checkpointer=checkpointer, interrupt_before=["send_email"]) # Pause avant action critique

# Reprendre après approbation humaine config = {"configurable": {"thread_id": "run-001"}} app.invoke(initial_state, config) # ... attendre approbation ... app.invoke(None, config) # Reprendre ```

  1. Observabilité — Tracer chaque nœud du workflow avec : durée d'exécution, tokens consommés, coût, input/output. Utiliser LangSmith (natif LangChain/LangGraph), Phoenix (Arize) (open-source, excellente UI), ou OpenTelemetry pour une approche agnostique. Configurer des alertes sur les métriques clés (latence > 30s, coût > $0.10/run, taux d'erreur > 5%).

```python import os os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_API_KEY"] = "ls_..." os.environ["LANGCHAIN_PROJECT"] = "my-workflow" # Toutes les exécutions LangChain/LangGraph sont automatiquement tracées

# Phoenix (alternative open-source) import phoenix as px from phoenix.otel import register tracer_provider = register(project_name="my-workflow") ```

  1. OptimisationPrompt caching : activer le caching Anthropic/OpenAI sur les préfixes longs (system prompts, documents). Batch processing : regrouper les requêtes indépendantes (map-reduce) en batch OpenAI API (-50% coût). Async execution : utiliser ainvoke() et asyncio.gather() pour les étapes parallèles — divise la latence par le nombre de branches. Streaming : propager le streaming jusqu'au client pour l'UX.

```python import asyncio

async def run_parallel_nodes(state: WorkflowState): # Exécuter en parallèle au lieu de séquentiel summary_task = summarize_chain.ainvoke({"text": state["document"]}) entities_task = extract_chain.ainvoke({"text": state["document"]}) summary, entities = await asyncio.gather(summary_task, entities_task) return {**state, "summary": summary, "entities": entities} ```

  1. Déploiement — Exposer le workflow comme API REST (FastAPI + endpoint async), webhook (déclenché par des événements externes), ou tâche planifiée (Celery, APScheduler, Cloud Scheduler). Pour la scalabilité : containeriser avec Docker, orchestrer avec Kubernetes, utiliser des queues de messages (Redis, RabbitMQ) pour les workloads batch. LangServe (déprécié) ou LangGraph Cloud pour un déploiement managé.

```python from fastapi import FastAPI, BackgroundTasks from pydantic import BaseModel

app = FastAPI()

class WorkflowRequest(BaseModel): document: str webhook_url: str | None = None

@app.post("/workflows/analyze") async def trigger_workflow(req: WorkflowRequest, bg: BackgroundTasks): run_id = str(uuid.uuid4()) bg.add_task(run_workflow_async, run_id, req.document, req.webhook_url) return {"run_id": run_id, "status": "started"}

@app.get("/workflows/{run_id}") async def get_status(run_id: str): return await get_run_status(run_id) ```

Règles