🤖 Agents IA

agent-workflow-automation-agent

Construction d'agents d'automatisation de workflows métier incluant RPA intelligent, déclencheurs et actions chaînées, orchestration d'étapes avec gestion d'erreurs et monitoring.

⚡ Installation & lancement en 1 commande

Copiez-collez dans votre terminal : le skill s'installe dans ~/.claude/skills et Claude Code se lance directement dessus.

macOS / Linux
curl -fsSL https://raw.githubusercontent.com/khalilbenaz/claude-skills-collection/main/install.sh | sh -s -- agent-workflow-automation-agent --launch
Windows (PowerShell)
iex "& { $(iwr -useb https://raw.githubusercontent.com/khalilbenaz/claude-skills-collection/main/install.ps1) } agent-workflow-automation-agent -Launch"

🚀 Déjà installé ?

claude "/agent-workflow-automation-agent"

Ou tapez /agent-workflow-automation-agent dans une session Claude Code, ou décrivez simplement votre besoin — le skill se déclenche automatiquement via le skill-router.

🔑 Déclencheurs automatiques

Le skill s'active automatiquement quand votre demande contient :

workflow agentRPAautomatisation métieragent d'automatisation

📦 Installation manuelle

git clone https://github.com/khalilbenaz/claude-skills-collection.git cp -r claude-skills-collection/skills/agent-workflow-automation-agent ~/.claude/skills/

Payload du plugin : skills/agent-workflow-automation-agent · source éditable : agent-skills/workflow-automation-agent

📖 Manuel

Workflow Automation Agent

Workflow

1. Cartographier les processus métier

2. Choisir l'architecture de l'agent

BesoinPattern recommandé
Étapes séquentielles simplesPipeline linéaire (fonctions chaînées)
Branchements complexes + état persistantState machine (XState, Temporal)
Processus long-running (jours/semaines)Temporal Workflow ou Azure Durable Functions
Événements multi-sourcesChorégraphie via message broker (RabbitMQ, Kafka)
Pas d'API disponibleRPA UI avec Playwright ou UiPath

Exemple minimal Temporal (Python) :

@workflow.defn
class OnboardingWorkflow:
    @workflow.run
    async def run(self, user_id: str) -> str:
        await workflow.execute_activity(create_account, user_id, start_to_close_timeout=timedelta(minutes=5))
        await workflow.execute_activity(send_welcome_email, user_id, start_to_close_timeout=timedelta(minutes=2))
        return "done"

3. Implémenter les connecteurs

from tenacity import retry, stop_after_attempt, wait_exponential
from circuitbreaker import circuit

@circuit(failure_threshold=5, recovery_timeout=30)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def call_external_api(payload: dict) -> dict:
    response = requests.post(API_URL, json=payload, timeout=10)
    response.raise_for_status()
    return response.json()
# Playwright — sélecteur robuste
await page.get_by_role("button", name="Valider").click()
# ❌ fragile
await page.click(".btn-primary:nth-child(2)")

4. Configurer les déclencheurs

TypeOutil / Exemple
Timer/Cron0 8 * * 1-5 — chaque matin ouvré à 8h
Webhook entrantFastAPI POST /webhook/{event_type}
Email reçuIMAP IDLE ou SendGrid Inbound Parse
Changement DBPostgreSQL LISTEN/NOTIFY ou CDC Debezium
Upload fichierS3 Event Notification → Lambda → queue

Implémenter le debouncing pour éviter les doubles déclenchements :

LAST_TRIGGER: dict[str, float] = {}
MIN_INTERVAL = 5.0  # secondes

def debounce(key: str) -> bool:
    now = time.time()
    if now - LAST_TRIGGER.get(key, 0) < MIN_INTERVAL:
        return False  # ignorer
    LAST_TRIGGER[key] = now
    return True

5. Orchestrer les actions chaînées

@dataclass
class WorkflowState:
    workflow_id: str
    step: str          # "create_account" | "send_email" | "done"
    payload: dict
    created_at: datetime
    retries: int = 0
executed_steps = []
try:
    create_order(order); executed_steps.append("create_order")
    charge_payment(order); executed_steps.append("charge_payment")
    send_confirmation(order)
except Exception:
    for step in reversed(executed_steps):
        compensators[step](order)  # rollback dans l'ordre inverse
    raise

6. Intégrer la décision intelligente (LLM)

result = llm_classify(document_text)
if result.confidence < 0.85:
    escalate_to_human(document_id, reason="confiance insuffisante")
else:
    route_automatically(result.category)

7. Gérer les erreurs et la résilience

def process_payment(order_id: str, idempotency_key: str):
    if db.exists(f"payment:{idempotency_key}"):
        return db.get(f"payment:{idempotency_key}")  # résultat déjà calculé
    result = payment_gateway.charge(order_id)
    db.set(f"payment:{idempotency_key}", result, ex=86400)
    return result

8. Monitorer et améliorer

KPIs à collecter :

Stack recommandée 2026 : OpenTelemetry (traces) + Prometheus (métriques) + Grafana (dashboards).

from opentelemetry import trace
tracer = trace.get_tracer("workflow-agent")

with tracer.start_as_current_span("step.create_order") as span:
    span.set_attribute("order.id", order_id)
    create_order(order_id)

Garde-fous et anti-patterns

Anti-patternConséquenceCorrection
Pas de compensationDonnées incohérentes après crash partielImplémenter compensate() pour chaque étape modifiante
État en mémoire uniquementPerte d'état au redémarragePersister dans Redis/DB après chaque transition
Sélecteurs CSS fragiles en RPABreakage à chaque déploiement UIUtiliser data-testid ou aria-*
LLM sans validation de sortieCrash imprévisible en productionValider avec Pydantic avant injection
Automatiser sans fallback manuelBlocage total si l'agent échoueToujours exposer une interface de reprise manuelle
Déclencheurs sans debounceExécutions dupliquéesIdempotency key + debounce
Timeout absentWorkflow bloqué indéfinimentTimeout par étape ET timeout global

Bonnes pratiques 2026