Data quality en pipelines pequeños: validaciones que merecen la pena desde el día uno

Validaciones sencillas para evitar datos basura en pipelines reales: nulos, duplicados, tipos y alertas con Python.

Cover for Data quality en pipelines pequeños: validaciones que merecen la pena desde el día uno
Actualizado: 18 may 2026

Hace poco revisé un pipeline que llevaba meses funcionando “sin problemas”. Cada día ingería datos de una API, los transformaba y los cargaba en una base de datos que alimentaba un dashboard. Nadie se quejaba. Hasta que alguien hizo una pregunta incómoda: “¿Por qué el revenue de abril es un 20% más bajo que el del año pasado?”. Después de dos días investigando, resultó que la API había empezado a devolver campos vacíos en un porcentaje de registros y el pipeline los estaba cargando alegremente como nulos. Las métricas se calculaban sobre datos incompletos. Nadie se enteró porque no había una sola validación.

Esto no pasó en un sistema legacy de una gran empresa. Pasó en un pipeline pequeño, mantenido por una persona, que hacía exactamente lo que se le pidió: mover datos de A a B. El problema no era el código. Era la ausencia total de controles sobre la calidad de lo que se movía.


Data quality no es un producto enterprise

Cuando oyes “data quality” probablemente pienses en herramientas caras, equipos dedicados y frameworks con dashboards impresionantes. Y sí, eso existe. Pero la realidad es que el 80% de los problemas de calidad de datos se detectan con validaciones que puedes escribir en 20 líneas de Python.

La validación de datos no es un proyecto. Es un hábito. Y como todos los hábitos, cuesta más empezar que mantenerlo.

No necesitas Great Expectations configurado con un deployment completo para saber si tus datos tienen nulos donde no deberían. No necesitas un data observability platform para detectar que la API te devolvió cero registros cuando normalmente devuelve miles. Necesitas asserts, ifs y un canal de Slack.


Las cinco validaciones mínimas que siempre deberías tener

Después de romperme la cabeza con pipelines que parecían funcionar y no lo hacían, llegué a una lista de validaciones que pongo en todos mis pipelines desde el día uno. Son cinco y no lleva más de una hora implementarlas.

1. El pipeline recibió datos

Parece obvio, pero no lo es. APIs que devuelven respuestas vacías, queries que no encuentran filas nuevas, archivos CSV que llegan vacíos. Si tu pipeline procesa cero registros y no avisa, tienes un problema silencioso.

def validate_not_empty(df: pd.DataFrame, source_name: str):
    """Valida que el DataFrame no esté vacío."""
    if df.empty:
        raise ValueError(
            f"[QUALITY] {source_name}: se recibieron 0 registros. "
            f"Esto no es normal. Revisa la fuente."
        )
    print(f"[QUALITY] {source_name}: {len(df)} registros recibidos. OK.")

2. Los campos obligatorios no son nulos

Cada dataset tiene campos que nunca deberían ser nulos. Un pedido sin order_id, un usuario sin email, una transacción sin amount. Si alguno de estos llega nulo, o la fuente tiene un problema o tu extracción se rompió.

def validate_no_nulls(df: pd.DataFrame, required_columns: list[str]):
    """Valida que las columnas obligatorias no tengan nulos."""
    for col in required_columns:
        null_count = df[col].isnull().sum()
        if null_count > 0:
            null_pct = (null_count / len(df)) * 100
            raise ValueError(
                f"[QUALITY] Columna '{col}': {null_count} nulos "
                f"({null_pct:.1f}%). No debería tener nulos."
            )
    print(f"[QUALITY] Campos obligatorios sin nulos. OK.")

3. No hay duplicados donde no debería haberlos

Los duplicados son la basura más común en pipelines. A veces la API envía el mismo registro dos veces. A veces tu pipeline se ejecuta dos veces por un error de programación. A veces la fuente tiene un bug. Si no verificas, tus métricas se inflan.

