Worker pools en Go: processar tasques concurrents de forma controlada

Com construir un worker pool en Go amb channels, context, control d'errors i tancament ordenat. Concurrència amb límits.

Cover for Worker pools en Go: processar tasques concurrents de forma controlada

Llançar 10.000 goroutines és fàcil. Controlar-les és el problema real d’enginyeria.

Go et dona goroutines barates i channels per comunicar-les. Però “barat” no significa “gratuït”, i “fàcil de crear” no equival a “segur d’operar”. En producció, un go func() descontrolat per cada petició entrant és una forma elegant de tirar avall el teu propi servei. Sense límit de concurrència, sense control d’errors, sense manera neta de parar: això no és arquitectura, és una bomba de rellotgeria.

Un worker pool resol exactament això. Defineix un nombre fix de goroutines que consumeixen tasques d’un channel, les processen i retornen resultats. Res més. Res menys. És el patró més pragmàtic que existeix per a concurrència controlada en Go.


Per què necessites worker pools

Si el teu servei rep 500 peticions per segon i per cada una llances una goroutine que fa una crida HTTP externa, tindràs 500 connexions simultànies obertes. Si el servei extern tarda més de l’habitual, s’acumulen. 1.000. 5.000. 20.000. El teu servei consumeix tota la memòria disponible, el kernel comença a rebutjar connexions, i l’on-call rep una alerta a les 3 de la matinada.

Un worker pool limita la concurrència a un nombre controlat. Si tens 20 workers, mai hi haurà més de 20 tasques executant-se alhora. Les altres esperen al channel. Així de simple.

Els casos d’ús més habituals:

  • Rate limiting cap a serveis externs: APIs amb límits de peticions per segon.
  • Control de recursos: connexions a base de dades, descriptors de fitxer, memòria.
  • Processament batch: milers de registres que cal processar en paral·lel però sense ofegar el sistema.
  • Pipelines de dades: llegir de Kafka, processar i escriure el resultat, tot amb backpressure natural.

El channel actua com a cua de treball amb backpressure incorporat. Si els workers no donen l’abast, el channel s’omple i el productor es bloqueja fins que hi ha lloc. No necessites implementar res extra: la mecànica dels channels ja t’ho dona.


El patró bàsic: jobs, workers, results

L’estructura fonamental d’un worker pool en Go té tres components:

  1. Un channel de jobs per on entren les tasques.
  2. N goroutines (workers) que llegeixen del channel de jobs, processen la tasca i envien el resultat.
  3. Un channel de results per on surten els resultats.
package main

import (
    \"fmt\"
    \"sync\"
    \"time\"
)

type Job struct {
    ID    int
    Input string
}

type Result struct {
    JobID  int
    Output string
    Err    error
}

func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        // Simular treball
        time.Sleep(100 * time.Millisecond)
        results <- Result{
            JobID:  job.ID,
            Output: fmt.Sprintf(\"worker %d va processar: %s\", id, job.Input),
        }
    }
}

