Consumir missatges de Kafka amb Go: exemple pràctic per a backend
Com crear un consumidor de Kafka en Go amb segmentio/kafka-go: missatges, commits, errors, context i shutdown ordenat.

Els consumidors de Kafka són un dels terrenys on Go brilla amb força: processos de llarga durada, processament concurrent de missatges i desplegament com un binari estàtic sense dependències. Si véns de la JVM, on un consumidor Kafka necessita mig centenar de dependències transitives i 512 MB de heap per arrencar, la diferència amb Go és brutal.
He muntat consumidors Kafka en Go per al processament d’esdeveniments, sincronització de dades entre serveis i pipelines d’ingesta. En tots els casos, el patró és el mateix: llegir missatges, processar-los, gestionar errors, fer commit d’offsets i tancar neta quan arriba un senyal. El que canvia és la lògica de negoci del mig.
D’això va aquest article: construir un consumidor Kafka en Go pas a pas, amb codi que pots fer servir com a base real. Sense abstraccions innecessàries, sense frameworks màgics.
Per què Go per a consumidors Kafka
Go no és l’única opció, però encaixa especialment bé en aquest tipus de workloads per raons concretes:
- Binaris estàtics: Un consumidor Kafka en Go compila a un binari de ~15 MB. Sense runtime, sense JVM, sense intèrpret. La imatge Docker resultant pot basar-se en
scratchodistrolessi pesar menys de 20 MB. - Goroutines i concurrència nativa: Processar missatges en paral·lel és natural amb goroutines i channels. No necessites thread pools explícits ni frameworks de concurrència.
- Arrencada immediata: Un procés Go arrenca en mil·lisegons. A Kubernetes, això significa que un HPA pot escalar consumidors ràpidament quan el lag creix.
- Baix consum de memòria: Un consumidor Kafka típic en Go consumeix entre 20-50 MB de RAM. L’equivalent en Java/Kotlin amb Spring Kafka part de 200-300 MB.
- Context natiu: El patró de context en Go encaixa perfectament amb el cicle de vida d’un consumidor: propagació de cancel·lació, timeouts per missatge i shutdown ordenat.
No tot és perfecte. L’ecosistema de llibreries Kafka en Go és més reduït que en JVM. No hi ha equivalent directe a Kafka Streams o Spring Kafka. Però per a consumidors purs (que és el 80% dels casos), Go és més que suficient.
Kafka en 2 minuts: el just per entendre el codi
Si ja treballes amb Kafka cada dia, salta aquesta secció. Si no, aquests són els conceptes que necessites per seguir l’article:
- Topic: Un canal de missatges amb nom. Els productors escriuen missatges a un topic, els consumidors els llegeixen.
- Partició: Cada topic es divideix en particions. Les particions permeten paral·lelisme: pots tenir N consumidors llegint N particions en paral·lel.
- Offset: Cada missatge dins d’una partició té un número seqüencial (offset). El consumidor porta el compte de fins on ha llegit.
- Consumer Group: Un grup de consumidors que es reparteixen les particions d’un topic. Kafka garanteix que cada partició la llegeix exactament un consumidor del grup.
- Commit: L’acte d’informar Kafka que has processat un missatge fins a cert offset. Si el consumidor cau, en reiniciar comença des de l’últim offset committed.
La relació clau: si un topic té 6 particions i el teu consumer group té 3 consumidors, cadascun llegeix 2 particions. Si puges a 6 consumidors, cadascun llegeix 1. Si puges a 7, un es queda sense particions (idle). El nombre de particions és el sostre de paral·lelisme per consumer group.
Triar llibreria: segmentio/kafka-go vs confluent-kafka-go
En Go hi ha dues opcions principals:
confluent-kafka-go
- Wrapper CGo sobre
librdkafka(C). - Rendiment màxim i compatibilitat total amb les features de Kafka.
- Requereix
librdkafkainstal·lada o linkada estàticament (compilació més lenta, cross-compilation més complicada). - API més propera al client C, menys idiomàtica en Go.
segmentio/kafka-go
- Implementació pura en Go, sense dependències CGo.
- API idiomàtica:
Reader,Writer,Conn. - Cross-compilation trivial. Compilar per a Linux des de macOS funciona sense canvis.
- Rendiment excel·lent per a la majoria de casos (no hi ha diferència pràctica per sota de 100K msg/s).
La meva elecció: segmentio/kafka-go. Per al 90% de consumidors backend, la simplicitat de compilació, l’API neta i l’absència de CGo compensen amb escreix. Només triaria confluent-kafka-go si necessités features molt específiques com transaccions de Kafka o Schema Registry integrat a nivell de llibreria.
Instal·lem la dependència:
go get github.com/segmentio/kafka-goConfigurar el consumidor amb kafka-go
La llibreria ofereix dues APIs: Reader (alt nivell, gestiona el consumer group) i Conn (baix nivell, connexió directa). Per a consumidors amb consumer group, Reader és el que vols.
// internal/kafka/consumer.go
package kafka
import (
\"context\"
\"log/slog\"
\"time\"
\"github.com/segmentio/kafka-go\"
)
type ConsumerConfig struct {
Brokers []string
Topic string
GroupID string
MinBytes int
MaxBytes int
CommitInterval time.Duration
}
func DefaultConsumerConfig() ConsumerConfig {
return ConsumerConfig{
Brokers: []string{\"localhost:9092\"},
Topic: \"events\",
GroupID: \"my-service\",
MinBytes: 1, // 1 byte mínim per fetch
MaxBytes: 10e6, // 10 MB màxim per fetch
CommitInterval: 0, // 0 = commit manual
}
}
func NewReader(cfg ConsumerConfig) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topic,
GroupID: cfg.GroupID,
MinBytes: cfg.MinBytes,
MaxBytes: cfg.MaxBytes,
CommitInterval: cfg.CommitInterval,
StartOffset: kafka.LastOffset,
Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
slog.Debug(msg, \"args\", args)
}),
ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
slog.Error(msg, \"args\", args)
}),
})
}Punts importants:
CommitInterval: 0: Desactiva l’auto-commit. Volem control total sobre quan es confirma un offset. Més sobre això a la secció de commits.StartOffset: kafka.LastOffset: Si el consumer group no té offset desat (primera vegada), comença des de l’últim missatge. L’alternativa éskafka.FirstOffsetper consumir tot l’històric.GroupID: Identifica el consumer group. Tots els processos amb el mateixGroupIDes reparteixen les particions.
Llegir missatges en bucle
El patró bàsic és un bucle infinit amb reader.ReadMessage() o reader.FetchMessage(). La diferència és crucial:
ReadMessage(): Llegeix el missatge i fa commit automàticament.FetchMessage(): Llegeix el missatge sense fer commit. Tu decideixes quan cridarCommitMessages().
Per a un consumidor amb commit manual (que és el que vols en producció), usa FetchMessage():
// internal/kafka/consumer.go
type MessageHandler func(ctx context.Context, msg kafka.Message) error
func Consume(ctx context.Context, reader *kafka.Reader, handler MessageHandler) error {
slog.Info(\"starting kafka consumer\",
\"topic\", reader.Config().Topic,
\"group\", reader.Config().GroupID,
)
for {
msg, err := reader.FetchMessage(ctx)
if err != nil {
if ctx.Err() != nil {
slog.Info(\"consumer context cancelled, stopping\")
return nil
}
slog.Error(\"error fetching message\", \"error\", err)
continue
}
slog.Debug(\"message received\",
\"topic\", msg.Topic,
\"partition\", msg.Partition,
\"offset\", msg.Offset,
\"key\", string(msg.Key),
)
if err := handler(ctx, msg); err != nil {
slog.Error(\"error processing message\",
\"error\", err,
\"topic\", msg.Topic,
\"partition\", msg.Partition,
\"offset\", msg.Offset,
)
// No fem commit: el missatge es reprocessarà
continue
}
if err := reader.CommitMessages(ctx, msg); err != nil {
slog.Error(\"error committing message\",
\"error\", err,
\"offset\", msg.Offset,
)
}
}
}Aquest patró té una propietat fonamental: at-least-once delivery. Si el procés cau després de processar un missatge però abans de fer commit, en reiniciar tornarà a llegir aquell missatge. El teu handler ha de ser idempotent.
El check de ctx.Err() després de l’error de FetchMessage és clau. Quan cancel·les el context (per exemple, en rebre SIGTERM), FetchMessage retorna un error. Sense aquell check, el consumidor entraria en un bucle infinit d’errors.
Processar missatges: deserialització i lògica de negoci
El MessageHandler rep un kafka.Message amb bytes crus. Necessites deserialitzar i executar la teva lògica de negoci. Un enfocament net és separar la deserialització del processament:
// internal/event/order.go
package event
import (
\"context\"
\"encoding/json\"
\"fmt\"
\"log/slog\"
\"time\"
\"github.com/segmentio/kafka-go\"
)
type OrderCreatedEvent struct {
OrderID string `json:\"order_id\"`
UserID string `json:\"user_id\"`
Amount float64 `json:\"amount\"`
Currency string `json:\"currency\"`
CreatedAt time.Time `json:\"created_at\"`
}
type OrderProcessor struct {
// Aquí anirien les teves dependències: repositori, servei de notificacions, etc.
}
func NewOrderProcessor() *OrderProcessor {
return &OrderProcessor{}
}
func (p *OrderProcessor) Handle(ctx context.Context, msg kafka.Message) error {
var event OrderCreatedEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
return fmt.Errorf(\"deserializing order event: %w\", err)
}
slog.Info(\"processing order\",
\"order_id\", event.OrderID,
\"user_id\", event.UserID,
\"amount\", event.Amount,
)
// La teva lògica de negoci aquí.
// Exemple: desar a base de dades, enviar notificació, etc.
if err := p.processOrder(ctx, event); err != nil {
return fmt.Errorf(\"processing order %s: %w\", event.OrderID, err)
}
return nil
}
func (p *OrderProcessor) processOrder(ctx context.Context, event OrderCreatedEvent) error {
// Implementació real de la lògica de negoci
slog.Info(\"order processed successfully\", \"order_id\", event.OrderID)
return nil
}Algunes decisions deliberades:
json.Unmarshaldirecte: Per a JSON funciona bé. Si uses Avro o Protobuf, substitueix pel deserialitzador corresponent.- Wrapping d’errors amb
%w: Permet a la capa superior inspeccionar el tipus d’error i decidir si reintenta o descarta. Més sobre això a la secció d’errors. - Struct
OrderProcessoramb dependències: No funcions soltes. Això facilita la injecció de dependències i el testing.
Estratègies de commit: auto-commit vs commit manual
Com i quan fas commit d’offsets defineix les garanties de lliurament del teu consumidor. Hi ha tres estratègies principals:
Auto-commit periòdic
reader := kafka.NewReader(kafka.ReaderConfig{
// ...
CommitInterval: 5 * time.Second, // Commit cada 5 segons
})Kafka-go fa commit automàticament dels offsets dels missatges llegits cada N segons. Avantatge: simplicitat. Problema: si el procés cau entre un commit automàtic i el següent, perds la referència dels missatges processats i es reproces en reiniciar.
Commit manual per missatge
És el que hem vist a dalt: FetchMessage + CommitMessages després de processar cada missatge.
msg, _ := reader.FetchMessage(ctx)
// processar...
reader.CommitMessages(ctx, msg)Avantatge: control total. L’offset es fa commit només quan el missatge està processat. Desavantatge: un commit per cada missatge implica més crides a Kafka (més latència si processen milers de missatges per segon).
Commit manual per lot
Acumules N missatges o esperes T temps, processes el lot, i fas commit de l’últim offset del lot:
func ConsumeBatch(ctx context.Context, reader *kafka.Reader, batchSize int, handler func(ctx context.Context, msgs []kafka.Message) error) error {
batch := make([]kafka.Message, 0, batchSize)
for {
msg, err := reader.FetchMessage(ctx)
if err != nil {
if ctx.Err() != nil {
return nil
}
slog.Error(\"error fetching message\", \"error\", err)
continue
}
batch = append(batch, msg)
if len(batch) >= batchSize {
if err := handler(ctx, batch); err != nil {
slog.Error(\"error processing batch\", \"error\", err)
batch = batch[:0]
continue
}
// Commit de l'últim missatge del lot
last := batch[len(batch)-1]
if err := reader.CommitMessages(ctx, last); err != nil {
slog.Error(\"error committing batch\", \"error\", err)
}
batch = batch[:0]
}
}
}Aquesta estratègia és la més eficient per a alt throughput, però té un risc: si el procés cau a meitat d’un lot, es reproces tots els missatges del lot. El teu handler ha de ser idempotent.
La meva recomanació: comença amb commit manual per missatge. És el més segur i fàcil de raonar. Passa a batch només si el throughput ho exigeix i la teva lògica és idempotent (que hauria de ser-ho de totes formes).
Gestió d’errors: transitoris vs permanents
No tots els errors es tracten igual. Un timeout de xarxa i un JSON mal format requereixen estratègies diferents:
// internal/kafka/errors.go
package kafka
import \"errors\"
// PermanentError indica un error que no es resoldrà amb reintents.
// Exemple: JSON mal format, camp obligatori absent, validació fallida.
type PermanentError struct {
Err error
}
func (e *PermanentError) Error() string {
return e.Err.Error()
}
func (e *PermanentError) Unwrap() error {
return e.Err
}
func NewPermanentError(err error) *PermanentError {
return &PermanentError{Err: err}
}
func IsPermanent(err error) bool {
var permanent *PermanentError
return errors.As(err, &permanent)
}Ara, al consumidor, la lògica de decisió:
func ConsumeWithRetry(ctx context.Context, reader *kafka.Reader, handler MessageHandler, maxRetries int) error {
for {
msg, err := reader.FetchMessage(ctx)
if err != nil {
if ctx.Err() != nil {
return nil
}
slog.Error(\"error fetching message\", \"error\", err)
continue
}
var processingErr error
for attempt := 1; attempt <= maxRetries; attempt++ {
processingErr = handler(ctx, msg)
if processingErr == nil {
break
}
if IsPermanent(processingErr) {
slog.Error(\"permanent error, skipping message\",
\"error\", processingErr,
\"offset\", msg.Offset,
\"partition\", msg.Partition,
)
// Aquí podries enviar a un dead-letter topic
break
}
slog.Warn(\"transient error, retrying\",
\"error\", processingErr,
\"attempt\", attempt,
\"max_retries\", maxRetries,
)
time.Sleep(time.Duration(attempt) * time.Second) // Backoff lineal
}
// Commit fins i tot si ha fallat: evita bloquejar la partició
if err := reader.CommitMessages(ctx, msg); err != nil {
slog.Error(\"error committing\", \"error\", err)
}
}
}La decisió de fer commit fins i tot quan el processament falla és deliberada. Si no ho fas, el consumidor es queda encallat al mateix missatge indefinidament. L’alternativa és enviar els missatges fallits a un dead-letter topic (DLT) abans de fer commit:
func sendToDeadLetter(ctx context.Context, writer *kafka.Writer, msg kafka.Message, err error) error {
return writer.WriteMessages(ctx, kafka.Message{
Key: msg.Key,
Value: msg.Value,
Headers: append(msg.Headers,
kafka.Header{Key: \"original-topic\", Value: []byte(msg.Topic)},
kafka.Header{Key: \"error\", Value: []byte(err.Error())},
kafka.Header{Key: \"failed-at\", Value: []byte(time.Now().UTC().Format(time.RFC3339))},
),
})
}Shutdown ordenat amb context i senyals del sistema
Un consumidor Kafka en producció ha de tancar neta: deixar de llegir missatges, acabar el processament en curs i fer commit del pendent. Si el procés mor de cop, els missatges en vol es reproces (que no és catastròfic si ets idempotent, però és un malbaratament).
El patró és interceptar senyals del sistema operatiu i propagar la cancel·lació a través del context de Go:
// cmd/consumer/main.go
package main
import (
\"context\"
\"log/slog\"
\"os\"
\"os/signal\"
\"syscall\"
appkafka \"myservice/internal/kafka\"
\"myservice/internal/event\"
)
func main() {
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
})))
ctx, cancel := signal.NotifyContext(context.Background(),
syscall.SIGINT,
syscall.SIGTERM,
)
defer cancel()
cfg := appkafka.DefaultConsumerConfig()
cfg.Topic = \"orders\"
cfg.GroupID = \"order-processor\"
reader := appkafka.NewReader(cfg)
defer func() {
if err := reader.Close(); err != nil {
slog.Error(\"error closing reader\", \"error\", err)
}
slog.Info(\"kafka reader closed\")
}()
processor := event.NewOrderProcessor()
slog.Info(\"starting consumer\",
\"topic\", cfg.Topic,
\"group\", cfg.GroupID,
\"brokers\", cfg.Brokers,
)
if err := appkafka.Consume(ctx, reader, processor.Handle); err != nil {
slog.Error(\"consumer error\", \"error\", err)
os.Exit(1)
}
slog.Info(\"consumer stopped gracefully\")
}signal.NotifyContext és la forma més neta de gestionar això en Go. Crea un context que es cancel·la automàticament quan arriba un dels senyals especificats. En cancel·lar-se el context:
FetchMessage(ctx)retorna un error.- El bucle detecta
ctx.Err() != nili surt. defer reader.Close()tanca la connexió amb Kafka.- El procés acaba net.
A Kubernetes, el procés rebrà SIGTERM quan el pod va a morir. Tens 30 segons per defecte (terminationGracePeriodSeconds) per tancar. Amb aquest patró, el shutdown tarda mil·lisegons.
Si el teu processament és lent i necessites un worker pool per paral·lelitzar missatges, el context es propaga igual a cada goroutine del pool. Quan arriba SIGTERM, totes les goroutines reben el senyal de cancel·lació i acaben la feina en curs.
Docker Compose amb Kafka per al desenvolupament local
Per desenvolupar i provar el consumidor necessites un cluster Kafka local. Docker Compose amb KRaft (sense Zookeeper) és l’opció més simple:
# docker-compose.yml
services:
kafka:
image: apache/kafka:3.8.0
container_name: kafka
ports:
- \"9092:9092\"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
healthcheck:
test: [\"CMD-SHELL\", \"/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list\"]
interval: 10s
timeout: 5s
retries: 5
kafka-init:
image: apache/kafka:3.8.0
depends_on:
kafka:
condition: service_healthy
entrypoint: [\"/bin/sh\", \"-c\"]
command: |
\"
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic orders --partitions 3 --replication-factor 1
echo 'Topic orders created'
\"Arranques amb docker compose up -d i tens un broker Kafka amb un topic orders de 3 particions llest per usar.
Per produir missatges de prova des del terminal:
// cmd/producer/main.go
package main
import (
\"context\"
\"encoding/json\"
\"fmt\"
\"log\"
\"time\"
\"github.com/segmentio/kafka-go\"
)
func main() {
writer := &kafka.Writer{
Addr: kafka.TCP(\"localhost:9092\"),
Topic: \"orders\",
Balancer: &kafka.LeastBytes{},
}
defer writer.Close()
for i := 1; i <= 10; i++ {
event := map[string]interface{}{
\"order_id\": fmt.Sprintf(\"ORD-%03d\", i),
\"user_id\": fmt.Sprintf(\"USR-%03d\", i),
\"amount\": float64(i) * 29.99,
\"currency\": \"EUR\",
\"created_at\": time.Now().UTC(),
}
value, _ := json.Marshal(event)
err := writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(fmt.Sprintf(\"ORD-%03d\", i)),
Value: value,
})
if err != nil {
log.Fatalf(\"error writing message: %v\", err)
}
fmt.Printf(\"sent: %s\n\", string(value))
}
}Testing amb testcontainers
Els tests unitaris estan bé per a la lògica de negoci, però un consumidor Kafka necessita tests d’integració contra un Kafka real. testcontainers-go et permet aixecar un broker Kafka en un contenidor Docker dins dels teus tests:
// internal/kafka/consumer_test.go
package kafka_test
import (
\"context\"
\"encoding/json\"
\"testing\"
\"time\"
\"github.com/segmentio/kafka-go\"
tc \"github.com/testcontainers/testcontainers-go\"
\"github.com/testcontainers/testcontainers-go/modules/kafka\"
)
func TestConsumer_ProcessesMessages(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
// Aixecar Kafka en un contenidor
kafkaContainer, err := kafka.Run(ctx, \"confluentinc/confluent-local:7.5.0\",
kafka.WithClusterID(\"test-cluster\"),
)
if err != nil {
t.Fatalf(\"starting kafka container: %v\", err)
}
defer func() {
if err := tc.TerminateContainer(kafkaContainer); err != nil {
t.Logf(\"terminating container: %v\", err)
}
}()
brokers, err := kafkaContainer.Brokers(ctx)
if err != nil {
t.Fatalf(\"getting brokers: %v\", err)
}
topic := \"test-orders\"
// Crear el topic
conn, err := kafka.DialLeader(ctx, \"tcp\", brokers[0], topic, 0)
if err != nil {
// Si falla, intentar crear via l'API d'admin
adminConn, _ := kafka.Dial(\"tcp\", brokers[0])
defer adminConn.Close()
adminConn.CreateTopics(kafka.TopicConfig{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
})
} else {
conn.Close()
}
// Produir un missatge de prova
writer := &kafka.Writer{
Addr: kafka.TCP(brokers[0]),
Topic: topic,
Balancer: &kafka.LeastBytes{},
}
event := map[string]interface{}{
\"order_id\": \"ORD-TEST-001\",
\"user_id\": \"USR-001\",
\"amount\": 99.99,
}
value, _ := json.Marshal(event)
err = writer.WriteMessages(ctx, kafka.Message{
Key: []byte(\"ORD-TEST-001\"),
Value: value,
})
writer.Close()
if err != nil {
t.Fatalf(\"writing message: %v\", err)
}
// Consumir i verificar
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: \"test-group\",
MinBytes: 1,
MaxBytes: 10e6,
})
defer reader.Close()
msg, err := reader.ReadMessage(ctx)
if err != nil {
t.Fatalf(\"reading message: %v\", err)
}
var received map[string]interface{}
if err := json.Unmarshal(msg.Value, &received); err != nil {
t.Fatalf(\"unmarshaling message: %v\", err)
}
if received[\"order_id\"] != \"ORD-TEST-001\" {
t.Errorf(\"expected order_id ORD-TEST-001, got %v\", received[\"order_id\"])
}
}El test aixeca un contenidor Kafka real, produeix un missatge, el consumeix i verifica el contingut. Tarda uns segons en arrencar el contenidor, però et dóna confiança real que el teu consumidor funciona contra un broker Kafka de debò.
Per executar-lo:
go test -v -tags=integration ./internal/kafka/...Pots usar build tags per separar els tests d’integració dels unitaris i no executar-los a cada go test ./....
Consideracions de producció
Un consumidor Kafka que funciona en local no és el mateix que un preparat per a producció. Aquests són els punts que no pots ignorar:
Monitorització del consumer lag
El consumer lag és la diferència entre l’offset més recent del topic i l’offset commitejat pel teu consumer group. Si el lag creix, el teu consumidor no està processant missatges al ritme que arriben.
Exposa mètriques del reader de kafka-go:
func reportMetrics(reader *kafka.Reader) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
stats := reader.Stats()
slog.Info(\"consumer stats\",
\"messages\", stats.Messages,
\"bytes\", stats.Bytes,
\"rebalances\", stats.Rebalances,
\"timeouts\", stats.Timeouts,
\"errors\", stats.Errors,
\"dial_time_avg\", stats.DialTime.Avg,
\"read_time_avg\", stats.ReadTime.Avg,
\"wait_time_avg\", stats.WaitTime.Avg,
\"fetch_size_avg\", stats.FetchSize.Avg,
\"fetch_bytes_avg\", stats.FetchBytes.Avg,
\"offset\", stats.Offset,
\"lag\", stats.Lag,
)
}
}En producció, aquestes mètriques haurien d’anar a Prometheus, Datadog o el teu sistema de monitorització. El stats.Lag és la mètrica crítica: configura una alerta si supera un llindar.
Escalat horitzontal
Per escalar consumidors, simplement llança més instàncies amb el mateix GroupID. Kafka reequilibrarà les particions automàticament. Recorda:
- Màxim de consumidors = nombre de particions. Si tens 6 particions, no escalis més enllà de 6 instàncies.
- El reequilibri té cost: Cada vegada que una instància entra o surt del grup, Kafka reasigna particions. Durant el reequilibri, el consum es pausa breument.
- A Kubernetes: Usa un Deployment (no un StatefulSet) i configura un HPA basat en el consumer lag.
Idempotència
Ho repeteixo perquè és el que més s’oblida: el teu handler ha de ser idempotent. Amb at-least-once delivery, rebràs missatges duplicats. Això passa quan:
- El consumidor processa un missatge però cau abans de commitear.
- Un reequilibri reasigna una partició i es reproces missatges.
- Kafka té un bug (rar, però passa).
Estratègies d’idempotència:
- Clau única: Desa l’
order_id(o l’identificador natural de l’esdeveniment) i fes un upsert en lloc d’un insert. - Taula de deduplicació: Desa els offsets processats i verifica abans de processar.
- Operacions idempotents per disseny:
SET balance = 100és idempotent,SET balance = balance + 100no ho és.
Timeouts i deadlines
Cada missatge s’hauria de processar amb un timeout. Si un handler es queda penjat, bloqueja aquella partició:
func handleWithTimeout(ctx context.Context, msg kafka.Message, handler MessageHandler, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return handler(ctx, msg)
}Un timeout de 30 segons per missatge és un bon punt de partida. Ajusta’l segons el teu cas.
Health checks
A Kubernetes, el teu consumidor necessita un endpoint de salut. Pots aixecar un servidor HTTP mínim en paral·lel:
func startHealthServer(ctx context.Context, port string) {
mux := http.NewServeMux()
mux.HandleFunc(\"/health\", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(\"ok\"))
})
server := &http.Server{Addr: \":\" + port, Handler: mux}
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
server.Shutdown(shutdownCtx)
}()
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
slog.Error(\"health server error\", \"error\", err)
}
}Llança’l en una goroutine des de main() i configura els probes de Kubernetes per apuntar a /health.
Estructura final del projecte
En juntar-ho tot, el projecte queda així:
kafka-consumer/
├── cmd/
│ ├── consumer/
│ │ └── main.go # Punt d'entrada del consumidor
│ └── producer/
│ └── main.go # Productor de proves
├── internal/
│ ├── event/
│ │ └── order.go # Handler d'esdeveniments de comandes
│ └── kafka/
│ ├── consumer.go # Configuració i bucle de consum
│ ├── consumer_test.go # Tests d'integració
│ └── errors.go # Tipus d'error
├── docker-compose.yml
├── go.mod
└── go.sumSi véns d’un article sobre workers en Go, veuràs que l’estructura és gairebé idèntica. I no és casualitat: un consumidor Kafka és, en essència, un worker que rep feina des d’un topic en lloc des d’una cua en memòria.
Kafka sense la complexitat de la JVM
Un consumidor Kafka en Go es construeix amb poques peces que encaixen de forma natural: un reader de segmentio/kafka-go amb consumer group i commit manual, un bucle de consum basat en FetchMessage + CommitMessages per al control total de l’offset, i handlers tipats que deserialitzen el missatge i executen la lògica de negoci. La resta és classificar errors entre transitoris i permanents, garantir un shutdown ordenat amb signal.NotifyContext, i dissenyar handlers idempotents perquè at-least-once delivery implica duplicats.
Go no té l’ecosistema Kafka de la JVM. No hi ha Kafka Streams, no hi ha Spring Kafka amb les seves 47 anotacions. Però per a consumidors backend (que són la majoria dels casos), el que ofereix és suficient i considerablement més simple. Un binari de 15 MB, 30 MB de RAM, arrencada en mil·lisegons i concurrència nativa per paral·lelitzar el processament.
El que m’agrada d’aquest enfocament és que el consumidor acaba sent un worker més. Si véns de l’article de workers en Go, hauràs vist que l’estructura és gairebé idèntica. La diferència és d’on ve la feina, no de com la processen. I això és exactament el que vols: patrons reutilitzables que s’adapten al transport sense reescriure la lògica.
Si necessites processar missatges Kafka amb concurrència real, combina el que has vist aquí amb un worker pool: el reader alimenta un channel, el pool processa els missatges en paral·lel i el commit es fa quan tots els missatges del batch estan processats.