def validate_no_duplicates(df: pd.DataFrame, key_columns: list[str]):
    """Valida que no haya filas duplicadas por las columnas clave."""
    duplicates = df.duplicated(subset=key_columns, keep=False)
    dup_count = duplicates.sum()

    if dup_count > 0:
        sample = df[duplicates].head(3)[key_columns]
        raise ValueError(
            f"[QUALITY] {dup_count} filas duplicadas por {key_columns}.\n"
            f"Ejemplo:\n{sample.to_string()}"
        )
    print(f"[QUALITY] Sin duplicados por {key_columns}. OK.")

4. Los tipos de datos son los esperados

Un campo total que llega como string en vez de numérico. Un created_at que de repente tiene formato americano en vez de ISO. Un campo booleano que ahora incluye “yes”, “no”, “maybe”. Si no validas tipos, las transformaciones posteriores fallan de formas impredecibles.

def validate_types(df: pd.DataFrame, expected_types: dict):
    """
    Valida que los tipos de las columnas sean los esperados.
    expected_types: {'total_amount': 'float64', 'order_id': 'object'}
    """
    errors = []
    for col, expected in expected_types.items():
        actual = str(df[col].dtype)
        if actual != expected:
            errors.append(
                f"  - '{col}': esperado {expected}, recibido {actual}"
            )

    if errors:
        raise TypeError(
            f"[QUALITY] Tipos incorrectos:\n" + "\n".join(errors)
        )
    print(f"[QUALITY] Tipos de datos correctos. OK.")

5. Los valores están dentro de rangos razonables

Un pedido con importe negativo. Una fecha en el año 1900. Un porcentaje de 350%. Estos valores técnicamente no son nulos ni duplicados, pero son basura. Las validaciones de rango son el último filtro antes de que un dato absurdo llegue a producción.

def validate_ranges(df: pd.DataFrame, range_rules: dict):
    """
    Valida que los valores estén dentro de rangos esperados.
    range_rules: {'total_amount': {'min': 0, 'max': 100000}}
    """
    for col, rules in range_rules.items():
        if 'min' in rules:
            violations = (df[col] < rules['min']).sum()
            if violations > 0:
                raise ValueError(
                    f"[QUALITY] '{col}': {violations} valores por debajo "
                    f"del mínimo ({rules['min']})"
                )
        if 'max' in rules:
            violations = (df[col] > rules['max']).sum()
            if violations > 0:
                raise ValueError(
                    f"[QUALITY] '{col}': {violations} valores por encima "
                    f"del máximo ({rules['max']})"
                )
    print(f"[QUALITY] Rangos dentro de los esperados. OK.")

Juntando todo: un validador completo

En la práctica, estas validaciones se ejecutan como un paso del pipeline, entre la ingestión y la transformación. Si alguna falla, el pipeline se detiene y alguien recibe una alerta.

import pandas as pd

def validate_orders(df: pd.DataFrame):
    """Ejecuta todas las validaciones sobre los pedidos ingestados."""
    # 1. No vacío
    validate_not_empty(df, source_name="orders")

    # 2. Campos obligatorios sin nulos
    validate_no_nulls(df, required_columns=[
        'order_id', 'customer_id', 'total_amount', 'status'
    ])

    # 3. Sin duplicados por order_id
    validate_no_duplicates(df, key_columns=['order_id'])

    # 4. Tipos correctos
    validate_types(df, expected_types={
        'order_id': 'object',
        'customer_id': 'object',
        'total_amount': 'float64',
    })

    # 5. Rangos razonables
    validate_ranges(df, range_rules={
        'total_amount': {'min': 0, 'max': 50000},
    })

    print(f"[QUALITY] Todas las validaciones pasaron. Pipeline continúa.")

Nulos: el enemigo que siempre vuelve

Los nulos merecen un tratamiento aparte porque no todos son iguales. Hay nulos que significan “no se proporcionó el dato”, nulos que significan “la API falló”, y nulos que significan “este campo no aplica”. Tratarlos a todos igual es un error.

