Build a background job worker in Go

Tutorial to build a worker in Go with an in-memory queue, context, retry, logging and graceful shutdown. Real backend.

Cover for Build a background job worker in Go

APIs get all the attention. Every Go tutorial starts with a REST endpoint, a handler, a JSON response. But most real backend systems need something more: processes that consume tasks in the background, retry failures, and shut down cleanly when the operating system sends a signal. That’s a worker.

A well-implemented worker teaches you more about real backend work than many CRUDs. It touches goroutines, channels, context, system signals, error handling, retry, structured logging, and graceful shutdown. Everything a backend engineer needs to master that rarely appears in “build your first API in 10 minutes” tutorials.

In this article we are going to build a complete worker from scratch. In-memory queue, concurrent processing, retry with backoff, logging with slog, and graceful shutdown with OS signals. Code you could put into production.


What is a background worker

A worker is a process that runs in the background consuming tasks from a queue. It doesn’t respond to HTTP requests. It has no endpoints. Its only job is to take a job from the queue, process it, and move to the next one.

Real-world examples:

  • Sending confirmation emails after a registration.
  • Processing images uploaded by users (resize, compress, generate thumbnails).
  • Syncing data with external systems that have rate limits.
  • Generating PDF reports that take several seconds.
  • Indexing documents in a search engine.

The pattern is always the same: something produces a job (an HTTP request, an event, a cron) and the worker consumes it asynchronously. The producer doesn’t wait for the job to finish. It just enqueues it and moves on.

In Go, the natural implementation of a worker uses goroutines for concurrency and channels as the communication queue. You don’t need external libraries for a working worker. The standard library gives you everything you need.


Project setup

We’ll create a clean project with the minimal necessary structure:

worker-demo/
├── go.mod
├── main.go
├── job/
│   └── job.go
├── queue/
│   └── queue.go
└── worker/
    └── worker.go

Initialize the module:

go mod init github.com/your-username/worker-demo

No frameworks, no external dependencies. Everything with the Go standard library. When you need something more, you’ll add it. But first, understand the mechanism without layers of abstraction.


The Job interface and the queue

The first step is defining what a job is. A job needs three things: an identifier, a type, and a method to execute itself.

// job/job.go
package job

import (
    "context"
    "fmt"
    "time"
)

// Job represents a task the worker can process.
type Job struct {
    ID        string
    Type      string
    Payload   map[string]any
    CreatedAt time.Time
    Retries   int
    MaxRetry  int
}

// Processor defines the interface for processing a job.
type Processor interface {
    Process(ctx context.Context, j Job) error
}

// ProcessorFunc is an adapter to use simple functions as a Processor.
type ProcessorFunc func(ctx context.Context, j Job) error

func (f ProcessorFunc) Process(ctx context.Context, j Job) error {
    return f(ctx, j)
}

// NewJob creates a job with default values.
func NewJob(jobType string, payload map[string]any) Job {
    return Job{
        ID:        fmt.Sprintf("%s-%d", jobType, time.Now().UnixNano()),
        Type:      jobType,
        Payload:   payload,
        CreatedAt: time.Now(),
        MaxRetry:  3,
    }
}

There are important decisions here. The Payload is a map[string]any rather than a generic type. In a real worker you might use generics or deserialize from JSON, but a flexible map is enough to get started and avoids premature complexity.

The ProcessorFunc pattern is the same one used by http.HandlerFunc in the standard library: an adapter that allows using simple functions where an interface is expected. If you come from Java, it’s the equivalent of a @FunctionalInterface.

MaxRetry has a default value of 3. Not because it’s a magic number, but because three retries is a reasonable starting point for most operations. If your job is idempotent (and it should be), three retries cover you against transient failures without saturating the system.

Now the queue. An in-memory queue based on a buffered channel:

// queue/queue.go
package queue

import (
    "errors"
    "sync/atomic"

    "github.com/your-username/worker-demo/job"
)

var ErrQueueClosed = errors.New("queue is closed")

// InMemoryQueue is a job queue based on a buffered channel.
type InMemoryQueue struct {
    jobs   chan job.Job
    closed atomic.Bool
}

// NewInMemoryQueue creates a queue with the specified capacity.
func NewInMemoryQueue(capacity int) *InMemoryQueue {
    return &InMemoryQueue{
        jobs: make(chan job.Job, capacity),
    }
}

