Data quality in small pipelines: validations worth having from day one

Simple validations to prevent garbage data in real pipelines: nulls, duplicates, types, and alerts with Python.

Cover for Data quality in small pipelines: validations worth having from day one

Not long ago I reviewed a pipeline that had been running “without issues” for months. Every day it ingested data from an API, transformed it, and loaded it into a database that fed a dashboard. Nobody complained. Until someone asked an uncomfortable question: “Why is April’s revenue 20% lower than last year’s?”. After two days of investigating, it turned out the API had started returning empty fields in a percentage of records and the pipeline was happily loading them as nulls. Metrics were being calculated on incomplete data. Nobody noticed because there wasn’t a single validation in place.

This didn’t happen in a legacy system at a big company. It happened in a small pipeline, maintained by one person, that did exactly what it was asked to do: move data from A to B. The problem wasn’t the code. It was the total absence of controls over the quality of what was being moved.


Data quality is not an enterprise product

When you hear “data quality” you probably think of expensive tools, dedicated teams, and frameworks with impressive dashboards. And yes, that exists. But the reality is that 80% of data quality issues are caught with validations you can write in 20 lines of Python.

Data validation is not a project. It’s a habit. And like all habits, starting is harder than keeping it up.

You don’t need Great Expectations set up with a full deployment to know if your data has nulls where it shouldn’t. You don’t need a data observability platform to detect that the API returned zero records when it normally returns thousands. You need asserts, ifs, and a Slack channel.


The five minimum validations you should always have

After banging my head against pipelines that seemed to work but didn’t, I arrived at a list of validations I put in every pipeline from day one. There are five and they take no more than an hour to implement.

1. The pipeline received data

Seems obvious, but it’s not. APIs that return empty responses, queries that find no new rows, CSV files that arrive empty. If your pipeline processes zero records and doesn’t alert, you have a silent problem.

def validate_not_empty(df: pd.DataFrame, source_name: str):
    """Validates that the DataFrame is not empty."""
    if df.empty:
        raise ValueError(
            f"[QUALITY] {source_name}: se recibieron 0 registros. "
            f"Esto no es normal. Revisa la fuente."
        )
    print(f"[QUALITY] {source_name}: {len(df)} registros recibidos. OK.")

2. Required fields are not null

Every dataset has fields that should never be null. An order without order_id, a user without email, a transaction without amount. If any of these arrives as null, either the source has a problem or your extraction broke.

def validate_no_nulls(df: pd.DataFrame, required_columns: list[str]):
    """Validates that required columns have no nulls."""
    for col in required_columns:
        null_count = df[col].isnull().sum()
        if null_count > 0:
            null_pct = (null_count / len(df)) * 100
            raise ValueError(
                f"[QUALITY] Columna '{col}': {null_count} nulos "
                f"({null_pct:.1f}%). No debería tener nulos."
            )
    print(f"[QUALITY] Campos obligatorios sin nulos. OK.")

3. No duplicates where there shouldn’t be any

Duplicates are the most common garbage in pipelines. Sometimes the API sends the same record twice. Sometimes your pipeline runs twice due to a scheduling error. Sometimes the source has a bug. If you don’t check, your metrics get inflated.

def validate_no_duplicates(df: pd.DataFrame, key_columns: list[str]):
    """Validates that there are no duplicate rows by key columns."""
    duplicates = df.duplicated(subset=key_columns, keep=False)
    dup_count = duplicates.sum()

    if dup_count > 0:
        sample = df[duplicates].head(3)[key_columns]
        raise ValueError(
            f"[QUALITY] {dup_count} filas duplicadas por {key_columns}.\n"
            f"Ejemplo:\n{sample.to_string()}"
        )
    print(f"[QUALITY] Sin duplicados por {key_columns}. OK.")

4. Data types are as expected

A total field that arrives as a string instead of numeric. A created_at that suddenly has American format instead of ISO. A boolean field that now includes “yes”, “no”, “maybe”. If you don’t validate types, downstream transformations fail in unpredictable ways.

def validate_types(df: pd.DataFrame, expected_types: dict):
    """
    Validates that column types match expectations.
    expected_types: {'total_amount': 'float64', 'order_id': 'object'}
    """
    errors = []
    for col, expected in expected_types.items():
        actual = str(df[col].dtype)
        if actual != expected:
            errors.append(
                f"  - '{col}': esperado {expected}, recibido {actual}"
            )

    if errors:
        raise TypeError(
            f"[QUALITY] Tipos incorrectos:\n" + "\n".join(errors)
        )
    print(f"[QUALITY] Tipos de datos correctos. OK.")

5. Values are within reasonable ranges