Estrategia para manejar nulos

Tipo de nuloEjemploAcción
Campo obligatorio nuloorder_id vacíoRechazar el registro
Campo opcional nulophone_number vacíoAceptar, rellenar con valor por defecto o marcar
Nulo por fallo de la fuenteToda una columna nulaAlertar y detener el pipeline
Nulo por diseñodiscount_code cuando no hay descuentoAceptar como está
def handle_nulls(df: pd.DataFrame) -> pd.DataFrame:
    """Estrategia de nulos para pedidos."""
    # Campos obligatorios: rechazar si son nulos
    critical_nulls = df[['order_id', 'total_amount']].isnull().any(axis=1)
    rejected = df[critical_nulls]
    if len(rejected) > 0:
        print(f"[QUALITY] Rechazados {len(rejected)} registros por nulos críticos")

    df = df[~critical_nulls].copy()

    # Campos opcionales: rellenar con valor por defecto
    df['discount_code'] = df['discount_code'].fillna('NONE')
    df['notes'] = df['notes'].fillna('')

    # Detección de anomalía: columna entera nula
    for col in df.columns:
        if df[col].isnull().all():
            raise ValueError(
                f"[QUALITY] La columna '{col}' está completamente nula. "
                f"Posible fallo en la fuente."
            )

    return df

Duplicados: no siempre es tan simple como drop_duplicates

La deduplicación ingenua es peligrosa. Un df.drop_duplicates() sin pensar puede eliminar registros legítimos o quedarse con la versión equivocada de un dato.

Preguntas antes de deduplicar

  1. ¿Qué define un duplicado? No siempre es la fila entera. A veces es solo una clave de negocio.
  2. ¿Con cuál te quedas? El más reciente, el primero, el más completo.
  3. ¿Los “duplicados” son realmente duplicados? A veces son actualizaciones del mismo registro.
def deduplicate_orders(df: pd.DataFrame) -> pd.DataFrame:
    """
    Deduplicación inteligente de pedidos.
    Se queda con el registro más reciente por order_id.
    """
    before = len(df)

    # Ordenar por fecha de ingestión descendente
    df = df.sort_values('_ingested_at', ascending=False)

    # Quedarse con el primer registro (más reciente) de cada order_id
    df = df.drop_duplicates(subset=['order_id'], keep='first')

    after = len(df)
    removed = before - after

    if removed > 0:
        pct = (removed / before) * 100
        print(f"[QUALITY] Deduplicación: {removed} duplicados eliminados ({pct:.1f}%)")

        # Si hay demasiados duplicados, algo raro pasa
        if pct > 10:
            print(
                f"[WARNING] Tasa de duplicados inusualmente alta ({pct:.1f}%). "
                f"Revisar la fuente."
            )

    return df.reset_index(drop=True)

Usando Pandera para validaciones declarativas

Si las validaciones manuales se te quedan cortas o quieres algo más estructurado, Pandera es una librería que permite definir schemas de validación de forma declarativa. No es enterprise, no requiere infraestructura, y se integra directamente con pandas.

import pandera as pa
from pandera import Column, Check, DataFrameSchema

order_schema = DataFrameSchema({
    "order_id": Column(
        str,
        Check.str_length(min_value=1),
        nullable=False,
        unique=True
    ),
    "customer_id": Column(
        str,
        nullable=False
    ),
    "total_amount": Column(
        float,
        Check.in_range(min_value=0, max_value=50000),
        nullable=False
    ),
    "status": Column(
        str,
        Check.isin(['pending', 'completed', 'shipped', 'cancelled']),
        nullable=False
    ),
    "order_created_at": Column(
        "datetime64[ns]",
        nullable=False
    ),
})

# Validar un DataFrame
try:
    order_schema.validate(df, lazy=True)
    print("[QUALITY] Schema Pandera validado correctamente.")
except pa.errors.SchemaErrors as err:
    print(f"[QUALITY] Errores de validación:\n{err.failure_cases}")