// Enqueue adds a job to the queue. Returns an error if the queue is closed.
func (q *InMemoryQueue) Enqueue(j job.Job) error {
    if q.closed.Load() {
        return ErrQueueClosed
    }

    select {
    case q.jobs <- j:
        return nil
    default:
        return errors.New("queue is full")
    }
}

// Dequeue returns the read channel for consuming jobs.
func (q *InMemoryQueue) Dequeue() <-chan job.Job {
    return q.jobs
}

// Close closes the queue. Jobs already in the queue can still be consumed.
func (q *InMemoryQueue) Close() {
    if q.closed.CompareAndSwap(false, true) {
        close(q.jobs)
    }
}

// Len returns the number of pending jobs in the queue.
func (q *InMemoryQueue) Len() int {
    return len(q.jobs)
}

The select with default in Enqueue is intentional. If the queue is full, it returns an error immediately instead of blocking the producer. In production, you might want different behavior (block, discard the oldest job, or return an error to the client), but failing fast is the safest default.

The atomic.Bool for closed avoids race conditions when closing the queue. CompareAndSwap guarantees that close(q.jobs) is called exactly once, even if multiple goroutines try to close the queue simultaneously.

Note that Close() closes the channel but doesn’t drain the queue. Jobs already in the buffer can still be consumed. This is crucial for graceful shutdown: you close the queue so no more jobs come in, but the ones already there are processed to completion.


Worker implementation with goroutines

Here is the core of the system. The worker consumes jobs from the queue, processes them using the corresponding Processor, and handles errors.

// worker/worker.go
package worker

import (
    "context"
    "fmt"
    "log/slog"
    "sync"
    "time"

    "github.com/your-username/worker-demo/job"
    "github.com/your-username/worker-demo/queue"
)

// Config holds the worker configuration.
type Config struct {
    Concurrency    int
    MaxRetry       int
    InitialBackoff time.Duration
    MaxBackoff     time.Duration
}

// DefaultConfig returns a sensible default configuration.
func DefaultConfig() Config {
    return Config{
        Concurrency:    5,
        MaxRetry:       3,
        InitialBackoff: 500 * time.Millisecond,
        MaxBackoff:     30 * time.Second,
    }
}

// Worker consumes jobs from a queue and processes them.
type Worker struct {
    queue      *queue.InMemoryQueue
    processors map[string]job.Processor
    config     Config
    logger     *slog.Logger
    wg         sync.WaitGroup
}

// New creates a new Worker.
func New(q *queue.InMemoryQueue, cfg Config, logger *slog.Logger) *Worker {
    return &Worker{
        queue:      q,
        processors: make(map[string]job.Processor),
        config:     cfg,
        logger:     logger,
    }
}

// Register associates a Processor with a job type.
func (w *Worker) Register(jobType string, p job.Processor) {
    w.processors[jobType] = p
}

// RegisterFunc is a shortcut for registering a function as a Processor.
func (w *Worker) RegisterFunc(jobType string, fn func(context.Context, job.Job) error) {
    w.processors[jobType] = job.ProcessorFunc(fn)
}

// Start launches the worker with the given context. It blocks until the context
// is cancelled and all in-progress jobs have finished.
func (w *Worker) Start(ctx context.Context) {
    w.logger.Info("worker starting",
        slog.Int("concurrency", w.config.Concurrency),
        slog.Int("registered_processors", len(w.processors)),
    )

    for i := 0; i < w.config.Concurrency; i++ {
        w.wg.Add(1)
        go w.consume(ctx, i)
    }

    w.wg.Wait()
    w.logger.Info("worker stopped: all goroutines finished")
}

// consume is the main loop for each worker goroutine.
func (w *Worker) consume(ctx context.Context, id int) {
    defer w.wg.Done()

    logger := w.logger.With(slog.Int("worker_id", id))
    logger.Info("worker goroutine started")

    for {
        select {
        case j, ok := <-w.queue.Dequeue():
            if !ok {
                logger.Info("queue closed, goroutine exiting")
                return
            }
            w.processJob(ctx, logger, j)

        case <-ctx.Done():
            logger.Info("context cancelled, draining remaining jobs")
            w.drain(ctx, logger)
            return
        }
    }
}

