Metadata-Driven ADF: Generating Pipelines from a Config Table Instead of Building Them by Hand
The client had forty-three source systems. Forty-three databases, flat files, and APIs that all needed to land in their Azure Data Lake before anything downstream could run. Their current approach: build each ADF pipeline by hand. A senior data engineer had spent three weeks on it and was about a third of the way through. At that rate they'd be done in late spring — for a project that was supposed to go live in six weeks.
This is the moment metadata-driven design stops being a nice architectural principle and becomes a business necessity. Let's dig in.
What Metadata-Driven ADF Actually Means
The concept is simple: instead of building fifty pipelines, you build one generic pipeline and a config table that parameterizes it for each source. The pipeline doesn't know it's extracting from OrdersDB specifically — it knows it's extracting from whatever @pipeline().parameters.SourceTable is pointing at right now.
The config table is the real deliverable. Once it exists, adding a new source is a row insert, not a pipeline build.
The Config Table Schema
CREATE TABLE [meta].[IngestionConfig] (
SourceID INT IDENTITY(1,1) PRIMARY KEY,
SourceName NVARCHAR(100) NOT NULL,
SourceSchema NVARCHAR(100) NOT NULL,
SourceTable NVARCHAR(200) NOT NULL,
ExtractType NVARCHAR(20) NOT NULL, -- FULL | INCREMENTAL
WatermarkColumn NVARCHAR(100) NULL, -- required if INCREMENTAL
WatermarkValue NVARCHAR(100) NULL, -- last successful load mark
TargetContainer NVARCHAR(100) NOT NULL,
TargetPath NVARCHAR(500) NOT NULL,
IsActive BIT NOT NULL DEFAULT 1,
LinkedServiceName NVARCHAR(200) NOT NULL
);The ExtractType column is doing a lot of work here. Full extracts are straightforward — read everything, overwrite the target. Incremental extracts need a watermark: the last timestamp or row ID from the previous successful run. That value lives in WatermarkValue and gets updated at the end of each successful pipeline execution.
The Generic Pipeline Structure
The ForEach-based pattern handles the iteration. A Lookup activity reads all active rows from meta.IngestionConfig, then a ForEach activity passes each row to a nested pipeline. The nested pipeline receives the row as a parameter object and uses it to build the dynamic source and sink configurations.
# Python: generate ADF pipeline JSON from config rows
import json
def build_copy_activity(row: dict) -> dict:
source_query = (
f"SELECT * FROM [{row['SourceSchema']}].[{row['SourceTable']}]"
if row['ExtractType'] == 'FULL'
else (
f"SELECT * FROM [{row['SourceSchema']}].[{row['SourceTable']}] "
f"WHERE [{row['WatermarkColumn']}] > '{row['WatermarkValue']}'"
)
)
return {
"name": f"Copy_{row['SourceName']}",
"type": "Copy",
"inputs": [{
"referenceName": row['LinkedServiceName'],
"type": "LinkedServiceReference"
}],
"typeProperties": {
"source": {
"type": "SqlServerSource",
"sqlReaderQuery": source_query
},
"sink": {
"type": "ParquetSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
}
}
}
}The Watermark Update
This is the part people forget until the second pipeline run returns empty results. After a successful incremental copy, you need to update WatermarkValue to the maximum value of WatermarkColumn from the data that was just loaded. The generic pipeline handles this with a stored procedure activity at the end:
EXEC [meta].[usp_UpdateWatermark]
@SourceID = @{pipeline().parameters.SourceID},
@NewWatermark = @{activity('LookupMaxWatermark').output.firstRow.MaxWatermark}The Gotcha: Template Divergence
Full extracts and incremental extracts look similar until they don't. Files land differently (overwrite vs. append), the sink partition structure is usually different, and error handling needs to account for partial incremental loads in a way that full extracts don't. I tried to handle both in a single generic pipeline early on and ended up with a single pipeline that was trying to be two things at once.
The cleaner pattern: two generic pipelines, one per extract type. The ForEach activity in the master pipeline routes rows to the right child pipeline based on ExtractType. Slightly more pipeline objects, much simpler logic in each one.
That client's forty-three sources were loaded in a day and a half of config table population. Adding source forty-four was a row insert. That's the whole point. As always, I'm here to help.