Chapter 12: Comprehensive Testing Strategies for DLT and Streaming Pipelines

Welcome to Chapter 12 of our journey! In the preceding chapters, we meticulously engineered robust data ingestion pipelines using Kafka, built transformative Delta Live Tables (DLT) for supply chain event processing and tariff analysis, and developed Spark Structured Streaming jobs for real-time logistics cost monitoring. We’ve laid a solid foundation for our real-time supply chain intelligence platform. However, building data pipelines is only half the battle; ensuring their reliability, accuracy, and performance is paramount for any production system.

This chapter is dedicated to establishing comprehensive testing strategies for our Databricks Delta Live Tables and Spark Structured Streaming pipelines. We will explore various testing methodologies, from unit tests for individual transformation logic to integration tests for entire data flows and, critically, data quality checks using DLT’s native expectations. The “why” behind each testing approach will be emphasized, highlighting its role in maintaining data integrity, preventing costly errors, and accelerating development cycles in a data-driven environment.

To effectively follow this chapter, you should have working versions of the DLT pipelines (e.g., supply_chain_events_dlt, tariff_analysis_dlt) and Spark Structured Streaming jobs (e.g., logistics_cost_monitoring_streaming) developed in previous chapters. We will leverage these existing components to demonstrate how to implement and integrate robust testing. By the end of this chapter, you will have a clear understanding of how to build a resilient testing framework that ensures our supply chain intelligence platform delivers accurate, high-quality insights consistently.

Planning & Design: A Multi-Layered Testing Approach

For complex data pipelines like ours, a multi-layered testing strategy is essential. This approach ensures that issues are caught early, reducing the cost and effort of remediation. We’ll categorize our testing into:

  1. Unit Testing: Focuses on individual functions or small logical units of code (e.g., a PySpark UDF, a SQL transformation snippet). The goal is to verify that each component works correctly in isolation.
  2. Data Quality Testing (DLT Expectations): Built directly into DLT pipelines, these define expected data characteristics (e.g., non-null values, valid ranges, schema adherence) and enforce them during pipeline execution. They are critical for ensuring data integrity at various stages.
  3. Integration Testing: Verifies the interaction between different components of a pipeline (e.g., raw data ingestion to bronze, bronze to silver transformations). This involves running a segment of the pipeline with controlled input data and asserting on the transformed output.
  4. End-to-End Testing: Simulates the entire data flow from source (e.g., Kafka) to final dashboard/report, ensuring all components work together seamlessly. This is typically done in a staging environment.
  5. Performance Testing: While not extensively covered in code examples, we’ll discuss its importance for streaming pipelines, focusing on latency, throughput, and resource utilization under load.

Component Architecture for Testing:

Our testing framework will involve:

  • DLT Pipeline Definitions: Modified to include dlt.expect statements.
  • Python/PySpark Utility Modules: Separate Python files for complex transformations, enabling easier unit testing.
  • Test Data Generation: Scripts or fixtures to create realistic, yet controlled, input data for integration tests.
  • Test Runner: pytest will be our primary test runner for Python-based unit and integration tests.
  • Databricks Jobs: For executing DLT pipelines and Structured Streaming jobs in a test environment.

File Structure for Testing:

We’ll introduce a tests/ directory at the root of our project to house our testing code.

.
├── dlt_pipelines/
│   ├── __init__.py
│   ├── supply_chain_events_dlt.py
│   └── tariff_analysis_dlt.py
├── streaming_jobs/
│   ├── __init__.py
│   └── logistics_cost_monitoring_streaming.py
├── src/
│   ├── __init__.py
│   └── data_transformation/
│       ├── __init__.py
│       └── utils.py
└── tests/
    ├── __init__.py
    ├── conftest.py                # Pytest fixtures for SparkSession
    ├── unit/
    │   ├── __init__.py
    │   └── test_data_transformation_utils.py
    └── integration/
        ├── __init__.py
        ├── test_supply_chain_dlt_pipeline.py
        └── test_logistics_streaming_job.py

Step-by-Step Implementation

1. Enhance DLT Pipelines with Data Quality Expectations

Data quality is paramount for reliable analytics. DLT’s built-in expectations provide a declarative way to enforce data quality rules directly within your pipeline definitions. If an expectation fails, DLT can drop, fail, or quarantine the offending records, providing immediate feedback and preventing bad data from propagating.

Let’s enhance our supply_chain_events_dlt.py pipeline with expectations.

a) Setup/Configuration

We’ll modify the existing DLT pipeline file. No new files are strictly needed for this step, but we will add more robust logging and error handling.

b) Core Implementation

We’ll add expectations to both the bronze (raw ingestion) and silver (curated) tables.

File: dlt_pipelines/supply_chain_events_dlt.py

# dlt_pipelines/supply_chain_events_dlt.py

import dlt
from pyspark.sql.functions import col, current_timestamp, sha2, concat_ws, lit

# Configuration for Kafka source (assuming these are passed via DLT pipeline config)
KAFKA_BOOTSTRAP_SERVERS = spark.conf.get("pipeline.kafka_bootstrap_servers", "kafka:9092")
KAFKA_TOPIC_SUPPLY_CHAIN_EVENTS = spark.conf.get("pipeline.kafka_topic_supply_chain_events", "supply_chain_events")
KAFKA_STARTING_OFFSETS = spark.conf.get("pipeline.kafka_starting_offsets", "earliest")

# --- BRONZE Layer: Raw Ingestion from Kafka ---
# This table ingests raw JSON data from Kafka, adding metadata.
@dlt.table(
    name="supply_chain_events_bronze",
    comment="Raw supply chain events from Kafka, including metadata.",
    table_properties={"quality": "bronze"}
)
@dlt.expect_or_drop("kafka_value_not_null", "value IS NOT NULL")
@dlt.expect_or_drop("kafka_timestamp_valid", "timestamp IS NOT NULL AND timestamp > '2020-01-01'")
def create_supply_chain_events_bronze():
    try:
        df = (
            spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
            .option("subscribe", KAFKA_TOPIC_SUPPLY_CHAIN_EVENTS)
            .option("startingOffsets", KAFKA_STARTING_OFFSETS)
            .load()
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp")
            .withColumn("ingestion_timestamp", current_timestamp())
        )
        dlt.info(f"Successfully configured Kafka stream for {KAFKA_TOPIC_SUPPLY_CHAIN_EVENTS}")
        return df
    except Exception as e:
        dlt.error(f"Error configuring Kafka stream for {KAFKA_TOPIC_SUPPLY_CHAIN_EVENTS}: {e}")
        raise # Re-raise to fail the pipeline if stream setup fails