// drain processes jobs remaining in the queue after the context is cancelled.
func (w *Worker) drain(ctx context.Context, logger *slog.Logger) {
    // Create a context with timeout for the drain
    drainCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    for {
        select {
        case j, ok := <-w.queue.Dequeue():
            if !ok {
                logger.Info("queue drained")
                return
            }
            w.processJob(drainCtx, logger, j)
        default:
            logger.Info("no more jobs to drain")
            return
        }
    }
}

Let’s break down the important decisions.

Configurable concurrency. Start launches N goroutines, all consuming from the same channel. Go guarantees that each job is delivered to exactly one goroutine. You don’t need locks, you don’t need manual coordination. The channel does the distribution work.

sync.WaitGroup to wait. Each goroutine increments the counter at startup and decrements it when done. Start blocks on wg.Wait() until all goroutines have finished. This is essential for graceful shutdown.

Drain after cancellation. When the context is cancelled (shutdown signal), goroutines don’t die immediately. They first process the jobs remaining in the queue. The drain has its own 30-second timeout to prevent a slow job from blocking the shutdown indefinitely.

If you want to go deeper into how this pattern of multiple goroutines consuming from the same channel works, I have a dedicated article on worker pools in Go.


Job processing with context

Each job is processed with a context that allows controlling timeouts and cancellation. This is where Go’s context shows its value.

// processJob processes a job with retry.
func (w *Worker) processJob(ctx context.Context, logger *slog.Logger, j job.Job) {
    processor, ok := w.processors[j.Type]
    if !ok {
        logger.Error("no processor registered for job type",
            slog.String("job_id", j.ID),
            slog.String("job_type", j.Type),
        )
        return
    }

    jobLogger := logger.With(
        slog.String("job_id", j.ID),
        slog.String("job_type", j.Type),
    )

    maxRetries := j.MaxRetry
    if maxRetries == 0 {
        maxRetries = w.config.MaxRetry
    }

    var lastErr error

    for attempt := 0; attempt <= maxRetries; attempt++ {
        if attempt > 0 {
            backoff := w.calculateBackoff(attempt)
            jobLogger.Info("retrying job",
                slog.Int("attempt", attempt),
                slog.Duration("backoff", backoff),
            )

            select {
            case <-time.After(backoff):
                // Continue with the retry
            case <-ctx.Done():
                jobLogger.Warn("context cancelled during backoff, aborting retry",
                    slog.Int("attempt", attempt),
                )
                return
            }
        }

        // Create a context with timeout for each attempt
        jobCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
        start := time.Now()

        err := processor.Process(jobCtx, j)
        duration := time.Since(start)
        cancel()

        if err == nil {
            jobLogger.Info("job completed",
                slog.Int("attempt", attempt),
                slog.Duration("duration", duration),
            )
            return
        }

        lastErr = err
        jobLogger.Warn("job failed",
            slog.Int("attempt", attempt),
            slog.Duration("duration", duration),
            slog.String("error", err.Error()),
        )
    }

    jobLogger.Error("job exhausted all retries",
        slog.Int("max_retries", maxRetries),
        slog.String("last_error", lastErr.Error()),
    )
}

There is an important pattern in the context handling inside the retry loop. Notice the select during the backoff:

select {
case <-time.After(backoff):
    // Continue with the retry
case <-ctx.Done():
    // Abort
    return
}

Without this select, if the worker receives a shutdown signal while waiting for a 30-second backoff, it would be blocked for those 30 seconds. With the select, it reacts immediately to context cancellation. It’s a detail that makes the difference between a worker that takes 30 seconds to shut down and one that responds in milliseconds.

Each attempt creates its own context.WithTimeout. This means that if a processor gets stuck, the timeout kills it after 60 seconds. cancel() is called immediately after processing to release context resources, not in a defer, because the loop continues and would create multiple contexts without releasing them.


Retry logic: simple backoff

Retry with exponential backoff is one of those things everyone mentions but few implement well. The idea is simple: each retry waits longer than the previous one to give the system time to recover.

