The Metadata-Driven Data Management Pattern: One Config Table, Every Client Source
The pattern I keep coming back to on client engagements is one that looks the same no matter which cloud, which tooling stack, or which domain. A config table drives everything: what sources exist, how to extract from them, where to land the data, and what transformations to apply. The metadata repository from the earlier post stores the structural facts; this pattern is how you apply them at runtime.
When it works well, adding a new data source is a database operation, not a development task. Here's the full pattern.
The Single Config Query
Every pipeline run starts by reading its own configuration from the metadata repository. Not from environment variables, not from hardcoded notebook parameters, not from a YAML file checked into the repo — from a SQL query against the config tables. The pipeline is parameterized; the config is data.
def load_pipeline_config(env: str, pipeline_name: str) -> list[dict]:
config_query = f"""
SELECT
s.SourceName,
s.SourceType,
s.SecretName,
e.SourceSchema,
e.SourceTable,
e.ExtractType,
e.WatermarkColumn,
e.WatermarkValue,
e.PartitionColumn,
e.PartitionCount,
e.TargetPath,
e.LoadFrequency,
t.NotebookPath,
t.BusinessRulesJSON
FROM meta.ExtractionConfig e
JOIN meta.DataSource s ON e.SourceID = s.SourceID
LEFT JOIN meta.TransformationConfig t ON t.ExtractionID = e.ExtractionID
WHERE e.IsActive = 1
AND s.IsActive = 1
AND e.LoadFrequency = '{pipeline_name}'
ORDER BY s.SourceName
"""
rows = spark.sql(config_query)
return [row.asDict() for row in rows.collect()]
The Generic Extraction Function
from pyspark.sql import DataFrame
import dbutils
def extract_source(config: dict) -> DataFrame:
secret_scope = "pipeline-secrets"
password = dbutils.secrets.get(scope=secret_scope, key=config['SecretName'])
jdbc_url = (
f"jdbc:sqlserver://{config['SourceName']}.database.windows.net:1433;"
f"database={config['SourceSchema']};"
f"encrypt=true;trustServerCertificate=false"
)
extract_opts = {
"url": jdbc_url,
"user": "pipeline_reader",
"password": password,
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
if config['ExtractType'] == 'INCREMENTAL' and config['WatermarkValue']:
extract_opts["query"] = (
f"SELECT * FROM [{config['SourceSchema']}].[{config['SourceTable']}] "
f"WHERE [{config['WatermarkColumn']}] > '{config['WatermarkValue']}'"
)
else:
extract_opts["dbtable"] = f"[{config['SourceSchema']}].[{config['SourceTable']}]"
if config.get('PartitionColumn') and config.get('PartitionCount'):
extract_opts.update({
"partitionColumn": config['PartitionColumn'],
"numPartitions": str(config['PartitionCount']),
"lowerBound": "0",
"upperBound": "9999999"
})
return spark.read.format("jdbc").options(**extract_opts).load()
Applying Business Rules from Config
import json
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
def apply_business_rules(df: DataFrame, rules_json: str) -> DataFrame:
if not rules_json:
return df
rules = json.loads(rules_json)
# Column renames
for old_name, new_name in rules.get('column_renames', {}).items():
df = df.withColumnRenamed(old_name, new_name)
# Column drops
cols_to_drop = rules.get('drop_columns', [])
if cols_to_drop:
df = df.drop(*cols_to_drop)
# Type casts
for col_name, target_type in rules.get('type_casts', {}).items():
df = df.withColumn(col_name, col(col_name).cast(target_type))
return df
The Watermark Update
def update_watermark(extraction_id: int, new_value: str) -> None:
spark.sql(f"""
UPDATE meta.ExtractionConfig
SET WatermarkValue = '{new_value}',
LastSuccessfulRun = current_timestamp()
WHERE ExtractionID = {extraction_id}
""")
The complete pipeline is now: read config → extract → apply rules → write to target → update watermark. When a new source arrives, you insert rows in the config tables and the next pipeline run handles it without any code changes. That's the payoff. As always, I'm here to help.