# --- SILVER Layer: Curated Supply Chain Events ---
# Parses raw JSON, extracts key fields, and performs basic data cleaning and enrichment.
@dlt.table(
    name="supply_chain_events_silver",
    comment="Curated supply chain events with parsed JSON and basic cleaning.",
    table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("event_id_not_null", "event_id IS NOT NULL")
@dlt.expect_or_drop("event_type_valid", "event_type IN ('ORDER_CREATED', 'SHIPMENT_UPDATE', 'INVENTORY_CHANGE', 'DELIVERY_CONFIRMED')")
@dlt.expect_or_fail("event_timestamp_recent", "event_timestamp >= current_timestamp() - INTERVAL '30 days'") # Fail pipeline if critical old data ingested
@dlt.expect_or_drop("product_id_not_null", "product_id IS NOT NULL")
def create_supply_chain_events_silver():
    bronze_df = dlt.read_stream("supply_chain_events_bronze")

    # Safely parse JSON payload
    parsed_df = bronze_df.withColumn("parsed_value", from_json(col("value"), schema_of_json(lit("""
        {
            "eventId": "uuid",
            "eventType": "string",
            "eventTimestamp": "timestamp",
            "payload": {
                "orderId": "string",
                "productId": "string",
                "quantity": "integer",
                "location": "string",
                "status": "string",
                "estimatedDelivery": "timestamp",
                "actualDelivery": "timestamp",
                "carrier": "string",
                "trackingId": "string"
            }
        }
    """))))) \
    .select(
        col("parsed_value.eventId").alias("event_id"),
        col("parsed_value.eventType").alias("event_type"),
        col("parsed_value.eventTimestamp").alias("event_timestamp"),
        col("parsed_value.payload.orderId").alias("order_id"),
        col("parsed_value.payload.productId").alias("product_id"),
        col("parsed_value.payload.quantity").alias("quantity"),
        col("parsed_value.payload.location").alias("location"),
        col("parsed_value.payload.status").alias("status"),
        col("parsed_value.payload.estimatedDelivery").alias("estimated_delivery"),
        col("parsed_value.payload.actualDelivery").alias("actual_delivery"),
        col("parsed_value.payload.carrier").alias("carrier"),
        col("parsed_value.payload.trackingId").alias("tracking_id"),
        col("ingestion_timestamp")
    )

    # Generate a unique hash for each event for idempotency and tracking changes
    enriched_df = parsed_df.withColumn(
        "event_hash",
        sha2(concat_ws("_", col("event_id"), col("event_type"), col("event_timestamp")), 256)
    )

    dlt.info("Successfully transformed bronze to silver supply chain events.")
    return enriched_df

# ... (Other DLT tables like tariff_analysis_dlt.py would also get expectations)

Explanation:

  • We’ve added @dlt.expect_or_drop and @dlt.expect_or_fail decorators to our DLT table definitions.
  • @dlt.expect_or_drop("kafka_value_not_null", "value IS NOT NULL"): For the bronze layer, this ensures that the raw Kafka message value is not null. If it is, the record is dropped, preventing malformed data from entering.
  • @dlt.expect_or_drop("event_id_not_null", "event_id IS NOT NULL"): In the silver layer, we expect a parsed event_id. If parsing fails or the field is missing, the record is dropped.
  • @dlt.expect_or_fail("event_timestamp_recent", "event_timestamp >= current_timestamp() - INTERVAL '30 days'"): This is a critical expectation. If a significant amount of data older than 30 days somehow enters the pipeline, it indicates a major issue (e.g., source system clock drift, reprocessing old data incorrectly). Failing the pipeline forces immediate investigation.
  • dlt.info() and dlt.error() are used for structured logging within the DLT pipeline, which is crucial for monitoring and debugging in production.
  • The from_json schema inference is made more robust by providing a sample JSON string to schema_of_json for better schema stability.

c) Testing This Component

To test DLT expectations, you simply run the DLT pipeline in a development environment.

  1. Deploy and Run DLT Pipeline:

    • Navigate to your Databricks workspace.
    • Go to “Workflows” -> “Delta Live Tables”.
    • Create or update your Supply Chain Events DLT Pipeline configuration, pointing to the modified dlt_pipelines/supply_chain_events_dlt.py file.
    • Ensure your Kafka configuration is correctly passed (e.g., as pipeline settings in DLT UI or spark.conf.set in a previous setup notebook).
    • Start the pipeline.
  2. Observe Expectations:

    • As the pipeline runs, monitor the DLT UI. It provides a visual representation of how many records passed, dropped, or failed due to expectations.
    • To simulate a failure, you could temporarily send a Kafka message with a null value for event_id or an event_timestamp far in the past. You’d see the event_id_not_null expectation drop the record, or event_timestamp_recent fail the pipeline.

Debugging Tips:

  • Check the “Event Log” in the DLT UI for detailed messages about expectation failures, including which records were affected.
  • Query the _delta_log for the DLT tables to inspect dropped or quarantined records (if you configured a quarantine table).
  • Use dlt.info() and dlt.error() statements within your DLT code to output custom messages to the DLT event log.

2. Unit Testing PySpark Transformations with pytest

While DLT expectations handle data quality, complex transformation logic often resides in Python functions that can be tested independently. This improves code quality, reusability, and maintainability.

a) Setup/Configuration

We’ll create a utility file for a common transformation and a corresponding pytest setup.

Dependencies: Ensure pytest and pyspark are installed in your development environment: pip install pytest pyspark

File: src/data_transformation/utils.py

# src/data_transformation/utils.py

from pyspark.sql import DataFrame
from pyspark.sql.functions import col, when, lit

