Data quality en pipelines petits: validacions que valen la pena des del dia u
Validacions senzilles per evitar dades escombraries en pipelines reals: nuls, duplicats, tipus i alertes amb Python.

Fa poc vaig revisar un pipeline que portava mesos funcionant “sense problemes”. Cada dia ingeria dades d’una API, les transformava i les carregava en una base de dades que alimentava un dashboard. Ningú es queixava. Fins que algú va fer una pregunta incòmoda: “Per què el revenue d’abril és un 20% més baix que el de l’any passat?”. Després de dos dies investigant, va resultar que l’API havia començat a retornar camps buits en un percentatge de registres i el pipeline els estava carregant alegrament com a nuls. Les mètriques es calculaven sobre dades incompletes. Ningú se’n va assabentar perquè no hi havia ni una sola validació.
Això no va passar en un sistema legacy d’una gran empresa. Va passar en un pipeline petit, mantingut per una persona, que feia exactament el que se li havia demanat: moure dades d’A a B. El problema no era el codi. Era l’absència total de controls sobre la qualitat del que es movia.
Data quality no és un producte enterprise
Quan sents “data quality” probablement penses en eines cares, equips dedicats i frameworks amb dashboards impressionants. I sí, això existeix. Però la realitat és que el 80% dels problemes de qualitat de dades es detecten amb validacions que pots escriure en 20 línies de Python.
La validació de dades no és un projecte. És un hàbit. I com tots els hàbits, costa més començar que mantenir-lo.
No necessites Great Expectations configurat amb un deployment complet per saber si les teves dades tenen nuls on no n’haurien de tenir. No necessites una data observability platform per detectar que l’API t’ha retornat zero registres quan normalment en retorna milers. Necessites asserts, ifs i un canal de Slack.
Les cinc validacions mínimes que sempre hauries de tenir
Després de trencar-me el cap amb pipelines que semblaven funcionar i no ho feien, vaig arribar a una llista de validacions que poso a tots els meus pipelines des del dia u. Són cinc i no porta més d’una hora implementar-les.
1. El pipeline ha rebut dades
Sembla obvi, però no ho és. APIs que retornen respostes buides, queries que no troben files noves, arxius CSV que arriben buits. Si el teu pipeline processa zero registres i no avisa, tens un problema silenciós.
def validate_not_empty(df: pd.DataFrame, source_name: str):
"""Valida que el DataFrame no esté vacío."""
if df.empty:
raise ValueError(
f"[QUALITY] {source_name}: se recibieron 0 registros. "
f"Esto no es normal. Revisa la fuente."
)
print(f"[QUALITY] {source_name}: {len(df)} registros recibidos. OK.")2. Els camps obligatoris no són nuls
Cada dataset té camps que mai haurien de ser nuls. Una comanda sense order_id, un usuari sense email, una transacció sense amount. Si algun d’aquests arriba nul, o la font té un problema o la teva extracció s’ha trencat.
def validate_no_nulls(df: pd.DataFrame, required_columns: list[str]):
"""Valida que las columnas obligatorias no tengan nulos."""
for col in required_columns:
null_count = df[col].isnull().sum()
if null_count > 0:
null_pct = (null_count / len(df)) * 100
raise ValueError(
f"[QUALITY] Columna '{col}': {null_count} nulos "
f"({null_pct:.1f}%). No debería tener nulos."
)
print(f"[QUALITY] Campos obligatorios sin nulos. OK.")3. No hi ha duplicats on no n’hi hauria d’haver
Els duplicats són les escombraries més comunes en pipelines. De vegades l’API envia el mateix registre dues vegades. De vegades el teu pipeline s’executa dues vegades per un error de programació. De vegades la font té un bug. Si no ho verifiques, les teves mètriques s’inflen.
def validate_no_duplicates(df: pd.DataFrame, key_columns: list[str]):
"""Valida que no haya filas duplicadas por las columnas clave."""
duplicates = df.duplicated(subset=key_columns, keep=False)
dup_count = duplicates.sum()
if dup_count > 0:
sample = df[duplicates].head(3)[key_columns]
raise ValueError(
f"[QUALITY] {dup_count} filas duplicadas por {key_columns}.\n"
f"Ejemplo:\n{sample.to_string()}"
)
print(f"[QUALITY] Sin duplicados por {key_columns}. OK.")4. Els tipus de dades són els esperats
Un camp total que arriba com a string en lloc de numèric. Un created_at que de sobte té format americà en lloc d’ISO. Un camp booleà que ara inclou “yes”, “no”, “maybe”. Si no valides tipus, les transformacions posteriors fallen de formes impredictibles.
def validate_types(df: pd.DataFrame, expected_types: dict):
"""
Valida que los tipos de las columnas sean los esperados.
expected_types: {'total_amount': 'float64', 'order_id': 'object'}
"""
errors = []
for col, expected in expected_types.items():
actual = str(df[col].dtype)
if actual != expected:
errors.append(
f" - '{col}': esperado {expected}, recibido {actual}"
)
if errors:
raise TypeError(
f"[QUALITY] Tipos incorrectos:\n" + "\n".join(errors)
)
print(f"[QUALITY] Tipos de datos correctos. OK.")5. Els valors estan dins de rangs raonables
Una comanda amb import negatiu. Una data de l’any 1900. Un percentatge de 350%. Aquests valors tècnicament no són nuls ni duplicats, però són escombraries. Les validacions de rang són l’últim filtre abans que una dada absurda arribi a producció.
def validate_ranges(df: pd.DataFrame, range_rules: dict):
"""
Valida que los valores estén dentro de rangos esperados.
range_rules: {'total_amount': {'min': 0, 'max': 100000}}
"""
for col, rules in range_rules.items():
if 'min' in rules:
violations = (df[col] < rules['min']).sum()
if violations > 0:
raise ValueError(
f"[QUALITY] '{col}': {violations} valores por debajo "
f"del mínimo ({rules['min']})"
)
if 'max' in rules:
violations = (df[col] > rules['max']).sum()
if violations > 0:
raise ValueError(
f"[QUALITY] '{col}': {violations} valores por encima "
f"del máximo ({rules['max']})"
)
print(f"[QUALITY] Rangos dentro de los esperados. OK.")Ajuntant-ho tot: un validador complet
A la pràctica, aquestes validacions s’executen com un pas del pipeline, entre la ingestió i la transformació. Si alguna falla, el pipeline s’atura i algú rep una alerta.
import pandas as pd
def validate_orders(df: pd.DataFrame):
"""Ejecuta todas las validaciones sobre los pedidos ingestados."""
# 1. No vacío
validate_not_empty(df, source_name="orders")
# 2. Campos obligatorios sin nulos
validate_no_nulls(df, required_columns=[
'order_id', 'customer_id', 'total_amount', 'status'
])
# 3. Sin duplicados por order_id
validate_no_duplicates(df, key_columns=['order_id'])
# 4. Tipos correctos
validate_types(df, expected_types={
'order_id': 'object',
'customer_id': 'object',
'total_amount': 'float64',
})
# 5. Rangos razonables
validate_ranges(df, range_rules={
'total_amount': {'min': 0, 'max': 50000},
})
print(f"[QUALITY] Todas las validaciones pasaron. Pipeline continúa.")Nuls: l’enemic que sempre torna
Els nuls mereixen un tractament a part perquè no tots són iguals. Hi ha nuls que signifiquen “no es va proporcionar la dada”, nuls que signifiquen “l’API ha fallat”, i nuls que signifiquen “aquest camp no aplica”. Tractar-los tots igual és un error.
Estratègia per gestionar nuls
| Tipus de nul | Exemple | Acció |
|---|---|---|
| Camp obligatori nul | order_id buit | Rebutjar el registre |
| Camp opcional nul | phone_number buit | Acceptar, omplir amb valor per defecte o marcar |
| Nul per fallada de la font | Tota una columna nul·la | Alertar i aturar el pipeline |
| Nul per disseny | discount_code quan no hi ha descompte | Acceptar tal com està |
def handle_nulls(df: pd.DataFrame) -> pd.DataFrame:
"""Estrategia de nulos para pedidos."""
# Campos obligatorios: rechazar si son nulos
critical_nulls = df[['order_id', 'total_amount']].isnull().any(axis=1)
rejected = df[critical_nulls]
if len(rejected) > 0:
print(f"[QUALITY] Rechazados {len(rejected)} registros por nulos críticos")
df = df[~critical_nulls].copy()
# Campos opcionales: rellenar con valor por defecto
df['discount_code'] = df['discount_code'].fillna('NONE')
df['notes'] = df['notes'].fillna('')
# Detección de anomalía: columna entera nula
for col in df.columns:
if df[col].isnull().all():
raise ValueError(
f"[QUALITY] La columna '{col}' está completamente nula. "
f"Posible fallo en la fuente."
)
return dfDuplicats: no sempre és tan simple com drop_duplicates
La deduplicació ingènua és perillosa. Un df.drop_duplicates() sense pensar pot eliminar registres legítims o quedar-se amb la versió equivocada d’una dada.
Preguntes abans de deduplicar
- Què defineix un duplicat? No sempre és la fila sencera. De vegades és només una clau de negoci.
- Amb quin et quedes? El més recent, el primer, el més complet.
- Els “duplicats” són realment duplicats? De vegades són actualitzacions del mateix registre.
def deduplicate_orders(df: pd.DataFrame) -> pd.DataFrame:
"""
Deduplicación inteligente de pedidos.
Se queda con el registro más reciente por order_id.
"""
before = len(df)
# Ordenar por fecha de ingestión descendente
df = df.sort_values('_ingested_at', ascending=False)
# Quedarse con el primer registro (más reciente) de cada order_id
df = df.drop_duplicates(subset=['order_id'], keep='first')
after = len(df)
removed = before - after
if removed > 0:
pct = (removed / before) * 100
print(f"[QUALITY] Deduplicación: {removed} duplicados eliminados ({pct:.1f}%)")
# Si hay demasiados duplicados, algo raro pasa
if pct > 10:
print(
f"[WARNING] Tasa de duplicados inusualmente alta ({pct:.1f}%). "
f"Revisar la fuente."
)
return df.reset_index(drop=True)Usant Pandera per a validacions declaratives
Si les validacions manuals se’t queden curtes o vols alguna cosa més estructurada, Pandera és una llibreria que permet definir schemas de validació de forma declarativa. No és enterprise, no requereix infraestructura, i s’integra directament amb pandas.
import pandera as pa
from pandera import Column, Check, DataFrameSchema
order_schema = DataFrameSchema({
"order_id": Column(
str,
Check.str_length(min_value=1),
nullable=False,
unique=True
),
"customer_id": Column(
str,
nullable=False
),
"total_amount": Column(
float,
Check.in_range(min_value=0, max_value=50000),
nullable=False
),
"status": Column(
str,
Check.isin(['pending', 'completed', 'shipped', 'cancelled']),
nullable=False
),
"order_created_at": Column(
"datetime64[ns]",
nullable=False
),
})
# Validar un DataFrame
try:
order_schema.validate(df, lazy=True)
print("[QUALITY] Schema Pandera validado correctamente.")
except pa.errors.SchemaErrors as err:
print(f"[QUALITY] Errores de validación:\n{err.failure_cases}")El que m’agrada de Pandera és que l’schema serveix com a documentació viva. Qualsevol persona que miri el codi sap exactament què esperar d’aquelles dades.
Alertes: de res serveix validar si ningú se n’assabenta
La validació més perfecta del món és inútil si falla i ningú ho sap. El pipeline s’atura, les dades no arriben, i tres dies després algú pregunta per què el dashboard és buit.
Principi bàsic d’alertes en pipelines
No alertis de tot. Alerta del que requereix acció. Si envies 50 alertes diàries, la gent les ignora. Si n’envies una quan de veritat hi ha un problema, la gent actua.
Implementació senzilla amb Slack
import requests
import os
from datetime import datetime
def send_alert(message: str, severity: str = "warning"):
"""Envía una alerta a Slack cuando una validación falla."""
webhook_url = os.getenv("SLACK_WEBHOOK_URL")
if not webhook_url:
print(f"[ALERT] {severity.upper()}: {message}")
return
emoji = "🔴" if severity == "critical" else "🟡"
payload = {
"text": (
f"{emoji} *Pipeline Alert [{severity.upper()}]*\n"
f"_{datetime.now().strftime('%Y-%m-%d %H:%M')}_\n\n"
f"{message}"
)
}
try:
requests.post(webhook_url, json=payload, timeout=10)
except requests.RequestException as e:
print(f"[ALERT] Error enviando alerta: {e}")Integrar alertes al validador
def validate_orders_with_alerts(df: pd.DataFrame):
"""Validaciones con alertas automáticas."""
try:
validate_not_empty(df, source_name="orders")
except ValueError as e:
send_alert(str(e), severity="critical")
raise
try:
validate_no_nulls(df, required_columns=[
'order_id', 'customer_id', 'total_amount'
])
except ValueError as e:
send_alert(str(e), severity="warning")
# Decidir si continuar o parar
raise
try:
validate_no_duplicates(df, key_columns=['order_id'])
except ValueError as e:
send_alert(str(e), severity="warning")
# Los duplicados se pueden limpiar, no paramos
print("[QUALITY] Continuando después de limpiar duplicados...")
print("[QUALITY] Validación completa.")Quan escalar: de scripts a eines
Les validacions manuals amb Python són suficients per a la majoria de pipelines petits. Però hi ha un punt on necessites alguna cosa més:
| Situació | El que faig servir |
|---|---|
| Un pipeline, un desenvolupador | Funcions Python + asserts |
| Diversos pipelines, mateix equip | Pandera + alertes centralitzades |
| Múltiples equips consumint dades | Great Expectations o Soda |
| Data contracts entre equips | Schemas versionats + CI/CD |
El meu consell: comença amb el mínim. Un assert len(df) > 0 és infinitament millor que res. Pots evolucionar cap a Pandera o Great Expectations quan el dolor ho justifiqui, no abans.
El que he après a base d’errors
Cada cop que un pipeline m’ha donat problemes seriosos, la causa arrel ha estat la mateixa: dades dolentes que van entrar sense control. No un bug al codi, no una fallada d’infraestructura. Dades que no haurien d’haver passat.
Les validacions que he descrit aquí no són sofisticades. Són les que posaries a qualsevol API per no acceptar escombraries: camps obligatoris, tipus correctes, rangs raonables. La diferència és que en pipelines de dades molta gent s’oblida de posar-les, perquè el pipeline “funciona” encara que les dades siguin incorrectes.
La pregunta no és si les teves dades tindran problemes de qualitat. És quant trigaràs a assabentar-te’n.
Posa-li validacions al teu pipeline. Les més simples. Avui. El teu jo del futur t’ho agrairà quan algú pregunti per què les mètriques no quadren i puguis respondre en cinc minuts en lloc de dos dies.


