Python Data Interrogator: Automated Schema Analysis as a First-Class Pipeline Step

The first time you connect to a new data source, you're flying blind. You have a table name, maybe a schema diagram if you're lucky, and a contact at the source team who can answer questions "when they get a chance." You need to know: how many rows, what's the value distribution on key columns, are there nulls where there shouldn't be, are the foreign key relationships actually enforced, and what does a typical row look like.

I got tired of writing the same exploratory queries every single time, so I built a tool to do it for me.

The Data Interrogator

A Data Interrogator is a script that takes a database connection and a table name, and returns a structured profile of that table. Not a profiling framework with a configuration language and a report server — just a Python function that returns a dictionary you can read, log, or load into your metadata repository.

import pyodbc
import pandas as pd
from typing import Any

def interrogate_table(conn_str: str, schema: str, table: str) -> dict[str, Any]:
    with pyodbc.connect(conn_str) as conn:
        # Basic shape
        row_count = pd.read_sql(
            f"SELECT COUNT(*) as cnt FROM [{schema}].[{table}]", conn
        ).iloc[0]['cnt']

        # Column profile
        columns_df = pd.read_sql(f"""
            SELECT
                COLUMN_NAME,
                DATA_TYPE,
                IS_NULLABLE,
                CHARACTER_MAXIMUM_LENGTH,
                NUMERIC_PRECISION,
                NUMERIC_SCALE
            FROM INFORMATION_SCHEMA.COLUMNS
            WHERE TABLE_SCHEMA = '{schema}' AND TABLE_NAME = '{table}'
            ORDER BY ORDINAL_POSITION
        """, conn)

        # Null analysis
        null_checks = []
        for col in columns_df['COLUMN_NAME']:
            null_count = pd.read_sql(
                f"SELECT COUNT(*) as cnt FROM [{schema}].[{table}] WHERE [{col}] IS NULL",
                conn
            ).iloc[0]['cnt']
            null_checks.append({
                'column': col,
                'null_count': int(null_count),
                'null_pct': round(null_count / row_count * 100, 2) if row_count > 0 else 0.0
            })

        # Uniqueness on likely key columns
        key_candidates = [
            c for c in columns_df['COLUMN_NAME']
            if any(kw in c.upper() for kw in ['ID', 'KEY', 'CODE', 'NUM'])
        ]
        uniqueness = {}
        for col in key_candidates[:5]:  # cap at 5 to avoid runaway queries
            distinct = pd.read_sql(
                f"SELECT COUNT(DISTINCT [{col}]) as cnt FROM [{schema}].[{table}]",
                conn
            ).iloc[0]['cnt']
            uniqueness[col] = {
                'distinct_count': int(distinct),
                'is_unique': distinct == row_count
            }

        # Sample rows
        sample = pd.read_sql(
            f"SELECT TOP 5 * FROM [{schema}].[{table}]", conn
        ).to_dict(orient='records')

    return {
        'table': f'{schema}.{table}',
        'row_count': int(row_count),
        'column_count': len(columns_df),
        'columns': columns_df.to_dict(orient='records'),
        'null_analysis': null_checks,
        'key_candidates': uniqueness,
        'sample_rows': sample
    }

Integrating with the Metadata Repository

The output is a dictionary, so it naturally lands in the metadata repository. Once you've interrogated a source table, you know enough to populate your ExtractionConfig accurately: whether to treat the apparent primary key as a watermark column, what the null rates are on columns you were planning to use as join keys, whether the table is actually as large as claimed.

profile = interrogate_table(conn_str, 'dbo', 'CustomerOrders')

# Flag tables that are probably safe for parallel JDBC reads
# (has a numeric key column that's close to sequential)
has_partition_candidate = any(
    v['distinct_count'] > 10000 and v['is_unique']
    for v in profile['key_candidates'].values()
)

print(f"Table: {profile['table']}")
print(f"Rows: {profile['row_count']:,}")
print(f"Parallel read candidate: {has_partition_candidate}")
print("\nNull rates on key columns:")
for col in profile['null_analysis']:
    if col['null_pct'] > 5:
        print(f"  {col['column']}: {col['null_pct']}% null — verify this is expected")

What It Doesn't Replace

The Data Interrogator tells you what the data looks like structurally. It doesn't tell you whether the values are correct, whether the business rules embedded in the source system are being enforced, or whether the "customer ID" in this table refers to the same customer as the "customer ID" in that table. Those questions require domain knowledge and data quality rules.

But it eliminates the first hour of exploratory work on every new source. Run it once at onboarding, store the output in the metadata repository, and you have a baseline profile to compare against when something downstream starts producing unexpected results. As always, I'm here to help.

Read more