def clean_and_standardize_location(df: DataFrame, location_col: str = "location") -> DataFrame:
    """
    Cleans and standardizes location strings in a DataFrame.
    - Trims whitespace.
    - Converts to uppercase.
    - Replaces common abbreviations.
    - Handles nulls by replacing with 'UNKNOWN'.
    """
    if location_col not in df.columns:
        # Log a warning or raise an error if the column doesn't exist
        print(f"Warning: Column '{location_col}' not found in DataFrame for standardization.")
        return df # Return original DF or handle as appropriate

    return df.withColumn(
        location_col,
        when(col(location_col).isNull(), lit("UNKNOWN"))
        .otherwise(
            upper(trim(
                regexp_replace(
                    regexp_replace(
                        regexp_replace(col(location_col), "NY", "NEW YORK"),
                        "CA", "CALIFORNIA"
                    ),
                    "TX", "TEXAS"
                )
            ))
        )
    )

def calculate_delivery_delay(df: DataFrame, actual_col: str = "actual_delivery", estimated_col: str = "estimated_delivery") -> DataFrame:
    """
    Calculates delivery delay in days.
    Returns 0 if actual_delivery is before or same as estimated_delivery,
    or difference in days if actual_delivery is after estimated_delivery.
    Returns NULL if either date is NULL.
    """
    if actual_col not in df.columns or estimated_col not in df.columns:
        print(f"Error: Missing columns '{actual_col}' or '{estimated_col}' for delay calculation.")
        return df # Return original DF or handle as appropriate

    return df.withColumn(
        "delivery_delay_days",
        when(
            col(actual_col).isNotNull() & col(estimated_col).isNotNull(),
            when(
                datediff(col(actual_col), col(estimated_col)) > 0,
                datediff(col(actual_col), col(estimated_col))
            ).otherwise(lit(0))
        ).otherwise(lit(None).cast("integer"))
    )

File: tests/conftest.py (for a reusable SparkSession fixture)

# tests/conftest.py

import pytest
from pyspark.sql import SparkSession

@pytest.fixture(scope="session")
def spark_session():
    """
    Provides a SparkSession for testing.
    Uses a local Spark instance for fast unit tests.
    """
    spark = (
        SparkSession.builder.appName("PySparkUnitTests")
        .master("local[*]")  # Run Spark locally with all available cores
        .config("spark.sql.shuffle.partitions", "1") # Optimize for local testing
        .config("spark.default.parallelism", "1")
        .config("spark.sql.session.timeZone", "UTC") # Consistent timezone
        .getOrCreate()
    )
    yield spark
    spark.stop()

b) Core Implementation

Now, let’s write unit tests for the functions in src/data_transformation/utils.py.

File: tests/unit/test_data_transformation_utils.py

# tests/unit/test_data_transformation_utils.py

import pytest
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
from datetime import datetime

# Import the functions to be tested
from src.data_transformation.utils import clean_and_standardize_location, calculate_delivery_delay

def test_clean_and_standardize_location(spark_session: SparkSession):
    """
    Tests the clean_and_standardize_location function.
    """
    schema = StructType([StructField("location", StringType(), True)])
    data = [
        ("  new york  ",),
        ("ca",),
        ("texas",),
        (None,),
        ("paris",),
        ("NY",)
    ]
    input_df = spark_session.createDataFrame(data, schema)

    expected_data = [
        ("NEW YORK",),
        ("CALIFORNIA",),
        ("TEXAS",),
        ("UNKNOWN",),
        ("PARIS",),
        ("NEW YORK",)
    ]
    expected_df = spark_session.createDataFrame(expected_data, schema)

    result_df = clean_and_standardize_location(input_df)

    # Collect and compare results
    assert result_df.count() == expected_df.count()
    assert result_df.exceptAll(expected_df).count() == 0
    assert expected_df.exceptAll(result_df).count() == 0

def test_calculate_delivery_delay(spark_session: SparkSession):
    """
    Tests the calculate_delivery_delay function.
    """
    schema = StructType([
        StructField("order_id", StringType(), False),
        StructField("actual_delivery", TimestampType(), True),
        StructField("estimated_delivery", TimestampType(), True)
    ])
    data = [
        ("ORD001", datetime(2025, 1, 10, 10, 0, 0), datetime(2025, 1, 8, 10, 0, 0)), # Delayed by 2 days
        ("ORD002", datetime(2025, 1, 5, 10, 0, 0), datetime(2025, 1, 5, 10, 0, 0)), # On time
        ("ORD003", datetime(2025, 1, 3, 10, 0, 0), datetime(2025, 1, 7, 10, 0, 0)), # Early (should be 0 delay)
        ("ORD004", None, datetime(2025, 1, 10, 10, 0, 0)), # Null actual
        ("ORD005", datetime(2025, 1, 10, 10, 0, 0), None), # Null estimated
        ("ORD006", None, None) # Both null
    ]
    input_df = spark_session.createDataFrame(data, schema)

    expected_schema = StructType([
        StructField("order_id", StringType(), False),
        StructField("actual_delivery", TimestampType(), True),
        StructField("estimated_delivery", TimestampType(), True),
        StructField("delivery_delay_days", IntegerType(), True)
    ])
    expected_data = [
        ("ORD001", datetime(2025, 1, 10, 10, 0, 0), datetime(2025, 1, 8, 10, 0, 0), 2),
        ("ORD002", datetime(2025, 1, 5, 10, 0, 0), datetime(2025, 1, 5, 10, 0, 0), 0),
        ("ORD003", datetime(2025, 1, 3, 10, 0, 0), datetime(2025, 1, 7, 10, 0, 0), 0),
        ("ORD004", None, datetime(2025, 1, 10, 10, 0, 0), None),
        ("ORD005", datetime(2025, 1, 10, 10, 0, 0), None, None),
        ("ORD006", None, None, None)
    ]
    expected_df = spark_session.createDataFrame(expected_data, expected_schema)

    result_df = calculate_delivery_delay(input_df)

    assert result_df.count() == expected_df.count()
    # Sort both DFs to ensure consistent comparison order for exceptAll
    result_df_sorted = result_df.orderBy("order_id")
    expected_df_sorted = expected_df.orderBy("order_id")

    assert result_df_sorted.exceptAll(expected_df_sorted).count() == 0
    assert expected_df_sorted.exceptAll(result_df_sorted).count() == 0