// calculateBackoff calculates the wait time for a retry using
// exponential backoff with jitter.
func (w *Worker) calculateBackoff(attempt int) time.Duration {
    backoff := w.config.InitialBackoff

    // Exponential backoff: 500ms, 1s, 2s, 4s, 8s...
    for i := 1; i < attempt; i++ {
        backoff *= 2
        if backoff > w.config.MaxBackoff {
            backoff = w.config.MaxBackoff
            break
        }
    }

    // Add ±25% jitter to avoid thundering herd
    jitter := time.Duration(float64(backoff) * 0.25)
    min := backoff - jitter
    max := backoff + jitter

    // Use the timestamp as a simple source of randomness
    // In production you would use math/rand/v2
    ns := time.Now().UnixNano()
    spread := max - min
    if spread > 0 {
        backoff = min + time.Duration(ns%int64(spread))
    }

    return backoff
}

Jitter is crucial. Without it, if you have 100 workers that fail at the same time (because the database went down, for example), they will all retry at the same time: 500ms later, then 1s later, then 2s later. This creates load spikes that can worsen the situation. Jitter distributes retries across a time window, reducing pressure on the system. This is the “thundering herd” pattern and jitter is the standard solution.

MaxBackoff puts a cap on it. Without it, exponential backoff grows without bound: 500ms, 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s… More than 30 seconds between retries rarely makes sense. If the system has been down for more than 30 seconds, you probably need an alert, not a longer retry.

The jitter implementation uses time.Now().UnixNano() as a source of randomness. It’s not cryptographically secure, but for backoff you don’t need security: you need distribution. In production you can use math/rand/v2 if you prefer something more robust.


Graceful shutdown with OS signals

Graceful shutdown is what separates a toy worker from one you can put into production. Without it, when you deploy a new version (or Kubernetes kills a pod), jobs being processed are lost.

// main.go
package main

import (
    "context"
    "log/slog"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/your-username/worker-demo/job"
    "github.com/your-username/worker-demo/queue"
    "github.com/your-username/worker-demo/worker"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
        Level: slog.LevelInfo,
    }))

    // Create the queue with capacity for 100 jobs
    q := queue.NewInMemoryQueue(100)

    // Configure the worker
    cfg := worker.DefaultConfig()
    cfg.Concurrency = 3

    w := worker.New(q, cfg, logger)

    // Register processors
    w.RegisterFunc("email", func(ctx context.Context, j job.Job) error {
        to, _ := j.Payload["to"].(string)
        subject, _ := j.Payload["subject"].(string)

        logger.Info("sending email",
            slog.String("to", to),
            slog.String("subject", subject),
        )

        // Simulate email sending
        select {
        case <-time.After(2 * time.Second):
            return nil
        case <-ctx.Done():
            return ctx.Err()
        }
    })

    w.RegisterFunc("resize_image", func(ctx context.Context, j job.Job) error {
        path, _ := j.Payload["path"].(string)

        logger.Info("resizing image",
            slog.String("path", path),
        )

        select {
        case <-time.After(5 * time.Second):
            return nil
        case <-ctx.Done():
            return ctx.Err()
        }
    })

    // Create context that cancels on SIGINT or SIGTERM
    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    // Enqueue some example jobs
    go func() {
        jobs := []job.Job{
            job.NewJob("email", map[string]any{
                "to":      "user@example.com",
                "subject": "Welcome",
            }),
            job.NewJob("email", map[string]any{
                "to":      "admin@example.com",
                "subject": "New registration",
            }),
            job.NewJob("resize_image", map[string]any{
                "path": "/uploads/photo.jpg",
            }),
            job.NewJob("email", map[string]any{
                "to":      "support@example.com",
                "subject": "Daily report",
            }),
        }

        for _, j := range jobs {
            if err := q.Enqueue(j); err != nil {
                logger.Error("failed to enqueue job",
                    slog.String("error", err.Error()),
                )
            }
        }
    }()

    // Start the worker. Blocks until the context is cancelled
    // and all in-progress jobs have finished.
    logger.Info("starting worker, press Ctrl+C to stop")
    w.Start(ctx)

    logger.Info("shutdown complete")
}

The key line is signal.NotifyContext. It creates a context that automatically cancels when the process receives SIGINT (Ctrl+C) or SIGTERM (what Kubernetes, Docker, or systemd send before killing the process).