func main() {
    const numWorkers = 5
    const numJobs = 20

    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)

    var wg sync.WaitGroup

    // Llançar workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }

    // Enviar jobs
    for j := 1; j <= numJobs; j++ {
        jobs <- Job{ID: j, Input: fmt.Sprintf(\"tasca-%d\", j)}
    }
    close(jobs)

    // Esperar que acabin i tancar results
    go func() {
        wg.Wait()
        close(results)
    }()

    // Recollir resultats
    for r := range results {
        if r.Err != nil {
            fmt.Printf(\"Job %d ha fallat: %v\n\", r.JobID, r.Err)
        } else {
            fmt.Printf(\"Job %d: %s\n\", r.JobID, r.Output)
        }
    }
}

Fixa’t en els tipus dels channels a la signatura de worker: jobs <-chan Job és de només lectura i results chan<- Result és de només escriptura. Això no és decoració. El compilador t’impedeix fer operacions incorrectes sobre el channel. Si un worker intenta tancar jobs, no compila. Si intenta llegir de results, tampoc. Usar-los així és una bona pràctica que evita bugs subtils.

El sync.WaitGroup coordina el tancament. Quan tots els workers acaben (perquè jobs es va tancar i han buidat el channel), el wg.Wait() es desbloqueja i tanca results. Llavors el range results a main acaba de forma natural.


Implementació pas a pas

Anem a construir un worker pool més robust, pas a pas. Partim d’alguna cosa real: un servei que necessita validar URLs en paral·lel.

Pas 1: definir els tipus

type URLJob struct {
    ID  int
    URL string
}

type URLResult struct {
    JobID      int
    URL        string
    StatusCode int
    Duration   time.Duration
    Err        error
}

Inclou sempre un camp Err al resultat. Els workers fallaran, i el resultat ha de transportar aquell error de tornada al consumidor sense trencar el flux.

Pas 2: el worker

func urlWorker(id int, jobs <-chan URLJob, results chan<- URLResult, wg *sync.WaitGroup) {
    defer wg.Done()
    client := &http.Client{Timeout: 5 * time.Second}

    for job := range jobs {
        start := time.Now()
        resp, err := client.Get(job.URL)
        duration := time.Since(start)

        result := URLResult{
            JobID:    job.ID,
            URL:      job.URL,
            Duration: duration,
        }

        if err != nil {
            result.Err = err
        } else {
            result.StatusCode = resp.StatusCode
            resp.Body.Close()
        }

        results <- result
    }
}

Cada worker té el seu propi http.Client amb timeout. No comparteixen client perquè no és necessari: http.Client és segur per a ús concurrent, però que cada worker tingui el seu simplifica el raonament. El resp.Body.Close() dins del else és obligatori: si no tanques el body, estàs filtrant connexions TCP.

Pas 3: orquestrar

func validateURLs(urls []string, concurrency int) []URLResult {
    jobs := make(chan URLJob, len(urls))
    results := make(chan URLResult, len(urls))
    var wg sync.WaitGroup

    // Llançar workers
    for i := 0; i < concurrency; i++ {
        wg.Add(1)
        go urlWorker(i, jobs, results, &wg)
    }

    // Enviar jobs
    for i, url := range urls {
        jobs <- URLJob{ID: i, URL: url}
    }
    close(jobs)

    // Tancar results quan tots els workers acabin
    go func() {
        wg.Wait()
        close(results)
    }()

    // Recollir resultats
    var allResults []URLResult
    for r := range results {
        allResults = append(allResults, r)
    }

    return allResults
}

El buffer del channel de jobs té mida len(urls) perquè enviem totes les tasques de cop. En un escenari real on els jobs arriben de forma contínua (un stream de Kafka, un endpoint HTTP), usaries un buffer més petit perquè el backpressure funcioni correctament.


Cancel·lació amb context

L’exemple anterior funciona, però no es pot cancel·lar. Si necessites aturar el pool perquè l’usuari ha cancel·lat la petició, perquè has excedit un timeout global, o perquè has rebut un senyal del sistema operatiu, necessites context.

func urlWorkerWithCtx(
    ctx context.Context,
    id int,
    jobs <-chan URLJob,
    results chan<- URLResult,
    wg *sync.WaitGroup,
) {
    defer wg.Done()
    client := &http.Client{Timeout: 5 * time.Second}

    for job := range jobs {
        // Comprovar cancel·lació abans de processar
        select {
        case <-ctx.Done():
            return
        default:
        }

        start := time.Now()
        req, err := http.NewRequestWithContext(ctx, http.MethodGet, job.URL, nil)
        if err != nil {
            results <- URLResult{JobID: job.ID, URL: job.URL, Err: err}
            continue
        }

        resp, err := client.Do(req)
        duration := time.Since(start)

        result := URLResult{
            JobID:    job.ID,
            URL:      job.URL,
            Duration: duration,
        }

        if err != nil {
            result.Err = err
        } else {
            result.StatusCode = resp.StatusCode
            resp.Body.Close()
        }

        results <- result
    }
}

Hi ha dos nivells de cancel·lació aquí:

  1. El select a l’inici del loop: si el context s’ha cancel·lat entre job i job, el worker surt immediatament sense processar més tasques.
  2. http.NewRequestWithContext: si el context es cancel·la durant la petició HTTP, aquesta s’avorta immediatament.

Per a l’orquestrador, el canvi és mínim:

func validateURLsWithTimeout(urls []string, concurrency int, timeout time.Duration) []URLResult {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    jobs := make(chan URLJob, len(urls))
    results := make(chan URLResult, len(urls))
    var wg sync.WaitGroup

    for i := 0; i < concurrency; i++ {
        wg.Add(1)
        go urlWorkerWithCtx(ctx, i, jobs, results, &wg)
    }

    // Enviar jobs respectant la cancel·lació
    go func() {
        defer close(jobs)
        for i, url := range urls {
            select {
            case jobs <- URLJob{ID: i, URL: url}:
            case <-ctx.Done():
                return
            }
        }
    }()

    go func() {
        wg.Wait()
        close(results)
    }()

    var allResults []URLResult
    for r := range results {
        allResults = append(allResults, r)
    }

    return allResults
}

L’enviament de jobs ara també usa select amb el context. Si el timeout s’exhaureix a meitat de l’enviament, deixem d’enviar. Sense això, el productor podria quedar-se bloquejat intentant escriure en un channel que ningú llegeix (perquè els workers ja han sortit).


Gestió d’errors en workers

Hi ha diverses estratègies per a errors en workers. La que triis depèn del teu cas d’ús.

Estratègia 1: error per resultat (la més comuna)

Cada resultat porta el seu propi error. El consumidor decideix què fer amb cada fallada individualment.

for r := range results {
    if r.Err != nil {
        log.Printf(\"Job %d ha fallat: %v\", r.JobID, r.Err)
        continue
    }
    processarResultat(r)
}

És la més flexible. Si 3 de 100 jobs fallen, processes els 97 que han anat bé i registres els 3 que han fallat. Ideal per a processament batch on les fallades individuals no invaliden el lot complet.

Estratègia 2: fallar ràpid (cancel on first error)

Si un sol error invalida tota l’operació, cancel·la el context al primer error:

func validateURLsFailFast(urls []string, concurrency int) ([]URLResult, error) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    jobs := make(chan URLJob, len(urls))
    results := make(chan URLResult, len(urls))
    var wg sync.WaitGroup

    for i := 0; i < concurrency; i++ {
        wg.Add(1)
        go urlWorkerWithCtx(ctx, i, jobs, results, &wg)
    }

    go func() {
        defer close(jobs)
        for i, url := range urls {
            select {
            case jobs <- URLJob{ID: i, URL: url}:
            case <-ctx.Done():
                return
            }
        }
    }()

    go func() {
        wg.Wait()
        close(results)
    }()

    var allResults []URLResult
    for r := range results {
        if r.Err != nil {
            cancel() // Cancel·la tots els workers
            return allResults, fmt.Errorf(\"job %d ha fallat: %w\", r.JobID, r.Err)
        }
        allResults = append(allResults, r)
    }

    return allResults, nil
}

En cridar cancel(), els workers que estan processant avorten la seva petició HTTP actual (gràcies a NewRequestWithContext), i els que esperen jobs surten en comprovar ctx.Done().

Estratègia 3: acumular errors

Recollir tots els errors i retornar-los agrupats:

var errs []error
for r := range results {
    if r.Err != nil {
        errs = append(errs, fmt.Errorf(\"job %d: %w\", r.JobID, r.Err))
    }
    allResults = append(allResults, r)
}

if len(errs) > 0 {
    return allResults, errors.Join(errs...)
}

errors.Join (Go 1.20+) combina múltiples errors en un de sol que es pot inspeccionar amb errors.Is i errors.As. Útil quan necessites reportar totes les fallades al caller però no vols parar al primer.


Tancament ordenat: tancar channels correctament

El tancament mal gestionat és la font número u de deadlocks i panics en worker pools. Les regles són simples però inflexibles:

  1. Només el productor tanca el channel de jobs. Els workers mai tanquen jobs.
  2. Només la goroutine que sap que tots els workers han acabat tanca el channel de results. Normalment és una goroutine que fa wg.Wait().
  3. Mai enviïs a un channel tancat. Provoca panic.
  4. Mai tanquis un channel més d’una vegada. També provoca panic.

El patró correcte que hem vist:

// 1. El productor envia i tanca jobs
go func() {
    defer close(jobs) // Es tanca quan el productor acaba
    for _, item := range items {
        select {
        case jobs <- item:
        case <-ctx.Done():
            return
        }
    }
}()

// 2. Goroutine dedicada espera i tanca results
go func() {
    wg.Wait()        // Espera tots els workers
    close(results)   // Només llavors tanca results
}()

// 3. Main recull resultats
for r := range results {
    // ...
}

Què passa si no segueixes aquest ordre? Un escenari típic de deadlock:

// MALAMENT: deadlock si results no té buffer suficient
for j := range jobList {
    jobs <- j
}
close(jobs)

// Això mai s'executa si els workers han omplert results
// i estan bloquejats esperant que algú llegeixi
wg.Wait()
close(results)

El problema: el productor envia tots els jobs de forma síncrona. Si el channel de results s’omple i ningú l’està llegint encara, els workers es bloquegen en intentar enviar el resultat. I com que el productor no pot avançar fins a enviar-ho tot, i ningú llegeix results fins que el productor tanca jobs… deadlock.

La solució és sempre enviar els jobs en una goroutine separada, com als exemples anteriors.


Escalat dinàmic de workers

De vegades un nombre fix de workers no és suficient. Potser vols escalar segons la càrrega: més workers quan hi ha molts jobs pendents, menys quan la cua és buida.

type Pool struct {
    jobs       chan Job
    results    chan Result
    maxWorkers int
    ctx        context.Context
    cancel     context.CancelFunc
    wg         sync.WaitGroup
}

func NewPool(maxWorkers, jobBuffer int) *Pool {
    ctx, cancel := context.WithCancel(context.Background())
    return &Pool{
        jobs:       make(chan Job, jobBuffer),
        results:    make(chan Result, jobBuffer),
        maxWorkers: maxWorkers,
        ctx:        ctx,
        cancel:     cancel,
    }
}

func (p *Pool) Start(numWorkers int) {
    for i := 0; i < numWorkers; i++ {
        p.addWorker()
    }
}

func (p *Pool) addWorker() {
    p.wg.Add(1)
    go func() {
        defer p.wg.Done()
        for {
            select {
            case <-p.ctx.Done():
                return
            case job, ok := <-p.jobs:
                if !ok {
                    return
                }
                result := process(job)
                select {
                case p.results <- result:
                case <-p.ctx.Done():
                    return
                }
            }
        }
    }()
}

func (p *Pool) ScaleUp(n int) {
    for i := 0; i < n; i++ {
        p.addWorker()
    }
}

func (p *Pool) Submit(job Job) {
    select {
    case p.jobs <- job:
    case <-p.ctx.Done():
    }
}

func (p *Pool) Shutdown() {
    close(p.jobs)
    p.wg.Wait()
    close(p.results)
}

func (p *Pool) Stop() {
    p.cancel()
    p.wg.Wait()
    close(p.results)
}

Shutdown és un tancament ordenat: deixa que els workers acabin els jobs que ja tenen. Stop és un tancament immediat: cancel·la el context i els workers surten tan aviat com poden.

Escalar cap avall és més complicat. No pots matar una goroutine des de fora en Go: ha de decidir sortir ella mateixa. Una opció és usar un channel de “quit” individual per worker, o simplement acceptar que reduir workers significa no llançar-ne de nous quan els actuals acabin les seves tasques.

A la pràctica, l’escalat dinàmic rarament és necessari. Un pool amb un nombre fix de workers ben dimensionat cobreix el 90% dels casos. Només el necessites si la càrrega és molt variable i el cost de tenir workers ociosos és significatiu (per exemple, si cada worker manté una connexió costosa oberta).


Patró semàfor com a alternativa

Si l’únic que necessites és limitar el nombre de goroutines simultànies i no necessites channels de results ni l’estructura completa d’un worker pool, el paquet golang.org/x/sync/semaphore ofereix una solució més lleugera:

import \"golang.org/x/sync/semaphore\"

func processAll(ctx context.Context, items []string, maxConcurrency int64) error {
    sem := semaphore.NewWeighted(maxConcurrency)
    var wg sync.WaitGroup
    var mu sync.Mutex
    var firstErr error

    for _, item := range items {
        if err := sem.Acquire(ctx, 1); err != nil {
            return err // Context cancel·lat
        }

        wg.Add(1)
        go func(item string) {
            defer wg.Done()
            defer sem.Release(1)

            if err := processItem(ctx, item); err != nil {
                mu.Lock()
                if firstErr == nil {
                    firstErr = err
                }
                mu.Unlock()
            }
        }(item)
    }

    wg.Wait()
    return firstErr
}

semaphore.Acquire bloqueja quan ja hi ha maxConcurrency goroutines executant-se. És com un worker pool implícit: en lloc de pre-crear N workers que llegeixen d’un channel, crees goroutines sota demanda però mai més de N alhora.

L’avantatge: menys boilerplate. El desavantatge: perds l’estructura clara de jobs/results. Per a tasques fire-and-forget o quan només t’importa si hi ha hagut error (no recollir resultats individuals), el semàfor és suficient.

També pots simular un semàfor amb un channel sense necessitat de dependències externes:

func processAllWithChan(ctx context.Context, items []string, maxConcurrency int) error {
    sem := make(chan struct{}, maxConcurrency)
    var wg sync.WaitGroup
    errCh := make(chan error, 1)

    for _, item := range items {
        select {
        case sem <- struct{}{}:
        case <-ctx.Done():
            break
        }

        wg.Add(1)
        go func(item string) {
            defer wg.Done()
            defer func() { <-sem }()

            if err := processItem(ctx, item); err != nil {
                select {
                case errCh <- err:
                default:
                }
            }
        }(item)
    }

    wg.Wait()

    select {
    case err := <-errCh:
        return err
    default:
        return nil
    }
}

Un channel de struct{} amb buffer N funciona com a semàfor. Enviar al channel “adquireix” un slot, rebre’l “l’allibera”. Zero dependències extra.


errgroup: l’opció simple per a molts casos

Si estàs pensant “aquest worker pool té molt boilerplate per al que necessito”, probablement tens raó. Per a molts escenaris, golang.org/x/sync/errgroup fa exactament el que necessites amb una fracció del codi:

import \"golang.org/x/sync/errgroup\"

func processAllSimple(ctx context.Context, items []string) error {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(10) // Màxim 10 goroutines simultànies

    for _, item := range items {
        item := item // Go < 1.22
        g.Go(func() error {
            return processItem(ctx, item)
        })
    }

    return g.Wait()
}

errgroup.SetLimit(10) fa que g.Go() bloquegi quan ja hi ha 10 goroutines executant-se. És un worker pool amb semàfor integrat, cancel·lació automàtica al primer error, i Wait() que retorna el primer error trobat. Exactament el mateix que hem construït a mà a les seccions anteriors, però en 10 línies.

errgroup.WithContext cancel·la el context derivat quan qualsevol funció retorna un error. Això significa que la resta de goroutines poden detectar la cancel·lació a través de ctx.Done().

Quan errgroup és suficient

  • Processar una llista finita de tasques en paral·lel amb límit de concurrència.
  • Quan només necessites saber si hi ha hagut error, no recollir resultats individuals.
  • Fan-out simple: llançar N operacions paral·leles i esperar que totes acabin.

Quan necessites un worker pool real

  • Els jobs arriben de forma contínua (no una llista finita).
  • Necessites un channel de resultats per processar outputs individualment.
  • Necessites escalat dinàmic de workers.
  • Necessites lògica de retry per worker.
  • Necessites mètriques per worker (jobs processats, latència, errors).

Exemple real: processar webhooks d’una API en paral·lel

Anem a ajuntar-ho tot en un exemple més proper a producció. Un servei que rep webhooks, els encua i els processa amb un pool de workers. Cada webhook requereix fer una crida a una API externa i persistir el resultat.

package main

import (
    \"context\"
    \"encoding/json\"
    \"fmt\"
    \"log\"
    \"net/http\"
    \"sync\"
    \"time\"
)

type Webhook struct {
    ID        string          `json:\"id\"`
    EventType string          `json:\"event_type\"`
    Payload   json.RawMessage `json:\"payload\"`
    Received  time.Time
}

type ProcessResult struct {
    WebhookID string
    Success   bool
    Duration  time.Duration
    Err       error
}

type WebhookProcessor struct {
    jobs       chan Webhook
    results    chan ProcessResult
    numWorkers int
    client     *http.Client
    ctx        context.Context
    cancel     context.CancelFunc
    wg         sync.WaitGroup
}

func NewWebhookProcessor(numWorkers, queueSize int) *WebhookProcessor {
    ctx, cancel := context.WithCancel(context.Background())
    return &WebhookProcessor{
        jobs:       make(chan Webhook, queueSize),
        results:    make(chan ProcessResult, queueSize),
        numWorkers: numWorkers,
        client:     &http.Client{Timeout: 10 * time.Second},
        ctx:        ctx,
        cancel:     cancel,
    }
}

func (wp *WebhookProcessor) Start() {
    // Llançar workers
    for i := 0; i < wp.numWorkers; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }

    // Goroutine per tancar results quan tots els workers acabin
    go func() {
        wp.wg.Wait()
        close(wp.results)
    }()

    // Goroutine per processar resultats
    go wp.collectResults()
}

func (wp *WebhookProcessor) worker(id int) {
    defer wp.wg.Done()
    log.Printf(\"Worker %d iniciat\", id)

    for {
        select {
        case <-wp.ctx.Done():
            log.Printf(\"Worker %d: context cancel·lat, sortint\", id)
            return
        case webhook, ok := <-wp.jobs:
            if !ok {
                log.Printf(\"Worker %d: channel tancat, sortint\", id)
                return
            }
            result := wp.processWebhook(webhook)
            select {
            case wp.results <- result:
            case <-wp.ctx.Done():
                return
            }
        }
    }
}

func (wp *WebhookProcessor) processWebhook(wh Webhook) ProcessResult {
    start := time.Now()

    // Aquí aniria la lògica real: cridar una API, escriure a BD, etc.
    // Simulem amb un sleep
    time.Sleep(50 * time.Millisecond)

    // Exemple: si el tipus d'event no és vàlid, retornar error
    validEvents := map[string]bool{
        \"order.created\":   true,
        \"order.updated\":   true,
        \"payment.success\": true,
        \"payment.failed\":  true,
    }

    if !validEvents[wh.EventType] {
        return ProcessResult{
            WebhookID: wh.ID,
            Success:   false,
            Duration:  time.Since(start),
            Err:       fmt.Errorf(\"event desconegut: %s\", wh.EventType),
        }
    }

    return ProcessResult{
        WebhookID: wh.ID,
        Success:   true,
        Duration:  time.Since(start),
    }
}

func (wp *WebhookProcessor) collectResults() {
    var processed, failed int
    for r := range wp.results {
        if r.Err != nil {
            failed++
            log.Printf(\"Webhook %s ha fallat (%v): %v\", r.WebhookID, r.Duration, r.Err)
        } else {
            processed++
            log.Printf(\"Webhook %s processat (%v)\", r.WebhookID, r.Duration)
        }
    }
    log.Printf(\"Resum: %d processats, %d fallits\", processed, failed)
}

func (wp *WebhookProcessor) Enqueue(wh Webhook) error {
    select {
    case wp.jobs <- wh:
        return nil
    case <-wp.ctx.Done():
        return fmt.Errorf(\"processador aturat\")
    default:
        return fmt.Errorf(\"cua plena, descartant webhook %s\", wh.ID)
    }
}

func (wp *WebhookProcessor) Shutdown(timeout time.Duration) {
    log.Println(\"Iniciant tancament ordenat...\")

    // 1. Deixar d'acceptar nous jobs
    close(wp.jobs)

    // 2. Esperar que els workers acabin amb timeout
    done := make(chan struct{})
    go func() {
        wp.wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        log.Println(\"Tots els workers han acabat neta\")
    case <-time.After(timeout):
        log.Println(\"Timeout assolit, forçant tancament\")
        wp.cancel()
        <-done // Esperar que els workers surtin després de la cancel·lació
    }
}

Punts clau d’aquest exemple:

  • Enqueue amb default: si la cua és plena, rebutja el webhook immediatament en lloc de bloquejar. En un handler HTTP, bloquejar significaria que la petició es queda penjada.
  • Shutdown amb timeout: tanca jobs i espera un temps raonable. Si els workers no acaben, cancel·la el context i força la sortida.
  • collectResults com a goroutine separada: desacobla el processament de resultats dels workers. Podries substituir els log.Printf per mètriques, alertes, o escriptura a base de dades.
  • El worker usa select doble: tant per llegir de jobs com per enviar a results, sempre amb ctx.Done() com a alternativa. Si no poses el select en enviar a results, un worker podria quedar-se bloquejat per sempre si ningú llegeix resultats i el context es cancel·la.

Per usar-lo en un servei HTTP real:

func main() {
    processor := NewWebhookProcessor(10, 100) // 10 workers, cua de 100
    processor.Start()

    http.HandleFunc(\"/webhook\", func(w http.ResponseWriter, r *http.Request) {
        var wh Webhook
        if err := json.NewDecoder(r.Body).Decode(&wh); err != nil {
            http.Error(w, \"payload invàlid\", http.StatusBadRequest)
            return
        }
        wh.Received = time.Now()

        if err := processor.Enqueue(wh); err != nil {
            http.Error(w, err.Error(), http.StatusServiceUnavailable)
            return
        }

        w.WriteHeader(http.StatusAccepted)
    })

    // Escoltar senyals del sistema per a tancament ordenat
    // (en producció usaries signal.NotifyContext)
    log.Fatal(http.ListenAndServe(\":8080\", nil))
}

Quan els worker pools són excessius

No tot necessita un worker pool. De vegades la complexitat no es justifica.

No el necessites si:

  • Només has de fer 3-5 operacions en paral·lel. Un errgroup o fins i tot goroutines soltes amb un WaitGroup són suficients.
  • El processament és tan ràpid que la concurrència no aporta res. Si cada tasca tarda 1ms, probablement vagis més ràpid en seqüencial (l’overhead de channels i goroutines no és zero).
  • Només tens un productor i un consumidor. Un channel directe entre ambdós ja és un pipeline perfectament vàlid.

El necessites si:

  • La càrrega és alta i contínua, i necessites controlar quants recursos consumeix.
  • Necessites backpressure: que el productor es ralentitzi quan els workers no donen l’abast.
  • Necessites mètriques i observabilitat per worker.
  • La tasca implica crides de xarxa, I/O de disc o accés a recursos compartits que es degraden sota concurrència excessiva.

La decisió és pragmàtica. Si un errgroup.SetLimit(10) resol el teu problema, usa”l. Si necessites tancament ordenat, escalat, mètriques i reintents, construeix el pool. El que no té sentit és llançar goroutines sense control i esperar que tot surti bé.

La concurrència en Go és una eina poderosa, però el poder sense control no és enginyeria. És sort.

Articles relacionats

OshyTech

Enginyeria backend i de dades orientada a sistemes escalables, automatització i IA.

Navegació

Copyright 2026 OshyTech. Tots els drets reservats