Earlier versions of Great Expectations had a concept called "validation operators" — a way to bundle a validation run with actions like saving results and building Data Docs. The API worked but was awkward to configure and reason about. Checkpoints replace it with something cleaner: a named, reusable configuration that specifies what to validate, against which suite, and what to do with the results.
If you're on a recent version of Great Expectations and your validation runs live in a mess of inline Python calls, Checkpoints are the refactor you need.
What a Checkpoint Is
A Checkpoint bundles three things:
- A batch request (which data to validate)
- An expectation suite (what rules to validate against)
- A list of actions (what to do after validation — save results, build Data Docs, send an alert)
You define it once, save it to your GE project, and run it by name. The checkpoint is a YAML file in your project that you can version-control alongside your expectation suites.
Defining a Checkpoint
import great_expectations as ge
context = ge.get_context()
# Define a checkpoint programmatically (or write the YAML directly)
checkpoint_config = {
"name": "storm_silver_daily_checkpoint",
"config_version": 1.0,
"class_name": "SimpleCheckpoint",
"validations": [
{
"batch_request": {
"datasource_name": "spark_delta",
"data_connector_name": "runtime_connector",
"data_asset_name": "storm_events_silver",
"batch_identifiers": {"run_id": "daily_run"}
},
"expectation_suite_name": "storm_events_silver"
}
],
"action_list": [
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"}
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"}
}
]
}
context.add_checkpoint(**checkpoint_config)
Running a Checkpoint
To run a checkpoint with fresh data — passing the actual DataFrame at runtime:
# In your daily pipeline job
delta_df = spark.read.format("delta").load("/mnt/datalake/storm_events/silver")
result = context.run_checkpoint(
checkpoint_name="storm_silver_daily_checkpoint",
batch_request={
"runtime_parameters": {"batch_data": delta_df},
"batch_identifiers": {"run_id": f"daily_{run_date}"}
}
)
if not result.success:
raise RuntimeError(
f"Data quality checkpoint failed: "
f"{result.statistics['unsuccessful_expectations']} expectations violated"
)
Multiple Validations in One Checkpoint
A single Checkpoint can validate multiple assets against multiple suites in one run — useful when your pipeline produces several outputs that all need to pass before anything proceeds:
checkpoint_config = {
"name": "pipeline_end_to_end_checkpoint",
"validations": [
{
"batch_request": {"data_asset_name": "storm_events_silver", ...},
"expectation_suite_name": "storm_silver_suite"
},
{
"batch_request": {"data_asset_name": "weather_stations_silver", ...},
"expectation_suite_name": "stations_silver_suite"
},
{
"batch_request": {"data_asset_name": "hail_features_gold", ...},
"expectation_suite_name": "hail_features_suite"
}
],
"action_list": [...]
}
All three validations run. If any one fails, the checkpoint result is a failure. You get a single gate that covers multiple pipeline outputs.
Checkpoints in a Databricks Workflow
In a Databricks Workflow (formerly Jobs), the checkpoint run is a notebook task. Preceding tasks do the transformations; the checkpoint task validates the output; subsequent tasks only run if the checkpoint passes. The quality gate is enforced by the job DAG, not by manual discipline — the same pattern I advocated for with Airflow in 2019, now native in Databricks. As always, I'm here to help.