💻 Développement

dev-data-pipeline-builder

Conception de pipelines de données robustes et scalables.

⚡ Installation & lancement en 1 commande

Copiez-collez dans votre terminal : le skill s'installe dans ~/.claude/skills et Claude Code se lance directement dessus.

macOS / Linux
curl -fsSL https://raw.githubusercontent.com/khalilbenaz/claude-skills-collection/main/install.sh | sh -s -- dev-data-pipeline-builder --launch
Windows (PowerShell)
iex "& { $(iwr -useb https://raw.githubusercontent.com/khalilbenaz/claude-skills-collection/main/install.ps1) } dev-data-pipeline-builder -Launch"

🚀 Déjà installé ?

claude "/dev-data-pipeline-builder"

Ou tapez /dev-data-pipeline-builder dans une session Claude Code, ou décrivez simplement votre besoin — le skill se déclenche automatiquement via le skill-router.

🔑 Déclencheurs automatiques

Le skill s'active automatiquement quand votre demande contient :

data pipelinepipeline de donnéesbatch processingstream processingApache SparkAirflowdbtdata engineering

📦 Installation manuelle

git clone https://github.com/khalilbenaz/claude-skills-collection.git cp -r claude-skills-collection/skills/dev-data-pipeline-builder ~/.claude/skills/

Payload du plugin : skills/dev-data-pipeline-builder · source éditable : dev-skills/data-pipeline-builder

📖 Manuel

Data Pipeline Builder

1. Analyser les sources et les besoins

Avant d'écrire une ligne de code, poser ces questions :

CritèreOptionsImpact
VolumeKo–Mo / Go / To+Pandas vs Spark vs DuckDB
FréquenceBatch horaire/journalier / Near-real-time / StreamingAirflow vs Flink/Kafka
Fraîcheur acceptable> 1h / 15 min / < 1 minBatch / micro-batch / streaming pur
Qualité sourcePropre / semi-structurée / dirtyNiveau de validation nécessaire
FormatCSV, JSON, Parquet, Avro, CDCConnecteur et schéma evolution

Règle : ne pas sur-architecturer. Un fichier CSV quotidien de 10 Mo → DuckDB + script Python, pas Spark.


2. Choisir l'architecture

Données fraîcheur > 1h   → Batch (Airflow + dbt + Warehouse)
Fraîcheur 1–15 min       → Micro-batch (Spark Structured Streaming / Flink)
Fraîcheur < 1 min        → Streaming pur (Kafka + Flink / Kafka Streams)
Transformations lourdes  → ELT (charger raw, transformer dans le warehouse)
Données sensibles/EDW    → ETL (transformer avant chargement)

Architecture lambda = batch + streaming en parallèle → complexité élevée, préférer kappa (streaming seul avec replay) quand le streaming couvre tous les cas.


3. Ingestion

CDC avec Debezium (PostgreSQL → Kafka)

# docker-compose snippet
debezium:
  image: debezium/connect:2.5
  environment:
    BOOTSTRAP_SERVERS: kafka:9092
    GROUP_ID: debezium-group
    CONFIG_STORAGE_TOPIC: debezium.configs
    OFFSET_STORAGE_TOPIC: debezium.offsets
# Enregistrer un connecteur PostgreSQL
curl -X POST http://localhost:8083/connectors -H 'Content-Type: application/json' -d '{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "replicator",
    "database.password": "secret",
    "database.dbname": "mydb",
    "table.include.list": "public.orders",
    "plugin.name": "pgoutput"
  }
}'

API polling avec pagination

def fetch_all_pages(base_url: str, headers: dict) -> list[dict]:
    results, page = [], 1
    while True:
        r = requests.get(base_url, params={"page": page, "per_page": 100}, headers=headers)
        r.raise_for_status()
        data = r.json()
        if not data:
            break
        results.extend(data)
        page += 1
    return results

4. Transformation

dbt — modèle incrémental (pattern recommandé 2026)

-- models/orders_enriched.sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    on_schema_change='append_new_columns'
) }}

SELECT
    o.order_id,
    o.created_at,
    c.country,
    SUM(oi.amount) AS total_amount