def test_calculate_delivery_delay_missing_columns(spark_session: SparkSession, capsys):
    """
    Tests calculate_delivery_delay when expected columns are missing.
    """
    schema = StructType([StructField("order_id", StringType(), False)])
    input_df = spark_session.createDataFrame([("ORD001",)], schema)

    # The function should return the original DataFrame and print an error
    result_df = calculate_delivery_delay(input_df)
    assert result_df.schema == input_df.schema
    assert result_df.collect() == input_df.collect()

    captured = capsys.readouterr()
    assert "Error: Missing columns 'actual_delivery' or 'estimated_delivery' for delay calculation." in captured.out

Explanation:

  • tests/conftest.py: This file provides a spark_session fixture that initializes a local SparkSession once per test session. This is efficient as it avoids creating a new SparkSession for every test.
  • test_clean_and_standardize_location:
    • Creates a sample input_df with various location strings, including nulls and abbreviations.
    • Defines an expected_df with the expected standardized values.
    • Calls the clean_and_standardize_location function.
    • Uses exceptAll() to compare the content of the resulting DataFrame with the expected DataFrame. exceptAll() returns rows present in one DataFrame but not the other. If both exceptAll calls return 0, the DataFrames are identical.
  • test_calculate_delivery_delay: Similar structure, testing date calculations and null handling.
  • test_calculate_delivery_delay_missing_columns: Demonstrates testing error handling. We use capsys (a pytest fixture) to capture stdout/stderr and assert that the expected warning message is printed.

c) Testing This Component

To run these unit tests:

  1. Open your terminal in the project’s root directory.
  2. Execute pytest tests/unit/.

You should see output similar to this:

============================= test session starts ==============================
platform linux -- Python 3.10.12, pytest-8.0.0, pluggy-1.4.0
rootdir: /path/to/your/project
plugins: anyio-4.3.0
collected 3 items

tests/unit/test_data_transformation_utils.py ...                               [100%]

============================== 3 passed in 5.XXs ===============================

Debugging Tips:

  • If a test fails, pytest will show a detailed traceback.
  • Use result_df.show() or result_df.collect() within your test functions (temporarily) to inspect the actual DataFrame content when debugging.
  • Ensure your expected_df exactly matches the schema and data types, as Spark is strict.

3. Integration Testing for Streaming Pipelines

Testing streaming pipelines requires a slightly different approach due to their continuous nature. We need to simulate data arrival and verify the output over time. For this, we’ll use a temporary Delta table as a “mock” Kafka source and another temporary Delta table as a sink.

Let’s consider our logistics_cost_monitoring_streaming.py job.

a) Setup/Configuration

We’ll assume the streaming job looks something like this (simplified):

File: streaming_jobs/logistics_cost_monitoring_streaming.py

# streaming_jobs/logistics_cost_monitoring_streaming.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, current_timestamp, sum, window, avg, round, count
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, IntegerType

# Define schema for incoming logistics events (simplified for example)
logistics_event_schema = StructType([
    StructField("trackingId", StringType(), True),
    StructField("eventTimestamp", TimestampType(), True),
    StructField("eventType", StringType(), True), # e.g., "FUEL_PURCHASE", "TOLL_CHARGE", "MAINTENANCE"
    StructField("cost", DoubleType(), True),
    StructField("currency", StringType(), True),
    StructField("routeId", StringType(), True)
])

def process_logistics_costs_stream(
    spark: SparkSession,
    input_topic_or_path: str,
    output_table_path: str,
    checkpoint_location: str,
    source_format: str = "kafka", # Can be 'kafka' or 'delta' for testing
    trigger_interval: str = "1 minute" # Or "availableNow" for batch-like processing
):
    """
    Processes a stream of logistics events, aggregates costs by route and time window.
    """
    print(f"Starting logistics cost monitoring stream from {input_topic_or_path}...")

    if source_format == "kafka":
        read_stream = (
            spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", spark.conf.get("pipeline.kafka_bootstrap_servers", "kafka:9092"))
            .option("subscribe", input_topic_or_path)
            .option("startingOffsets", spark.conf.get("pipeline.kafka_starting_offsets", "earliest"))
            .load()
            .selectExpr("CAST(value AS STRING) as json_payload", "timestamp")
        )
    elif source_format == "delta":
        # For testing, read from a Delta table
        read_stream = (
            spark.readStream
            .format("delta")
            .load(input_topic_or_path)
            .selectExpr("CAST(value AS STRING) as json_payload", "timestamp") # Assume 'value' and 'timestamp' cols for compatibility
        )
    else:
        raise ValueError("Invalid source_format. Must be 'kafka' or 'delta'.")

    parsed_stream = read_stream.withColumn("data", from_json(col("json_payload"), logistics_event_schema)) \
        .select(
            col("data.trackingId").alias("tracking_id"),
            col("data.eventTimestamp").alias("event_timestamp"),
            col("data.eventType").alias("event_type"),
            col("data.cost").alias("cost"),
            col("data.currency").alias("currency"),
            col("data.routeId").alias("route_id"),
            col("timestamp").alias("kafka_ingestion_timestamp") # Kafka timestamp for ordering
        ) \
        .filter(col("cost").isNotNull() & col("event_timestamp").isNotNull() & col("route_id").isNotNull())

    # Aggregate costs by route and 1-hour window
    aggregated_costs = parsed_stream \
        .withWatermark("event_timestamp", "10 minutes") \
        .groupBy(
            window(col("event_timestamp"), "1 hour", "30 minutes"), # 1-hour window, sliding every 30 mins
            col("route_id")
        ) \
        .agg(
            sum("cost").alias("total_window_cost"),
            avg("cost").alias("avg_event_cost"),
            count("*").alias("event_count_in_window")
        ) \
        .withColumn("processing_timestamp", current_timestamp())

    # Write to Delta table
    query = (
        aggregated_costs.writeStream
        .format("delta")
        .outputMode("append") # Use append mode for aggregates
        .option("checkpointLocation", checkpoint_location)
        .option("mergeSchema", "true") # Allow schema evolution
        .trigger(processingTime=trigger_interval) # Or availableNow=True for testing
        .toTable(output_table_path) # Use toTable for Unity Catalog or managed table
    )

    # In a production setup, you would return the query and manage it.
    # For a job, it would typically block here with query.awaitTermination()
    # For testing, we start and return the query to allow control.
    print(f"Logistics cost monitoring stream started, writing to {output_table_path}")
    return query