An order with a negative amount. A date in the year 1900. A percentage of 350%. These values are technically not null or duplicated, but they’re garbage. Range validations are the last filter before an absurd value reaches production.

def validate_ranges(df: pd.DataFrame, range_rules: dict):
    """
    Validates that values fall within expected ranges.
    range_rules: {'total_amount': {'min': 0, 'max': 100000}}
    """
    for col, rules in range_rules.items():
        if 'min' in rules:
            violations = (df[col] < rules['min']).sum()
            if violations > 0:
                raise ValueError(
                    f"[QUALITY] '{col}': {violations} valores por debajo "
                    f"del mínimo ({rules['min']})"
                )
        if 'max' in rules:
            violations = (df[col] > rules['max']).sum()
            if violations > 0:
                raise ValueError(
                    f"[QUALITY] '{col}': {violations} valores por encima "
                    f"del máximo ({rules['max']})"
                )
    print(f"[QUALITY] Rangos dentro de los esperados. OK.")

Putting it all together: a complete validator

In practice, these validations run as a pipeline step between ingestion and transformation. If any fails, the pipeline stops and someone gets an alert.

import pandas as pd

def validate_orders(df: pd.DataFrame):
    """Runs all validations on the ingested orders."""
    # 1. Not empty
    validate_not_empty(df, source_name="orders")

    # 2. Required fields without nulls
    validate_no_nulls(df, required_columns=[
        'order_id', 'customer_id', 'total_amount', 'status'
    ])

    # 3. No duplicates by order_id
    validate_no_duplicates(df, key_columns=['order_id'])

    # 4. Correct types
    validate_types(df, expected_types={
        'order_id': 'object',
        'customer_id': 'object',
        'total_amount': 'float64',
    })

    # 5. Reasonable ranges
    validate_ranges(df, range_rules={
        'total_amount': {'min': 0, 'max': 50000},
    })

    print(f"[QUALITY] Todas las validaciones pasaron. Pipeline continúa.")

Nulls: the enemy that always comes back

Nulls deserve special treatment because they’re not all the same. There are nulls that mean “the data was not provided,” nulls that mean “the API failed,” and nulls that mean “this field doesn’t apply.” Treating them all the same is a mistake.

Strategy for handling nulls

Null typeExampleAction
Required field is nullEmpty order_idReject the record
Optional field is nullEmpty phone_numberAccept, fill with default, or flag
Null due to source failureAn entire column is nullAlert and stop the pipeline
Null by designdiscount_code when there’s no discountAccept as-is
def handle_nulls(df: pd.DataFrame) -> pd.DataFrame:
    """Null handling strategy for orders."""
    # Required fields: reject if null
    critical_nulls = df[['order_id', 'total_amount']].isnull().any(axis=1)
    rejected = df[critical_nulls]
    if len(rejected) > 0:
        print(f"[QUALITY] Rechazados {len(rejected)} registros por nulos críticos")

    df = df[~critical_nulls].copy()

    # Optional fields: fill with default value
    df['discount_code'] = df['discount_code'].fillna('NONE')
    df['notes'] = df['notes'].fillna('')

    # Anomaly detection: entire column is null
    for col in df.columns:
        if df[col].isnull().all():
            raise ValueError(
                f"[QUALITY] La columna '{col}' está completamente nula. "
                f"Posible fallo en la fuente."
            )

    return df

Duplicates: it’s not always as simple as drop_duplicates

Naive deduplication is dangerous. A df.drop_duplicates() without thinking can remove legitimate records or keep the wrong version of a data point.

Questions before deduplicating

  1. What defines a duplicate? It’s not always the entire row. Sometimes it’s just a business key.
  2. Which one do you keep? The most recent, the first, the most complete.
  3. Are the “duplicates” actually duplicates? Sometimes they’re updates of the same record.
def deduplicate_orders(df: pd.DataFrame) -> pd.DataFrame:
    """
    Smart deduplication of orders.
    Keeps the most recent record per order_id.
    """
    before = len(df)

    # Sort by ingestion date descending
    df = df.sort_values('_ingested_at', ascending=False)

    # Keep the first record (most recent) for each order_id
    df = df.drop_duplicates(subset=['order_id'], keep='first')

    after = len(df)
    removed = before - after

    if removed > 0:
        pct = (removed / before) * 100
        print(f"[QUALITY] Deduplicación: {removed} duplicados eliminados ({pct:.1f}%)")

        # If there are too many duplicates, something weird is going on
        if pct > 10:
            print(
                f"[WARNING] Tasa de duplicados inusualmente alta ({pct:.1f}%). "
                f"Revisar la fuente."
            )

    return df.reset_index(drop=True)