The shutdown flow is:

  1. The process receives SIGINT or SIGTERM.
  2. The context is cancelled.
  3. The worker goroutines detect the cancellation in the select of the main loop.
  4. Each goroutine drains the remaining jobs from the queue.
  5. wg.Wait() waits for all goroutines to finish.
  6. Start returns and the process terminates cleanly.

If something goes wrong and a goroutine gets stuck, the drain has a 30-second timeout. After that, the goroutine gives up and the process can terminate. In an environment like Kubernetes, this gives you enough time for a clean shutdown before the kubelet sends a SIGKILL (which by default arrives 30 seconds after the SIGTERM).

Second signal: immediate kill

A useful pattern in production is to listen for a second signal to force immediate shutdown:

// Alternative: use a manual context to handle double signal
ctx, cancel := context.WithCancel(context.Background())

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

go func() {
    sig := <-sigChan
    logger.Info("received signal, starting graceful shutdown",
        slog.String("signal", sig.String()),
    )
    cancel()

    // Second signal: force immediate shutdown
    sig = <-sigChan
    logger.Error("received second signal, forcing shutdown",
        slog.String("signal", sig.String()),
    )
    os.Exit(1)
}()

If the operator presses Ctrl+C once, the worker shuts down cleanly. If they press Ctrl+C a second time, the process dies immediately. It’s a standard pattern in Go CLI tools and gives the operator control without sacrificing the safety of graceful shutdown.


Logging and observability

A worker without logging is a worker you can’t debug in production. With slog (available since Go 1.21), structured logging is part of the standard library.

We’ve already integrated logging throughout the worker, but let’s review the important practices:

Structured logs with context

Each worker log includes attributes that allow filtering and correlation:

logger.Info("job completed",
    slog.String("job_id", j.ID),
    slog.String("job_type", j.Type),
    slog.Int("attempt", attempt),
    slog.Duration("duration", duration),
)

In JSON, this produces:

{
    "time": "2026-08-07T10:30:00Z",
    "level": "INFO",
    "msg": "job completed",
    "job_id": "email-1691400600000000000",
    "job_type": "email",
    "attempt": 0,
    "duration": "2.003s"
}

You can search all logs for a specific job by filtering on job_id. You can see all retries by filtering on attempt > 0. You can detect slow jobs by filtering on duration. Without structured logs, you’d be doing grep on free text and regretting it.

Worker ID in each goroutine

Each worker goroutine has a worker_id that propagates to all its logs:

logger := w.logger.With(slog.Int("worker_id", id))

This is fundamental for debugging concurrency issues. If a worker gets stuck, you can see exactly which one by its ID.

Basic metrics without Prometheus

If you don’t want to add Prometheus (which is the recommended choice in production), you can at least expose basic metrics with atomic counters:

type Metrics struct {
    Processed atomic.Int64
    Failed    atomic.Int64
    Retried   atomic.Int64
}

func (m *Metrics) Log(logger *slog.Logger) {
    logger.Info("worker metrics",
        slog.Int64("processed", m.Processed.Load()),
        slog.Int64("failed", m.Failed.Load()),
        slog.Int64("retried", m.Retried.Load()),
    )
}

You can call metrics.Log(logger) periodically with a ticker, or at the end of shutdown for a summary of the execution. It doesn’t replace a real metrics system, but it gives you basic visibility without dependencies.


Testing the worker

Testing a worker is simpler than it seems if you design the code with testing in mind. The key is that Processor is an interface and the queue is injectable.

Test for basic processing

package worker_test

import (
    "context"
    "log/slog"
    "os"
    "sync/atomic"
    "testing"
    "time"

    "github.com/your-username/worker-demo/job"
    "github.com/your-username/worker-demo/queue"
    "github.com/your-username/worker-demo/worker"
)

func TestWorker_ProcessesJobs(t *testing.T) {
    q := queue.NewInMemoryQueue(10)
    logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

    cfg := worker.DefaultConfig()
    cfg.Concurrency = 2

    w := worker.New(q, cfg, logger)

    var processed atomic.Int32

    w.RegisterFunc("test", func(ctx context.Context, j job.Job) error {
        processed.Add(1)
        return nil
    })

    // Enqueue 5 jobs
    for i := 0; i < 5; i++ {
        _ = q.Enqueue(job.NewJob("test", nil))
    }

    // Close the queue so the worker finishes when everything is processed
    q.Close()

    // Start the worker with a timeout
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    w.Start(ctx)

    if processed.Load() != 5 {
        t.Errorf("expected 5 processed jobs, got %d", processed.Load())
    }
}