if __name__ == "__main__":
    # Example of how to run this as a Databricks Job
    spark = SparkSession.builder.appName("LogisticsCostMonitor").getOrCreate()

    # These would typically come from job parameters or environment variables
    kafka_servers = "your_kafka_servers:9092"
    input_topic = "logistics_events"
    output_table_name = "logistics_cost_metrics_gold"
    checkpoint_loc = "/mnt/data/checkpoints/logistics_cost_monitor"

    # Configure Spark for Kafka (if running directly without DLT)
    # spark.conf.set("spark.sql.streaming.kafka.maxOffsetsPerTrigger", "1000")

    # Call the processing function
    # process_logistics_costs_stream(
    #     spark,
    #     input_topic,
    #     output_table_name,
    #     checkpoint_loc,
    #     source_format="kafka",
    #     trigger_interval="1 minute"
    # )
    # spark.streams.awaitAnyTermination() # Block indefinitely

b) Core Implementation

Now, we’ll write an integration test for this streaming job. We’ll use temporary Delta tables for input and output.

File: tests/integration/test_logistics_streaming_job.py

# tests/integration/test_logistics_streaming_job.py

import pytest
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from datetime import datetime, timedelta
import os
import shutil
import time

# Import the streaming job function
from streaming_jobs.logistics_cost_monitoring_streaming import process_logistics_costs_stream

@pytest.fixture(scope="module")
def spark_session_integration():
    """
    Provides a SparkSession for integration testing.
    Uses a local Spark instance with Delta Lake support.
    """
    spark = (
        SparkSession.builder.appName("PySparkStreamingIntegrationTests")
        .master("local[*]")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.sql.shuffle.partitions", "1")
        .config("spark.default.parallelism", "1")
        .config("spark.sql.session.timeZone", "UTC")
        .getOrCreate()
    )
    yield spark
    spark.stop()

@pytest.fixture(scope="function")
def temp_delta_paths(tmp_path):
    """
    Provides temporary paths for Delta input, output, and checkpoint locations.
    Cleans up after each test function.
    """
    input_path = str(tmp_path / "input_delta_stream")
    output_path = str(tmp_path / "output_delta_stream")
    checkpoint_path = str(tmp_path / "checkpoint")

    # Ensure directories are clean
    for path in [input_path, output_path, checkpoint_path]:
        if os.path.exists(path):
            shutil.rmtree(path)

    yield input_path, output_path, checkpoint_path

    # Cleanup after test
    for path in [input_path, output_path, checkpoint_path]:
        if os.path.exists(path):
            shutil.rmtree(path)

