Python Data Interrogator: Automated Schema Analysis as a First-Class Pipeline Step
The first time you connect to a new data source, you're flying blind. You have a table name, maybe a schema diagram if you're lucky, and a contact at the source team who can answer questions "when they get a chance." You need to know: how many rows, what's the value distribution on key columns, are there nulls where there shouldn't be, are the foreign key relationships actually enforced, and what does a typical row look like.
I got tired of writing the same exploratory queries every single time, so I built a tool to do it for me.
The Data Interrogator
A Data Interrogator is a script that takes a database connection and a table name, and returns a structured profile of that table. Not a profiling framework with a configuration language and a report server — just a Python function that returns a dictionary you can read, log, or load into your metadata repository.
import pyodbc
import pandas as pd
from typing import Any
def interrogate_table(conn_str: str, schema: str, table: str) -> dict[str, Any]:
with pyodbc.connect(conn_str) as conn:
# Basic shape
row_count = pd.read_sql(
f"SELECT COUNT(*) as cnt FROM [{schema}].[{table}]", conn
).iloc[0]['cnt']
# Column profile
columns_df = pd.read_sql(f"""
SELECT
COLUMN_NAME,
DATA_TYPE,
IS_NULLABLE,
CHARACTER_MAXIMUM_LENGTH,
NUMERIC_PRECISION,
NUMERIC_SCALE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = '{schema}' AND TABLE_NAME = '{table}'
ORDER BY ORDINAL_POSITION
""", conn)
# Null analysis
null_checks = []
for col in columns_df['COLUMN_NAME']:
null_count = pd.read_sql(
f"SELECT COUNT(*) as cnt FROM [{schema}].[{table}] WHERE [{col}] IS NULL",
conn
).iloc[0]['cnt']
null_checks.append({
'column': col,
'null_count': int(null_count),
'null_pct': round(null_count / row_count * 100, 2) if row_count > 0 else 0.0
})
# Uniqueness on likely key columns
key_candidates = [
c for c in columns_df['COLUMN_NAME']
if any(kw in c.upper() for kw in ['ID', 'KEY', 'CODE', 'NUM'])
]
uniqueness = {}
for col in key_candidates[:5]: # cap at 5 to avoid runaway queries
distinct = pd.read_sql(
f"SELECT COUNT(DISTINCT [{col}]) as cnt FROM [{schema}].[{table}]",
conn
).iloc[0]['cnt']
uniqueness[col] = {
'distinct_count': int(distinct),
'is_unique': distinct == row_count
}
# Sample rows
sample = pd.read_sql(
f"SELECT TOP 5 * FROM [{schema}].[{table}]", conn
).to_dict(orient='records')
return {
'table': f'{schema}.{table}',
'row_count': int(row_count),
'column_count': len(columns_df),
'columns': columns_df.to_dict(orient='records'),
'null_analysis': null_checks,
'key_candidates': uniqueness,
'sample_rows': sample
}
Integrating with the Metadata Repository
The output is a dictionary, so it naturally lands in the metadata repository. Once you've interrogated a source table, you know enough to populate your ExtractionConfig accurately: whether to treat the apparent primary key as a watermark column, what the null rates are on columns you were planning to use as join keys, whether the table is actually as large as claimed.
profile = interrogate_table(conn_str, 'dbo', 'CustomerOrders')
# Flag tables that are probably safe for parallel JDBC reads
# (has a numeric key column that's close to sequential)
has_partition_candidate = any(
v['distinct_count'] > 10000 and v['is_unique']
for v in profile['key_candidates'].values()
)
print(f"Table: {profile['table']}")
print(f"Rows: {profile['row_count']:,}")
print(f"Parallel read candidate: {has_partition_candidate}")
print("\nNull rates on key columns:")
for col in profile['null_analysis']:
if col['null_pct'] > 5:
print(f" {col['column']}: {col['null_pct']}% null — verify this is expected")
What It Doesn't Replace
The Data Interrogator tells you what the data looks like structurally. It doesn't tell you whether the values are correct, whether the business rules embedded in the source system are being enforced, or whether the "customer ID" in this table refers to the same customer as the "customer ID" in that table. Those questions require domain knowledge and data quality rules.
But it eliminates the first hour of exploratory work on every new source. Run it once at onboarding, store the output in the metadata repository, and you have a baseline profile to compare against when something downstream starts producing unexpected results. As always, I'm here to help.