The trick is closing the queue before starting the worker. When the queue is closed and empty, the range over the channel ends and goroutines exit. This way the test doesn’t hang waiting for more jobs.

Test for retry

func TestWorker_RetriesOnFailure(t *testing.T) {
    q := queue.NewInMemoryQueue(10)
    logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

    cfg := worker.DefaultConfig()
    cfg.Concurrency = 1
    cfg.InitialBackoff = 10 * time.Millisecond // Low backoff for tests
    cfg.MaxBackoff = 50 * time.Millisecond

    w := worker.New(q, cfg, logger)

    var attempts atomic.Int32

    w.RegisterFunc("flaky", func(ctx context.Context, j job.Job) error {
        count := attempts.Add(1)
        if count < 3 {
            return fmt.Errorf("transient error (attempt %d)", count)
        }
        return nil
    })

    j := job.NewJob("flaky", nil)
    j.MaxRetry = 5
    _ = q.Enqueue(j)

    q.Close()

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    w.Start(ctx)

    if attempts.Load() != 3 {
        t.Errorf("expected 3 attempts, got %d", attempts.Load())
    }
}

Note how the test configures a very low backoff (10ms) so it doesn’t take ages. In tests, always reduce wait times. A test that takes 30 seconds because of backoffs is a test nobody will run.

Test for graceful shutdown

func TestWorker_GracefulShutdown(t *testing.T) {
    q := queue.NewInMemoryQueue(100)
    logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

    cfg := worker.DefaultConfig()
    cfg.Concurrency = 2

    w := worker.New(q, cfg, logger)

    var processed atomic.Int32

    w.RegisterFunc("slow", func(ctx context.Context, j job.Job) error {
        select {
        case <-time.After(100 * time.Millisecond):
            processed.Add(1)
            return nil
        case <-ctx.Done():
            return ctx.Err()
        }
    })

    // Enqueue 10 jobs
    for i := 0; i < 10; i++ {
        _ = q.Enqueue(job.NewJob("slow", nil))
    }

    ctx, cancel := context.WithCancel(context.Background())

    // Cancel after 300ms (enough to process some but not all)
    go func() {
        time.Sleep(300 * time.Millisecond)
        cancel()
        q.Close()
    }()

    w.Start(ctx)

    count := processed.Load()
    if count == 0 {
        t.Error("expected at least some jobs to be processed")
    }
    if count == 10 {
        t.Error("expected some jobs to NOT be processed (shutdown before completion)")
    }
    t.Logf("processed %d out of 10 jobs before shutdown", count)
}

This test verifies that the worker processes some jobs and then stops cleanly when the context is cancelled. It doesn’t verify an exact number because it depends on timing, but it does verify that shutdown works: neither zero (the worker didn’t start) nor ten (the worker ignored the cancellation).

For more on testing in Go, including subtests, table-driven tests, and mocks, see testing in Go.


From memory to external queues (Redis, Kafka)

The in-memory queue we built works. But it has an obvious problem: if the process dies, jobs are lost. In production, you usually want persistence.

The good news is that the worker design is already ready for this. The worker doesn’t depend on InMemoryQueue directly: it consumes from a <-chan job.Job. We can abstract the queue behind an interface:

// Queue defines the minimal interface for a job queue.
type Queue interface {
    Enqueue(j job.Job) error
    Dequeue() <-chan job.Job
    Close()
}

With this interface, you can implement queues backed by different systems without touching the worker:

Redis with lists

type RedisQueue struct {
    client *redis.Client
    key    string
    jobs   chan job.Job
    ctx    context.Context
    cancel context.CancelFunc
}

func (q *RedisQueue) startConsumer() {
    go func() {
        for {
            select {
            case <-q.ctx.Done():
                close(q.jobs)
                return
            default:
                // BRPOP blocks until there is an element or timeout
                result, err := q.client.BRPop(q.ctx, 5*time.Second, q.key).Result()
                if err != nil {
                    continue
                }

                var j job.Job
                if err := json.Unmarshal([]byte(result[1]), &j); err != nil {
                    continue
                }
                q.jobs <- j
            }
        }
    }()
}