def test_logistics_cost_streaming_pipeline(spark_session_integration: SparkSession, temp_delta_paths):
    """
    Tests the logistics cost monitoring streaming pipeline end-to-end using Delta tables
    as source and sink.
    """
    input_path, output_path, checkpoint_path = temp_delta_paths

    # 1. Define input schema for the "mock Kafka" Delta table
    input_schema = StructType([
        StructField("value", StringType(), True), # Raw JSON payload
        StructField("timestamp", TimestampType(), True) # Kafka timestamp
    ])

    # 2. Prepare initial input data (simulating Kafka messages)
    # These timestamps are crucial for windowing and watermarking
    event_time_1 = datetime(2025, 1, 1, 10, 0, 0)
    event_time_2 = datetime(2025, 1, 1, 10, 15, 0)
    event_time_3 = datetime(2025, 1, 1, 10, 45, 0)
    event_time_4 = datetime(2025, 1, 1, 11, 10, 0)
    event_time_5 = datetime(2025, 1, 1, 11, 40, 0) # In second window, different route
    event_time_6 = datetime(2025, 1, 1, 10, 50, 0) # Another event for first window

    input_data_batch1 = [
        ('{"trackingId": "T1", "eventTimestamp": "2025-01-01T10:00:00Z", "eventType": "FUEL", "cost": 100.0, "currency": "USD", "routeId": "R1"}', event_time_1),
        ('{"trackingId": "T2", "eventTimestamp": "2025-01-01T10:15:00Z", "eventType": "TOLL", "cost": 10.0, "currency": "USD", "routeId": "R1"}', event_time_2),
        ('{"trackingId": "T3", "eventTimestamp": "2025-01-01T10:45:00Z", "eventType": "MAINT", "cost": 50.0, "currency": "USD", "routeId": "R1"}', event_time_3),
    ]

    input_df_batch1 = spark_session_integration.createDataFrame(input_data_batch1, input_schema)
    input_df_batch1.write.format("delta").mode("overwrite").save(input_path)

    # 3. Start the streaming job reading from the Delta source
    # Use "availableNow" trigger for batch-like processing in tests
    streaming_query = process_logistics_costs_stream(
        spark_session_integration,
        input_path,
        output_path,
        checkpoint_path,
        source_format="delta",
        trigger_interval="availableNow" # Process all available data once
    )

    # 4. Wait for the streaming query to process the first batch
    streaming_query.awaitTermination(timeout=60) # Wait up to 60 seconds
    assert not streaming_query.isActive # Should be terminated if availableNow

    # 5. Read the output and assert on the results for batch 1
    output_df_batch1 = spark_session_integration.read.format("delta").load(output_path)
    output_df_batch1.show(truncate=False)

    # Expected output for the first window (10:00-11:00, sliding every 30 mins -> window 10:00-11:00)
    # The events at 10:00, 10:15, 10:45, 10:50 should fall into the first window.
    # Total cost for R1 in 10:00-11:00 window should be 100 + 10 + 50 = 160
    # Add event_time_6 to batch 2 and check after second run.
    expected_schema_output = StructType([
        StructField("window", StructType([
            StructField("start", TimestampType(), True),
            StructField("end", TimestampType(), True)
        ]), True),
        StructField("route_id", StringType(), True),
        StructField("total_window_cost", DoubleType(), True),
        StructField("avg_event_cost", DoubleType(), True),
        StructField("event_count_in_window", IntegerType(), True),
        StructField("processing_timestamp", TimestampType(), True) # This will vary, so we'll ignore it
    ])

    # Expected results after first batch
    # Window (10:00-11:00) for R1
    expected_results_batch1 = [
        (datetime(2025, 1, 1, 10, 0, 0), datetime(2025, 1, 1, 11, 0, 0), "R1", 160.0, 160.0/3, 3) # Sum of 100, 10, 50
    ]
    # Create a DataFrame for expected results (excluding processing_timestamp)
    # Note: Spark's window object is complex to construct directly in data, so we'll compare by value

    # Assertions for Batch 1 (simplified to check key values)
    assert output_df_batch1.count() == 1
    first_row = output_df_batch1.collect()[0]
    assert first_row["route_id"] == "R1"
    assert first_row["total_window_cost"] == pytest.approx(160.0)
    assert first_row["event_count_in_window"] == 3
    assert first_row["window"]["start"] == datetime(2025, 1, 1, 10, 0, 0)
    assert first_row["window"]["end"] == datetime(2025, 1, 1, 11, 0, 0)

    # 6. Append more data to the input Delta table to simulate next Kafka messages
    input_data_batch2 = [
        ('{"trackingId": "T4", "eventTimestamp": "2025-01-01T11:10:00Z", "eventType": "FUEL", "cost": 200.0, "currency": "USD", "routeId": "R1"}', event_time_4),
        ('{"trackingId": "T5", "eventTimestamp": "2025-01-01T11:40:00Z", "eventType": "TOLL", "cost": 20.0, "currency": "USD", "routeId": "R2"}', event_time_5),
        ('{"trackingId": "T6", "eventTimestamp": "2025-01-01T10:50:00Z", "eventType": "MAINT", "cost": 30.0, "currency": "USD", "routeId": "R1"}', event_time_6), # Late arriving event for first window
    ]
    input_df_batch2 = spark_session_integration.createDataFrame(input_data_batch2, input_schema)
    input_df_batch2.write.format("delta").mode("append").save(input_path)

    # 7. Restart the streaming query (or trigger a new micro-batch)
    # For "availableNow" trigger, we need to restart the query to pick up new data
    streaming_query_2 = process_logistics_costs_stream(
        spark_session_integration,
        input_path,
        output_path,
        checkpoint_path,
        source_format="delta",
        trigger_interval="availableNow"
    )
    streaming_query_2.awaitTermination(timeout=60)
    assert not streaming_query_2.isActive

    # 8. Read the output again and assert on the updated results
    output_df_batch2 = spark_session_integration.read.format("delta").load(output_path)
    output_df_batch2.show(truncate=False)

    # Expected:
    # - Original window R1 (10:00-11:00) should be updated by event_time_6 (late arrival)
    #   New total: 160 + 30 = 190, count = 4
    # - New window R1 (10:30-11:30) for event_time_4 (11:10) and event_time_6 (10:50)
    #   Total for window 10:30-11:30 for R1: 50 (from 10:45) + 200 (from 11:10) + 30 (from 10:50) = 280, count = 3
    # - New window R2 (11:30-12:30) for event_time_5 (11:40)
    #   Total for window 11:30-12:30 for R2: 20, count = 1

    # Due to `append` outputMode and watermark, Spark Structured Streaming will output
    # *new* aggregates for newly completed windows, and *updates* for existing windows
    # if late data arrives within the watermark.
    # The `outputMode("append")` means each new window's final result is appended.
    # If a late event updates an *already outputted* window, the behavior depends on the sink.
    # For Delta, it might write a new version of the aggregate, which needs MERGE to deduplicate.
    # For simplicity, we expect new rows for new windows and updated counts,
    # and will check the overall state of the output table.

    # Let's simplify the assertion for `append` mode and check the final state.
    # The watermarking and windowing logic will produce multiple rows for overlapping windows.
    # We should expect total rows for unique window/route_id combinations.

    # A more robust test would merge the output into a target table and assert on that.
    # For `append` mode, we'll get new rows for each window as it's completed.

    # Expected final state (after processing all data and considering windows)
    # Window start, Window end, Route, Total Cost, Avg Cost, Event Count
    expected_final_data = [
        # Original window for R1 (10:00-11:00), updated by late event T6 (10:50)
        (datetime(2025, 1, 1, 10, 0, 0), datetime(2025, 1, 1, 11, 0, 0), "R1", 190.0, 190.0/4, 4), # 100+10+50+30
        # New window for R1 (10:30-11:30)
        (datetime(2025, 1, 1, 10, 30, 0), datetime(2025, 1, 1, 11, 30, 0), "R1", 280.0, 280.0/3, 3), # T3(50) + T4(200) + T6(30)
        # New window for R2 (11:30-12:30)
        (datetime(2025, 1, 1, 11, 30, 0), datetime(2025, 1, 1, 12, 30, 0), "R2", 20.0, 20.0/1, 1), # T5(20)
    ]

    # Convert output_df_batch2 to a format comparable with expected_final_data
    # Extract window start/end, route_id, total_window_cost, event_count_in_window
    # and round averages for comparison.
    actual_results = output_df_batch2.select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("route_id"),
        col("total_window_cost"),
        round(col("avg_event_cost"), 2).alias("avg_event_cost"),
        col("event_count_in_window")
    ).orderBy("window_start", "route_id").collect()

    # Prepare expected results for comparison
    expected_comparable_data = []
    for ws, we, route, total, avg, count in expected_final_data:
        expected_comparable_data.append(
            (ws, we, route, total, round(avg, 2), count)
        )
    expected_comparable_df = spark_session_integration.createDataFrame(
        expected_comparable_data,
        schema=[
            StructField("window_start", TimestampType(), True),
            StructField("window_end", TimestampType(), True),
            StructField("route_id", StringType(), True),
            StructField("total_window_cost", DoubleType(), True),
            StructField("avg_event_cost", DoubleType(), True),
            StructField("event_count_in_window", IntegerType(), True)
        ]
    ).orderBy("window_start", "route_id").collect()

    # Compare collected rows
    assert len(actual_results) == len(expected_comparable_df)
    for actual_row, expected_row in zip(actual_results, expected_comparable_df):
        assert actual_row["window_start"] == expected_row["window_start"]
        assert actual_row["window_end"] == expected_row["window_end"]
        assert actual_row["route_id"] == expected_row["route_id"]
        assert actual_row["total_window_cost"] == pytest.approx(expected_row["total_window_cost"])
        assert actual_row["avg_event_cost"] == pytest.approx(expected_row["avg_event_cost"])
        assert actual_row["event_count_in_window"] == expected_row["event_count_in_window"]

    # Stop the query if it's still active (shouldn't be with availableNow)
    if streaming_query.isActive:
        streaming_query.stop()
    if streaming_query_2.isActive:
        streaming_query_2.stop()

