Crear un worker en Go per processar jobs en segon pla
Tutorial per construir un worker en Go amb cua en memòria, context, retry, logging i shutdown ordenat. Backend real.

Les APIs s’emporten tota l’atenció. Cada tutorial de Go comença amb un endpoint REST, un handler, un JSON de resposta. Però la majoria de sistemes backend reals necessiten alguna cosa més: processos que consumeixen tasques en segon pla, reintenten fallades i s’apaguen de forma ordenada quan arriba un senyal del sistema operatiu. Això és un worker.
Un worker ben implementat t’ensenya més sobre backend real que molts CRUDs. Toca goroutines, channels, context, senyals de sistema, gestió d’errors, retry, logging estructurat i shutdown graceful. Tot el que un backend engineer necessita dominar i que rarament apareix als tutorials de “fes la teva primera API en 10 minuts”.
En aquest article construirem un worker complet des de zero. Cua en memòria, processament concurrent, retry amb backoff, logging amb slog i shutdown ordenat amb senyals del sistema operatiu. Codi que podries posar en producció.
Què és un background worker
Un worker és un procés que s’executa en segon pla consumint tasques d’una cua. No respon a peticions HTTP. No té endpoints. La seva única feina és agafar un job de la cua, processar-lo i passar al següent.
Exemples del món real:
- Enviar emails de confirmació després d’un registre.
- Processar imatges pujades pels usuaris (redimensionar, comprimir, generar thumbnails).
- Sincronitzar dades amb sistemes externs que tenen rate limits.
- Generar informes PDF que triguen diversos segons.
- Indexar documents en un motor de cerca.
El patró sempre és el mateix: alguna cosa produeix un job (una petició HTTP, un event, un cron) i el worker el consumeix de forma asíncrona. El productor no espera que el job acabi. Simplement l’encua i continua.
En Go, la implementació natural d’un worker fa servir goroutines per a la concurrència i channels com a cua de comunicació. No necessites llibreries externes per a un worker funcional. La llibreria estàndard et dóna tot el necessari.
Setup del projecte
Crearem un projecte net amb l’estructura mínima necessària:
worker-demo/
├── go.mod
├── main.go
├── job/
│ └── job.go
├── queue/
│ └── queue.go
└── worker/
└── worker.goInicialitzem el mòdul:
go mod init github.com/el-teu-usuari/worker-demoRes de frameworks, res de dependències externes. Tot amb la llibreria estàndard de Go. Quan necessitis alguna cosa més, ho afegiràs. Però primer, entén el mecanisme sense capes d’abstracció.
La interfície de Job i la cua
El primer pas és definir què és un job. Un job necessita tres coses: un identificador, un tipus i un mètode per executar-se.
// job/job.go
package job
import (
\"context\"
\"fmt\"
\"time\"
)
// Job representa una tasca que el worker pot processar.
type Job struct {
ID string
Type string
Payload map[string]any
CreatedAt time.Time
Retries int
MaxRetry int
}
// Processor defineix la interfície per processar un job.
type Processor interface {
Process(ctx context.Context, j Job) error
}
// ProcessorFunc és un adaptador per usar funcions simples com a Processor.
type ProcessorFunc func(ctx context.Context, j Job) error
func (f ProcessorFunc) Process(ctx context.Context, j Job) error {
return f(ctx, j)
}
// NewJob crea un job amb valors per defecte.
func NewJob(jobType string, payload map[string]any) Job {
return Job{
ID: fmt.Sprintf(\"%s-%d\", jobType, time.Now().UnixNano()),
Type: jobType,
Payload: payload,
CreatedAt: time.Now(),
MaxRetry: 3,
}
}Hi ha decisions importants aquí. El Payload és un map[string]any en lloc d’un tipus genèric. En un worker real podries usar generics o deserialitzar des de JSON, però un mapa flexible és suficient per començar i evita complexitat prematura.
El patró ProcessorFunc és el mateix que fa servir http.HandlerFunc a la llibreria estàndard: un adaptador que permet usar funcions simples allà on s’espera una interfície. Si véns de Java, és l’equivalent a un @FunctionalInterface.
MaxRetry té un valor per defecte de 3. No perquè sigui un número màgic, sinó perquè tres reintents és un punt de partida raonable per a la majoria d’operacions. Si el teu job és idempotent (i hauria de ser-ho), tres reintents et cobreixen contra fallades transitòries sense saturar el sistema.
Ara la cua. Una cua en memòria basada en un channel buffered:
// queue/queue.go
package queue
import (
\"errors\"
\"sync/atomic\"
\"github.com/el-teu-usuari/worker-demo/job\"
)
var ErrQueueClosed = errors.New(\"queue is closed\")
// InMemoryQueue és una cua de jobs basada en un channel buffered.
type InMemoryQueue struct {
jobs chan job.Job
closed atomic.Bool
}
// NewInMemoryQueue crea una cua amb la capacitat especificada.
func NewInMemoryQueue(capacity int) *InMemoryQueue {
return &InMemoryQueue{
jobs: make(chan job.Job, capacity),
}
}
// Enqueue afegeix un job a la cua. Retorna error si la cua està tancada.
func (q *InMemoryQueue) Enqueue(j job.Job) error {
if q.closed.Load() {
return ErrQueueClosed
}
select {
case q.jobs <- j:
return nil
default:
return errors.New(\"queue is full\")
}
}
// Dequeue retorna el channel de lectura per consumir jobs.
func (q *InMemoryQueue) Dequeue() <-chan job.Job {
return q.jobs
}
// Close tanca la cua. Els jobs que ja estan a la cua es poden continuar consumint.
func (q *InMemoryQueue) Close() {
if q.closed.CompareAndSwap(false, true) {
close(q.jobs)
}
}
// Len retorna el nombre de jobs pendents a la cua.
func (q *InMemoryQueue) Len() int {
return len(q.jobs)
}El select amb default a Enqueue és intencional. Si la cua és plena, retorna un error immediatament en lloc de bloquejar el productor. En producció, podries voler un comportament diferent (bloquejar, descartar el job més antic, o retornar un error al client), però fallar ràpid és el default més segur.
El atomic.Bool per a closed evita race conditions en tancar la cua. CompareAndSwap garanteix que close(q.jobs) es crida exactament una vegada, fins i tot si múltiples goroutines intenten tancar la cua simultàniament.
Fixa’t que Close() tanca el channel però no buida la cua. Els jobs que ja estan al buffer es poden continuar consumint. Això és crucial per al shutdown ordenat: tanques la cua perquè no entrin més jobs, però els que ja hi estan es processen fins al final.
Implementació del worker amb goroutines
Aquí està el nucli del sistema. El worker consumeix jobs de la cua, els processa fent servir el Processor corresponent i gestiona errors.
// worker/worker.go
package worker
import (
\"context\"
\"fmt\"
\"log/slog\"
\"sync\"
\"time\"
\"github.com/el-teu-usuari/worker-demo/job\"
\"github.com/el-teu-usuari/worker-demo/queue\"
)
// Config conté la configuració del worker.
type Config struct {
Concurrency int
MaxRetry int
InitialBackoff time.Duration
MaxBackoff time.Duration
}
// DefaultConfig retorna una configuració per defecte raonable.
func DefaultConfig() Config {
return Config{
Concurrency: 5,
MaxRetry: 3,
InitialBackoff: 500 * time.Millisecond,
MaxBackoff: 30 * time.Second,
}
}
// Worker consumeix jobs d'una cua i els processa.
type Worker struct {
queue *queue.InMemoryQueue
processors map[string]job.Processor
config Config
logger *slog.Logger
wg sync.WaitGroup
}
// New crea un nou Worker.
func New(q *queue.InMemoryQueue, cfg Config, logger *slog.Logger) *Worker {
return &Worker{
queue: q,
processors: make(map[string]job.Processor),
config: cfg,
logger: logger,
}
}
// Register associa un Processor a un tipus de job.
func (w *Worker) Register(jobType string, p job.Processor) {
w.processors[jobType] = p
}
// RegisterFunc és un drecera per registrar una funció com a Processor.
func (w *Worker) RegisterFunc(jobType string, fn func(context.Context, job.Job) error) {
w.processors[jobType] = job.ProcessorFunc(fn)
}
// Start arrenca el worker amb el context donat. Bloqueja fins que el context
// es cancel·li i tots els jobs en curs hagin acabat.
func (w *Worker) Start(ctx context.Context) {
w.logger.Info(\"worker starting\",
slog.Int(\"concurrency\", w.config.Concurrency),
slog.Int(\"registered_processors\", len(w.processors)),
)
for i := 0; i < w.config.Concurrency; i++ {
w.wg.Add(1)
go w.consume(ctx, i)
}
w.wg.Wait()
w.logger.Info(\"worker stopped: all goroutines finished\")
}
// consume és el loop principal de cada goroutine del worker.
func (w *Worker) consume(ctx context.Context, id int) {
defer w.wg.Done()
logger := w.logger.With(slog.Int(\"worker_id\", id))
logger.Info(\"worker goroutine started\")
for {
select {
case j, ok := <-w.queue.Dequeue():
if !ok {
logger.Info(\"queue closed, goroutine exiting\")
return
}
w.processJob(ctx, logger, j)
case <-ctx.Done():
logger.Info(\"context cancelled, draining remaining jobs\")
w.drain(ctx, logger)
return
}
}
}
// drain processa els jobs que queden a la cua després que el context es cancel·li.
func (w *Worker) drain(ctx context.Context, logger *slog.Logger) {
// Crear un context amb timeout per al drain
drainCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
for {
select {
case j, ok := <-w.queue.Dequeue():
if !ok {
logger.Info(\"queue drained\")
return
}
w.processJob(drainCtx, logger, j)
default:
logger.Info(\"no more jobs to drain\")
return
}
}
}Desglosem les decisions importants.
Concurrència configurable. El Start llança N goroutines, totes consumint del mateix channel. Go garanteix que cada job es lliura a exactament una goroutine. No necessites locks, no necessites coordinació manual. El channel fa la feina de distribució.
sync.WaitGroup per esperar. Cada goroutine incrementa el comptador en arrencar i el decrementa en acabar. Start bloqueja en wg.Wait() fins que totes les goroutines han acabat. Això és essencial per al shutdown ordenat.
Drain després de cancel·lació. Quan el context es cancel·la (senyal de shutdown), les goroutines no moren immediatament. Primer processen els jobs que queden a la cua. El drain té el seu propi timeout de 30 segons per evitar que un job lent bloquegi el shutdown indefinidament.
Si vols aprofundir en com funciona aquest patró de múltiples goroutines consumint del mateix channel, tinc un article dedicat a worker pools en Go.
Processament de jobs amb context
Cada job es processa amb un context que permet controlar timeouts i cancel·lació. Aquí és on el context de Go demostra el seu valor.
// processJob processa un job amb retry.
func (w *Worker) processJob(ctx context.Context, logger *slog.Logger, j job.Job) {
processor, ok := w.processors[j.Type]
if !ok {
logger.Error(\"no processor registered for job type\",
slog.String(\"job_id\", j.ID),
slog.String(\"job_type\", j.Type),
)
return
}
jobLogger := logger.With(
slog.String(\"job_id\", j.ID),
slog.String(\"job_type\", j.Type),
)
maxRetries := j.MaxRetry
if maxRetries == 0 {
maxRetries = w.config.MaxRetry
}
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
if attempt > 0 {
backoff := w.calculateBackoff(attempt)
jobLogger.Info(\"retrying job\",
slog.Int(\"attempt\", attempt),
slog.Duration(\"backoff\", backoff),
)
select {
case <-time.After(backoff):
// Continuar amb el retry
case <-ctx.Done():
jobLogger.Warn(\"context cancelled during backoff, aborting retry\",
slog.Int(\"attempt\", attempt),
)
return
}
}
// Crear un context amb timeout per a cada intent
jobCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
start := time.Now()
err := processor.Process(jobCtx, j)
duration := time.Since(start)
cancel()
if err == nil {
jobLogger.Info(\"job completed\",
slog.Int(\"attempt\", attempt),
slog.Duration(\"duration\", duration),
)
return
}
lastErr = err
jobLogger.Warn(\"job failed\",
slog.Int(\"attempt\", attempt),
slog.Duration(\"duration\", duration),
slog.String(\"error\", err.Error()),
)
}
jobLogger.Error(\"job exhausted all retries\",
slog.Int(\"max_retries\", maxRetries),
slog.String(\"last_error\", lastErr.Error()),
)
}Hi ha un patró important en la gestió del context dins del loop de retry. Fixa’t en el select durant el backoff:
select {
case <-time.After(backoff):
// Continuar amb el retry
case <-ctx.Done():
// Abortar
return
}Sense aquest select, si el worker rep un senyal de shutdown mentre espera un backoff de 30 segons, es quedaria bloquejat aquells 30 segons. Amb el select, reacciona immediatament a la cancel·lació del context. És un detall que marca la diferència entre un worker que triga 30 segons en apagar-se i un que respon en mil·lisegons.
Cada intent crea el seu propi context.WithTimeout. Això significa que si un processador es queda penjat, el timeout el mata després de 60 segons. El cancel() es crida immediatament després del processament per alliberar recursos del context, no en un defer, perquè el loop continua i crearia múltiples contextos sense alliberar.
Retry logic: backoff simple
El retry amb backoff exponencial és una d’aquelles coses que tothom menciona però pocs implementen bé. La idea és simple: cada reintent espera més temps que l’anterior per donar temps al sistema a recuperar-se.
// calculateBackoff calcula el temps d'espera per a un retry usant
// backoff exponencial amb jitter.
func (w *Worker) calculateBackoff(attempt int) time.Duration {
backoff := w.config.InitialBackoff
// Backoff exponencial: 500ms, 1s, 2s, 4s, 8s...
for i := 1; i < attempt; i++ {
backoff *= 2
if backoff > w.config.MaxBackoff {
backoff = w.config.MaxBackoff
break
}
}
// Afegir jitter de ±25% per evitar thundering herd
jitter := time.Duration(float64(backoff) * 0.25)
min := backoff - jitter
max := backoff + jitter
// Usar el timestamp com a font d'aleatorietat simple
// En producció usaries math/rand/v2
ns := time.Now().UnixNano()
spread := max - min
if spread > 0 {
backoff = min + time.Duration(ns%int64(spread))
}
return backoff
}El jitter és crucial. Sense ell, si tens 100 workers que fallen al mateix temps (perquè la base de dades ha caigut, per exemple), tots reintentaran al mateix temps: 500ms després, llavors 1s després, llavors 2s després. Això crea pics de càrrega que poden empitjorar la situació. El jitter distribueix els reintents en una finestra temporal, reduint la pressió sobre el sistema. És el patró de “thundering herd” i el jitter és la solució estàndard.
El MaxBackoff posa un límit. Sense ell, el backoff exponencial creix sense límit: 500ms, 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s… Més de 30 segons entre reintents rarament té sentit. Si el sistema porta més de 30 segons caigut, probablement necessites una alerta, no un retry més llarg.
La implementació del jitter usa time.Now().UnixNano() com a font d’aleatorietat. No és criptogràficament segur, però per a backoff no necessites seguretat: necessites distribució. En producció pots usar math/rand/v2 si prefereixes alguna cosa més robusta.
Shutdown ordenat amb senyals del sistema operatiu
El shutdown ordenat és el que separa un worker de joguina d’un que pots posar en producció. Sense ell, quan desplegues una nova versió (o Kubernetes mata un pod), els jobs a mig processar es perden.
// main.go
package main
import (
\"context\"
\"log/slog\"
\"os\"
\"os/signal\"
\"syscall\"
\"time\"
\"github.com/el-teu-usuari/worker-demo/job\"
\"github.com/el-teu-usuari/worker-demo/queue\"
\"github.com/el-teu-usuari/worker-demo/worker\"
)
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
// Crear la cua amb capacitat per a 100 jobs
q := queue.NewInMemoryQueue(100)
// Configurar el worker
cfg := worker.DefaultConfig()
cfg.Concurrency = 3
w := worker.New(q, cfg, logger)
// Registrar processors
w.RegisterFunc(\"email\", func(ctx context.Context, j job.Job) error {
to, _ := j.Payload[\"to\"].(string)
subject, _ := j.Payload[\"subject\"].(string)
logger.Info(\"sending email\",
slog.String(\"to\", to),
slog.String(\"subject\", subject),
)
// Simular enviament d'email
select {
case <-time.After(2 * time.Second):
return nil
case <-ctx.Done():
return ctx.Err()
}
})
w.RegisterFunc(\"resize_image\", func(ctx context.Context, j job.Job) error {
path, _ := j.Payload[\"path\"].(string)
logger.Info(\"resizing image\",
slog.String(\"path\", path),
)
select {
case <-time.After(5 * time.Second):
return nil
case <-ctx.Done():
return ctx.Err()
}
})
// Crear context que es cancel·la amb SIGINT o SIGTERM
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// Encuar alguns jobs d'exemple
go func() {
jobs := []job.Job{
job.NewJob(\"email\", map[string]any{
\"to\": \"usuari@example.com\",
\"subject\": \"Benvingut\",
}),
job.NewJob(\"email\", map[string]any{
\"to\": \"admin@example.com\",
\"subject\": \"Nou registre\",
}),
job.NewJob(\"resize_image\", map[string]any{
\"path\": \"/uploads/foto.jpg\",
}),
job.NewJob(\"email\", map[string]any{
\"to\": \"suport@example.com\",
\"subject\": \"Informe diari\",
}),
}
for _, j := range jobs {
if err := q.Enqueue(j); err != nil {
logger.Error(\"failed to enqueue job\",
slog.String(\"error\", err.Error()),
)
}
}
}()
// Arrencar el worker. Bloqueja fins que el context es cancel·li
// i tots els jobs en curs hagin acabat.
logger.Info(\"starting worker, press Ctrl+C to stop\")
w.Start(ctx)
logger.Info(\"shutdown complete\")
}La línia clau és signal.NotifyContext. Crea un context que es cancel·la automàticament quan el procés rep SIGINT (Ctrl+C) o SIGTERM (el que envien Kubernetes, Docker o systemd abans de matar el procés).
El flux de shutdown és:
- El procés rep SIGINT o SIGTERM.
- El context es cancel·la.
- Les goroutines del worker detecten la cancel·lació en el
selectdel loop principal. - Cada goroutine drena els jobs restants de la cua.
wg.Wait()espera que totes les goroutines acabin.Startretorna i el procés termina neteament.
Si alguna cosa va malament i una goroutine es queda penjada, el drain té un timeout de 30 segons. Després d’això, la goroutine abandona i el procés pot terminar. En un entorn com Kubernetes, això et dona temps suficient per a un shutdown ordenat abans que el kubelet enviï un SIGKILL (que per defecte arriba 30 segons després del SIGTERM).
Segona senyal: kill immediat
Un patró útil en producció és escoltar una segona senyal per forçar el tancament immediat:
// Alternativa: usar context manual per gestionar doble senyal
ctx, cancel := context.WithCancel(context.Background())
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigChan
logger.Info(\"received signal, starting graceful shutdown\",
slog.String(\"signal\", sig.String()),
)
cancel()
// Segona senyal: forçar tancament immediat
sig = <-sigChan
logger.Error(\"received second signal, forcing shutdown\",
slog.String(\"signal\", sig.String()),
)
os.Exit(1)
}()Si l’operador prem Ctrl+C una vegada, el worker s’apaga ordenadament. Si prem Ctrl+C una segona vegada, el procés mor immediatament. És un patró estàndard en eines CLI de Go i dona control a l’operador sense sacrificar la seguretat del shutdown ordenat.
Logging i observabilitat
Un worker sense logging és un worker que no pots depurar en producció. Amb slog (disponible des de Go 1.21), el logging estructurat forma part de la llibreria estàndard.
Ja hem integrat logging a tot el worker, però repassem les pràctiques importants:
Logs estructurats amb context
Cada log del worker inclou atributs que permeten filtrar i correlacionar:
logger.Info(\"job completed\",
slog.String(\"job_id\", j.ID),
slog.String(\"job_type\", j.Type),
slog.Int(\"attempt\", attempt),
slog.Duration(\"duration\", duration),
)En JSON, això produeix:
{
\"time\": \"2026-08-07T10:30:00Z\",
\"level\": \"INFO\",
\"msg\": \"job completed\",
\"job_id\": \"email-1691400600000000000\",
\"job_type\": \"email\",
\"attempt\": 0,
\"duration\": \"2.003s\"
}Pots cercar tots els logs d’un job específic filtrant per job_id. Pots veure tots els reintents filtrant per attempt > 0. Pots detectar jobs lents filtrant per duration. Sense logs estructurats, estaries fent grep en text lliure i lamentant-te.
Worker ID a cada goroutine
Cada goroutine del worker té un worker_id que es propaga a tots els seus logs:
logger := w.logger.With(slog.Int(\"worker_id\", id))Això és fonamental per depurar problemes de concurrència. Si un worker es queda penjat, pots veure exactament quin és pel seu ID.
Mètriques bàsiques sense Prometheus
Si no vols afegir Prometheus (que és el recomanable en producció), almenys pots exposar mètriques bàsiques amb comptadors atòmics:
type Metrics struct {
Processed atomic.Int64
Failed atomic.Int64
Retried atomic.Int64
}
func (m *Metrics) Log(logger *slog.Logger) {
logger.Info(\"worker metrics\",
slog.Int64(\"processed\", m.Processed.Load()),
slog.Int64(\"failed\", m.Failed.Load()),
slog.Int64(\"retried\", m.Retried.Load()),
)
}Pots cridar metrics.Log(logger) periòdicament amb un ticker, o al final del shutdown per a un resum de l’execució. No reemplaça un sistema de mètriques real, però et dona visibilitat bàsica sense dependències.
Testing del worker
Testar un worker és més simple del que sembla si dissenyes el codi amb testing en ment. La clau és que el Processor és una interfície i la cua és injectable.
Test del processament bàsic
package worker_test
import (
\"context\"
\"log/slog\"
\"os\"
\"sync/atomic\"
\"testing\"
\"time\"
\"github.com/el-teu-usuari/worker-demo/job\"
\"github.com/el-teu-usuari/worker-demo/queue\"
\"github.com/el-teu-usuari/worker-demo/worker\"
)
func TestWorker_ProcessesJobs(t *testing.T) {
q := queue.NewInMemoryQueue(10)
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
cfg := worker.DefaultConfig()
cfg.Concurrency = 2
w := worker.New(q, cfg, logger)
var processed atomic.Int32
w.RegisterFunc(\"test\", func(ctx context.Context, j job.Job) error {
processed.Add(1)
return nil
})
// Encuar 5 jobs
for i := 0; i < 5; i++ {
_ = q.Enqueue(job.NewJob(\"test\", nil))
}
// Tancar la cua perquè el worker acabi quan hagi processat tot
q.Close()
// Arrencar el worker amb timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
w.Start(ctx)
if processed.Load() != 5 {
t.Errorf(\"expected 5 processed jobs, got %d\", processed.Load())
}
}El truc és tancar la cua abans d’arrencar el worker. Quan la cua està tancada i buida, el range sobre el channel acaba i les goroutines surten. Així el test no es queda penjat esperant més jobs.
Test del retry
func TestWorker_RetriesOnFailure(t *testing.T) {
q := queue.NewInMemoryQueue(10)
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
cfg := worker.DefaultConfig()
cfg.Concurrency = 1
cfg.InitialBackoff = 10 * time.Millisecond // Backoff baix per a tests
cfg.MaxBackoff = 50 * time.Millisecond
w := worker.New(q, cfg, logger)
var attempts atomic.Int32
w.RegisterFunc(\"flaky\", func(ctx context.Context, j job.Job) error {
count := attempts.Add(1)
if count < 3 {
return fmt.Errorf(\"transient error (attempt %d)\", count)
}
return nil
})
j := job.NewJob(\"flaky\", nil)
j.MaxRetry = 5
_ = q.Enqueue(j)
q.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
w.Start(ctx)
if attempts.Load() != 3 {
t.Errorf(\"expected 3 attempts, got %d\", attempts.Load())
}
}Nota com el test configura un backoff molt baix (10ms) perquè no trigui segles. En tests, sempre redueix els temps d’espera. Un test que triga 30 segons pels backoffs és un test que ningú executarà.
Test del shutdown ordenat
func TestWorker_GracefulShutdown(t *testing.T) {
q := queue.NewInMemoryQueue(100)
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
cfg := worker.DefaultConfig()
cfg.Concurrency = 2
w := worker.New(q, cfg, logger)
var processed atomic.Int32
w.RegisterFunc(\"slow\", func(ctx context.Context, j job.Job) error {
select {
case <-time.After(100 * time.Millisecond):
processed.Add(1)
return nil
case <-ctx.Done():
return ctx.Err()
}
})
// Encuar 10 jobs
for i := 0; i < 10; i++ {
_ = q.Enqueue(job.NewJob(\"slow\", nil))
}
ctx, cancel := context.WithCancel(context.Background())
// Cancel·lar després de 300ms (suficient per processar alguns però no tots)
go func() {
time.Sleep(300 * time.Millisecond)
cancel()
q.Close()
}()
w.Start(ctx)
count := processed.Load()
if count == 0 {
t.Error(\"expected at least some jobs to be processed\")
}
if count == 10 {
t.Error(\"expected some jobs to NOT be processed (shutdown before completion)\")
}
t.Logf(\"processed %d out of 10 jobs before shutdown\", count)
}Aquest test verifica que el worker processa alguns jobs i llavors s’atura neteament quan el context es cancel·la. No verifica un nombre exacte perquè depèn del timing, però sí verifica que el shutdown funciona: ni zero (el worker no ha arrencat) ni deu (el worker ha ignorat la cancel·lació).
Per a més sobre testing en Go, incloent subtests, table-driven tests i mocks, mira testing en Go.
De memòria a cues externes (Redis, Kafka)
La cua en memòria que hem construït funciona. Però té un problema evident: si el procés mor, els jobs es perden. En producció, normalment vols persistència.
La bona notícia és que el disseny del worker ja està preparat per a això. El worker no depèn de InMemoryQueue directament: consumeix d’un <-chan job.Job. Podem abstraure la cua darrere d’una interfície:
// Queue defineix la interfície mínima per a una cua de jobs.
type Queue interface {
Enqueue(j job.Job) error
Dequeue() <-chan job.Job
Close()
}Amb aquesta interfície, pots implementar cues respaldades per diferents sistemes sense tocar el worker:
Redis amb llistes
type RedisQueue struct {
client *redis.Client
key string
jobs chan job.Job
ctx context.Context
cancel context.CancelFunc
}
func (q *RedisQueue) startConsumer() {
go func() {
for {
select {
case <-q.ctx.Done():
close(q.jobs)
return
default:
// BRPOP bloqueja fins que hi ha un element o timeout
result, err := q.client.BRPop(q.ctx, 5*time.Second, q.key).Result()
if err != nil {
continue
}
var j job.Job
if err := json.Unmarshal([]byte(result[1]), &j); err != nil {
continue
}
q.jobs <- j
}
}
}()
}Redis amb BRPOP és probablement l’opció més pragmàtica per a la majoria de projectes. Tens persistència, és ràpid, i si ja fas servir Redis per a caché, no afegeixes infraestructura nova.
Kafka per a alt volum
type KafkaQueue struct {
reader *kafka.Reader
jobs chan job.Job
}
func (q *KafkaQueue) startConsumer() {
go func() {
for {
msg, err := q.reader.ReadMessage(context.Background())
if err != nil {
break
}
var j job.Job
if err := json.Unmarshal(msg.Value, &j); err != nil {
continue
}
q.jobs <- j
}
close(q.jobs)
}()
}Kafka té sentit quan necessites alt throughput, múltiples consumidors, o replay d’events. Però per a la majoria de projectes és sobreenginyeria. No facis servir Kafka perquè “escala”: fes-lo servir quan realment necessitis les garanties que ofereix.
PostgreSQL com a cua (SKIP LOCKED)
Una opció que sovint es passa per alt: usar PostgreSQL com a cua amb SELECT FOR UPDATE SKIP LOCKED.
func (q *PGQueue) fetchNextJob(ctx context.Context) (*job.Job, error) {
row := q.db.QueryRowContext(ctx, `
UPDATE jobs SET status = 'processing', started_at = NOW()
WHERE id = (
SELECT id FROM jobs
WHERE status = 'pending'
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id, type, payload, retries, max_retry, created_at
`)
var j job.Job
err := row.Scan(&j.ID, &j.Type, &j.Payload, &j.Retries, &j.MaxRetry, &j.CreatedAt)
return &j, err
}SKIP LOCKED és la clau: permet que múltiples workers consultin la taula simultàniament sense bloquejar-se entre si. Cada worker agafa un job diferent. Si ja tens PostgreSQL al teu stack, aquesta opció t’estalvia afegir Redis o Kafka només per a la cua.
Quan usar cadascun
| Sistema | Quan usar-lo |
|---|---|
| Memòria | Prototips, tests, jobs que pots perdre sense conseqüències |
| Redis | La majoria de projectes. Persistència, velocitat, sense afegir gaire complexitat |
| PostgreSQL | Ja tens Postgres i no vols més infraestructura. Volum baix-mitjà |
| Kafka | Alt volum, múltiples consumidors, replay d’events, event sourcing |
L’abstracció amb la interfície Queue et permet començar amb memòria i migrar a Redis o Kafka sense reescriure el worker. Dissenya per al cas simple, prepara per al cas complex.
Codi complet i estructura del projecte
Juntant totes les peces, aquesta és l’estructura final del projecte:
worker-demo/
├── go.mod
├── main.go // Arrencada, senyals, configuració
├── job/
│ └── job.go // Definició de Job i Processor
├── queue/
│ └── queue.go // Cua en memòria (implementa Queue)
└── worker/
├── worker.go // Worker: consumeix, processa, retry, drain
└── worker_test.go // TestsEl flux complet:
main.goconfigura el logger, crea la cua, registra processors i arrenca el worker amb un context lligat a senyals del sistema.- Els productors (una API, un cron, un event listener) encuen jobs amb
queue.Enqueue(). - El worker consumeix jobs del channel, troba el processor corresponent i l’executa amb un context amb timeout.
- Si el job falla, el worker aplica backoff exponencial amb jitter i reintenta fins a esgotar el màxim de reintents.
- Quan arriba SIGTERM, el context es cancel·la, les goroutines drenen els jobs restants i el worker s’apaga neteament.
La peça invisible que sosté el backend
Un worker no té la visibilitat d’una API, però és el que fa la feina pesada en segon pla. El que hem construït aquí cobreix les decisions que importen: jobs desacoblats del worker mitjançant una interfície Processor, una cua en memòria amb channels buffered que és thread-safe sense mutexes, concurrència configurable amb N goroutines consumint del mateix channel, i retry amb backoff exponencial i jitter per evitar thundering herd. El shutdown ordenat amb senyals del sistema, drain amb timeout i WaitGroup garanteix que no perds feina quan desplegues. I la interfície Queue et deixa migrar a Redis, Kafka o PostgreSQL sense tocar una línia del worker.
Dominar aquests patrons de concurrència, retry i shutdown et servirà en qualsevol projecte Go. I si vols escalar la concurrència més enllà del que hem vist, el proper pas natural és implementar worker pools en Go amb channels com a mecanisme de coordinació.