Redis with BRPOP is probably the most pragmatic option for most projects. You have persistence, it’s fast, and if you already use Redis for caching, you’re not adding new infrastructure.

Kafka for high volume

type KafkaQueue struct {
    reader *kafka.Reader
    jobs   chan job.Job
}

func (q *KafkaQueue) startConsumer() {
    go func() {
        for {
            msg, err := q.reader.ReadMessage(context.Background())
            if err != nil {
                break
            }

            var j job.Job
            if err := json.Unmarshal(msg.Value, &j); err != nil {
                continue
            }
            q.jobs <- j
        }
        close(q.jobs)
    }()
}

Kafka makes sense when you need high throughput, multiple consumers, or event replay. But for most projects it’s over-engineering. Don’t use Kafka because it “scales”: use it when you actually need the guarantees it offers.

PostgreSQL as a queue (SKIP LOCKED)

An option that often gets overlooked: using PostgreSQL as a queue with SELECT FOR UPDATE SKIP LOCKED.

func (q *PGQueue) fetchNextJob(ctx context.Context) (*job.Job, error) {
    row := q.db.QueryRowContext(ctx, `
        UPDATE jobs SET status = 'processing', started_at = NOW()
        WHERE id = (
            SELECT id FROM jobs
            WHERE status = 'pending'
            ORDER BY created_at
            FOR UPDATE SKIP LOCKED
            LIMIT 1
        )
        RETURNING id, type, payload, retries, max_retry, created_at
    `)

    var j job.Job
    err := row.Scan(&j.ID, &j.Type, &j.Payload, &j.Retries, &j.MaxRetry, &j.CreatedAt)
    return &j, err
}

SKIP LOCKED is the key: it allows multiple workers to query the table simultaneously without blocking each other. Each worker picks a different job. If you already have PostgreSQL in your stack, this option saves you from adding Redis or Kafka just for the queue.

When to use each one

SystemWhen to use it
MemoryPrototypes, tests, jobs you can lose without consequences
RedisMost projects. Persistence, speed, without adding much complexity
PostgreSQLYou already have Postgres and don’t want more infrastructure. Low-medium volume
KafkaHigh volume, multiple consumers, event replay, event sourcing

The abstraction with the Queue interface lets you start with memory and migrate to Redis or Kafka without rewriting the worker. Design for the simple case, prepare for the complex one.


Complete code and project structure

Putting all the pieces together, this is the final project structure:

worker-demo/
├── go.mod
├── main.go            // Startup, signals, configuration
├── job/
│   └── job.go         // Job and Processor definitions
├── queue/
│   └── queue.go       // In-memory queue (implements Queue)
└── worker/
    ├── worker.go      // Worker: consume, process, retry, drain
    └── worker_test.go // Tests

The complete flow:

  1. main.go configures the logger, creates the queue, registers processors, and starts the worker with a context bound to OS signals.
  2. Producers (an API, a cron, an event listener) enqueue jobs with queue.Enqueue().
  3. The worker consumes jobs from the channel, finds the corresponding processor, and executes it with a context that has a timeout.
  4. If the job fails, the worker applies exponential backoff with jitter and retries until it exhausts the maximum number of retries.
  5. When SIGTERM arrives, the context is cancelled, goroutines drain the remaining jobs, and the worker shuts down cleanly.

The invisible piece that holds the backend together

A worker doesn’t have the visibility of an API, but it’s the one doing the heavy lifting in the background. What we’ve built here covers the decisions that matter: jobs decoupled from the worker through a Processor interface, an in-memory queue with buffered channels that is thread-safe without mutexes, configurable concurrency with N goroutines consuming from the same channel, and retry with exponential backoff and jitter to avoid thundering herd. Graceful shutdown with OS signals, drain with timeout, and WaitGroup guarantees you don’t lose work when you deploy. And the Queue interface lets you migrate to Redis, Kafka, or PostgreSQL without touching a line of the worker.

Mastering these patterns of concurrency, retry, and shutdown will serve you in any Go project. And if you want to scale concurrency beyond what we’ve seen here, the natural next step is to implement worker pools in Go with channels as the coordination mechanism.

OshyTech

Backend and data engineering focused on scalable systems, automation, and AI.

Navigation

Copyright 2026 OshyTech. All Rights Reserved