Explanation:

  • spark_session_integration fixture: Similar to the unit test fixture, but configured with Delta Lake extensions, as we are reading/writing Delta tables.
  • temp_delta_paths fixture: Creates temporary directories for input, output, and checkpoint locations, ensuring a clean slate for each test.
  • test_logistics_cost_streaming_pipeline:
    1. Input Data Preparation: We create a DataFrame representing a batch of Kafka messages and write it to a temporary Delta table (input_path). The value column contains the raw JSON, and timestamp simulates the Kafka ingestion timestamp.
    2. Start Streaming Query: We call process_logistics_costs_stream, passing the input_path as the source and specifying source_format="delta" and trigger_interval="availableNow". availableNow is crucial for testing, as it makes the stream process all available data as a single micro-batch and then terminate, allowing for deterministic assertions.
    3. Wait for Termination: streaming_query.awaitTermination(timeout=60) ensures the test waits until the stream has processed the data.
    4. Assert Batch 1: We read the output Delta table (output_path) and assert that the aggregated results match our expectations for the first batch of data. We specifically check window boundaries, route IDs, total costs, and event counts.
    5. Append More Data: To simulate continuous streaming, we append a second batch of data to the input_path Delta table. This includes a late-arriving event to test watermarking behavior.
    6. Restart Query: Since availableNow triggers terminate, we re-invoke process_logistics_costs_stream to process the newly appended data.
    7. Assert Batch 2: We read the output again and verify the updated aggregations, including how late-arriving data affects previously emitted windows (within the watermark). The assertions are refined to handle the nature of append output mode with windowing.

c) Testing This Component

To run these integration tests:

  1. Open your terminal in the project’s root directory.
  2. Execute pytest tests/integration/.

You should see output similar to this:

============================= test session starts ==============================
platform linux -- Python 3.10.12, pytest-8.0.0, pluggy-1.4.0
rootdir: /path/to/your/project
plugins: anyio-4.3.0
collected 1 item

tests/integration/test_logistics_streaming_job.py .                         [100%]

============================== 1 passed in 15.XXs ==============================

Debugging Tips:

  • Checkpoint Location: If a test fails and you re-run it, ensure the checkpoint_path is truly clean between runs. The tmp_path fixture and shutil.rmtree in our temp_delta_paths fixture handle this automatically.
  • Timezones: Be explicit with timezones (e.g., UTC) in both your SparkSession configuration and when creating datetime objects to avoid inconsistencies.
  • Watermarking: Understanding how watermarks and outputMode("append") interact with late data is crucial. If your test data includes late events, carefully calculate the expected output based on your watermark definition.
  • show() and collect(): Temporarily add input_df.show(), parsed_stream.show(), aggregated_costs.show(), output_df.show(truncate=False) within your test or the streaming function to inspect intermediate DataFrames and understand data flow.

Production Considerations

  1. CI/CD Integration:

    • Automate test execution (unit, integration) as part of your CI/CD pipeline (e.g., GitHub Actions, Azure DevOps, GitLab CI).
    • For Databricks-specific testing, consider using Databricks Asset Bundles (DABs) or Databricks Repos with a custom pytest environment on a job cluster. DABs provide a structured way to define, deploy, and manage your Databricks resources, including test jobs.
    • DLT pipelines can be triggered in a test environment (e.g., DEV workspace) with specific configurations (smaller clusters, test data sources).
    • Best Practice: Run DLT pipelines with development mode for faster iteration during testing, then switch to production mode for actual deployments to benefit from enhanced monitoring and recovery.
  2. Test Data Management:

    • Volume: For integration and end-to-end tests, use representative but smaller datasets to keep test runtimes reasonable.
    • Variety: Include edge cases, nulls, malformed records, and late-arriving data to thoroughly test error handling and data quality rules.
    • Anonymization: If using subsets of production data, ensure it’s anonymized or synthetic to comply with data privacy regulations.
    • Versioning: Keep test data versioned alongside your code.
  3. Monitoring Test Results:

    • Integrate test results into your CI/CD dashboard.
    • For DLT, monitor the DLT UI for expectation failures. Configure alerts (e.g., via Databricks webhooks or _delta_log queries) for critical expectation failures in production.
  4. Performance Testing:

    • For streaming pipelines, performance testing is crucial to ensure they can handle expected data volumes and latency requirements.
    • Use tools like Apache JMeter or custom load generators to simulate high-volume Kafka traffic.
    • Monitor Spark metrics (e.g., processing time, backlog, CPU/memory usage) and Kafka consumer lag.
    • Test different cluster configurations and autoscaling settings to find the optimal balance between cost and performance.
    • DLT Serverless: For DLT pipelines, leverage DLT Serverless (if available in your region) to abstract away cluster management and automatically optimize performance.
  5. Security Considerations:

    • Ensure your test environments have appropriate access controls, separated from production.
    • Use service principals or managed identities for CI/CD pipelines to access Databricks and other cloud resources, following the principle of least privilege.
    • Never hardcode sensitive credentials in test data or code. Use Databricks Secrets.

Code Review Checkpoint

