Chapter 12: Comprehensive Testing Strategies for DLT and Streaming Pipelines
Welcome to Chapter 12 of our journey! In the preceding chapters, we meticulously engineered robust data ingestion pipelines using Kafka, built transformative Delta Live Tables (DLT) for supply chain event processing and tariff analysis, and developed Spark Structured Streaming jobs for real-time logistics cost monitoring. We’ve laid a solid foundation for our real-time supply chain intelligence platform. However, building data pipelines is only half the battle; ensuring their reliability, accuracy, and performance is paramount for any production system.
This chapter is dedicated to establishing comprehensive testing strategies for our Databricks Delta Live Tables and Spark Structured Streaming pipelines. We will explore various testing methodologies, from unit tests for individual transformation logic to integration tests for entire data flows and, critically, data quality checks using DLT’s native expectations. The “why” behind each testing approach will be emphasized, highlighting its role in maintaining data integrity, preventing costly errors, and accelerating development cycles in a data-driven environment.
To effectively follow this chapter, you should have working versions of the DLT pipelines (e.g., supply_chain_events_dlt, tariff_analysis_dlt) and Spark Structured Streaming jobs (e.g., logistics_cost_monitoring_streaming) developed in previous chapters. We will leverage these existing components to demonstrate how to implement and integrate robust testing. By the end of this chapter, you will have a clear understanding of how to build a resilient testing framework that ensures our supply chain intelligence platform delivers accurate, high-quality insights consistently.
Planning & Design: A Multi-Layered Testing Approach
For complex data pipelines like ours, a multi-layered testing strategy is essential. This approach ensures that issues are caught early, reducing the cost and effort of remediation. We’ll categorize our testing into:
- Unit Testing: Focuses on individual functions or small logical units of code (e.g., a PySpark UDF, a SQL transformation snippet). The goal is to verify that each component works correctly in isolation.
- Data Quality Testing (DLT Expectations): Built directly into DLT pipelines, these define expected data characteristics (e.g., non-null values, valid ranges, schema adherence) and enforce them during pipeline execution. They are critical for ensuring data integrity at various stages.
- Integration Testing: Verifies the interaction between different components of a pipeline (e.g., raw data ingestion to bronze, bronze to silver transformations). This involves running a segment of the pipeline with controlled input data and asserting on the transformed output.
- End-to-End Testing: Simulates the entire data flow from source (e.g., Kafka) to final dashboard/report, ensuring all components work together seamlessly. This is typically done in a staging environment.
- Performance Testing: While not extensively covered in code examples, we’ll discuss its importance for streaming pipelines, focusing on latency, throughput, and resource utilization under load.
Component Architecture for Testing:
Our testing framework will involve:
- DLT Pipeline Definitions: Modified to include
dlt.expectstatements. - Python/PySpark Utility Modules: Separate Python files for complex transformations, enabling easier unit testing.
- Test Data Generation: Scripts or fixtures to create realistic, yet controlled, input data for integration tests.
- Test Runner:
pytestwill be our primary test runner for Python-based unit and integration tests. - Databricks Jobs: For executing DLT pipelines and Structured Streaming jobs in a test environment.
File Structure for Testing:
We’ll introduce a tests/ directory at the root of our project to house our testing code.
.
├── dlt_pipelines/
│ ├── __init__.py
│ ├── supply_chain_events_dlt.py
│ └── tariff_analysis_dlt.py
├── streaming_jobs/
│ ├── __init__.py
│ └── logistics_cost_monitoring_streaming.py
├── src/
│ ├── __init__.py
│ └── data_transformation/
│ ├── __init__.py
│ └── utils.py
└── tests/
├── __init__.py
├── conftest.py # Pytest fixtures for SparkSession
├── unit/
│ ├── __init__.py
│ └── test_data_transformation_utils.py
└── integration/
├── __init__.py
├── test_supply_chain_dlt_pipeline.py
└── test_logistics_streaming_job.py
Step-by-Step Implementation
1. Enhance DLT Pipelines with Data Quality Expectations
Data quality is paramount for reliable analytics. DLT’s built-in expectations provide a declarative way to enforce data quality rules directly within your pipeline definitions. If an expectation fails, DLT can drop, fail, or quarantine the offending records, providing immediate feedback and preventing bad data from propagating.
Let’s enhance our supply_chain_events_dlt.py pipeline with expectations.
a) Setup/Configuration
We’ll modify the existing DLT pipeline file. No new files are strictly needed for this step, but we will add more robust logging and error handling.
b) Core Implementation
We’ll add expectations to both the bronze (raw ingestion) and silver (curated) tables.
File: dlt_pipelines/supply_chain_events_dlt.py
# dlt_pipelines/supply_chain_events_dlt.py
import dlt
from pyspark.sql.functions import col, current_timestamp, sha2, concat_ws, lit
# Configuration for Kafka source (assuming these are passed via DLT pipeline config)
KAFKA_BOOTSTRAP_SERVERS = spark.conf.get("pipeline.kafka_bootstrap_servers", "kafka:9092")
KAFKA_TOPIC_SUPPLY_CHAIN_EVENTS = spark.conf.get("pipeline.kafka_topic_supply_chain_events", "supply_chain_events")
KAFKA_STARTING_OFFSETS = spark.conf.get("pipeline.kafka_starting_offsets", "earliest")
# --- BRONZE Layer: Raw Ingestion from Kafka ---
# This table ingests raw JSON data from Kafka, adding metadata.
@dlt.table(
name="supply_chain_events_bronze",
comment="Raw supply chain events from Kafka, including metadata.",
table_properties={"quality": "bronze"}
)
@dlt.expect_or_drop("kafka_value_not_null", "value IS NOT NULL")
@dlt.expect_or_drop("kafka_timestamp_valid", "timestamp IS NOT NULL AND timestamp > '2020-01-01'")
def create_supply_chain_events_bronze():
try:
df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
.option("subscribe", KAFKA_TOPIC_SUPPLY_CHAIN_EVENTS)
.option("startingOffsets", KAFKA_STARTING_OFFSETS)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp")
.withColumn("ingestion_timestamp", current_timestamp())
)
dlt.info(f"Successfully configured Kafka stream for {KAFKA_TOPIC_SUPPLY_CHAIN_EVENTS}")
return df
except Exception as e:
dlt.error(f"Error configuring Kafka stream for {KAFKA_TOPIC_SUPPLY_CHAIN_EVENTS}: {e}")
raise # Re-raise to fail the pipeline if stream setup fails
# --- SILVER Layer: Curated Supply Chain Events ---
# Parses raw JSON, extracts key fields, and performs basic data cleaning and enrichment.
@dlt.table(
name="supply_chain_events_silver",
comment="Curated supply chain events with parsed JSON and basic cleaning.",
table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("event_id_not_null", "event_id IS NOT NULL")
@dlt.expect_or_drop("event_type_valid", "event_type IN ('ORDER_CREATED', 'SHIPMENT_UPDATE', 'INVENTORY_CHANGE', 'DELIVERY_CONFIRMED')")
@dlt.expect_or_fail("event_timestamp_recent", "event_timestamp >= current_timestamp() - INTERVAL '30 days'") # Fail pipeline if critical old data ingested
@dlt.expect_or_drop("product_id_not_null", "product_id IS NOT NULL")
def create_supply_chain_events_silver():
bronze_df = dlt.read_stream("supply_chain_events_bronze")
# Safely parse JSON payload
parsed_df = bronze_df.withColumn("parsed_value", from_json(col("value"), schema_of_json(lit("""
{
"eventId": "uuid",
"eventType": "string",
"eventTimestamp": "timestamp",
"payload": {
"orderId": "string",
"productId": "string",
"quantity": "integer",
"location": "string",
"status": "string",
"estimatedDelivery": "timestamp",
"actualDelivery": "timestamp",
"carrier": "string",
"trackingId": "string"
}
}
"""))))) \
.select(
col("parsed_value.eventId").alias("event_id"),
col("parsed_value.eventType").alias("event_type"),
col("parsed_value.eventTimestamp").alias("event_timestamp"),
col("parsed_value.payload.orderId").alias("order_id"),
col("parsed_value.payload.productId").alias("product_id"),
col("parsed_value.payload.quantity").alias("quantity"),
col("parsed_value.payload.location").alias("location"),
col("parsed_value.payload.status").alias("status"),
col("parsed_value.payload.estimatedDelivery").alias("estimated_delivery"),
col("parsed_value.payload.actualDelivery").alias("actual_delivery"),
col("parsed_value.payload.carrier").alias("carrier"),
col("parsed_value.payload.trackingId").alias("tracking_id"),
col("ingestion_timestamp")
)
# Generate a unique hash for each event for idempotency and tracking changes
enriched_df = parsed_df.withColumn(
"event_hash",
sha2(concat_ws("_", col("event_id"), col("event_type"), col("event_timestamp")), 256)
)
dlt.info("Successfully transformed bronze to silver supply chain events.")
return enriched_df
# ... (Other DLT tables like tariff_analysis_dlt.py would also get expectations)
Explanation:
- We’ve added
@dlt.expect_or_dropand@dlt.expect_or_faildecorators to our DLT table definitions. @dlt.expect_or_drop("kafka_value_not_null", "value IS NOT NULL"): For the bronze layer, this ensures that the raw Kafka messagevalueis not null. If it is, the record is dropped, preventing malformed data from entering.@dlt.expect_or_drop("event_id_not_null", "event_id IS NOT NULL"): In the silver layer, we expect a parsedevent_id. If parsing fails or the field is missing, the record is dropped.@dlt.expect_or_fail("event_timestamp_recent", "event_timestamp >= current_timestamp() - INTERVAL '30 days'"): This is a critical expectation. If a significant amount of data older than 30 days somehow enters the pipeline, it indicates a major issue (e.g., source system clock drift, reprocessing old data incorrectly). Failing the pipeline forces immediate investigation.dlt.info()anddlt.error()are used for structured logging within the DLT pipeline, which is crucial for monitoring and debugging in production.- The
from_jsonschema inference is made more robust by providing a sample JSON string toschema_of_jsonfor better schema stability.
c) Testing This Component
To test DLT expectations, you simply run the DLT pipeline in a development environment.
Deploy and Run DLT Pipeline:
- Navigate to your Databricks workspace.
- Go to “Workflows” -> “Delta Live Tables”.
- Create or update your
Supply Chain Events DLT Pipelineconfiguration, pointing to the modifieddlt_pipelines/supply_chain_events_dlt.pyfile. - Ensure your Kafka configuration is correctly passed (e.g., as pipeline settings in DLT UI or
spark.conf.setin a previous setup notebook). - Start the pipeline.
Observe Expectations:
- As the pipeline runs, monitor the DLT UI. It provides a visual representation of how many records passed, dropped, or failed due to expectations.
- To simulate a failure, you could temporarily send a Kafka message with a
nullvalue forevent_idor anevent_timestampfar in the past. You’d see theevent_id_not_nullexpectation drop the record, orevent_timestamp_recentfail the pipeline.
Debugging Tips:
- Check the “Event Log” in the DLT UI for detailed messages about expectation failures, including which records were affected.
- Query the
_delta_logfor the DLT tables to inspect dropped or quarantined records (if you configured a quarantine table). - Use
dlt.info()anddlt.error()statements within your DLT code to output custom messages to the DLT event log.
2. Unit Testing PySpark Transformations with pytest
While DLT expectations handle data quality, complex transformation logic often resides in Python functions that can be tested independently. This improves code quality, reusability, and maintainability.
a) Setup/Configuration
We’ll create a utility file for a common transformation and a corresponding pytest setup.
Dependencies:
Ensure pytest and pyspark are installed in your development environment:
pip install pytest pyspark
File: src/data_transformation/utils.py
# src/data_transformation/utils.py
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, when, lit
def clean_and_standardize_location(df: DataFrame, location_col: str = "location") -> DataFrame:
"""
Cleans and standardizes location strings in a DataFrame.
- Trims whitespace.
- Converts to uppercase.
- Replaces common abbreviations.
- Handles nulls by replacing with 'UNKNOWN'.
"""
if location_col not in df.columns:
# Log a warning or raise an error if the column doesn't exist
print(f"Warning: Column '{location_col}' not found in DataFrame for standardization.")
return df # Return original DF or handle as appropriate
return df.withColumn(
location_col,
when(col(location_col).isNull(), lit("UNKNOWN"))
.otherwise(
upper(trim(
regexp_replace(
regexp_replace(
regexp_replace(col(location_col), "NY", "NEW YORK"),
"CA", "CALIFORNIA"
),
"TX", "TEXAS"
)
))
)
)
def calculate_delivery_delay(df: DataFrame, actual_col: str = "actual_delivery", estimated_col: str = "estimated_delivery") -> DataFrame:
"""
Calculates delivery delay in days.
Returns 0 if actual_delivery is before or same as estimated_delivery,
or difference in days if actual_delivery is after estimated_delivery.
Returns NULL if either date is NULL.
"""
if actual_col not in df.columns or estimated_col not in df.columns:
print(f"Error: Missing columns '{actual_col}' or '{estimated_col}' for delay calculation.")
return df # Return original DF or handle as appropriate
return df.withColumn(
"delivery_delay_days",
when(
col(actual_col).isNotNull() & col(estimated_col).isNotNull(),
when(
datediff(col(actual_col), col(estimated_col)) > 0,
datediff(col(actual_col), col(estimated_col))
).otherwise(lit(0))
).otherwise(lit(None).cast("integer"))
)
File: tests/conftest.py (for a reusable SparkSession fixture)
# tests/conftest.py
import pytest
from pyspark.sql import SparkSession
@pytest.fixture(scope="session")
def spark_session():
"""
Provides a SparkSession for testing.
Uses a local Spark instance for fast unit tests.
"""
spark = (
SparkSession.builder.appName("PySparkUnitTests")
.master("local[*]") # Run Spark locally with all available cores
.config("spark.sql.shuffle.partitions", "1") # Optimize for local testing
.config("spark.default.parallelism", "1")
.config("spark.sql.session.timeZone", "UTC") # Consistent timezone
.getOrCreate()
)
yield spark
spark.stop()
b) Core Implementation
Now, let’s write unit tests for the functions in src/data_transformation/utils.py.
File: tests/unit/test_data_transformation_utils.py
# tests/unit/test_data_transformation_utils.py
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
from datetime import datetime
# Import the functions to be tested
from src.data_transformation.utils import clean_and_standardize_location, calculate_delivery_delay
def test_clean_and_standardize_location(spark_session: SparkSession):
"""
Tests the clean_and_standardize_location function.
"""
schema = StructType([StructField("location", StringType(), True)])
data = [
(" new york ",),
("ca",),
("texas",),
(None,),
("paris",),
("NY",)
]
input_df = spark_session.createDataFrame(data, schema)
expected_data = [
("NEW YORK",),
("CALIFORNIA",),
("TEXAS",),
("UNKNOWN",),
("PARIS",),
("NEW YORK",)
]
expected_df = spark_session.createDataFrame(expected_data, schema)
result_df = clean_and_standardize_location(input_df)
# Collect and compare results
assert result_df.count() == expected_df.count()
assert result_df.exceptAll(expected_df).count() == 0
assert expected_df.exceptAll(result_df).count() == 0
def test_calculate_delivery_delay(spark_session: SparkSession):
"""
Tests the calculate_delivery_delay function.
"""
schema = StructType([
StructField("order_id", StringType(), False),
StructField("actual_delivery", TimestampType(), True),
StructField("estimated_delivery", TimestampType(), True)
])
data = [
("ORD001", datetime(2025, 1, 10, 10, 0, 0), datetime(2025, 1, 8, 10, 0, 0)), # Delayed by 2 days
("ORD002", datetime(2025, 1, 5, 10, 0, 0), datetime(2025, 1, 5, 10, 0, 0)), # On time
("ORD003", datetime(2025, 1, 3, 10, 0, 0), datetime(2025, 1, 7, 10, 0, 0)), # Early (should be 0 delay)
("ORD004", None, datetime(2025, 1, 10, 10, 0, 0)), # Null actual
("ORD005", datetime(2025, 1, 10, 10, 0, 0), None), # Null estimated
("ORD006", None, None) # Both null
]
input_df = spark_session.createDataFrame(data, schema)
expected_schema = StructType([
StructField("order_id", StringType(), False),
StructField("actual_delivery", TimestampType(), True),
StructField("estimated_delivery", TimestampType(), True),
StructField("delivery_delay_days", IntegerType(), True)
])
expected_data = [
("ORD001", datetime(2025, 1, 10, 10, 0, 0), datetime(2025, 1, 8, 10, 0, 0), 2),
("ORD002", datetime(2025, 1, 5, 10, 0, 0), datetime(2025, 1, 5, 10, 0, 0), 0),
("ORD003", datetime(2025, 1, 3, 10, 0, 0), datetime(2025, 1, 7, 10, 0, 0), 0),
("ORD004", None, datetime(2025, 1, 10, 10, 0, 0), None),
("ORD005", datetime(2025, 1, 10, 10, 0, 0), None, None),
("ORD006", None, None, None)
]
expected_df = spark_session.createDataFrame(expected_data, expected_schema)
result_df = calculate_delivery_delay(input_df)
assert result_df.count() == expected_df.count()
# Sort both DFs to ensure consistent comparison order for exceptAll
result_df_sorted = result_df.orderBy("order_id")
expected_df_sorted = expected_df.orderBy("order_id")
assert result_df_sorted.exceptAll(expected_df_sorted).count() == 0
assert expected_df_sorted.exceptAll(result_df_sorted).count() == 0
def test_calculate_delivery_delay_missing_columns(spark_session: SparkSession, capsys):
"""
Tests calculate_delivery_delay when expected columns are missing.
"""
schema = StructType([StructField("order_id", StringType(), False)])
input_df = spark_session.createDataFrame([("ORD001",)], schema)
# The function should return the original DataFrame and print an error
result_df = calculate_delivery_delay(input_df)
assert result_df.schema == input_df.schema
assert result_df.collect() == input_df.collect()
captured = capsys.readouterr()
assert "Error: Missing columns 'actual_delivery' or 'estimated_delivery' for delay calculation." in captured.out
Explanation:
tests/conftest.py: This file provides aspark_sessionfixture that initializes a local SparkSession once per test session. This is efficient as it avoids creating a new SparkSession for every test.test_clean_and_standardize_location:- Creates a sample
input_dfwith various location strings, including nulls and abbreviations. - Defines an
expected_dfwith the expected standardized values. - Calls the
clean_and_standardize_locationfunction. - Uses
exceptAll()to compare the content of the resulting DataFrame with the expected DataFrame.exceptAll()returns rows present in one DataFrame but not the other. If bothexceptAllcalls return 0, the DataFrames are identical.
- Creates a sample
test_calculate_delivery_delay: Similar structure, testing date calculations and null handling.test_calculate_delivery_delay_missing_columns: Demonstrates testing error handling. We usecapsys(apytestfixture) to capturestdout/stderrand assert that the expected warning message is printed.
c) Testing This Component
To run these unit tests:
- Open your terminal in the project’s root directory.
- Execute
pytest tests/unit/.
You should see output similar to this:
============================= test session starts ==============================
platform linux -- Python 3.10.12, pytest-8.0.0, pluggy-1.4.0
rootdir: /path/to/your/project
plugins: anyio-4.3.0
collected 3 items
tests/unit/test_data_transformation_utils.py ... [100%]
============================== 3 passed in 5.XXs ===============================
Debugging Tips:
- If a test fails,
pytestwill show a detailed traceback. - Use
result_df.show()orresult_df.collect()within your test functions (temporarily) to inspect the actual DataFrame content when debugging. - Ensure your
expected_dfexactly matches the schema and data types, as Spark is strict.
3. Integration Testing for Streaming Pipelines
Testing streaming pipelines requires a slightly different approach due to their continuous nature. We need to simulate data arrival and verify the output over time. For this, we’ll use a temporary Delta table as a “mock” Kafka source and another temporary Delta table as a sink.
Let’s consider our logistics_cost_monitoring_streaming.py job.
a) Setup/Configuration
We’ll assume the streaming job looks something like this (simplified):
File: streaming_jobs/logistics_cost_monitoring_streaming.py
# streaming_jobs/logistics_cost_monitoring_streaming.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, current_timestamp, sum, window, avg, round, count
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, IntegerType
# Define schema for incoming logistics events (simplified for example)
logistics_event_schema = StructType([
StructField("trackingId", StringType(), True),
StructField("eventTimestamp", TimestampType(), True),
StructField("eventType", StringType(), True), # e.g., "FUEL_PURCHASE", "TOLL_CHARGE", "MAINTENANCE"
StructField("cost", DoubleType(), True),
StructField("currency", StringType(), True),
StructField("routeId", StringType(), True)
])
def process_logistics_costs_stream(
spark: SparkSession,
input_topic_or_path: str,
output_table_path: str,
checkpoint_location: str,
source_format: str = "kafka", # Can be 'kafka' or 'delta' for testing
trigger_interval: str = "1 minute" # Or "availableNow" for batch-like processing
):
"""
Processes a stream of logistics events, aggregates costs by route and time window.
"""
print(f"Starting logistics cost monitoring stream from {input_topic_or_path}...")
if source_format == "kafka":
read_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", spark.conf.get("pipeline.kafka_bootstrap_servers", "kafka:9092"))
.option("subscribe", input_topic_or_path)
.option("startingOffsets", spark.conf.get("pipeline.kafka_starting_offsets", "earliest"))
.load()
.selectExpr("CAST(value AS STRING) as json_payload", "timestamp")
)
elif source_format == "delta":
# For testing, read from a Delta table
read_stream = (
spark.readStream
.format("delta")
.load(input_topic_or_path)
.selectExpr("CAST(value AS STRING) as json_payload", "timestamp") # Assume 'value' and 'timestamp' cols for compatibility
)
else:
raise ValueError("Invalid source_format. Must be 'kafka' or 'delta'.")
parsed_stream = read_stream.withColumn("data", from_json(col("json_payload"), logistics_event_schema)) \
.select(
col("data.trackingId").alias("tracking_id"),
col("data.eventTimestamp").alias("event_timestamp"),
col("data.eventType").alias("event_type"),
col("data.cost").alias("cost"),
col("data.currency").alias("currency"),
col("data.routeId").alias("route_id"),
col("timestamp").alias("kafka_ingestion_timestamp") # Kafka timestamp for ordering
) \
.filter(col("cost").isNotNull() & col("event_timestamp").isNotNull() & col("route_id").isNotNull())
# Aggregate costs by route and 1-hour window
aggregated_costs = parsed_stream \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(
window(col("event_timestamp"), "1 hour", "30 minutes"), # 1-hour window, sliding every 30 mins
col("route_id")
) \
.agg(
sum("cost").alias("total_window_cost"),
avg("cost").alias("avg_event_cost"),
count("*").alias("event_count_in_window")
) \
.withColumn("processing_timestamp", current_timestamp())
# Write to Delta table
query = (
aggregated_costs.writeStream
.format("delta")
.outputMode("append") # Use append mode for aggregates
.option("checkpointLocation", checkpoint_location)
.option("mergeSchema", "true") # Allow schema evolution
.trigger(processingTime=trigger_interval) # Or availableNow=True for testing
.toTable(output_table_path) # Use toTable for Unity Catalog or managed table
)
# In a production setup, you would return the query and manage it.
# For a job, it would typically block here with query.awaitTermination()
# For testing, we start and return the query to allow control.
print(f"Logistics cost monitoring stream started, writing to {output_table_path}")
return query
if __name__ == "__main__":
# Example of how to run this as a Databricks Job
spark = SparkSession.builder.appName("LogisticsCostMonitor").getOrCreate()
# These would typically come from job parameters or environment variables
kafka_servers = "your_kafka_servers:9092"
input_topic = "logistics_events"
output_table_name = "logistics_cost_metrics_gold"
checkpoint_loc = "/mnt/data/checkpoints/logistics_cost_monitor"
# Configure Spark for Kafka (if running directly without DLT)
# spark.conf.set("spark.sql.streaming.kafka.maxOffsetsPerTrigger", "1000")
# Call the processing function
# process_logistics_costs_stream(
# spark,
# input_topic,
# output_table_name,
# checkpoint_loc,
# source_format="kafka",
# trigger_interval="1 minute"
# )
# spark.streams.awaitAnyTermination() # Block indefinitely
b) Core Implementation
Now, we’ll write an integration test for this streaming job. We’ll use temporary Delta tables for input and output.
File: tests/integration/test_logistics_streaming_job.py
# tests/integration/test_logistics_streaming_job.py
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from datetime import datetime, timedelta
import os
import shutil
import time
# Import the streaming job function
from streaming_jobs.logistics_cost_monitoring_streaming import process_logistics_costs_stream
@pytest.fixture(scope="module")
def spark_session_integration():
"""
Provides a SparkSession for integration testing.
Uses a local Spark instance with Delta Lake support.
"""
spark = (
SparkSession.builder.appName("PySparkStreamingIntegrationTests")
.master("local[*]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.shuffle.partitions", "1")
.config("spark.default.parallelism", "1")
.config("spark.sql.session.timeZone", "UTC")
.getOrCreate()
)
yield spark
spark.stop()
@pytest.fixture(scope="function")
def temp_delta_paths(tmp_path):
"""
Provides temporary paths for Delta input, output, and checkpoint locations.
Cleans up after each test function.
"""
input_path = str(tmp_path / "input_delta_stream")
output_path = str(tmp_path / "output_delta_stream")
checkpoint_path = str(tmp_path / "checkpoint")
# Ensure directories are clean
for path in [input_path, output_path, checkpoint_path]:
if os.path.exists(path):
shutil.rmtree(path)
yield input_path, output_path, checkpoint_path
# Cleanup after test
for path in [input_path, output_path, checkpoint_path]:
if os.path.exists(path):
shutil.rmtree(path)
def test_logistics_cost_streaming_pipeline(spark_session_integration: SparkSession, temp_delta_paths):
"""
Tests the logistics cost monitoring streaming pipeline end-to-end using Delta tables
as source and sink.
"""
input_path, output_path, checkpoint_path = temp_delta_paths
# 1. Define input schema for the "mock Kafka" Delta table
input_schema = StructType([
StructField("value", StringType(), True), # Raw JSON payload
StructField("timestamp", TimestampType(), True) # Kafka timestamp
])
# 2. Prepare initial input data (simulating Kafka messages)
# These timestamps are crucial for windowing and watermarking
event_time_1 = datetime(2025, 1, 1, 10, 0, 0)
event_time_2 = datetime(2025, 1, 1, 10, 15, 0)
event_time_3 = datetime(2025, 1, 1, 10, 45, 0)
event_time_4 = datetime(2025, 1, 1, 11, 10, 0)
event_time_5 = datetime(2025, 1, 1, 11, 40, 0) # In second window, different route
event_time_6 = datetime(2025, 1, 1, 10, 50, 0) # Another event for first window
input_data_batch1 = [
('{"trackingId": "T1", "eventTimestamp": "2025-01-01T10:00:00Z", "eventType": "FUEL", "cost": 100.0, "currency": "USD", "routeId": "R1"}', event_time_1),
('{"trackingId": "T2", "eventTimestamp": "2025-01-01T10:15:00Z", "eventType": "TOLL", "cost": 10.0, "currency": "USD", "routeId": "R1"}', event_time_2),
('{"trackingId": "T3", "eventTimestamp": "2025-01-01T10:45:00Z", "eventType": "MAINT", "cost": 50.0, "currency": "USD", "routeId": "R1"}', event_time_3),
]
input_df_batch1 = spark_session_integration.createDataFrame(input_data_batch1, input_schema)
input_df_batch1.write.format("delta").mode("overwrite").save(input_path)
# 3. Start the streaming job reading from the Delta source
# Use "availableNow" trigger for batch-like processing in tests
streaming_query = process_logistics_costs_stream(
spark_session_integration,
input_path,
output_path,
checkpoint_path,
source_format="delta",
trigger_interval="availableNow" # Process all available data once
)
# 4. Wait for the streaming query to process the first batch
streaming_query.awaitTermination(timeout=60) # Wait up to 60 seconds
assert not streaming_query.isActive # Should be terminated if availableNow
# 5. Read the output and assert on the results for batch 1
output_df_batch1 = spark_session_integration.read.format("delta").load(output_path)
output_df_batch1.show(truncate=False)
# Expected output for the first window (10:00-11:00, sliding every 30 mins -> window 10:00-11:00)
# The events at 10:00, 10:15, 10:45, 10:50 should fall into the first window.
# Total cost for R1 in 10:00-11:00 window should be 100 + 10 + 50 = 160
# Add event_time_6 to batch 2 and check after second run.
expected_schema_output = StructType([
StructField("window", StructType([
StructField("start", TimestampType(), True),
StructField("end", TimestampType(), True)
]), True),
StructField("route_id", StringType(), True),
StructField("total_window_cost", DoubleType(), True),
StructField("avg_event_cost", DoubleType(), True),
StructField("event_count_in_window", IntegerType(), True),
StructField("processing_timestamp", TimestampType(), True) # This will vary, so we'll ignore it
])
# Expected results after first batch
# Window (10:00-11:00) for R1
expected_results_batch1 = [
(datetime(2025, 1, 1, 10, 0, 0), datetime(2025, 1, 1, 11, 0, 0), "R1", 160.0, 160.0/3, 3) # Sum of 100, 10, 50
]
# Create a DataFrame for expected results (excluding processing_timestamp)
# Note: Spark's window object is complex to construct directly in data, so we'll compare by value
# Assertions for Batch 1 (simplified to check key values)
assert output_df_batch1.count() == 1
first_row = output_df_batch1.collect()[0]
assert first_row["route_id"] == "R1"
assert first_row["total_window_cost"] == pytest.approx(160.0)
assert first_row["event_count_in_window"] == 3
assert first_row["window"]["start"] == datetime(2025, 1, 1, 10, 0, 0)
assert first_row["window"]["end"] == datetime(2025, 1, 1, 11, 0, 0)
# 6. Append more data to the input Delta table to simulate next Kafka messages
input_data_batch2 = [
('{"trackingId": "T4", "eventTimestamp": "2025-01-01T11:10:00Z", "eventType": "FUEL", "cost": 200.0, "currency": "USD", "routeId": "R1"}', event_time_4),
('{"trackingId": "T5", "eventTimestamp": "2025-01-01T11:40:00Z", "eventType": "TOLL", "cost": 20.0, "currency": "USD", "routeId": "R2"}', event_time_5),
('{"trackingId": "T6", "eventTimestamp": "2025-01-01T10:50:00Z", "eventType": "MAINT", "cost": 30.0, "currency": "USD", "routeId": "R1"}', event_time_6), # Late arriving event for first window
]
input_df_batch2 = spark_session_integration.createDataFrame(input_data_batch2, input_schema)
input_df_batch2.write.format("delta").mode("append").save(input_path)
# 7. Restart the streaming query (or trigger a new micro-batch)
# For "availableNow" trigger, we need to restart the query to pick up new data
streaming_query_2 = process_logistics_costs_stream(
spark_session_integration,
input_path,
output_path,
checkpoint_path,
source_format="delta",
trigger_interval="availableNow"
)
streaming_query_2.awaitTermination(timeout=60)
assert not streaming_query_2.isActive
# 8. Read the output again and assert on the updated results
output_df_batch2 = spark_session_integration.read.format("delta").load(output_path)
output_df_batch2.show(truncate=False)
# Expected:
# - Original window R1 (10:00-11:00) should be updated by event_time_6 (late arrival)
# New total: 160 + 30 = 190, count = 4
# - New window R1 (10:30-11:30) for event_time_4 (11:10) and event_time_6 (10:50)
# Total for window 10:30-11:30 for R1: 50 (from 10:45) + 200 (from 11:10) + 30 (from 10:50) = 280, count = 3
# - New window R2 (11:30-12:30) for event_time_5 (11:40)
# Total for window 11:30-12:30 for R2: 20, count = 1
# Due to `append` outputMode and watermark, Spark Structured Streaming will output
# *new* aggregates for newly completed windows, and *updates* for existing windows
# if late data arrives within the watermark.
# The `outputMode("append")` means each new window's final result is appended.
# If a late event updates an *already outputted* window, the behavior depends on the sink.
# For Delta, it might write a new version of the aggregate, which needs MERGE to deduplicate.
# For simplicity, we expect new rows for new windows and updated counts,
# and will check the overall state of the output table.
# Let's simplify the assertion for `append` mode and check the final state.
# The watermarking and windowing logic will produce multiple rows for overlapping windows.
# We should expect total rows for unique window/route_id combinations.
# A more robust test would merge the output into a target table and assert on that.
# For `append` mode, we'll get new rows for each window as it's completed.
# Expected final state (after processing all data and considering windows)
# Window start, Window end, Route, Total Cost, Avg Cost, Event Count
expected_final_data = [
# Original window for R1 (10:00-11:00), updated by late event T6 (10:50)
(datetime(2025, 1, 1, 10, 0, 0), datetime(2025, 1, 1, 11, 0, 0), "R1", 190.0, 190.0/4, 4), # 100+10+50+30
# New window for R1 (10:30-11:30)
(datetime(2025, 1, 1, 10, 30, 0), datetime(2025, 1, 1, 11, 30, 0), "R1", 280.0, 280.0/3, 3), # T3(50) + T4(200) + T6(30)
# New window for R2 (11:30-12:30)
(datetime(2025, 1, 1, 11, 30, 0), datetime(2025, 1, 1, 12, 30, 0), "R2", 20.0, 20.0/1, 1), # T5(20)
]
# Convert output_df_batch2 to a format comparable with expected_final_data
# Extract window start/end, route_id, total_window_cost, event_count_in_window
# and round averages for comparison.
actual_results = output_df_batch2.select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
col("route_id"),
col("total_window_cost"),
round(col("avg_event_cost"), 2).alias("avg_event_cost"),
col("event_count_in_window")
).orderBy("window_start", "route_id").collect()
# Prepare expected results for comparison
expected_comparable_data = []
for ws, we, route, total, avg, count in expected_final_data:
expected_comparable_data.append(
(ws, we, route, total, round(avg, 2), count)
)
expected_comparable_df = spark_session_integration.createDataFrame(
expected_comparable_data,
schema=[
StructField("window_start", TimestampType(), True),
StructField("window_end", TimestampType(), True),
StructField("route_id", StringType(), True),
StructField("total_window_cost", DoubleType(), True),
StructField("avg_event_cost", DoubleType(), True),
StructField("event_count_in_window", IntegerType(), True)
]
).orderBy("window_start", "route_id").collect()
# Compare collected rows
assert len(actual_results) == len(expected_comparable_df)
for actual_row, expected_row in zip(actual_results, expected_comparable_df):
assert actual_row["window_start"] == expected_row["window_start"]
assert actual_row["window_end"] == expected_row["window_end"]
assert actual_row["route_id"] == expected_row["route_id"]
assert actual_row["total_window_cost"] == pytest.approx(expected_row["total_window_cost"])
assert actual_row["avg_event_cost"] == pytest.approx(expected_row["avg_event_cost"])
assert actual_row["event_count_in_window"] == expected_row["event_count_in_window"]
# Stop the query if it's still active (shouldn't be with availableNow)
if streaming_query.isActive:
streaming_query.stop()
if streaming_query_2.isActive:
streaming_query_2.stop()
Explanation:
spark_session_integrationfixture: Similar to the unit test fixture, but configured with Delta Lake extensions, as we are reading/writing Delta tables.temp_delta_pathsfixture: Creates temporary directories for input, output, and checkpoint locations, ensuring a clean slate for each test.test_logistics_cost_streaming_pipeline:- Input Data Preparation: We create a
DataFramerepresenting a batch of Kafka messages and write it to a temporary Delta table (input_path). Thevaluecolumn contains the raw JSON, andtimestampsimulates the Kafka ingestion timestamp. - Start Streaming Query: We call
process_logistics_costs_stream, passing theinput_pathas the source and specifyingsource_format="delta"andtrigger_interval="availableNow".availableNowis crucial for testing, as it makes the stream process all available data as a single micro-batch and then terminate, allowing for deterministic assertions. - Wait for Termination:
streaming_query.awaitTermination(timeout=60)ensures the test waits until the stream has processed the data. - Assert Batch 1: We read the output Delta table (
output_path) and assert that the aggregated results match our expectations for the first batch of data. We specifically check window boundaries, route IDs, total costs, and event counts. - Append More Data: To simulate continuous streaming, we append a second batch of data to the
input_pathDelta table. This includes a late-arriving event to test watermarking behavior. - Restart Query: Since
availableNowtriggers terminate, we re-invokeprocess_logistics_costs_streamto process the newly appended data. - Assert Batch 2: We read the output again and verify the updated aggregations, including how late-arriving data affects previously emitted windows (within the watermark). The assertions are refined to handle the nature of
appendoutput mode with windowing.
- Input Data Preparation: We create a
c) Testing This Component
To run these integration tests:
- Open your terminal in the project’s root directory.
- Execute
pytest tests/integration/.
You should see output similar to this:
============================= test session starts ==============================
platform linux -- Python 3.10.12, pytest-8.0.0, pluggy-1.4.0
rootdir: /path/to/your/project
plugins: anyio-4.3.0
collected 1 item
tests/integration/test_logistics_streaming_job.py . [100%]
============================== 1 passed in 15.XXs ==============================
Debugging Tips:
- Checkpoint Location: If a test fails and you re-run it, ensure the
checkpoint_pathis truly clean between runs. Thetmp_pathfixture andshutil.rmtreein ourtemp_delta_pathsfixture handle this automatically. - Timezones: Be explicit with timezones (e.g.,
UTC) in both your SparkSession configuration and when creatingdatetimeobjects to avoid inconsistencies. - Watermarking: Understanding how watermarks and
outputMode("append")interact with late data is crucial. If your test data includes late events, carefully calculate the expected output based on your watermark definition. show()andcollect(): Temporarily addinput_df.show(),parsed_stream.show(),aggregated_costs.show(),output_df.show(truncate=False)within your test or the streaming function to inspect intermediate DataFrames and understand data flow.
Production Considerations
CI/CD Integration:
- Automate test execution (unit, integration) as part of your CI/CD pipeline (e.g., GitHub Actions, Azure DevOps, GitLab CI).
- For Databricks-specific testing, consider using Databricks Asset Bundles (DABs) or Databricks Repos with a custom
pytestenvironment on a job cluster. DABs provide a structured way to define, deploy, and manage your Databricks resources, including test jobs. - DLT pipelines can be triggered in a test environment (e.g.,
DEVworkspace) with specific configurations (smaller clusters, test data sources). - Best Practice: Run DLT pipelines with
developmentmode for faster iteration during testing, then switch toproductionmode for actual deployments to benefit from enhanced monitoring and recovery.
Test Data Management:
- Volume: For integration and end-to-end tests, use representative but smaller datasets to keep test runtimes reasonable.
- Variety: Include edge cases, nulls, malformed records, and late-arriving data to thoroughly test error handling and data quality rules.
- Anonymization: If using subsets of production data, ensure it’s anonymized or synthetic to comply with data privacy regulations.
- Versioning: Keep test data versioned alongside your code.
Monitoring Test Results:
- Integrate test results into your CI/CD dashboard.
- For DLT, monitor the DLT UI for expectation failures. Configure alerts (e.g., via Databricks webhooks or
_delta_logqueries) for critical expectation failures in production.
Performance Testing:
- For streaming pipelines, performance testing is crucial to ensure they can handle expected data volumes and latency requirements.
- Use tools like Apache JMeter or custom load generators to simulate high-volume Kafka traffic.
- Monitor Spark metrics (e.g., processing time, backlog, CPU/memory usage) and Kafka consumer lag.
- Test different cluster configurations and autoscaling settings to find the optimal balance between cost and performance.
- DLT Serverless: For DLT pipelines, leverage DLT Serverless (if available in your region) to abstract away cluster management and automatically optimize performance.
Security Considerations:
- Ensure your test environments have appropriate access controls, separated from production.
- Use service principals or managed identities for CI/CD pipelines to access Databricks and other cloud resources, following the principle of least privilege.
- Never hardcode sensitive credentials in test data or code. Use Databricks Secrets.
Code Review Checkpoint
At this point, we have significantly enhanced the robustness of our data pipelines by:
- Adding DLT Expectations: Implemented
dlt.expect_or_dropanddlt.expect_or_failindlt_pipelines/supply_chain_events_dlt.pyto enforce data quality rules and provide immediate feedback on data issues. - Establishing PySpark Unit Testing: Created
src/data_transformation/utils.pyfor reusable PySpark logic andtests/unit/test_data_transformation_utils.pywithpytestand a local SparkSession to test these functions in isolation. - Implementing Streaming Integration Testing: Developed
tests/integration/test_logistics_streaming_job.pyto simulate a real-time streaming scenario forstreaming_jobs/logistics_cost_monitoring_streaming.pyusing temporary Delta tables as mock sources and sinks, verifying end-to-end data flow and transformations.
Files Created/Modified:
dlt_pipelines/supply_chain_events_dlt.py(Modified)src/data_transformation/utils.py(New)tests/conftest.py(New)tests/unit/test_data_transformation_utils.py(New)tests/integration/test_logistics_streaming_job.py(New)streaming_jobs/logistics_cost_monitoring_streaming.py(Modified for testability)
These testing components integrate seamlessly with our existing project structure, providing a safety net that catches errors early and ensures the reliability of our real-time supply chain intelligence.
Common Issues & Solutions
Issue:
Py4JJavaErrorduring PySpark unit tests.- Symptom: Tests fail with Java exceptions, often related to SparkContext or missing classes.
- Cause: Incorrect SparkSession setup, missing PySpark dependencies, or environment issues.
- Solution:
- Ensure
pysparkis correctly installed and accessible in your Python environment. - Verify your
spark_sessionfixture inconftest.pyis correctly configured (e.g.,master("local[*]")). - For Delta Lake tests, ensure
spark.sql.extensionsandspark.sql.catalogconfigurations are correctly set. - Check your Java version compatibility with PySpark.
- Ensure
- Prevention: Use a
venvorcondaenvironment to manage dependencies, and standardize yourconftest.pyfor SparkSession initialization.
Issue: DLT pipeline fails due to
event_timestamp_recentexpectation, but data looks fine.- Symptom: A
dlt.expect_or_failexpectation triggers, failing the pipeline, even if the data appears to be within the expected time range. - Cause: Timezone discrepancies. The
current_timestamp()function might be evaluating in a different timezone than yourevent_timestampdata, leading to incorrect comparisons. - Solution:
- Explicitly set the timezone for your SparkSession:
spark.conf.set("spark.sql.session.timeZone", "UTC"). - Ensure all timestamps in your source data are either UTC or consistently converted to UTC during ingestion.
- When constructing
datetimeobjects in Python for test data, always make them timezone-aware (e.g.,datetime.now(timezone.utc)).
- Explicitly set the timezone for your SparkSession:
- Prevention: Standardize on UTC for all data storage and processing within your data platform.
- Symptom: A
Issue: Streaming integration tests are flaky or take too long.
- Symptom: Tests sometimes pass, sometimes fail, or they run for an unacceptably long time.
- Cause: Non-deterministic stream processing, issues with checkpoint cleanup, or inefficient test data generation.
- Solution:
- Use
trigger(availableNow=True)for integration tests to ensure all available data is processed in a single micro-batch, making runs deterministic and faster. - Thoroughly clean
checkpointLocationand output paths before each test run using fixtures liketmp_pathandshutil.rmtree. - Limit the volume of test data to the minimum required to validate the logic.
- Ensure your watermarking strategy in the streaming job is well-understood and accounted for in test assertions.
- Use
- Prevention: Design streaming jobs with testability in mind, abstracting the core logic into functions that can be tested independently. Always use temporary, isolated resources for integration tests.
Testing & Verification
To verify the work in this chapter:
Run All Unit Tests:
pytest tests/unit/Expected outcome: All tests pass, indicating that our individual PySpark transformation utilities are functioning correctly.
Run All Integration Tests:
pytest tests/integration/Expected outcome: All tests pass, confirming that our streaming pipeline logic (including windowing and aggregation) works as expected with simulated data.
Deploy and Monitor DLT Pipeline with Expectations:
- Deploy the updated
dlt_pipelines/supply_chain_events_dlt.pyto a Databricks DLT pipeline in a development environment. - Send some valid Kafka messages to the
supply_chain_eventstopic. Verify that records flow throughsupply_chain_events_bronzeandsupply_chain_events_silversuccessfully. - Verification: Check the DLT UI for the pipeline. All expectations should show
100% validor0% dropped/failedfor valid data. - Simulate Failure: Introduce a Kafka message with a
nullvalueor a very oldeventTimestampfor a brief period. Observe the DLT UI to confirm that the corresponding expectations (kafka_value_not_null,event_timestamp_recent) correctly identify and handle these invalid records (dropping them or failing the pipeline as configured). This confirms your data quality gates are active.
- Deploy the updated
By successfully completing these steps, you will have verified that your DLT and streaming pipelines are not only functional but also robustly tested for correctness and data quality, ready for production deployment.
Summary & Next Steps
In this pivotal Chapter 12, we established a comprehensive testing framework for our real-time supply chain intelligence platform. We learned how to:
- Leverage Databricks Delta Live Tables’ native expectations to enforce data quality and reliability directly within our DLT pipelines.
- Implement unit tests for standalone PySpark transformation logic using
pytestand a local SparkSession, promoting modularity and code correctness. - Design and execute integration tests for Spark Structured Streaming jobs, simulating real-time data flow with temporary Delta tables to verify complex transformations, windowing, and watermarking.
- Discussed crucial production considerations for testing, including CI/CD integration, test data management, performance testing, and security.
This robust testing strategy is a cornerstone of building production-ready data applications. It ensures that our real-time procurement price intelligence, logistics cost monitoring, and tariff analysis components are accurate, reliable, and maintainable.
In the next chapter, Chapter 13: Implementing CI/CD for Automated Deployment and Testing, we will take these testing strategies a step further by integrating them into an automated Continuous Integration/Continuous Deployment (CI/CD) pipeline. This will allow us to automatically build, test, and deploy our Databricks assets, streamlining our development workflow and ensuring consistent, high-quality releases.