FROM {{ ref('stg_orders') }} o
JOIN {{ ref('stg_customers') }} c ON c.customer_id = o.customer_id
JOIN {{ ref('stg_order_items') }} oi ON oi.order_id = o.order_id
{% if is_incremental() %}
WHERE o.created_at > (SELECT MAX(created_at) FROM {{ this }})
{% endif %}
GROUP BY 1, 2, 3
dbt run --select orders_enriched --vars '{"start_date": "2026-01-01"}'
dbt test --select orders_enriched  # great_expectations intégré via dbt-expectations

PySpark — lecture Parquet partitionné

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date

spark = SparkSession.builder.appName("orders-etl").getOrCreate()

df = (
    spark.read.parquet("s3a://datalake/raw/orders/")
    .filter(col("_date") >= "2026-01-01")          # partition pruning
    .withColumn("order_date", to_date("created_at"))
    .dropDuplicates(["order_id"])                   # idempotence
)

(
    df.write
    .mode("overwrite")
    .partitionBy("order_date")
    .parquet("s3a://datalake/processed/orders/")
)

5. Orchestration

Airflow DAG — pattern recommandé

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from pendulum import datetime

@dag(
    schedule="0 6 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=["orders", "daily"],
)
def orders_pipeline():

    @task(retries=3, retry_delay=timedelta(minutes=5))
    def extract():
        return fetch_all_pages(...)

    @task
    def transform(raw: list) -> None:
        # appel dbt ou PySpark
        pass

    @task
    def validate() -> None:
        # data quality checks
        pass

    extract_result = extract()
    transform_result = transform(extract_result)
    validate()  # déclenché après transform

orders_pipeline()

Critères de choix d'orchestrateur :


6. Data Quality intégrée

Ne pas valider uniquement en fin de pipeline — insérer les checks à chaque étape.

# Avec Great Expectations
import great_expectations as gx

context = gx.get_context()
batch = context.sources.pandas_default.read_dataframe(df)

suite = context.add_expectation_suite("orders_suite")
batch.expect_column_values_to_not_be_null("order_id")
batch.expect_column_values_to_be_between("total_amount", min_value=0, max_value=1_000_000)
batch.expect_column_values_to_be_unique("order_id")

results = context.run_checkpoint("orders_checkpoint")
if not results["success"]:
    raise ValueError(f"Data quality failed: {results}")

7. Storage layer

BesoinSolution
Data lake open formatDelta Lake (Databricks/OSS) ou Apache Iceberg
Analytique cloud scaleBigQuery, Snowflake, Redshift
Analytique locale/mid-scaleDuckDB (fichiers Parquet locaux ou S3)
LakehouseDelta Lake sur S3/ADLS + Spark ou Trino

Partitioning : toujours partitionner par date en premier (colonne la plus filtrée). Éviter plus de 10 000 partitions — risque de small files problem.

Compactage (Delta Lake) :

from delta.tables import DeltaTable
DeltaTable.forPath(spark, "s3a://datalake/processed/orders/").optimize().executeCompaction()

8. Monitoring et alerting

# Mesure de fraîcheur — à exposer dans Prometheus/Datadog
from datetime import datetime, timedelta

def check_freshness(table: str, max_delay: timedelta) -> bool:
    last_updated = get_max_timestamp(table)  # requête warehouse
    return datetime.utcnow() - last_updated < max_delay

Métriques clés à suivre :


9. Idempotence et résilience

# Upsert idempotent avec DuckDB
conn.execute("""
    INSERT OR REPLACE INTO orders SELECT * FROM staging_orders
    WHERE order_id NOT IN (SELECT order_id FROM orders WHERE updated_at >= ?)
""", [run_date])

Garde-fous et anti-patterns

Anti-patternProblèmeAlternative
SELECT * en productionRupture si schéma changeLister explicitement les colonnes
Insert sans déduplicationDoublons silencieuxMERGE / INSERT OR REPLACE / unique_key dbt
Transformation dans l'ingestionPerte du raw, non-rejouableToujours stocker le raw avant de transformer
Timeouts sans retryPerte silencieuse de donnéesRetry + dead-letter queue
Partitions trop finesSmall files → lenteur de lectureCompaction + partitioning par jour/semaine
Secrets dans le codeFuite de credentialsVault, AWS Secrets Manager, Airflow Connections
Pipeline monolithiqueImpossible à déboguerDécouper en tâches atomiques testables
Pas de lineageDebugging aveugledbt lineage, OpenLineage/Marquez, Dagster assets