📊 Data

data-kafka-patterns

Patterns Apache Kafka — topics, partitions, consumer groups, exactly-once semantics et Kafka Streams.

⚡ Installation & lancement en 1 commande

Copiez-collez dans votre terminal : le skill s'installe dans ~/.claude/skills et Claude Code se lance directement dessus.

macOS / Linux
curl -fsSL https://raw.githubusercontent.com/khalilbenaz/claude-skills-collection/main/install.sh | sh -s -- data-kafka-patterns --launch
Windows (PowerShell)
iex "& { $(iwr -useb https://raw.githubusercontent.com/khalilbenaz/claude-skills-collection/main/install.ps1) } data-kafka-patterns -Launch"

🚀 Déjà installé ?

claude "/data-kafka-patterns"

Ou tapez /data-kafka-patterns dans une session Claude Code, ou décrivez simplement votre besoin — le skill se déclenche automatiquement via le skill-router.

🔑 Déclencheurs automatiques

Le skill s'active automatiquement quand votre demande contient :

KafkaApache Kafkatopic Kafkaconsumer groupKafka Streamsevent streaming

📦 Installation manuelle

git clone https://github.com/khalilbenaz/claude-skills-collection.git cp -r claude-skills-collection/skills/data-kafka-patterns ~/.claude/skills/

Payload du plugin : skills/data-kafka-patterns · source éditable : data-skills/kafka-patterns

📖 Manuel

Kafka Patterns

Workflow

1. Concevoir la topologie des topics

# Créer un topic avec 12 partitions, réplication 3, rétention 7 jours
kafka-topics.sh --bootstrap-server broker:9092 \
  --create --topic orders.created \
  --partitions 12 --replication-factor 3 \
  --config retention.ms=604800000

# Lister / décrire
kafka-topics.sh --bootstrap-server broker:9092 --describe --topic orders.created

2. Définir le schéma des messages

// Avro schema — orders.created
{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.orders",
  "fields": [
    {"name": "event_id", "type": "string"},
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "amount_cents", "type": "long"},
    {"name": "occurred_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
  ]
}

3. Configurer les producers

Choisir les garanties selon le besoin :

Casacksenable.idempotencetransactional.id
Fire-and-forget (logs)0false
At-least-once (défaut)allfalse
Exactly-oncealltruetxn-{service}-{instance}
// Producer Java — exactly-once
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txn-orders-1");
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);        // batching
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);   // 64 KB

KafkaProducer<String, OrderCreated> producer = new KafkaProducer<>(props);
producer.initTransactions();

producer.beginTransaction();
producer.send(new ProducerRecord<>("orders.created", order.getId(), event));
producer.commitTransaction();

4. Concevoir les consumer groups

// Consumer Java — commit manuel
props.put(ConsumerConfig.GROUP_ID_CONFIG, "orders-billing-service");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

while (true) {
    ConsumerRecords<String, OrderCreated> records = consumer.poll(Duration.ofMillis(500));
    for (ConsumerRecord<String, OrderCreated> record : records) {
        process(record.value());
    }
    consumer.commitSync(); // après traitement complet
}

5. Patterns de traitement distribué

Dead Letter Topic (DLT) — isoler les messages non traitables :

topic-principal → traitement → erreur → topic-principal.DLT

Saga choreography — chaque service publie des events, les autres réagissent :

orders.created → payment-service → payments.processed → inventory-service → inventory.reserved
                                 ↘ payments.failed   → orders.cancelled

CDC avec Debezium — capturer les changements base de données :

// connector config (Postgres)
{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "database.hostname": "postgres",
  "database.dbname": "mydb",
  "table.include.list": "public.orders",
  "topic.prefix": "cdc"
}

Claim Check — payload > 1 MB : stocker dans S3/Blob, publier seulement la référence :

{"event_id": "...", "payload_ref": "s3://bucket/events/abc123.json"}

6. Kafka Streams — traitements stateful

StreamsBuilder builder = new StreamsBuilder();

// Comptage par client sur fenêtre de 1h (hopping)
KStream<String, OrderCreated> orders = builder.stream("orders.created");
orders
  .groupByKey()
  .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
  .count(Materialized.as("orders-per-customer"))
  .toStream()
  .to("orders.count-per-customer-1h");

// Stream-Table join
KTable<String, CustomerProfile> profiles = builder.table("customers.profiles");
orders.join(profiles,
    (order, profile) -> enrich(order, profile),
    Joined.with(Serdes.String(), orderSerde, profileSerde))
  .to("orders.enriched");

7. Exactly-once semantics (EOS)

Configuration minimale pour EOS dans Kafka Streams :

props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

Pour les consumers en boucle read-process-write :

8. Monitoring & alertes

Métriques critiques à surveiller :

MétriqueOutilSeuil d'alerte
kafka.consumer.lagJMX / Prometheus> 10 000 msgs
under-replicated-partitionsKafka MBean> 0
records-error-rateKafka MBean> 0
Latence end-to-endOpenTelemetry> SLA défini
# Consumer lag via CLI
kafka-consumer-groups.sh --bootstrap-server broker:9092 \
  --describe --group orders-billing-service

# Reset offset en cas d'incident (dry-run)
kafka-consumer-groups.sh --bootstrap-server broker:9092 \
  --group orders-billing-service --topic orders.created \
  --reset-offsets --to-earliest --dry-run

Critères de décision

BesoinChoix
Traitement stateless simpleConsumer KafkaConsumer
Aggregation / join / windowingKafka Streams ou ksqlDB
SQL déclaratif sur streamsksqlDB
CDC depuis base relationnelleDebezium + Kafka Connect
Garantie exactement-une-foisEOS (EXACTLY_ONCE_V2)
Payload volumineux (> 1 MB)Claim Check pattern

Garde-fous / Anti-patterns / Pièges