Consumir mensajes de Kafka con Go: ejemplo práctico para backend
Cómo crear un consumidor de Kafka en Go con segmentio/kafka-go: mensajes, commits, errores, context y shutdown ordenado.

Los consumidores de Kafka son uno de los terrenos donde Go brilla con fuerza: procesos de larga duración, procesamiento concurrente de mensajes y despliegue como un binario estático sin dependencias. Si vienes de JVM, donde un consumidor Kafka necesita medio centenar de dependencias transitivas y 512 MB de heap para arrancar, la diferencia con Go es brutal.
He montado consumidores Kafka en Go para procesamiento de eventos, sincronización de datos entre servicios y pipelines de ingesta. En todos los casos, el patrón es el mismo: leer mensajes, procesarlos, gestionar errores, hacer commit de offsets y cerrar limpiamente cuando llega una señal. Lo que cambia es la lógica de negocio del medio.
Este artículo va de eso: construir un consumidor Kafka en Go paso a paso, con código que puedes usar como base real. Sin abstracciones innecesarias, sin frameworks mágicos.
Por qué Go para consumidores Kafka
Go no es la única opción, pero encaja especialmente bien en este tipo de workloads por razones concretas:
- Binarios estáticos: Un consumidor Kafka en Go compila a un binario de ~15 MB. Sin runtime, sin JVM, sin intérprete. La imagen Docker resultante puede basarse en
scratchodistrolessy pesar menos de 20 MB. - Goroutines y concurrencia nativa: Procesar mensajes en paralelo es natural con goroutines y channels. No necesitas thread pools explícitos ni frameworks de concurrencia.
- Arranque inmediato: Un proceso Go arranca en milisegundos. En Kubernetes, esto significa que un HPA puede escalar consumidores rápidamente cuando el lag crece.
- Bajo consumo de memoria: Un consumidor Kafka típico en Go consume entre 20-50 MB de RAM. El equivalente en Java/Kotlin con Spring Kafka parte de 200-300 MB.
- Context nativo: El patrón de context en Go encaja perfectamente con el ciclo de vida de un consumidor: propagación de cancelación, timeouts por mensaje y shutdown ordenado.
No todo es perfecto. El ecosistema de librerías Kafka en Go es más reducido que en JVM. No hay equivalente directo a Kafka Streams o Spring Kafka. Pero para consumidores puros (que es el 80% de los casos), Go es más que suficiente.
Kafka en 2 minutos: lo justo para entender el código
Si ya trabajas con Kafka a diario, salta esta sección. Si no, estos son los conceptos que necesitas para seguir el artículo:
- Topic: Un canal de mensajes con nombre. Los productores escriben mensajes en un topic, los consumidores los leen.
- Partición: Cada topic se divide en particiones. Las particiones permiten paralelismo: puedes tener N consumidores leyendo N particiones en paralelo.
- Offset: Cada mensaje dentro de una partición tiene un número secuencial (offset). El consumidor lleva la cuenta de hasta dónde ha leído.
- Consumer Group: Un grupo de consumidores que se reparten las particiones de un topic. Kafka garantiza que cada partición la lee exactamente un consumidor del grupo.
- Commit: El acto de informar a Kafka de que has procesado un mensaje hasta cierto offset. Si el consumidor se cae, al reiniciar empieza desde el último offset committed.
La relación clave: si un topic tiene 6 particiones y tu consumer group tiene 3 consumidores, cada uno lee 2 particiones. Si subes a 6 consumidores, cada uno lee 1. Si subes a 7, uno se queda sin particiones (idle). El número de particiones es el techo de paralelismo por consumer group.
Elegir librería: segmentio/kafka-go vs confluent-kafka-go
En Go hay dos opciones principales:
confluent-kafka-go
- Wrapper CGo sobre
librdkafka(C). - Rendimiento máximo y compatibilidad total con las features de Kafka.
- Requiere
librdkafkainstalada o linkada estáticamente (compilación más lenta, cross-compilation más complicada). - API más cercana al cliente C, menos idiomática en Go.
segmentio/kafka-go
- Implementación pura en Go, sin dependencias CGo.
- API idiomática:
Reader,Writer,Conn. - Cross-compilation trivial. Compilar para Linux desde macOS funciona sin cambios.
- Rendimiento excelente para la mayoría de casos (no hay diferencia práctica por debajo de 100K msg/s).
Mi elección: segmentio/kafka-go. Para el 90% de consumidores backend, la simplicidad de compilación, la API limpia y la ausencia de CGo compensan con creces. Solo elegiría confluent-kafka-go si necesitara features muy específicas como transacciones de Kafka o Schema Registry integrado a nivel de librería.
Instalamos la dependencia:
go get github.com/segmentio/kafka-goConfigurar el consumidor con kafka-go
La librería ofrece dos APIs: Reader (alto nivel, maneja el consumer group) y Conn (bajo nivel, conexión directa). Para consumidores con consumer group, Reader es lo que quieres.
// internal/kafka/consumer.go
package kafka
import (
"context"
"log/slog"
"time"
"github.com/segmentio/kafka-go"
)
type ConsumerConfig struct {
Brokers []string
Topic string
GroupID string
MinBytes int
MaxBytes int
CommitInterval time.Duration
}
func DefaultConsumerConfig() ConsumerConfig {
return ConsumerConfig{
Brokers: []string{"localhost:9092"},
Topic: "events",
GroupID: "my-service",
MinBytes: 1, // 1 byte mínimo por fetch
MaxBytes: 10e6, // 10 MB máximo por fetch
CommitInterval: 0, // 0 = commit manual
}
}
func NewReader(cfg ConsumerConfig) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topic,
GroupID: cfg.GroupID,
MinBytes: cfg.MinBytes,
MaxBytes: cfg.MaxBytes,
CommitInterval: cfg.CommitInterval,
StartOffset: kafka.LastOffset,
Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
slog.Debug(msg, "args", args)
}),
ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
slog.Error(msg, "args", args)
}),
})
}Puntos importantes:
CommitInterval: 0: Desactiva auto-commit. Queremos control total sobre cuándo se confirma un offset. Más sobre esto en la sección de commits.StartOffset: kafka.LastOffset: Si el consumer group no tiene offset guardado (primera vez), empieza desde el último mensaje. La alternativa eskafka.FirstOffsetpara consumir todo el histórico.GroupID: Identifica al consumer group. Todos los procesos con el mismoGroupIDse reparten las particiones.
Leer mensajes en bucle
El patrón básico es un bucle infinito con reader.ReadMessage() o reader.FetchMessage(). La diferencia es crucial:
ReadMessage(): Lee el mensaje y hace commit automáticamente.FetchMessage(): Lee el mensaje sin hacer commit. Tú decides cuándo llamar aCommitMessages().
Para un consumidor con commit manual (que es lo que quieres en producción), usa FetchMessage():
// internal/kafka/consumer.go
type MessageHandler func(ctx context.Context, msg kafka.Message) error
func Consume(ctx context.Context, reader *kafka.Reader, handler MessageHandler) error {
slog.Info("starting kafka consumer",
"topic", reader.Config().Topic,
"group", reader.Config().GroupID,
)
for {
msg, err := reader.FetchMessage(ctx)
if err != nil {
if ctx.Err() != nil {
slog.Info("consumer context cancelled, stopping")
return nil
}
slog.Error("error fetching message", "error", err)
continue
}
slog.Debug("message received",
"topic", msg.Topic,
"partition", msg.Partition,
"offset", msg.Offset,
"key", string(msg.Key),
)
if err := handler(ctx, msg); err != nil {
slog.Error("error processing message",
"error", err,
"topic", msg.Topic,
"partition", msg.Partition,
"offset", msg.Offset,
)
// No hacemos commit: el mensaje se reprocesará
continue
}
if err := reader.CommitMessages(ctx, msg); err != nil {
slog.Error("error committing message",
"error", err,
"offset", msg.Offset,
)
}
}
}Este patrón tiene una propiedad fundamental: at-least-once delivery. Si el proceso se cae después de procesar un mensaje pero antes de hacer commit, al reiniciar volverá a leer ese mensaje. Tu handler debe ser idempotente.
El check de ctx.Err() después del error de FetchMessage es clave. Cuando cancelas el context (por ejemplo, al recibir SIGTERM), FetchMessage devuelve un error. Sin ese check, el consumidor entraría en un bucle infinito de errores.
Procesar mensajes: deserialización y lógica de negocio
El MessageHandler recibe un kafka.Message con bytes crudos. Necesitas deserializar y ejecutar tu lógica de negocio. Un enfoque limpio es separar la deserialización del procesamiento:
// internal/event/order.go
package event
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"
"github.com/segmentio/kafka-go"
)
type OrderCreatedEvent struct {
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
Amount float64 `json:"amount"`
Currency string `json:"currency"`
CreatedAt time.Time `json:"created_at"`
}
type OrderProcessor struct {
// Aquí irían tus dependencias: repositorio, servicio de notificaciones, etc.
}
func NewOrderProcessor() *OrderProcessor {
return &OrderProcessor{}
}
func (p *OrderProcessor) Handle(ctx context.Context, msg kafka.Message) error {
var event OrderCreatedEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
return fmt.Errorf("deserializing order event: %w", err)
}
slog.Info("processing order",
"order_id", event.OrderID,
"user_id", event.UserID,
"amount", event.Amount,
)
// Tu lógica de negocio aquí.
// Ejemplo: guardar en base de datos, enviar notificación, etc.
if err := p.processOrder(ctx, event); err != nil {
return fmt.Errorf("processing order %s: %w", event.OrderID, err)
}
return nil
}
func (p *OrderProcessor) processOrder(ctx context.Context, event OrderCreatedEvent) error {
// Implementación real de la lógica de negocio
slog.Info("order processed successfully", "order_id", event.OrderID)
return nil
}Algunas decisiones deliberadas:
json.Unmarshaldirecto: Para JSON funciona bien. Si usas Avro o Protobuf, sustituye por el deserializador correspondiente.- Wrapping de errores con
%w: Permite a la capa superior inspeccionar el tipo de error y decidir si reintenta o descarta. Más sobre esto en la sección de errores. - Struct
OrderProcessorcon dependencias: No funciones sueltas. Esto facilita la inyección de dependencias y el testing.
Estrategias de commit: auto-commit vs commit manual
Cómo y cuándo haces commit de offsets define las garantías de entrega de tu consumidor. Hay tres estrategias principales:
Auto-commit periódico
reader := kafka.NewReader(kafka.ReaderConfig{
// ...
CommitInterval: 5 * time.Second, // Commit cada 5 segundos
})Kafka-go commitea automáticamente los offsets de los mensajes leídos cada N segundos. Ventaja: simplicidad. Problema: si el proceso se cae entre un commit automático y el siguiente, pierdes la referencia de los mensajes procesados y se reprocesan al reiniciar.
Commit manual por mensaje
Es lo que hemos visto arriba: FetchMessage + CommitMessages después de procesar cada mensaje.
msg, _ := reader.FetchMessage(ctx)
// procesar...
reader.CommitMessages(ctx, msg)Ventaja: control total. El offset se commitea solo cuando el mensaje está procesado. Desventaja: un commit por cada mensaje implica más llamadas a Kafka (más latencia si procesas miles de mensajes por segundo).
Commit manual por lote
Acumulas N mensajes o esperas T tiempo, procesas el lote, y commiteas el último offset del lote:
func ConsumeBatch(ctx context.Context, reader *kafka.Reader, batchSize int, handler func(ctx context.Context, msgs []kafka.Message) error) error {
batch := make([]kafka.Message, 0, batchSize)
for {
msg, err := reader.FetchMessage(ctx)
if err != nil {
if ctx.Err() != nil {
return nil
}
slog.Error("error fetching message", "error", err)
continue
}
batch = append(batch, msg)
if len(batch) >= batchSize {
if err := handler(ctx, batch); err != nil {
slog.Error("error processing batch", "error", err)
batch = batch[:0]
continue
}
// Commit del último mensaje del lote
last := batch[len(batch)-1]
if err := reader.CommitMessages(ctx, last); err != nil {
slog.Error("error committing batch", "error", err)
}
batch = batch[:0]
}
}
}Esta estrategia es la más eficiente para alto throughput, pero tiene un riesgo: si el proceso se cae a mitad de un lote, se reprocesan todos los mensajes del lote. Tu handler debe ser idempotente.
Mi recomendación: empieza con commit manual por mensaje. Es el más seguro y fácil de razonar. Pasa a batch solo si el throughput lo exige y tu lógica es idempotente (que debería serlo de todas formas).
Gestión de errores: transitorios vs permanentes
No todos los errores se tratan igual. Un timeout de red y un JSON malformado requieren estrategias distintas:
// internal/kafka/errors.go
package kafka
import "errors"
// PermanentError indica un error que no se resolverá con reintentos.
// Ejemplo: JSON malformado, campo obligatorio ausente, validación fallida.
type PermanentError struct {
Err error
}
func (e *PermanentError) Error() string {
return e.Err.Error()
}
func (e *PermanentError) Unwrap() error {
return e.Err
}
func NewPermanentError(err error) *PermanentError {
return &PermanentError{Err: err}
}
func IsPermanent(err error) bool {
var permanent *PermanentError
return errors.As(err, &permanent)
}Ahora, en el consumidor, la lógica de decisión:
func ConsumeWithRetry(ctx context.Context, reader *kafka.Reader, handler MessageHandler, maxRetries int) error {
for {
msg, err := reader.FetchMessage(ctx)
if err != nil {
if ctx.Err() != nil {
return nil
}
slog.Error("error fetching message", "error", err)
continue
}
var processingErr error
for attempt := 1; attempt <= maxRetries; attempt++ {
processingErr = handler(ctx, msg)
if processingErr == nil {
break
}
if IsPermanent(processingErr) {
slog.Error("permanent error, skipping message",
"error", processingErr,
"offset", msg.Offset,
"partition", msg.Partition,
)
// Aquí podrías enviar a un dead-letter topic
break
}
slog.Warn("transient error, retrying",
"error", processingErr,
"attempt", attempt,
"max_retries", maxRetries,
)
time.Sleep(time.Duration(attempt) * time.Second) // Backoff lineal
}
// Commit incluso si falló: evita bloquear la partición
if err := reader.CommitMessages(ctx, msg); err != nil {
slog.Error("error committing", "error", err)
}
}
}La decisión de hacer commit incluso cuando el procesamiento falla es deliberada. Si no lo haces, el consumidor se queda atascado en el mismo mensaje indefinidamente. La alternativa es enviar los mensajes fallidos a un dead-letter topic (DLT) antes de hacer commit:
func sendToDeadLetter(ctx context.Context, writer *kafka.Writer, msg kafka.Message, err error) error {
return writer.WriteMessages(ctx, kafka.Message{
Key: msg.Key,
Value: msg.Value,
Headers: append(msg.Headers,
kafka.Header{Key: "original-topic", Value: []byte(msg.Topic)},
kafka.Header{Key: "error", Value: []byte(err.Error())},
kafka.Header{Key: "failed-at", Value: []byte(time.Now().UTC().Format(time.RFC3339))},
),
})
}Shutdown ordenado con context y señales del sistema
Un consumidor Kafka en producción debe cerrar limpiamente: dejar de leer mensajes, terminar el procesamiento en curso y hacer commit de lo pendiente. Si el proceso muere de golpe, los mensajes en vuelo se reprocesan (que no es catastrófico si eres idempotente, pero es un desperdicio).
El patrón es interceptar señales del sistema operativo y propagar la cancelación a través del context de Go:
// cmd/consumer/main.go
package main
import (
"context"
"log/slog"
"os"
"os/signal"
"syscall"
appkafka "myservice/internal/kafka"
"myservice/internal/event"
)
func main() {
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
})))
ctx, cancel := signal.NotifyContext(context.Background(),
syscall.SIGINT,
syscall.SIGTERM,
)
defer cancel()
cfg := appkafka.DefaultConsumerConfig()
cfg.Topic = "orders"
cfg.GroupID = "order-processor"
reader := appkafka.NewReader(cfg)
defer func() {
if err := reader.Close(); err != nil {
slog.Error("error closing reader", "error", err)
}
slog.Info("kafka reader closed")
}()
processor := event.NewOrderProcessor()
slog.Info("starting consumer",
"topic", cfg.Topic,
"group", cfg.GroupID,
"brokers", cfg.Brokers,
)
if err := appkafka.Consume(ctx, reader, processor.Handle); err != nil {
slog.Error("consumer error", "error", err)
os.Exit(1)
}
slog.Info("consumer stopped gracefully")
}signal.NotifyContext es la forma más limpia de manejar esto en Go. Crea un context que se cancela automáticamente cuando llega una de las señales especificadas. Al cancelarse el context:
FetchMessage(ctx)devuelve un error.- El bucle detecta
ctx.Err() != nily sale. defer reader.Close()cierra la conexión con Kafka.- El proceso termina limpiamente.
En Kubernetes, el proceso recibirá SIGTERM cuando el pod va a morir. Tienes 30 segundos por defecto (terminationGracePeriodSeconds) para cerrar. Con este patrón, el shutdown tarda milisegundos.
Si tu procesamiento es lento y necesitas un worker pool para paralelizar mensajes, el context se propaga igual a cada goroutine del pool. Cuando llega SIGTERM, todas las goroutines reciben la señal de cancelación y terminan su trabajo en curso.
Docker Compose con Kafka para desarrollo local
Para desarrollar y probar el consumidor necesitas un cluster Kafka local. Docker Compose con KRaft (sin Zookeeper) es la opción más simple:
# docker-compose.yml
services:
kafka:
image: apache/kafka:3.8.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
healthcheck:
test: ["CMD-SHELL", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list"]
interval: 10s
timeout: 5s
retries: 5
kafka-init:
image: apache/kafka:3.8.0
depends_on:
kafka:
condition: service_healthy
entrypoint: ["/bin/sh", "-c"]
command: |
"
/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic orders --partitions 3 --replication-factor 1
echo 'Topic orders created'
"Arrancas con docker compose up -d y tienes un broker Kafka con un topic orders de 3 particiones listo para usar.
Para producir mensajes de prueba desde la terminal:
// cmd/producer/main.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
writer := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "orders",
Balancer: &kafka.LeastBytes{},
}
defer writer.Close()
for i := 1; i <= 10; i++ {
event := map[string]interface{}{
"order_id": fmt.Sprintf("ORD-%03d", i),
"user_id": fmt.Sprintf("USR-%03d", i),
"amount": float64(i) * 29.99,
"currency": "EUR",
"created_at": time.Now().UTC(),
}
value, _ := json.Marshal(event)
err := writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(fmt.Sprintf("ORD-%03d", i)),
Value: value,
})
if err != nil {
log.Fatalf("error writing message: %v", err)
}
fmt.Printf("sent: %s\n", string(value))
}
}Testing con testcontainers
Los tests unitarios están bien para la lógica de negocio, pero un consumidor Kafka necesita tests de integración contra un Kafka real. testcontainers-go te permite levantar un broker Kafka en un contenedor Docker dentro de tus tests:
// internal/kafka/consumer_test.go
package kafka_test
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/segmentio/kafka-go"
tc "github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/kafka"
)
func TestConsumer_ProcessesMessages(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
// Levantar Kafka en un contenedor
kafkaContainer, err := kafka.Run(ctx, "confluentinc/confluent-local:7.5.0",
kafka.WithClusterID("test-cluster"),
)
if err != nil {
t.Fatalf("starting kafka container: %v", err)
}
defer func() {
if err := tc.TerminateContainer(kafkaContainer); err != nil {
t.Logf("terminating container: %v", err)
}
}()
brokers, err := kafkaContainer.Brokers(ctx)
if err != nil {
t.Fatalf("getting brokers: %v", err)
}
topic := "test-orders"
// Crear el topic
conn, err := kafka.DialLeader(ctx, "tcp", brokers[0], topic, 0)
if err != nil {
// Si falla, intentar crear vía la API de admin
adminConn, _ := kafka.Dial("tcp", brokers[0])
defer adminConn.Close()
adminConn.CreateTopics(kafka.TopicConfig{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
})
} else {
conn.Close()
}
// Producir un mensaje de prueba
writer := &kafka.Writer{
Addr: kafka.TCP(brokers[0]),
Topic: topic,
Balancer: &kafka.LeastBytes{},
}
event := map[string]interface{}{
"order_id": "ORD-TEST-001",
"user_id": "USR-001",
"amount": 99.99,
}
value, _ := json.Marshal(event)
err = writer.WriteMessages(ctx, kafka.Message{
Key: []byte("ORD-TEST-001"),
Value: value,
})
writer.Close()
if err != nil {
t.Fatalf("writing message: %v", err)
}
// Consumir y verificar
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: "test-group",
MinBytes: 1,
MaxBytes: 10e6,
})
defer reader.Close()
msg, err := reader.ReadMessage(ctx)
if err != nil {
t.Fatalf("reading message: %v", err)
}
var received map[string]interface{}
if err := json.Unmarshal(msg.Value, &received); err != nil {
t.Fatalf("unmarshaling message: %v", err)
}
if received["order_id"] != "ORD-TEST-001" {
t.Errorf("expected order_id ORD-TEST-001, got %v", received["order_id"])
}
}El test levanta un contenedor Kafka real, produce un mensaje, lo consume y verifica el contenido. Tarda unos segundos en arrancar el contenedor, pero te da confianza real de que tu consumidor funciona contra un broker Kafka de verdad.
Para ejecutarlo:
go test -v -tags=integration ./internal/kafka/...Puedes usar build tags para separar los tests de integración de los unitarios y no ejecutarlos en cada go test ./....
Consideraciones de producción
Un consumidor Kafka que funciona en local no es lo mismo que uno preparado para producción. Estos son los puntos que no puedes ignorar:
Monitorización del consumer lag
El consumer lag es la diferencia entre el offset más reciente del topic y el offset committeado por tu consumer group. Si el lag crece, tu consumidor no está procesando mensajes al ritmo que llegan.
Expón métricas del reader de kafka-go:
func reportMetrics(reader *kafka.Reader) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
stats := reader.Stats()
slog.Info("consumer stats",
"messages", stats.Messages,
"bytes", stats.Bytes,
"rebalances", stats.Rebalances,
"timeouts", stats.Timeouts,
"errors", stats.Errors,
"dial_time_avg", stats.DialTime.Avg,
"read_time_avg", stats.ReadTime.Avg,
"wait_time_avg", stats.WaitTime.Avg,
"fetch_size_avg", stats.FetchSize.Avg,
"fetch_bytes_avg", stats.FetchBytes.Avg,
"offset", stats.Offset,
"lag", stats.Lag,
)
}
}En producción, estas métricas deberían ir a Prometheus, Datadog o tu sistema de monitorización. El stats.Lag es la métrica crítica: configura una alerta si supera un umbral.
Escalado horizontal
Para escalar consumidores, simplemente lanza más instancias con el mismo GroupID. Kafka rebalanceará las particiones automáticamente. Recuerda:
- Máximo de consumidores = número de particiones. Si tienes 6 particiones, no escales más allá de 6 instancias.
- Rebalanceo tiene coste: Cada vez que una instancia entra o sale del grupo, Kafka reasigna particiones. Durante el rebalanceo, el consumo se pausa brevemente.
- En Kubernetes: Usa un Deployment (no un StatefulSet) y configura un HPA basado en el consumer lag.
Idempotencia
Lo repito porque es lo que más se olvida: tu handler debe ser idempotente. Con at-least-once delivery, vas a recibir mensajes duplicados. Esto pasa cuando:
- El consumidor procesa un mensaje pero se cae antes de commitear.
- Un rebalanceo reasigna una partición y se reprocesan mensajes.
- Kafka tiene un bug (raro, pero pasa).
Estrategias de idempotencia:
- Clave única: Guarda el
order_id(o el identificador natural del evento) y haz un upsert en vez de un insert. - Tabla de deduplicación: Guarda los offsets procesados y verifica antes de procesar.
- Operaciones idempotentes por diseño:
SET balance = 100es idempotente,SET balance = balance + 100no lo es.
Timeouts y deadlines
Cada mensaje debería procesarse con un timeout. Si un handler se queda colgado, bloquea esa partición:
func handleWithTimeout(ctx context.Context, msg kafka.Message, handler MessageHandler, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return handler(ctx, msg)
}Un timeout de 30 segundos por mensaje es un buen punto de partida. Ajústalo según tu caso.
Health checks
En Kubernetes, tu consumidor necesita un endpoint de salud. Puedes levantar un servidor HTTP mínimo en paralelo:
func startHealthServer(ctx context.Context, port string) {
mux := http.NewServeMux()
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
server := &http.Server{Addr: ":" + port, Handler: mux}
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
server.Shutdown(shutdownCtx)
}()
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
slog.Error("health server error", "error", err)
}
}Lánzalo en una goroutine desde main() y configura los probes de Kubernetes para apuntar a /health.
Estructura final del proyecto
Al juntar todo, el proyecto queda así:
kafka-consumer/
├── cmd/
│ ├── consumer/
│ │ └── main.go # Punto de entrada del consumidor
│ └── producer/
│ └── main.go # Productor de pruebas
├── internal/
│ ├── event/
│ │ └── order.go # Handler de eventos de pedidos
│ └── kafka/
│ ├── consumer.go # Configuración y bucle de consumo
│ ├── consumer_test.go # Tests de integración
│ └── errors.go # Tipos de error
├── docker-compose.yml
├── go.mod
└── go.sumSi vienes de un artículo sobre workers en Go, verás que la estructura es casi idéntica. Y no es casualidad: un consumidor Kafka es, en esencia, un worker que recibe trabajo desde un topic en lugar de desde una cola en memoria.
Kafka sin la complejidad de JVM
Un consumidor Kafka en Go se construye con pocas piezas que encajan de forma natural: un reader de segmentio/kafka-go con consumer group y commit manual, un bucle de consumo basado en FetchMessage + CommitMessages para control total del offset, y handlers tipados que deserializan el mensaje y ejecutan la lógica de negocio. El resto es clasificar errores entre transitorios y permanentes, garantizar un shutdown ordenado con signal.NotifyContext, y diseñar handlers idempotentes porque at-least-once delivery implica duplicados.
Go no tiene el ecosistema Kafka de JVM. No hay Kafka Streams, no hay Spring Kafka con sus 47 anotaciones. Pero para consumidores backend (que son la mayoría de los casos), lo que ofrece es suficiente y considerablemente más simple. Un binario de 15 MB, 30 MB de RAM, arranque en milisegundos y concurrencia nativa para paralelizar el procesamiento.
Lo que me gusta de este enfoque es que el consumidor acaba siendo un worker más. Si vienes del artículo de workers en Go, habrás visto que la estructura es casi idéntica. La diferencia es de dónde viene el trabajo, no de cómo lo procesas. Y eso es exactamente lo que quieres: patrones reutilizables que se adaptan al transporte sin reescribir la lógica.
Si necesitas procesar mensajes Kafka con concurrencia real, combina lo que has visto aquí con un worker pool: el reader alimenta un channel, el pool procesa los mensajes en paralelo y el commit se hace cuando todos los mensajes del batch están procesados.