Lo que me gusta de Pandera es que el schema sirve como documentación viva. Cualquiera que mire el código sabe exactamente qué esperar de esos datos.


Alertas: de nada sirve validar si nadie se entera

La validación más perfecta del mundo es inútil si falla y nadie lo sabe. El pipeline se detiene, los datos no llegan, y tres días después alguien pregunta por qué el dashboard está vacío.

Principio básico de alertas en pipelines

No alertes de todo. Alerta de lo que requiere acción. Si mandas 50 alertas diarias, la gente las ignora. Si mandas una cuando de verdad hay un problema, la gente actúa.

Implementación sencilla con Slack

import requests
import os
from datetime import datetime

def send_alert(message: str, severity: str = "warning"):
    """Envía una alerta a Slack cuando una validación falla."""
    webhook_url = os.getenv("SLACK_WEBHOOK_URL")

    if not webhook_url:
        print(f"[ALERT] {severity.upper()}: {message}")
        return

    emoji = "🔴" if severity == "critical" else "🟡"
    payload = {
        "text": (
            f"{emoji} *Pipeline Alert [{severity.upper()}]*\n"
            f"_{datetime.now().strftime('%Y-%m-%d %H:%M')}_\n\n"
            f"{message}"
        )
    }

    try:
        requests.post(webhook_url, json=payload, timeout=10)
    except requests.RequestException as e:
        print(f"[ALERT] Error enviando alerta: {e}")

Integrar alertas en el validador

def validate_orders_with_alerts(df: pd.DataFrame):
    """Validaciones con alertas automáticas."""
    try:
        validate_not_empty(df, source_name="orders")
    except ValueError as e:
        send_alert(str(e), severity="critical")
        raise

    try:
        validate_no_nulls(df, required_columns=[
            'order_id', 'customer_id', 'total_amount'
        ])
    except ValueError as e:
        send_alert(str(e), severity="warning")
        # Decidir si continuar o parar
        raise

    try:
        validate_no_duplicates(df, key_columns=['order_id'])
    except ValueError as e:
        send_alert(str(e), severity="warning")
        # Los duplicados se pueden limpiar, no paramos
        print("[QUALITY] Continuando después de limpiar duplicados...")

    print("[QUALITY] Validación completa.")

Cuándo escalar: de scripts a herramientas

Las validaciones manuales con Python son suficientes para la mayoría de pipelines pequeños. Pero hay un punto donde necesitas algo más:

SituaciónLo que uso
Un pipeline, un desarrolladorFunciones Python + asserts
Varios pipelines, mismo equipoPandera + alertas centralizadas
Múltiples equipos consumiendo datosGreat Expectations o Soda
Data contracts entre equiposSchemas versionados + CI/CD

Mi consejo: empieza con lo mínimo. Un assert len(df) > 0 es infinitamente mejor que nada. Puedes evolucionar hacia Pandera o Great Expectations cuando el dolor lo justifique, no antes.


Lo que he aprendido a base de errores

Cada vez que un pipeline me ha dado problemas serios, la causa raíz ha sido la misma: datos malos que entraron sin control. No un bug en el código, no un fallo de infraestructura. Datos que no deberían haber pasado.

Las validaciones que he descrito aquí no son sofisticadas. Son las que pondrías en cualquier API para no aceptar basura: campos obligatorios, tipos correctos, rangos razonables. La diferencia es que en pipelines de datos mucha gente se olvida de ponerlas, porque el pipeline “funciona” aunque los datos sean incorrectos.

La pregunta no es si tus datos van a tener problemas de calidad. Es cuánto vas a tardar en enterarte.

Ponle validaciones a tu pipeline. Las más simples. Hoy. Tu yo del futuro te lo agradecerá cuando alguien pregunte por qué las métricas no cuadran y puedas responder en cinco minutos en vez de en dos días.

OshyTech

Ingeniería backend y de datos orientada a sistemas escalables, automatización e IA.

Navegación

Copyright 2026 OshyTech. Todos los derechos reservados