At this point, we have significantly enhanced the robustness of our data pipelines by:

  • Adding DLT Expectations: Implemented dlt.expect_or_drop and dlt.expect_or_fail in dlt_pipelines/supply_chain_events_dlt.py to enforce data quality rules and provide immediate feedback on data issues.
  • Establishing PySpark Unit Testing: Created src/data_transformation/utils.py for reusable PySpark logic and tests/unit/test_data_transformation_utils.py with pytest and a local SparkSession to test these functions in isolation.
  • Implementing Streaming Integration Testing: Developed tests/integration/test_logistics_streaming_job.py to simulate a real-time streaming scenario for streaming_jobs/logistics_cost_monitoring_streaming.py using temporary Delta tables as mock sources and sinks, verifying end-to-end data flow and transformations.

Files Created/Modified:

  • dlt_pipelines/supply_chain_events_dlt.py (Modified)
  • src/data_transformation/utils.py (New)
  • tests/conftest.py (New)
  • tests/unit/test_data_transformation_utils.py (New)
  • tests/integration/test_logistics_streaming_job.py (New)
  • streaming_jobs/logistics_cost_monitoring_streaming.py (Modified for testability)

These testing components integrate seamlessly with our existing project structure, providing a safety net that catches errors early and ensures the reliability of our real-time supply chain intelligence.

Common Issues & Solutions

  1. Issue: Py4JJavaError during PySpark unit tests.

    • Symptom: Tests fail with Java exceptions, often related to SparkContext or missing classes.
    • Cause: Incorrect SparkSession setup, missing PySpark dependencies, or environment issues.
    • Solution:
      • Ensure pyspark is correctly installed and accessible in your Python environment.
      • Verify your spark_session fixture in conftest.py is correctly configured (e.g., master("local[*]")).
      • For Delta Lake tests, ensure spark.sql.extensions and spark.sql.catalog configurations are correctly set.
      • Check your Java version compatibility with PySpark.
    • Prevention: Use a venv or conda environment to manage dependencies, and standardize your conftest.py for SparkSession initialization.
  2. Issue: DLT pipeline fails due to event_timestamp_recent expectation, but data looks fine.

    • Symptom: A dlt.expect_or_fail expectation triggers, failing the pipeline, even if the data appears to be within the expected time range.
    • Cause: Timezone discrepancies. The current_timestamp() function might be evaluating in a different timezone than your event_timestamp data, leading to incorrect comparisons.
    • Solution:
      • Explicitly set the timezone for your SparkSession: spark.conf.set("spark.sql.session.timeZone", "UTC").
      • Ensure all timestamps in your source data are either UTC or consistently converted to UTC during ingestion.
      • When constructing datetime objects in Python for test data, always make them timezone-aware (e.g., datetime.now(timezone.utc)).
    • Prevention: Standardize on UTC for all data storage and processing within your data platform.
  3. Issue: Streaming integration tests are flaky or take too long.

    • Symptom: Tests sometimes pass, sometimes fail, or they run for an unacceptably long time.
    • Cause: Non-deterministic stream processing, issues with checkpoint cleanup, or inefficient test data generation.
    • Solution:
      • Use trigger(availableNow=True) for integration tests to ensure all available data is processed in a single micro-batch, making runs deterministic and faster.
      • Thoroughly clean checkpointLocation and output paths before each test run using fixtures like tmp_path and shutil.rmtree.
      • Limit the volume of test data to the minimum required to validate the logic.
      • Ensure your watermarking strategy in the streaming job is well-understood and accounted for in test assertions.
    • Prevention: Design streaming jobs with testability in mind, abstracting the core logic into functions that can be tested independently. Always use temporary, isolated resources for integration tests.

Testing & Verification

To verify the work in this chapter:

  1. Run All Unit Tests:

    pytest tests/unit/
    

    Expected outcome: All tests pass, indicating that our individual PySpark transformation utilities are functioning correctly.

  2. Run All Integration Tests:

    pytest tests/integration/
    

    Expected outcome: All tests pass, confirming that our streaming pipeline logic (including windowing and aggregation) works as expected with simulated data.

  3. Deploy and Monitor DLT Pipeline with Expectations:

    • Deploy the updated dlt_pipelines/supply_chain_events_dlt.py to a Databricks DLT pipeline in a development environment.
    • Send some valid Kafka messages to the supply_chain_events topic. Verify that records flow through supply_chain_events_bronze and supply_chain_events_silver successfully.
    • Verification: Check the DLT UI for the pipeline. All expectations should show 100% valid or 0% dropped/failed for valid data.
    • Simulate Failure: Introduce a Kafka message with a null value or a very old eventTimestamp for a brief period. Observe the DLT UI to confirm that the corresponding expectations (kafka_value_not_null, event_timestamp_recent) correctly identify and handle these invalid records (dropping them or failing the pipeline as configured). This confirms your data quality gates are active.

By successfully completing these steps, you will have verified that your DLT and streaming pipelines are not only functional but also robustly tested for correctness and data quality, ready for production deployment.

Summary & Next Steps

In this pivotal Chapter 12, we established a comprehensive testing framework for our real-time supply chain intelligence platform. We learned how to:

  • Leverage Databricks Delta Live Tables’ native expectations to enforce data quality and reliability directly within our DLT pipelines.
  • Implement unit tests for standalone PySpark transformation logic using pytest and a local SparkSession, promoting modularity and code correctness.
  • Design and execute integration tests for Spark Structured Streaming jobs, simulating real-time data flow with temporary Delta tables to verify complex transformations, windowing, and watermarking.
  • Discussed crucial production considerations for testing, including CI/CD integration, test data management, performance testing, and security.

This robust testing strategy is a cornerstone of building production-ready data applications. It ensures that our real-time procurement price intelligence, logistics cost monitoring, and tariff analysis components are accurate, reliable, and maintainable.

In the next chapter, Chapter 13: Implementing CI/CD for Automated Deployment and Testing, we will take these testing strategies a step further by integrating them into an automated Continuous Integration/Continuous Deployment (CI/CD) pipeline. This will allow us to automatically build, test, and deploy our Databricks assets, streamlining our development workflow and ensuring consistent, high-quality releases.