Using Pandera for declarative validations

If manual validations fall short or you want something more structured, Pandera is a library that lets you define validation schemas declaratively. It’s not enterprise, doesn’t require infrastructure, and integrates directly with pandas.

import pandera as pa
from pandera import Column, Check, DataFrameSchema

order_schema = DataFrameSchema({
    "order_id": Column(
        str,
        Check.str_length(min_value=1),
        nullable=False,
        unique=True
    ),
    "customer_id": Column(
        str,
        nullable=False
    ),
    "total_amount": Column(
        float,
        Check.in_range(min_value=0, max_value=50000),
        nullable=False
    ),
    "status": Column(
        str,
        Check.isin(['pending', 'completed', 'shipped', 'cancelled']),
        nullable=False
    ),
    "order_created_at": Column(
        "datetime64[ns]",
        nullable=False
    ),
})

# Validate a DataFrame
try:
    order_schema.validate(df, lazy=True)
    print("[QUALITY] Schema Pandera validado correctamente.")
except pa.errors.SchemaErrors as err:
    print(f"[QUALITY] Errores de validación:\n{err.failure_cases}")

What I like about Pandera is that the schema serves as living documentation. Anyone who looks at the code knows exactly what to expect from that data.


Alerts: validating is useless if nobody finds out

The most perfect validation in the world is useless if it fails and nobody knows. The pipeline stops, the data doesn’t arrive, and three days later someone asks why the dashboard is empty.

Basic alerting principle for pipelines

Don’t alert on everything. Alert on what requires action. If you send 50 alerts a day, people ignore them. If you send one when there’s a real problem, people act.

Simple implementation with Slack

import requests
import os
from datetime import datetime

def send_alert(message: str, severity: str = "warning"):
    """Sends an alert to Slack when a validation fails."""
    webhook_url = os.getenv("SLACK_WEBHOOK_URL")

    if not webhook_url:
        print(f"[ALERT] {severity.upper()}: {message}")
        return

    emoji = "🔴" if severity == "critical" else "🟡"
    payload = {
        "text": (
            f"{emoji} *Pipeline Alert [{severity.upper()}]*\n"
            f"_{datetime.now().strftime('%Y-%m-%d %H:%M')}_\n\n"
            f"{message}"
        )
    }

    try:
        requests.post(webhook_url, json=payload, timeout=10)
    except requests.RequestException as e:
        print(f"[ALERT] Error enviando alerta: {e}")

Integrating alerts into the validator

def validate_orders_with_alerts(df: pd.DataFrame):
    """Validations with automatic alerts."""
    try:
        validate_not_empty(df, source_name="orders")
    except ValueError as e:
        send_alert(str(e), severity="critical")
        raise

    try:
        validate_no_nulls(df, required_columns=[
            'order_id', 'customer_id', 'total_amount'
        ])
    except ValueError as e:
        send_alert(str(e), severity="warning")
        # Decide whether to continue or stop
        raise

    try:
        validate_no_duplicates(df, key_columns=['order_id'])
    except ValueError as e:
        send_alert(str(e), severity="warning")
        # Duplicates can be cleaned, we don't stop
        print("[QUALITY] Continuando después de limpiar duplicados...")

    print("[QUALITY] Validación completa.")

When to scale: from scripts to tools

Manual Python validations are enough for most small pipelines. But there’s a point where you need something more:

SituationWhat I use
One pipeline, one developerPython functions + asserts
Several pipelines, same teamPandera + centralized alerts
Multiple teams consuming dataGreat Expectations or Soda
Data contracts between teamsVersioned schemas + CI/CD

My advice: start with the minimum. An assert len(df) > 0 is infinitely better than nothing. You can evolve toward Pandera or Great Expectations when the pain justifies it, not before.


What I’ve learned from making mistakes

Every time a pipeline has given me serious trouble, the root cause has been the same: bad data that entered without any checks. Not a code bug, not an infrastructure failure. Data that shouldn’t have passed through.

The validations I’ve described here aren’t sophisticated. They’re the ones you’d put in any API to avoid accepting garbage: required fields, correct types, reasonable ranges. The difference is that in data pipelines many people forget to add them, because the pipeline “works” even when the data is wrong.

The question isn’t whether your data will have quality issues. It’s how long it will take you to find out.

Add validations to your pipeline. The simplest ones. Today. Your future self will thank you when someone asks why the metrics don’t add up and you can answer in five minutes instead of two days.

OshyTech

Backend and data engineering focused on scalable systems, automation, and AI.

Navigation

Copyright 2026 OshyTech. All Rights Reserved