Crear un worker en Go para procesar jobs en segundo plano
Tutorial para construir un worker en Go con cola en memoria, context, retry, logging y shutdown ordenado. Backend real.

Las APIs se llevan toda la atención. Cada tutorial de Go empieza con un endpoint REST, un handler, un JSON de respuesta. Pero la mayoría de sistemas backend reales necesitan algo más: procesos que consumen tareas en segundo plano, reintentan fallos y se apagan de forma ordenada cuando llega una señal del sistema operativo. Eso es un worker.
Un worker bien implementado te enseña más sobre backend real que muchos CRUDs. Toca goroutines, channels, context, señales de sistema, manejo de errores, retry, logging estructurado y shutdown graceful. Todo lo que un backend engineer necesita dominar y que rara vez aparece en los tutoriales de “haz tu primera API en 10 minutos”.
En este artículo vamos a construir un worker completo desde cero. Cola en memoria, procesamiento concurrente, retry con backoff, logging con slog y shutdown ordenado con señales del sistema operativo. Código que podrías poner en producción.
Qué es un background worker
Un worker es un proceso que se ejecuta en segundo plano consumiendo tareas de una cola. No responde a peticiones HTTP. No tiene endpoints. Su único trabajo es tomar un job de la cola, procesarlo y pasar al siguiente.
Ejemplos del mundo real:
- Enviar emails de confirmación después de un registro.
- Procesar imágenes subidas por los usuarios (redimensionar, comprimir, generar thumbnails).
- Sincronizar datos con sistemas externos que tienen rate limits.
- Generar reportes PDF que tardan varios segundos.
- Indexar documentos en un motor de búsqueda.
El patrón siempre es el mismo: algo produce un job (una petición HTTP, un evento, un cron) y el worker lo consume de forma asíncrona. El productor no espera a que el job termine. Simplemente lo encola y continúa.
En Go, la implementación natural de un worker usa goroutines para la concurrencia y channels como cola de comunicación. No necesitas librerías externas para un worker funcional. La librería estándar te da todo lo necesario.
Setup del proyecto
Vamos a crear un proyecto limpio con la estructura mínima necesaria:
worker-demo/
├── go.mod
├── main.go
├── job/
│ └── job.go
├── queue/
│ └── queue.go
└── worker/
└── worker.goInicializamos el módulo:
go mod init github.com/tu-usuario/worker-demoNada de frameworks, nada de dependencias externas. Todo con la librería estándar de Go. Cuando necesites algo más, lo añadirás. Pero primero, entiende el mecanismo sin capas de abstracción.
La interfaz de Job y la cola
El primer paso es definir qué es un job. Un job necesita tres cosas: un identificador, un tipo y un método para ejecutarse.
// job/job.go
package job
import (
"context"
"fmt"
"time"
)
// Job representa una tarea que el worker puede procesar.
type Job struct {
ID string
Type string
Payload map[string]any
CreatedAt time.Time
Retries int
MaxRetry int
}
// Processor define la interfaz para procesar un job.
type Processor interface {
Process(ctx context.Context, j Job) error
}
// ProcessorFunc es un adaptador para usar funciones simples como Processor.
type ProcessorFunc func(ctx context.Context, j Job) error
func (f ProcessorFunc) Process(ctx context.Context, j Job) error {
return f(ctx, j)
}
// NewJob crea un job con valores por defecto.
func NewJob(jobType string, payload map[string]any) Job {
return Job{
ID: fmt.Sprintf("%s-%d", jobType, time.Now().UnixNano()),
Type: jobType,
Payload: payload,
CreatedAt: time.Now(),
MaxRetry: 3,
}
}Hay decisiones importantes aquí. El Payload es un map[string]any en lugar de un tipo genérico. En un worker real podrías usar generics o deserializar desde JSON, pero un mapa flexible es suficiente para empezar y evita complejidad prematura.
El patrón ProcessorFunc es el mismo que usa http.HandlerFunc en la librería estándar: un adaptador que permite usar funciones simples donde se espera una interfaz. Si vienes de Java, es el equivalente a un @FunctionalInterface.
MaxRetry tiene un valor por defecto de 3. No porque sea un número mágico, sino porque tres reintentos es un punto de partida razonable para la mayoría de operaciones. Si tu job es idempotente (y debería serlo), tres reintentos te cubren contra fallos transitorios sin saturar el sistema.
Ahora la cola. Una cola en memoria basada en un channel buffered:
// queue/queue.go
package queue
import (
"errors"
"sync/atomic"
"github.com/tu-usuario/worker-demo/job"
)
var ErrQueueClosed = errors.New("queue is closed")
// InMemoryQueue es una cola de jobs basada en un channel buffered.
type InMemoryQueue struct {
jobs chan job.Job
closed atomic.Bool
}
// NewInMemoryQueue crea una cola con la capacidad especificada.
func NewInMemoryQueue(capacity int) *InMemoryQueue {
return &InMemoryQueue{
jobs: make(chan job.Job, capacity),
}
}
// Enqueue añade un job a la cola. Devuelve error si la cola está cerrada.
func (q *InMemoryQueue) Enqueue(j job.Job) error {
if q.closed.Load() {
return ErrQueueClosed
}
select {
case q.jobs <- j:
return nil
default:
return errors.New("queue is full")
}
}
// Dequeue devuelve el channel de lectura para consumir jobs.
func (q *InMemoryQueue) Dequeue() <-chan job.Job {
return q.jobs
}
// Close cierra la cola. Los jobs que ya están en la cola se pueden seguir consumiendo.
func (q *InMemoryQueue) Close() {
if q.closed.CompareAndSwap(false, true) {
close(q.jobs)
}
}
// Len devuelve el número de jobs pendientes en la cola.
func (q *InMemoryQueue) Len() int {
return len(q.jobs)
}El select con default en Enqueue es intencional. Si la cola está llena, devuelve un error inmediatamente en lugar de bloquear al productor. En producción, podrías querer un comportamiento diferente (bloquear, descartar el job más antiguo, o devolver un error al cliente), pero fallar rápido es el default más seguro.
El atomic.Bool para closed evita race conditions al cerrar la cola. CompareAndSwap garantiza que close(q.jobs) se llama exactamente una vez, incluso si múltiples goroutines intentan cerrar la cola simultáneamente.
Fíjate en que Close() cierra el channel pero no vacía la cola. Los jobs que ya están en el buffer se pueden seguir consumiendo. Esto es crucial para el shutdown ordenado: cierras la cola para que no entren más jobs, pero los que ya están se procesan hasta el final.
Implementación del worker con goroutines
Aquí está el núcleo del sistema. El worker consume jobs de la cola, los procesa usando el Processor correspondiente y maneja errores.
// worker/worker.go
package worker
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"github.com/tu-usuario/worker-demo/job"
"github.com/tu-usuario/worker-demo/queue"
)
// Config contiene la configuración del worker.
type Config struct {
Concurrency int
MaxRetry int
InitialBackoff time.Duration
MaxBackoff time.Duration
}
// DefaultConfig devuelve una configuración por defecto razonable.
func DefaultConfig() Config {
return Config{
Concurrency: 5,
MaxRetry: 3,
InitialBackoff: 500 * time.Millisecond,
MaxBackoff: 30 * time.Second,
}
}
// Worker consume jobs de una cola y los procesa.
type Worker struct {
queue *queue.InMemoryQueue
processors map[string]job.Processor
config Config
logger *slog.Logger
wg sync.WaitGroup
}
// New crea un nuevo Worker.
func New(q *queue.InMemoryQueue, cfg Config, logger *slog.Logger) *Worker {
return &Worker{
queue: q,
processors: make(map[string]job.Processor),
config: cfg,
logger: logger,
}
}
// Register asocia un Processor a un tipo de job.
func (w *Worker) Register(jobType string, p job.Processor) {
w.processors[jobType] = p
}
// RegisterFunc es un atajo para registrar una función como Processor.
func (w *Worker) RegisterFunc(jobType string, fn func(context.Context, job.Job) error) {
w.processors[jobType] = job.ProcessorFunc(fn)
}
// Start arranca el worker con el contexto dado. Bloquea hasta que el contexto
// se cancele y todos los jobs en curso hayan terminado.
func (w *Worker) Start(ctx context.Context) {
w.logger.Info("worker starting",
slog.Int("concurrency", w.config.Concurrency),
slog.Int("registered_processors", len(w.processors)),
)
for i := 0; i < w.config.Concurrency; i++ {
w.wg.Add(1)
go w.consume(ctx, i)
}
w.wg.Wait()
w.logger.Info("worker stopped: all goroutines finished")
}
// consume es el loop principal de cada goroutine del worker.
func (w *Worker) consume(ctx context.Context, id int) {
defer w.wg.Done()
logger := w.logger.With(slog.Int("worker_id", id))
logger.Info("worker goroutine started")
for {
select {
case j, ok := <-w.queue.Dequeue():
if !ok {
logger.Info("queue closed, goroutine exiting")
return
}
w.processJob(ctx, logger, j)
case <-ctx.Done():
logger.Info("context cancelled, draining remaining jobs")
w.drain(ctx, logger)
return
}
}
}
// drain procesa los jobs que quedan en la cola después de que el contexto se cancele.
func (w *Worker) drain(ctx context.Context, logger *slog.Logger) {
// Crear un contexto con timeout para el drain
drainCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
for {
select {
case j, ok := <-w.queue.Dequeue():
if !ok {
logger.Info("queue drained")
return
}
w.processJob(drainCtx, logger, j)
default:
logger.Info("no more jobs to drain")
return
}
}
}Vamos a descomponer las decisiones importantes.
Concurrencia configurable. El Start lanza N goroutines, todas consumiendo del mismo channel. Go garantiza que cada job se entrega a exactamente una goroutine. No necesitas locks, no necesitas coordinación manual. El channel hace el trabajo de distribución.
sync.WaitGroup para esperar. Cada goroutine incrementa el contador al arrancar y lo decrementa al terminar. Start bloquea en wg.Wait() hasta que todas las goroutines han terminado. Esto es esencial para el shutdown ordenado.
Drain después de cancelación. Cuando el contexto se cancela (señal de shutdown), las goroutines no mueren inmediatamente. Primero procesan los jobs que quedan en la cola. El drain tiene su propio timeout de 30 segundos para evitar que un job lento bloquee el shutdown indefinidamente.
Si quieres profundizar en cómo funciona este patrón de múltiples goroutines consumiendo del mismo channel, tengo un artículo dedicado a worker pools en Go.
Procesamiento de jobs con context
Cada job se procesa con un context que permite controlar timeouts y cancelación. Aquí es donde el context de Go demuestra su valor.
// processJob procesa un job con retry.
func (w *Worker) processJob(ctx context.Context, logger *slog.Logger, j job.Job) {
processor, ok := w.processors[j.Type]
if !ok {
logger.Error("no processor registered for job type",
slog.String("job_id", j.ID),
slog.String("job_type", j.Type),
)
return
}
jobLogger := logger.With(
slog.String("job_id", j.ID),
slog.String("job_type", j.Type),
)
maxRetries := j.MaxRetry
if maxRetries == 0 {
maxRetries = w.config.MaxRetry
}
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
if attempt > 0 {
backoff := w.calculateBackoff(attempt)
jobLogger.Info("retrying job",
slog.Int("attempt", attempt),
slog.Duration("backoff", backoff),
)
select {
case <-time.After(backoff):
// Continuar con el retry
case <-ctx.Done():
jobLogger.Warn("context cancelled during backoff, aborting retry",
slog.Int("attempt", attempt),
)
return
}
}
// Crear un context con timeout para cada intento
jobCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
start := time.Now()
err := processor.Process(jobCtx, j)
duration := time.Since(start)
cancel()
if err == nil {
jobLogger.Info("job completed",
slog.Int("attempt", attempt),
slog.Duration("duration", duration),
)
return
}
lastErr = err
jobLogger.Warn("job failed",
slog.Int("attempt", attempt),
slog.Duration("duration", duration),
slog.String("error", err.Error()),
)
}
jobLogger.Error("job exhausted all retries",
slog.Int("max_retries", maxRetries),
slog.String("last_error", lastErr.Error()),
)
}Hay un patrón importante en el manejo del context dentro del loop de retry. Fíjate en el select durante el backoff:
select {
case <-time.After(backoff):
// Continuar con el retry
case <-ctx.Done():
// Abortar
return
}Sin este select, si el worker recibe una señal de shutdown mientras espera un backoff de 30 segundos, se quedaría bloqueado esos 30 segundos. Con el select, reacciona inmediatamente a la cancelación del contexto. Es un detalle que marca la diferencia entre un worker que tarda 30 segundos en apagarse y uno que responde en milisegundos.
Cada intento crea su propio context.WithTimeout. Esto significa que si un procesador se queda colgado, el timeout lo mata después de 60 segundos. El cancel() se llama inmediatamente después del procesamiento para liberar recursos del contexto, no en un defer, porque el loop continúa y crearía múltiples contextos sin liberar.
Retry logic: backoff simple
El retry con backoff exponencial es una de esas cosas que todo el mundo menciona pero pocos implementan bien. La idea es simple: cada reintento espera más tiempo que el anterior para dar tiempo al sistema a recuperarse.
// calculateBackoff calcula el tiempo de espera para un retry usando
// backoff exponencial con jitter.
func (w *Worker) calculateBackoff(attempt int) time.Duration {
backoff := w.config.InitialBackoff
// Backoff exponencial: 500ms, 1s, 2s, 4s, 8s...
for i := 1; i < attempt; i++ {
backoff *= 2
if backoff > w.config.MaxBackoff {
backoff = w.config.MaxBackoff
break
}
}
// Añadir jitter de ±25% para evitar thundering herd
jitter := time.Duration(float64(backoff) * 0.25)
min := backoff - jitter
max := backoff + jitter
// Usar el timestamp como fuente de aleatoriedad simple
// En producción usarías math/rand/v2
ns := time.Now().UnixNano()
spread := max - min
if spread > 0 {
backoff = min + time.Duration(ns%int64(spread))
}
return backoff
}El jitter es crucial. Sin él, si tienes 100 workers que fallan al mismo tiempo (porque la base de datos se cayó, por ejemplo), todos van a reintentar al mismo tiempo: 500ms después, luego 1s después, luego 2s después. Esto crea picos de carga que pueden empeorar la situación. El jitter distribuye los reintentos en una ventana temporal, reduciendo la presión sobre el sistema. Es el patrón de “thundering herd” y el jitter es la solución estándar.
El MaxBackoff pone un tope. Sin él, el backoff exponencial crece sin límite: 500ms, 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s… Más de 30 segundos entre reintentos rara vez tiene sentido. Si el sistema lleva más de 30 segundos caído, probablemente necesitas una alerta, no un retry más largo.
La implementación del jitter usa time.Now().UnixNano() como fuente de aleatoriedad. No es criptográficamente seguro, pero para backoff no necesitas seguridad: necesitas distribución. En producción puedes usar math/rand/v2 si prefieres algo más robusto.
Shutdown ordenado con señales del sistema operativo
El shutdown ordenado es lo que separa un worker de juguete de uno que puedes poner en producción. Sin él, cuando despliegas una nueva versión (o Kubernetes mata un pod), los jobs a medio procesar se pierden.
// main.go
package main
import (
"context"
"log/slog"
"os"
"os/signal"
"syscall"
"time"
"github.com/tu-usuario/worker-demo/job"
"github.com/tu-usuario/worker-demo/queue"
"github.com/tu-usuario/worker-demo/worker"
)
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
// Crear la cola con capacidad para 100 jobs
q := queue.NewInMemoryQueue(100)
// Configurar el worker
cfg := worker.DefaultConfig()
cfg.Concurrency = 3
w := worker.New(q, cfg, logger)
// Registrar processors
w.RegisterFunc("email", func(ctx context.Context, j job.Job) error {
to, _ := j.Payload["to"].(string)
subject, _ := j.Payload["subject"].(string)
logger.Info("sending email",
slog.String("to", to),
slog.String("subject", subject),
)
// Simular envío de email
select {
case <-time.After(2 * time.Second):
return nil
case <-ctx.Done():
return ctx.Err()
}
})
w.RegisterFunc("resize_image", func(ctx context.Context, j job.Job) error {
path, _ := j.Payload["path"].(string)
logger.Info("resizing image",
slog.String("path", path),
)
select {
case <-time.After(5 * time.Second):
return nil
case <-ctx.Done():
return ctx.Err()
}
})
// Crear contexto que se cancela con SIGINT o SIGTERM
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// Encolar algunos jobs de ejemplo
go func() {
jobs := []job.Job{
job.NewJob("email", map[string]any{
"to": "usuario@example.com",
"subject": "Bienvenido",
}),
job.NewJob("email", map[string]any{
"to": "admin@example.com",
"subject": "Nuevo registro",
}),
job.NewJob("resize_image", map[string]any{
"path": "/uploads/foto.jpg",
}),
job.NewJob("email", map[string]any{
"to": "soporte@example.com",
"subject": "Reporte diario",
}),
}
for _, j := range jobs {
if err := q.Enqueue(j); err != nil {
logger.Error("failed to enqueue job",
slog.String("error", err.Error()),
)
}
}
}()
// Arrancar el worker. Bloquea hasta que el contexto se cancele
// y todos los jobs en curso hayan terminado.
logger.Info("starting worker, press Ctrl+C to stop")
w.Start(ctx)
logger.Info("shutdown complete")
}La línea clave es signal.NotifyContext. Crea un contexto que se cancela automáticamente cuando el proceso recibe SIGINT (Ctrl+C) o SIGTERM (lo que envía Kubernetes, Docker o systemd antes de matar el proceso).
El flujo de shutdown es:
- El proceso recibe SIGINT o SIGTERM.
- El contexto se cancela.
- Las goroutines del worker detectan la cancelación en el
selectdel loop principal. - Cada goroutine drena los jobs restantes de la cola.
wg.Wait()espera a que todas las goroutines terminen.Startretorna y el proceso termina limpiamente.
Si algo va mal y una goroutine se queda colgada, el drain tiene un timeout de 30 segundos. Después de eso, la goroutine abandona y el proceso puede terminar. En un entorno como Kubernetes, esto te da tiempo suficiente para un shutdown ordenado antes de que el kubelet envíe un SIGKILL (que por defecto llega 30 segundos después del SIGTERM).
Segunda señal: kill inmediato
Un patrón útil en producción es escuchar una segunda señal para forzar el cierre inmediato:
// Alternativa: usar context manual para manejar doble señal
ctx, cancel := context.WithCancel(context.Background())
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigChan
logger.Info("received signal, starting graceful shutdown",
slog.String("signal", sig.String()),
)
cancel()
// Segunda señal: forzar cierre inmediato
sig = <-sigChan
logger.Error("received second signal, forcing shutdown",
slog.String("signal", sig.String()),
)
os.Exit(1)
}()Si el operador pulsa Ctrl+C una vez, el worker se apaga ordenadamente. Si pulsa Ctrl+C una segunda vez, el proceso muere inmediatamente. Es un patrón estándar en herramientas CLI de Go y da control al operador sin sacrificar la seguridad del shutdown ordenado.
Logging y observabilidad
Un worker sin logging es un worker que no puedes depurar en producción. Con slog (disponible desde Go 1.21), el logging estructurado es parte de la librería estándar.
Ya hemos integrado logging en todo el worker, pero vamos a repasar las prácticas importantes:
Logs estructurados con contexto
Cada log del worker incluye atributos que permiten filtrar y correlacionar:
logger.Info("job completed",
slog.String("job_id", j.ID),
slog.String("job_type", j.Type),
slog.Int("attempt", attempt),
slog.Duration("duration", duration),
)En JSON, esto produce:
{
"time": "2026-08-07T10:30:00Z",
"level": "INFO",
"msg": "job completed",
"job_id": "email-1691400600000000000",
"job_type": "email",
"attempt": 0,
"duration": "2.003s"
}Puedes buscar todos los logs de un job específico filtrando por job_id. Puedes ver todos los reintentos filtrando por attempt > 0. Puedes detectar jobs lentos filtrando por duration. Sin logs estructurados, estarías haciendo grep en texto libre y lamentándote.
Worker ID en cada goroutine
Cada goroutine del worker tiene un worker_id que se propaga a todos sus logs:
logger := w.logger.With(slog.Int("worker_id", id))Esto es fundamental para depurar problemas de concurrencia. Si un worker se queda colgado, puedes ver exactamente cuál es por su ID.
Métricas básicas sin Prometheus
Si no quieres añadir Prometheus (que es lo recomendable en producción), al menos puedes exponer métricas básicas con contadores atómicos:
type Metrics struct {
Processed atomic.Int64
Failed atomic.Int64
Retried atomic.Int64
}
func (m *Metrics) Log(logger *slog.Logger) {
logger.Info("worker metrics",
slog.Int64("processed", m.Processed.Load()),
slog.Int64("failed", m.Failed.Load()),
slog.Int64("retried", m.Retried.Load()),
)
}Puedes llamar a metrics.Log(logger) periódicamente con un ticker, o al final del shutdown para un resumen de la ejecución. No reemplaza un sistema de métricas real, pero te da visibilidad básica sin dependencias.
Testing del worker
Testear un worker es más simple de lo que parece si diseñas el código con testing en mente. La clave es que el Processor es una interfaz y la cola es inyectable.
Test del procesamiento básico
package worker_test
import (
"context"
"log/slog"
"os"
"sync/atomic"
"testing"
"time"
"github.com/tu-usuario/worker-demo/job"
"github.com/tu-usuario/worker-demo/queue"
"github.com/tu-usuario/worker-demo/worker"
)
func TestWorker_ProcessesJobs(t *testing.T) {
q := queue.NewInMemoryQueue(10)
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
cfg := worker.DefaultConfig()
cfg.Concurrency = 2
w := worker.New(q, cfg, logger)
var processed atomic.Int32
w.RegisterFunc("test", func(ctx context.Context, j job.Job) error {
processed.Add(1)
return nil
})
// Encolar 5 jobs
for i := 0; i < 5; i++ {
_ = q.Enqueue(job.NewJob("test", nil))
}
// Cerrar la cola para que el worker termine cuando haya procesado todo
q.Close()
// Arrancar el worker con timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
w.Start(ctx)
if processed.Load() != 5 {
t.Errorf("expected 5 processed jobs, got %d", processed.Load())
}
}El truco está en cerrar la cola antes de arrancar el worker. Cuando la cola está cerrada y vacía, el range sobre el channel termina y las goroutines salen. Así el test no se queda colgado esperando más jobs.
Test del retry
func TestWorker_RetriesOnFailure(t *testing.T) {
q := queue.NewInMemoryQueue(10)
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
cfg := worker.DefaultConfig()
cfg.Concurrency = 1
cfg.InitialBackoff = 10 * time.Millisecond // Backoff bajo para tests
cfg.MaxBackoff = 50 * time.Millisecond
w := worker.New(q, cfg, logger)
var attempts atomic.Int32
w.RegisterFunc("flaky", func(ctx context.Context, j job.Job) error {
count := attempts.Add(1)
if count < 3 {
return fmt.Errorf("transient error (attempt %d)", count)
}
return nil
})
j := job.NewJob("flaky", nil)
j.MaxRetry = 5
_ = q.Enqueue(j)
q.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
w.Start(ctx)
if attempts.Load() != 3 {
t.Errorf("expected 3 attempts, got %d", attempts.Load())
}
}Nota cómo el test configura un backoff muy bajo (10ms) para que no tarde siglos. En tests, siempre reduce los tiempos de espera. Un test que tarda 30 segundos por los backoffs es un test que nadie va a ejecutar.
Test del shutdown ordenado
func TestWorker_GracefulShutdown(t *testing.T) {
q := queue.NewInMemoryQueue(100)
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
cfg := worker.DefaultConfig()
cfg.Concurrency = 2
w := worker.New(q, cfg, logger)
var processed atomic.Int32
w.RegisterFunc("slow", func(ctx context.Context, j job.Job) error {
select {
case <-time.After(100 * time.Millisecond):
processed.Add(1)
return nil
case <-ctx.Done():
return ctx.Err()
}
})
// Encolar 10 jobs
for i := 0; i < 10; i++ {
_ = q.Enqueue(job.NewJob("slow", nil))
}
ctx, cancel := context.WithCancel(context.Background())
// Cancelar después de 300ms (suficiente para procesar algunos pero no todos)
go func() {
time.Sleep(300 * time.Millisecond)
cancel()
q.Close()
}()
w.Start(ctx)
count := processed.Load()
if count == 0 {
t.Error("expected at least some jobs to be processed")
}
if count == 10 {
t.Error("expected some jobs to NOT be processed (shutdown before completion)")
}
t.Logf("processed %d out of 10 jobs before shutdown", count)
}Este test verifica que el worker procesa algunos jobs y luego se detiene limpiamente cuando el contexto se cancela. No verifica un número exacto porque depende del timing, pero sí verifica que el shutdown funciona: ni cero (el worker no arrancó) ni diez (el worker ignoró la cancelación).
Para más sobre testing en Go, incluyendo subtests, table-driven tests y mocks, mira testing en Go.
De memoria a colas externas (Redis, Kafka)
La cola en memoria que hemos construido funciona. Pero tiene un problema evidente: si el proceso muere, los jobs se pierden. En producción, normalmente quieres persistencia.
La buena noticia es que el diseño del worker ya está preparado para esto. El worker no depende de InMemoryQueue directamente: consume de un <-chan job.Job. Podemos abstraer la cola detrás de una interfaz:
// Queue define la interfaz mínima para una cola de jobs.
type Queue interface {
Enqueue(j job.Job) error
Dequeue() <-chan job.Job
Close()
}Con esta interfaz, puedes implementar colas respaldadas por distintos sistemas sin tocar el worker:
Redis con listas
type RedisQueue struct {
client *redis.Client
key string
jobs chan job.Job
ctx context.Context
cancel context.CancelFunc
}
func (q *RedisQueue) startConsumer() {
go func() {
for {
select {
case <-q.ctx.Done():
close(q.jobs)
return
default:
// BRPOP bloquea hasta que hay un elemento o timeout
result, err := q.client.BRPop(q.ctx, 5*time.Second, q.key).Result()
if err != nil {
continue
}
var j job.Job
if err := json.Unmarshal([]byte(result[1]), &j); err != nil {
continue
}
q.jobs <- j
}
}
}()
}Redis con BRPOP es probablemente la opción más pragmática para la mayoría de proyectos. Tienes persistencia, es rápido, y si ya usas Redis para caché, no añades infraestructura nueva.
Kafka para alto volumen
type KafkaQueue struct {
reader *kafka.Reader
jobs chan job.Job
}
func (q *KafkaQueue) startConsumer() {
go func() {
for {
msg, err := q.reader.ReadMessage(context.Background())
if err != nil {
break
}
var j job.Job
if err := json.Unmarshal(msg.Value, &j); err != nil {
continue
}
q.jobs <- j
}
close(q.jobs)
}()
}Kafka tiene sentido cuando necesitas alto throughput, múltiples consumidores, o replay de eventos. Pero para la mayoría de proyectos es sobreingeniería. No uses Kafka porque “escala”: úsalo cuando realmente necesites las garantías que ofrece.
PostgreSQL como cola (SKIP LOCKED)
Una opción que se suele pasar por alto: usar PostgreSQL como cola con SELECT FOR UPDATE SKIP LOCKED.
func (q *PGQueue) fetchNextJob(ctx context.Context) (*job.Job, error) {
row := q.db.QueryRowContext(ctx, `
UPDATE jobs SET status = 'processing', started_at = NOW()
WHERE id = (
SELECT id FROM jobs
WHERE status = 'pending'
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id, type, payload, retries, max_retry, created_at
`)
var j job.Job
err := row.Scan(&j.ID, &j.Type, &j.Payload, &j.Retries, &j.MaxRetry, &j.CreatedAt)
return &j, err
}SKIP LOCKED es la clave: permite que múltiples workers consulten la tabla simultáneamente sin bloquearse entre sí. Cada worker toma un job diferente. Si ya tienes PostgreSQL en tu stack, esta opción te ahorra añadir Redis o Kafka solo para la cola.
Cuándo usar cada uno
| Sistema | Cuándo usarlo |
|---|---|
| Memoria | Prototipos, tests, jobs que puedes perder sin consecuencias |
| Redis | La mayoría de proyectos. Persistencia, velocidad, sin añadir mucha complejidad |
| PostgreSQL | Ya tienes Postgres y no quieres más infraestructura. Volumen bajo-medio |
| Kafka | Alto volumen, múltiples consumidores, replay de eventos, event sourcing |
La abstracción con la interfaz Queue te permite empezar con memoria y migrar a Redis o Kafka sin reescribir el worker. Diseña para el caso simple, prepara para el caso complejo.
Código completo y estructura del proyecto
Juntando todas las piezas, esta es la estructura final del proyecto:
worker-demo/
├── go.mod
├── main.go // Arranque, señales, configuración
├── job/
│ └── job.go // Definición de Job y Processor
├── queue/
│ └── queue.go // Cola en memoria (implementa Queue)
└── worker/
├── worker.go // Worker: consume, procesa, retry, drain
└── worker_test.go // TestsEl flujo completo:
main.goconfigura el logger, crea la cola, registra processors y arranca el worker con un contexto ligado a señales del sistema.- Los productores (una API, un cron, un event listener) encolan jobs con
queue.Enqueue(). - El worker consume jobs del channel, encuentra el processor correspondiente y lo ejecuta con un context con timeout.
- Si el job falla, el worker aplica backoff exponencial con jitter y reintenta hasta agotar el máximo de reintentos.
- Cuando llega SIGTERM, el contexto se cancela, las goroutines drenan los jobs restantes y el worker se apaga limpiamente.
La pieza invisible que sostiene el backend
Un worker no tiene la visibilidad de una API, pero es el que hace el trabajo pesado en segundo plano. Lo que hemos construido aquí cubre las decisiones que importan: jobs desacoplados del worker mediante una interfaz Processor, una cola en memoria con channels buffered que es thread-safe sin mutexes, concurrencia configurable con N goroutines consumiendo del mismo channel, y retry con backoff exponencial y jitter para evitar thundering herd. El shutdown ordenado con señales del sistema, drain con timeout y WaitGroup garantiza que no pierdes trabajo cuando despliegas. Y la interfaz Queue te deja migrar a Redis, Kafka o PostgreSQL sin tocar una línea del worker.
Dominar estos patrones de concurrencia, retry y shutdown te va a servir en cualquier proyecto Go. Y si quieres escalar la concurrencia más allá de lo que hemos visto, el siguiente paso natural es implementar worker pools en Go con channels como mecanismo de coordinación.


