1. Chapter Introduction

In modern supply chains, real-time visibility into logistics costs is paramount for effective decision-making, cost optimization, and competitive advantage. This chapter guides you through building a robust, real-time logistics cost monitoring pipeline using Apache Spark Structured Streaming on Databricks. We will ingest streaming logistics events from Kafka, process them to calculate various cost components, and enrich them with previously generated tariff data and dynamic fuel prices.

This step is crucial for providing immediate insights into transportation expenses, identifying cost anomalies, and enabling proactive adjustments to logistics strategies. By correlating live logistics data with tariff impacts and fuel price fluctuations, businesses can understand the true cost drivers and optimize routes, carriers, and procurement.

Prerequisites: Before starting this chapter, you should have:

  • A Databricks workspace configured with Unity Catalog.
  • Kafka up and running (either self-managed or a managed service like Confluent Cloud, AWS MSK, Azure Event Hubs).
  • The DLT pipelines from previous chapters deployed, specifically those populating the gold.tariff_analysis table with HS Code-based tariff impacts.
  • An understanding of Spark SQL and basic Python programming.

Expected Outcome: By the end of this chapter, you will have a production-ready Spark Structured Streaming application that continuously ingests logistics events, calculates real-time costs, enriches them with tariff and fuel price data, and stores the results in a Delta Lake table (silver.enriched_logistics_costs) within your Unity Catalog-managed Lakehouse. This data will be ready for downstream analytics and reporting.

2. Planning & Design

To achieve real-time logistics cost monitoring, we’ll implement a multi-hop streaming architecture on Databricks, leveraging Spark Structured Streaming for its scalability and fault tolerance.

Component Architecture

Our streaming pipeline will consist of two main Structured Streaming jobs, forming a Bronze-to-Silver layer transformation:

  1. Raw Logistics Event Ingestion (Bronze Layer):

    • Source: Kafka topic (logistics_events).
    • Process: Read raw JSON messages, apply a schema, add ingestion metadata (timestamp, Kafka offset).
    • Sink: bronze.raw_logistics_events Delta table. This table will hold the immutable, raw stream of events.
  2. Enriched Logistics Cost Calculation (Silver Layer):

    • Source: bronze.raw_logistics_events Delta table (as a streaming source).
    • Lookup Sources:
      • gold.tariff_analysis Delta table (from DLT pipeline) for HS Code tariff impacts.
      • silver.fuel_prices Delta table (a new table we’ll introduce, representing current fuel prices by region/type).
    • Process:
      • Join streaming logistics events with static/slowly changing tariff data and fuel price data.
      • Calculate various cost components: fuel cost, tariff cost, handling fees, etc.
      • Apply data quality checks and error handling.
    • Sink: silver.enriched_logistics_costs Delta table. This table will contain the cleaned, enriched, and cost-calculated logistics data.
graph TD A[Kafka Topic: logistics_events] --> B(Spark Structured Streaming Job 1: Raw Ingestion) B --> C[Delta Table: bronze.raw_logistics_events] C --> D(Spark Structured Streaming Job 2: Cost Enrichment) subgraph Lookups E[Delta Table: gold.tariff_analysis] F[Delta Table: silver.fuel_prices] end E --> D F --> D D --> G[Delta Table: silver.enriched_logistics_costs] G --> H[Reporting/Analytics/Dashboards]

Database Schema (Unity Catalog)

We will define the schemas for the new Delta tables in Unity Catalog.

bronze.raw_logistics_events This table will store the raw JSON messages from Kafka, along with metadata added during ingestion.

Column NameData TypeDescription
valueSTRINGRaw JSON message from Kafka
keyBINARYKafka message key (if any)
topicSTRINGKafka topic name
partitionINTKafka partition
offsetLONGKafka offset
timestampTIMESTAMPKafka message timestamp
ingestion_timeTIMESTAMPTimestamp when the record was ingested into Delta

silver.fuel_prices This table will store fuel price data, which we’ll assume is updated periodically (e.g., daily) or streamed from another source. For this chapter, we’ll simulate it.

Column NameData TypeDescription
fuel_typeSTRINGType of fuel (e.g., Diesel, Gasoline)
regionSTRINGGeographic region for the price
price_per_unitDOUBLEPrice per unit of fuel (e.g., per liter/gallon)
currencySTRINGCurrency of the price
effective_dateDATEDate when the price became effective
update_timeTIMESTAMPTimestamp of the last update

silver.enriched_logistics_costs This table will contain the processed and enriched logistics events with calculated costs.

Column NameData TypeDescription
event_idSTRINGUnique identifier for the logistics event
shipment_idSTRINGIdentifier for the shipment
event_typeSTRINGType of event (e.g., ‘SHIPMENT_UPDATE’, ‘FUEL_STOP’)
event_timestampTIMESTAMPTimestamp of the logistics event
origin_locationSTRINGOrigin location of the shipment
destination_locationSTRINGDestination location of the shipment
product_hs_codeSTRINGHarmonized System Code of the product
distance_kmDOUBLEDistance covered for the event (in kilometers)
fuel_consumed_litersDOUBLEFuel consumed for the event (in liters)
driver_idSTRINGID of the driver involved
current_statusSTRINGCurrent status of the shipment
tariff_impact_usdDOUBLECalculated tariff cost impact for the shipment
fuel_price_per_unitDOUBLEFuel price at the time/region of event
calculated_fuel_cost_usdDOUBLECalculated fuel cost for the event
total_logistics_cost_usdDOUBLETotal calculated logistics cost for the event
processing_timestampTIMESTAMPTimestamp when the record was processed
data_quality_statusSTRINGStatus of data quality check (e.g., ‘CLEAN’, ‘BAD_SCHEMA’)
error_detailsSTRINGDetails if data quality issues found

File Structure

For this chapter, we will organize our code in Databricks notebooks, which can later be modularized into Python scripts and deployed via Databricks Asset Bundles for production CI/CD.

├── workspace/
│   ├── supply_chain_project/
│   │   ├── 08_structured_streaming_cost_monitoring/
│   │   │   ├── 01_setup_schemas_and_lookups.py
│   │   │   ├── 02_stream_raw_logistics_events_bronze.py
│   │   │   ├── 03_stream_enriched_logistics_costs_silver.py
│   │   │   ├── 04_simulate_kafka_data.py (for local testing)
│   │   │   ├── conf/
│   │   │   │   ├── kafka_config.py

3. Step-by-Step Implementation

We’ll build the pipeline incrementally. Ensure you have a Databricks cluster running with access to Unity Catalog.

3.1. Setup & Configuration

First, let’s define our Unity Catalog schema, simulate the silver.fuel_prices table, and configure Kafka connection details.

a) Create Unity Catalog Schema and silver.fuel_prices Table

We’ll create a Python notebook to set up the necessary Unity Catalog schema and a mock silver.fuel_prices table.

File: workspace/supply_chain_project/08_structured_streaming_cost_monitoring/01_setup_schemas_and_lookups.py

# Databricks notebook source
# MAGIC %md
# MAGIC ### Setup: Create Unity Catalog Schema and Mock Lookup Tables
# MAGIC
# MAGIC This notebook initializes the Unity Catalog schema and creates a mock `silver.fuel_prices` table for our streaming pipeline.
# MAGIC It also defines the expected schema for incoming Kafka messages.

# COMMAND ----------

import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField, StringType, TimestampType, DoubleType, DateType, IntegerType
)
from delta.tables import DeltaTable

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# COMMAND ----------

# Configuration
CATALOG_NAME = "supply_chain_lakehouse"
SCHEMA_NAME = "bronze" # For raw events
SCHEMA_NAME_SILVER = "silver" # For enriched data and lookup tables
SCHEMA_NAME_GOLD = "gold" # For tariff analysis lookup

# COMMAND ----------

# MAGIC %md
# MAGIC #### 1. Create Unity Catalog Schemas
# MAGIC
# MAGIC We'll ensure the necessary schemas exist in Unity Catalog.

# COMMAND ----------

try:
    spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG_NAME}")
    spark.sql(f"USE CATALOG {CATALOG_NAME}")
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME}")
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME_SILVER}")
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME_GOLD}") # Ensure gold schema from DLT exists
    logger.info(f"Unity Catalog schemas '{CATALOG_NAME}.{SCHEMA_NAME}', '{CATALOG_NAME}.{SCHEMA_NAME_SILVER}', '{CATALOG_NAME}.{SCHEMA_NAME_GOLD}' ensured.")
except Exception as e:
    logger.error(f"Error creating Unity Catalog schemas: {e}")
    raise

# COMMAND ----------

# MAGIC %md
# MAGIC #### 2. Define Expected Kafka Message Schema
# MAGIC
# MAGIC This is the schema we expect for the `logistics_events` Kafka topic.

# COMMAND ----------

logistics_event_schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("shipment_id", StringType(), False),
    StructField("event_type", StringType(), False), # e.g., 'SHIPMENT_UPDATE', 'FUEL_STOP'
    StructField("event_timestamp", TimestampType(), False),
    StructField("origin_location", StringType(), True),
    StructField("destination_location", StringType(), True),
    StructField("product_hs_code", StringType(), True), # For tariff correlation
    StructField("distance_km", DoubleType(), True),
    StructField("fuel_consumed_liters", DoubleType(), True),
    StructField("driver_id", StringType(), True),
    StructField("current_status", StringType(), True)
])

logger.info("Logistics event schema defined.")

# COMMAND ----------

# MAGIC %md
# MAGIC #### 3. Create or Update `silver.fuel_prices` Lookup Table
# MAGIC
# MAGIC We'll create a mock fuel prices table. In a real-world scenario, this would be populated by another data pipeline (e.g., daily batch job or another stream).

# COMMAND ----------

fuel_prices_table_path = f"{CATALOG_NAME}.{SCHEMA_NAME_SILVER}.fuel_prices"

# Data for the mock fuel prices table
fuel_price_data = [
    ("Diesel", "North America", 1.25, "USD", "2025-12-19", "2025-12-19 08:00:00"),
    ("Diesel", "Europe", 1.50, "EUR", "2025-12-19", "2025-12-19 08:00:00"),
    ("Diesel", "Asia", 1.10, "USD", "2025-12-19", "2025-12-19 08:00:00"),
    ("Gasoline", "North America", 1.15, "USD", "2025-12-19", "2025-12-19 08:00:00"),
    ("Gasoline", "Europe", 1.40, "EUR", "2025-12-19", "2025-12-19 08:00:00"),
]

fuel_prices_df = spark.createDataFrame(
    fuel_price_data,
    schema=["fuel_type", "region", "price_per_unit", "currency", "effective_date", "update_time"]
)

try:
    # Use MERGE INTO for idempotent updates
    # This ensures that if the table exists, it's updated, otherwise created.
    # For a simple mock, we can just overwrite or create.
    # For production, a MERGE is better if data changes frequently.
    if spark.catalog.tableExists(fuel_prices_table_path):
        logger.info(f"Table {fuel_prices_table_path} exists. Overwriting with mock data.")
        fuel_prices_df.write \
            .format("delta") \
            .mode("overwrite") \
            .option("mergeSchema", "true") \
            .saveAsTable(fuel_prices_table_path)
    else:
        logger.info(f"Table {fuel_prices_table_path} does not exist. Creating with mock data.")
        fuel_prices_df.write \
            .format("delta") \
            .mode("append") \
            .saveAsTable(fuel_prices_table_path)

    logger.info(f"Mock fuel prices table '{fuel_prices_table_path}' created/updated successfully.")
    spark.sql(f"SELECT * FROM {fuel_prices_table_path}").display()

except Exception as e:
    logger.error(f"Error creating/updating fuel prices table: {e}")
    raise

# COMMAND ----------

# MAGIC %md
# MAGIC #### 4. Verify Tariff Analysis Table (from DLT)
# MAGIC
# MAGIC We'll verify the `gold.tariff_analysis` table from previous DLT chapters exists and has data.
# MAGIC This table is crucial for joining.

# COMMAND ----------

tariff_analysis_table_path = f"{CATALOG_NAME}.{SCHEMA_NAME_GOLD}.tariff_analysis"

try:
    if spark.catalog.tableExists(tariff_analysis_table_path):
        logger.info(f"Tariff analysis table '{tariff_analysis_table_path}' found.")
        # Display a sample to confirm data presence
        spark.sql(f"SELECT * FROM {tariff_analysis_table_path} LIMIT 5").display()
    else:
        logger.warning(f"Tariff analysis table '{tariff_analysis_table_path}' not found. Please ensure DLT pipelines from previous chapters are run.")
        # Create a dummy table for local testing if not found, for demonstration purposes.
        # In production, this would be a hard dependency.
        dummy_tariff_data = [
            ("870323", "USA", "Mexico", 0.05, "2025-01-01"),
            ("870323", "Germany", "France", 0.02, "2025-01-01"),
            ("847130", "China", "USA", 0.10, "2025-01-01"),
        ]
        dummy_tariff_df = spark.createDataFrame(
            dummy_tariff_data,
            schema=["hs_code", "origin_country", "destination_country", "tariff_rate", "effective_date"]
        )
        dummy_tariff_df.write \
            .format("delta") \
            .mode("overwrite") \
            .saveAsTable(tariff_analysis_table_path)
        logger.info(f"Created dummy tariff analysis table '{tariff_analysis_table_path}' for demonstration.")
        spark.sql(f"SELECT * FROM {tariff_analysis_table_path} LIMIT 5").display()
except Exception as e:
    logger.error(f"Error verifying/creating tariff analysis table: {e}")
    raise

# COMMAND ----------

# MAGIC %md
# MAGIC #### 5. Kafka Configuration
# MAGIC
# MAGIC We'll store Kafka connection details in a configuration file or Databricks secrets. For this tutorial, we'll use a Python dictionary, but strongly recommend Databricks Secrets for production.

# COMMAND ----------

# File: conf/kafka_config.py
# This file would typically be managed by Databricks Secrets or a configuration management system.
# For tutorial purposes, we define it here.

kafka_bootstrap_servers = "<YOUR_KAFKA_BOOTSTRAP_SERVERS>" # e.g., "pkc-lxxxx.us-east-1.aws.confluent.cloud:9092"
kafka_topic_logistics_events = "logistics_events"
kafka_sasl_jaas_config = "org.apache.kafka.common.security.plain.PlainLoginModule required username='<YOUR_KAFKA_API_KEY>' password='<YOUR_KAFKA_API_SECRET>';"
kafka_security_protocol = "SASL_SSL"
kafka_sasl_mechanism = "PLAIN"

# IMPORTANT: In a production environment, NEVER hardcode credentials.
# Use Databricks Secrets (dbutils.secrets.get) for secure access.
# Example:
# kafka_bootstrap_servers = dbutils.secrets.get(scope="kafka_scope", key="bootstrap_servers")
# kafka_api_key = dbutils.secrets.get(scope="kafka_scope", key="api_key")
# kafka_api_secret = dbutils.secrets.get(scope="kafka_scope", key="api_secret")
# kafka_sasl_jaas_config = f"org.apache.kafka.common.security.plain.PlainLoginModule required username='{kafka_api_key}' password='{kafka_api_secret}';"

logger.info("Kafka configuration loaded (ensure secrets are handled securely in production).")

# COMMAND ----------

# MAGIC %md
# MAGIC #### Setup Complete
# MAGIC
# MAGIC The necessary Unity Catalog schemas are in place, mock lookup tables are populated, and Kafka configuration is defined. We are now ready to build our streaming jobs.

# COMMAND ----------

# End of 01_setup_schemas_and_lookups.py

b) Testing This Component

  • Run the 01_setup_schemas_and_lookups.py notebook in your Databricks workspace.
  • Verify in Unity Catalog that supply_chain_lakehouse.bronze, supply_chain_lakehouse.silver, and supply_chain_lakehouse.gold schemas exist.
  • Confirm that supply_chain_lakehouse.silver.fuel_prices and supply_chain_lakehouse.gold.tariff_analysis tables are created and contain the mock data by running:
    SELECT * FROM supply_chain_lakehouse.silver.fuel_prices;
    SELECT * FROM supply_chain_lakehouse.gold.tariff_analysis;
    
  • Ensure you’ve replaced <YOUR_KAFKA_BOOTSTRAP_SERVERS>, <YOUR_KAFKA_API_KEY>, and <YOUR_KAFKA_API_SECRET> with your actual Kafka connection details. For production, set up Databricks Secrets.

3.2. Core Implementation: Raw Logistics Event Ingestion (Bronze Layer)

This job reads raw JSON events from Kafka and writes them to the bronze.raw_logistics_events Delta table.

File: workspace/supply_chain_project/08_structured_streaming_cost_monitoring/02_stream_raw_logistics_events_bronze.py

# Databricks notebook source
# MAGIC %md
# MAGIC ### Stream Raw Logistics Events to Bronze Delta Table
# MAGIC
# MAGIC This notebook sets up a Spark Structured Streaming job to ingest raw logistics events from a Kafka topic
# MAGIC and land them into the `bronze.raw_logistics_events` Delta table.

# COMMAND ----------

import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, from_json, lit, expr
from pyspark.sql.types import (
    StructType, StructField, StringType, TimestampType, DoubleType, DateType, IntegerType
)
from delta.tables import DeltaTable
from pyspark.sql.streaming import StreamingQuery
from pyspark.sql.utils import StreamingQueryException

# Import Kafka configuration
# In a production setup, this would be a dedicated config module or Databricks Secrets
try:
    from supply_chain_project.08_structured_streaming_cost_monitoring.conf.kafka_config import (
        kafka_bootstrap_servers, kafka_topic_logistics_events,
        kafka_sasl_jaas_config, kafka_security_protocol, kafka_sasl_mechanism
    )
except ImportError:
    # Fallback for direct notebook execution if conf/kafka_config.py is not in path or run separately
    logging.warning("kafka_config.py not found as a module. Assuming variables are defined in current scope or hardcoded for testing.")
    kafka_bootstrap_servers = "<YOUR_KAFKA_BOOTSTRAP_SERVERS>"
    kafka_topic_logistics_events = "logistics_events"
    kafka_sasl_jaas_config = "org.apache.kafka.common.security.plain.PlainLoginModule required username='<YOUR_KAFKA_API_KEY>' password='<YOUR_KAFKA_API_SECRET>';"
    kafka_security_protocol = "SASL_SSL"
    kafka_sasl_mechanism = "PLAIN"


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# COMMAND ----------

# Configuration
CATALOG_NAME = "supply_chain_lakehouse"
SCHEMA_NAME = "bronze"
BRONZE_TABLE_NAME = "raw_logistics_events"
BRONZE_TABLE_PATH = f"{CATALOG_NAME}.{SCHEMA_NAME}.{BRONZE_TABLE_NAME}"
CHECKPOINT_LOCATION = f"/mnt/{CATALOG_NAME}/checkpoints/{SCHEMA_NAME}/{BRONZE_TABLE_NAME}" # Use ADLS/S3 mount for checkpoints

# Ensure mount point exists if using /mnt/
# dbutils.fs.mkdirs(f"/mnt/{CATALOG_NAME}/checkpoints")
# For Unity Catalog, external locations are preferred for managed checkpoints.
# For simplicity, we assume an external location or managed checkpointing.

# COMMAND ----------

# MAGIC %md
# MAGIC #### 1. Define Kafka Read Stream

# COMMAND ----------

try:
    # Read from Kafka using Spark Structured Streaming
    kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_logistics_events) \
        .option("startingOffsets", "earliest") \
        .option("failOnDataLoss", "false") \
        .option("kafka.security.protocol", kafka_security_protocol) \
        .option("kafka.sasl.mechanism", kafka_sasl_mechanism) \
        .option("kafka.sasl.jaas.config", kafka_sasl_jaas_config) \
        .load()

    logger.info(f"Kafka read stream initialized for topic: {kafka_topic_logistics_events}")

except Exception as e:
    logger.error(f"Failed to initialize Kafka read stream: {e}")
    raise

# COMMAND ----------

# MAGIC %md
# MAGIC #### 2. Transform and Select Data for Bronze Table

# COMMAND ----------

# Add ingestion timestamp and cast value to string
bronze_df = kafka_df.selectExpr(
    "CAST(key AS BINARY) as key",
    "CAST(value AS STRING) as value",
    "topic",
    "partition",
    "offset",
    "timestamp",
    "current_timestamp() as ingestion_time"
)

logger.info("Data transformed for bronze layer.")

# COMMAND ----------

# MAGIC %md
# MAGIC #### 3. Write Stream to Delta Lake Bronze Table

# COMMAND ----------

# Define the write stream query
try:
    bronze_stream_query = bronze_df.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", CHECKPOINT_LOCATION) \
        .option("mergeSchema", "true") # Allows schema evolution
        .trigger(processingTime="30 seconds") # Process data every 30 seconds
        .toTable(BRONZE_TABLE_PATH)

    logger.info(f"Bronze stream query started for table: {BRONZE_TABLE_PATH}. Checkpoint: {CHECKPOINT_LOCATION}")
    logger.info("To stop the stream, run: bronze_stream_query.stop()")

    # For development, you might want to wait for termination for a short period
    # bronze_stream_query.awaitTermination(timeout=300) # Wait for 5 minutes

except StreamingQueryException as e:
    logger.error(f"Streaming query failed: {e}")
    # Handle specific streaming errors
    if "offset out of range" in str(e):
        logger.error("Kafka offset out of range. Consider resetting startingOffsets or checkpoint.")
    raise
except Exception as e:
    logger.error(f"An unexpected error occurred during stream writing: {e}")
    raise

# COMMAND ----------

# MAGIC %md
# MAGIC #### Monitoring and Stopping the Stream
# MAGIC
# MAGIC You can monitor the stream's progress in the Databricks UI under the "Streaming" tab of your cluster.
# MAGIC To manually stop the stream (e.g., for development or redeployment):

# COMMAND ----------

# To stop the stream, uncomment and run the following line
# if bronze_stream_query.isActive:
#     bronze_stream_query.stop()
#     logger.info("Bronze stream query stopped.")

# COMMAND ----------

# End of 02_stream_raw_logistics_events_bronze.py

c) Testing This Component

  1. Run the Bronze Ingestion Notebook: Execute 02_stream_raw_logistics_events_bronze.py in your Databricks workspace. It should start a streaming job.

  2. Simulate Kafka Data (if not already generating): You’ll need to send messages to your logistics_events Kafka topic. Here’s a simple Python script (run from your local machine or another Databricks notebook) to simulate data.

    File: workspace/supply_chain_project/08_structured_streaming_cost_monitoring/04_simulate_kafka_data.py

    # This script is to be run locally or in a separate environment to push data to Kafka.
    # It requires 'kafka-python' library: pip install kafka-python
    
    import json
    import time
    from datetime import datetime
    from kafka import KafkaProducer
    import uuid
    import random
    
    # Kafka configuration (replace with your details)
    KAFKA_BOOTSTRAP_SERVERS = "<YOUR_KAFKA_BOOTSTRAP_SERVERS>"
    KAFKA_TOPIC = "logistics_events"
    KAFKA_API_KEY = "<YOUR_KAFKA_API_KEY>"
    KAFKA_API_SECRET = "<YOUR_KAFKA_API_SECRET>"
    
    # Ensure to replace with your actual Kafka connection details
    producer = KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        security_protocol="SASL_SSL",
        sasl_mechanism="PLAIN",
        sasl_plain_username=KAFKA_API_KEY,
        sasl_plain_password=KAFKA_API_SECRET,
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    
    print(f"Kafka Producer initialized for topic: {KAFKA_TOPIC}")
    
    def generate_logistics_event():
        event_id = str(uuid.uuid4())
        shipment_id = f"SHIP{random.randint(10000, 99999)}"
        event_type = random.choice(["SHIPMENT_UPDATE", "FUEL_STOP", "ROUTE_CHANGE", "DELIVERY_SCAN"])
        event_timestamp = datetime.now().isoformat(timespec='seconds') + 'Z' # ISO 8601 format
    
        # Mock HS codes that exist in our dummy tariff table
        hs_codes = ["870323", "847130"]
        product_hs_code = random.choice(hs_codes)
    
        # Mock locations
        origin_locations = ["Port of Hamburg", "Shenzhen Logistics Hub", "Chicago Distribution Ctr"]
        destination_locations = ["New York Port", "Rotterdam Warehouse", "Los Angeles DC"]
        regions = ["North America", "Europe", "Asia"] # For fuel price lookup
    
        event_data = {
            "event_id": event_id,
            "shipment_id": shipment_id,
            "event_type": event_type,
            "event_timestamp": event_timestamp,
            "origin_location": random.choice(origin_locations),
            "destination_location": random.choice(destination_locations),
            "product_hs_code": product_hs_code,
            "distance_km": round(random.uniform(50, 1000), 2) if event_type != "DELIVERY_SCAN" else None,
            "fuel_consumed_liters": round(random.uniform(10, 200), 2) if event_type == "FUEL_STOP" else None,
            "driver_id": f"DRV{random.randint(100, 999)}",
            "current_status": random.choice(["IN_TRANSIT", "AT_PORT", "DELIVERED", "DELAYED"])
        }
        return event_data
    
    # Send 10 messages every 5 seconds
    for i in range(20):
        event = generate_logistics_event()
        print(f"Sending event: {event['event_id']}")
        producer.send(KAFKA_TOPIC, event)
        time.sleep(5)
    
    producer.flush()
    print("Finished sending messages.")
    
  3. Verify Data in Bronze Table: In a new Databricks cell or notebook, query the bronze.raw_logistics_events table:

    SELECT * FROM supply_chain_lakehouse.bronze.raw_logistics_events;
    

    You should see raw Kafka messages appearing. Check the value column for JSON content and ingestion_time for recent timestamps.

3.3. Core Implementation: Enriched Logistics Cost Calculation (Silver Layer)

This job reads from the bronze.raw_logistics_events Delta table, parses the JSON, joins with gold.tariff_analysis and silver.fuel_prices, calculates costs, and writes to silver.enriched_logistics_costs.

File: workspace/supply_chain_project/08_structured_streaming_cost_monitoring/03_stream_enriched_logistics_costs_silver.py

# Databricks notebook source
# MAGIC %md
# MAGIC ### Stream Enriched Logistics Costs to Silver Delta Table
# MAGIC
# MAGIC This notebook sets up a Spark Structured Streaming job to read raw logistics events from the Bronze layer,
# MAGIC parse them, enrich with tariff and fuel price data, calculate total costs, and land them into the
# MAGIC `silver.enriched_logistics_costs` Delta table.

# COMMAND ----------

import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, current_timestamp, from_json, lit, expr, coalesce, when,
    date_trunc, to_date, year, month, dayofmonth,
    sha2, concat_ws, md5
)
from pyspark.sql.types import (
    StructType, StructField, StringType, TimestampType, DoubleType, DateType, IntegerType, LongType
)
from delta.tables import DeltaTable
from pyspark.sql.streaming import StreamingQuery
from pyspark.sql.utils import StreamingQueryException

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# COMMAND ----------

# Configuration
CATALOG_NAME = "supply_chain_lakehouse"
SCHEMA_NAME_BRONZE = "bronze"
SCHEMA_NAME_SILVER = "silver"
SCHEMA_NAME_GOLD = "gold"

BRONZE_TABLE_PATH = f"{CATALOG_NAME}.{SCHEMA_NAME_BRONZE}.raw_logistics_events"
SILVER_FUEL_PRICES_TABLE_PATH = f"{CATALOG_NAME}.{SCHEMA_NAME_SILVER}.fuel_prices"
GOLD_TARIFF_ANALYSIS_TABLE_PATH = f"{CATALOG_NAME}.{SCHEMA_NAME_GOLD}.tariff_analysis"
SILVER_ENRICHED_COSTS_TABLE_NAME = "enriched_logistics_costs"
SILVER_ENRICHED_COSTS_TABLE_PATH = f"{CATALOG_NAME}.{SCHEMA_NAME_SILVER}.{SILVER_ENRICHED_COSTS_TABLE_NAME}"
CHECKPOINT_LOCATION_SILVER = f"/mnt/{CATALOG_NAME}/checkpoints/{SCHEMA_NAME_SILVER}/{SILVER_ENRICHED_COSTS_TABLE_NAME}"

# Ensure mount point exists if using /mnt/
# dbutils.fs.mkdirs(f"/mnt/{CATALOG_NAME}/checkpoints")
# For Unity Catalog, external locations are preferred for managed checkpoints.

# COMMAND ----------

# MAGIC %md
# MAGIC #### 1. Define Expected Kafka Message Schema (for parsing Bronze `value` column)
# MAGIC
# MAGIC This schema must match the structure of your Kafka `logistics_events` messages.

# COMMAND ----------

# This schema is used to parse the 'value' (JSON string) column from the bronze table
logistics_event_schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("shipment_id", StringType(), False),
    StructField("event_type", StringType(), False), # e.g., 'SHIPMENT_UPDATE', 'FUEL_STOP'
    StructField("event_timestamp", TimestampType(), False),
    StructField("origin_location", StringType(), True),
    StructField("destination_location", StringType(), True),
    StructField("product_hs_code", StringType(), True), # For tariff correlation
    StructField("distance_km", DoubleType(), True),
    StructField("fuel_consumed_liters", DoubleType(), True),
    StructField("driver_id", StringType(), True),
    StructField("current_status", StringType(), True)
])

logger.info("Logistics event schema for parsing defined.")

# COMMAND ----------

# MAGIC %md
# MAGIC #### 2. Read Stream from Bronze Delta Table

# COMMAND ----------

try:
    # Read from the bronze Delta table as a streaming source
    bronze_stream_df = spark.readStream \
        .format("delta") \
        .option("ignoreDeletes", "true") \
        .option("ignoreChanges", "true") \
        .table(BRONZE_TABLE_PATH)

    logger.info(f"Structured Stream initialized from Bronze table: {BRONZE_TABLE_PATH}")

except Exception as e:
    logger.error(f"Failed to initialize stream from Bronze Delta table: {e}")
    raise

# COMMAND ----------

# MAGIC %md
# MAGIC #### 3. Parse JSON, Flatten, and Apply Data Quality Checks

# COMMAND ----------

# Parse the JSON string 'value' column into structured columns
parsed_df = bronze_stream_df.withColumn(
    "parsed_data",
    from_json(col("value"), logistics_event_schema)
).select(
    col("parsed_data.*"),
    col("ingestion_time").alias("bronze_ingestion_time"),
    col("offset").alias("kafka_offset_id") # Keep offset for traceability
)

# Apply basic data quality checks
# For example, filter out records where 'event_id' or 'shipment_id' is null
# Or flag them for a Dead Letter Queue (DLQ) if using a more advanced pattern
clean_df = parsed_df.filter(
    col("event_id").isNotNull() & col("shipment_id").isNotNull()
).withColumn(
    "data_quality_status", lit("CLEAN")
).withColumn(
    "error_details", lit(None)
)

# For records failing schema parsing or basic checks, you might route them to a DLQ
# For simplicity, we're filtering out. In production, consider a more robust DLQ approach.
bad_schema_df = parsed_df.filter(
    col("event_id").isNull() | col("shipment_id").isNull()
).withColumn(
    "data_quality_status", lit("BAD_SCHEMA_OR_MISSING_KEY_FIELDS")
).withColumn(
    "error_details", lit("Missing event_id or shipment_id in raw JSON")
)

# Union clean and bad_schema records to write to a single table, but flag bad ones.
# For this tutorial, we'll proceed with `clean_df` directly for simplicity,
# assuming a separate DLQ stream for `bad_schema_df` in a full production system.
# If we wanted to write all, we'd union them and then write.
# For now, we'll only process clean_df to the silver table.

logger.info("JSON parsing and initial data quality checks applied.")

# COMMAND ----------

# MAGIC %md
# MAGIC #### 4. Load Lookup Tables (Tariff Analysis and Fuel Prices)
# MAGIC
# MAGIC We'll load these as batch DataFrames which Spark will handle efficiently (e.g., broadcasting for small tables) for joining with the stream.

# COMMAND ----------

try:
    # Load the tariff analysis table
    tariff_analysis_df = spark.read.table(GOLD_TARIFF_ANALYSIS_TABLE_PATH) \
        .select(
            col("hs_code").alias("tariff_hs_code"),
            col("origin_country").alias("tariff_origin_country"),
            col("destination_country").alias("tariff_destination_country"),
            col("tariff_rate"),
            col("effective_date").alias("tariff_effective_date")
        )

    logger.info(f"Tariff analysis lookup table '{GOLD_TARIFF_ANALYSIS_TABLE_PATH}' loaded.")

    # Load the fuel prices table
    fuel_prices_df = spark.read.table(SILVER_FUEL_PRICES_TABLE_PATH) \
        .select(
            col("fuel_type").alias("fuel_lookup_type"),
            col("region").alias("fuel_lookup_region"),
            col("price_per_unit").alias("fuel_price_per_unit"),
            col("currency").alias("fuel_currency"),
            col("effective_date").alias("fuel_effective_date")
        )

    logger.info(f"Fuel prices lookup table '{SILVER_FUEL_PRICES_TABLE_PATH}' loaded.")

except Exception as e:
    logger.error(f"Failed to load lookup tables: {e}")
    raise

# COMMAND ----------

# MAGIC %md
# MAGIC #### 5. Enrich Data with Lookups and Calculate Costs

# COMMAND ----------

# Determine region from origin_location for fuel price lookup (simplified)
# In reality, you'd need a more sophisticated geo-mapping service
enriched_df = clean_df.withColumn(
    "event_region",
    when(col("origin_location").contains("America"), "North America")
    .when(col("origin_location").contains("Europe"), "Europe")
    .when(col("origin_location").contains("Asia"), "Asia")
    .otherwise("Other")
)

# Join with Fuel Prices
# This is a stream-static join. Spark handles this by broadcasting the static DF.
enriched_df = enriched_df.join(
    fuel_prices_df,
    (enriched_df["event_region"] == fuel_prices_df["fuel_lookup_region"]) &
    (to_date(enriched_df["event_timestamp"]) >= fuel_prices_df["fuel_effective_date"]) & # Use effective date for prices
    (lit("Diesel") == fuel_prices_df["fuel_lookup_type"]), # Assuming 'Diesel' for simplicity, could be dynamic
    "leftouter"
)

# Join with Tariff Analysis
enriched_df = enriched_df.join(
    tariff_analysis_df,
    (enriched_df["product_hs_code"] == tariff_analysis_df["tariff_hs_code"]) &
    (to_date(enriched_df["event_timestamp"]) >= tariff_analysis_df["tariff_effective_date"]), # Use effective date for tariffs
    "leftouter"
)

# Calculate costs
final_silver_df = enriched_df.withColumn(
    "calculated_fuel_cost_usd",
    coalesce(col("fuel_consumed_liters") * col("fuel_price_per_unit"), lit(0.0))
).withColumn(
    "tariff_impact_usd",
    when(col("tariff_rate").isNotNull(), lit(100.0) * col("tariff_rate")) # Example: 100 USD base value for calculation
    .otherwise(lit(0.0))
).withColumn(
    "other_fixed_cost_usd", lit(50.0) # Example: a fixed handling cost per event
).withColumn(
    "total_logistics_cost_usd",
    col("calculated_fuel_cost_usd") + col("tariff_impact_usd") + col("other_fixed_cost_usd")
).withColumn(
    "processing_timestamp", current_timestamp()
).select(
    col("event_id"),
    col("shipment_id"),
    col("event_type"),
    col("event_timestamp"),
    col("origin_location"),
    col("destination_location"),
    col("product_hs_code"),
    col("distance_km"),
    col("fuel_consumed_liters"),
    col("driver_id"),
    col("current_status"),
    col("tariff_impact_usd"),
    col("fuel_price_per_unit"),
    col("calculated_fuel_cost_usd"),
    col("total_logistics_cost_usd"),
    col("processing_timestamp"),
    col("data_quality_status"),
    col("error_details")
)

logger.info("Data enriched and costs calculated.")

# COMMAND ----------

# MAGIC %md
# MAGIC #### 6. Write Stream to Delta Lake Silver Table

# COMMAND ----------

# Define the write stream query for the silver table
try:
    silver_stream_query = final_silver_df.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", CHECKPOINT_LOCATION_SILVER) \
        .option("mergeSchema", "true") \
        .trigger(processingTime="30 seconds") # Process data every 30 seconds
        .toTable(SILVER_ENRICHED_COSTS_TABLE_PATH)

    logger.info(f"Silver stream query started for table: {SILVER_ENRICHED_COSTS_TABLE_PATH}. Checkpoint: {CHECKPOINT_LOCATION_SILVER}")
    logger.info("To stop the stream, run: silver_stream_query.stop()")

    # For development, you might want to wait for termination for a short period
    # silver_stream_query.awaitTermination(timeout=300) # Wait for 5 minutes

except StreamingQueryException as e:
    logger.error(f"Streaming query failed: {e}")
    # Handle specific streaming errors
    raise
except Exception as e:
    logger.error(f"An unexpected error occurred during stream writing to silver: {e}")
    raise

# COMMAND ----------

# MAGIC %md
# MAGIC #### Monitoring and Stopping the Stream
# MAGIC
# MAGIC You can monitor the stream's progress in the Databricks UI under the "Streaming" tab of your cluster.
# MAGIC To manually stop the stream (e.g., for development or redeployment):

# COMMAND ----------

# To stop the stream, uncomment and run the following line
# if silver_stream_query.isActive:
#     silver_stream_query.stop()
#     logger.info("Silver stream query stopped.")

# COMMAND ----------

# End of 03_stream_enriched_logistics_costs_silver.py

c) Testing This Component

  1. Ensure Bronze Stream is Running: Verify that the 02_stream_raw_logistics_events_bronze.py notebook’s stream is active.
  2. Run the Silver Enrichment Notebook: Execute 03_stream_enriched_logistics_costs_silver.py in your Databricks workspace. This will start the second streaming job.
  3. Simulate More Kafka Data: Use the 04_simulate_kafka_data.py script to send more events to Kafka.
  4. Verify Data in Silver Table: After a few minutes, query the silver.enriched_logistics_costs table:
    SELECT * FROM supply_chain_lakehouse.silver.enriched_logistics_costs;
    
    You should see records with tariff_impact_usd, fuel_price_per_unit, calculated_fuel_cost_usd, and total_logistics_cost_usd populated. Check that the values make sense based on your mock data and calculations. Specifically, look for product_hs_code values like “870323” or “847130” to see if tariff impacts are applied correctly.

4. Production Considerations

Deploying Spark Structured Streaming jobs in production requires careful attention to robustness, performance, security, and observability.

  • Error Handling and Dead Letter Queues (DLQ):

    • Schema Evolution: Use option("mergeSchema", "true") for Delta sinks to handle minor schema changes. For breaking changes, consider a separate stream for failed records.
    • Corrupted Records: For Kafka sources, option("failOnDataLoss", "false") prevents the stream from failing on unrecoverable data loss. For parsing JSON, records that fail from_json can be routed to a DLQ. Instead of simply filtering them out, use bad_records_path option or a UNION with a bad_schema_df to write them to a separate error table (e.g., bronze.logistics_events_errors) for later investigation.
    • Lookup Failures: If joins fail (e.g., missing HS codes or fuel prices), the leftouter join ensures the main event still flows through, but the joined columns will be null. Downstream logic must handle these nulls or flag them for review.
  • Performance Optimization:

    • Cluster Sizing: Right-size your Databricks cluster based on data volume, velocity, and complexity of transformations. Use autoscaling to handle varying loads.
    • Micro-Batch Interval (processingTime): Tune this (e.g., trigger(processingTime="30 seconds")) to balance latency and throughput. Shorter intervals mean lower latency but potentially higher overhead.
    • State Management: For stateful operations (like aggregations or deduplication with watermarking), optimize state storage (e.g., RocksDB state store on Databricks) and checkpointing. Our current pipeline is stateless, simplifying this.
    • Join Optimization: Spark automatically optimizes stream-static joins. Ensure static lookup tables are small enough to be broadcast (spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")). For larger lookup tables, consider re-partitioning or pre-joining.
    • Delta Lake Optimizations: optimizeWrite and autoCompact are automatically enabled for DLT. For manual Structured Streaming, consider spark.sql("ALTER TABLE ... SET TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true', 'delta.autoOptimize.autoCompact' = 'true')") for your Delta tables, or run OPTIMIZE commands periodically.
  • Security Considerations:

    • Unity Catalog: Leverage Unity Catalog for fine-grained access control on all your Delta tables (bronze.raw_logistics_events, silver.fuel_prices, gold.tariff_analysis, silver.enriched_logistics_costs). Grant SELECT privileges to streaming jobs and MODIFY privileges to the target tables.
    • Secret Management: Crucially, use Databricks Secrets to store Kafka credentials (API keys, secrets, bootstrap servers) instead of hardcoding them. Access them using dbutils.secrets.get(scope="your_scope", key="your_key").
    • Network Security: Ensure your Databricks workspace has secure network connectivity to your Kafka cluster (e.g., VPC peering, Private Link, or secure egress configurations).
  • Logging and Monitoring:

    • Structured Logging: Implement robust logging within your Spark applications using Python’s logging module. Log key metrics, errors, and progress.
    • Databricks Monitoring: Utilize Databricks’ built-in monitoring tools for streaming jobs (Spark UI, Streaming tab). Set up alerts for stream failures or backlogs.
    • External Monitoring: Integrate Databricks logs and metrics with external monitoring systems like Splunk, ELK stack, Prometheus/Grafana, or cloud-native monitoring services (Azure Monitor, AWS CloudWatch) for centralized observability.
    • Metrics: Monitor Kafka consumer lag, processing rates, number of input rows, number of output rows, and error counts.

5. Code Review Checkpoint

At this point, you have successfully built and tested two interconnected Spark Structured Streaming jobs:

  1. 02_stream_raw_logistics_events_bronze.py: Ingests raw logistics events from Kafka into supply_chain_lakehouse.bronze.raw_logistics_events.
  2. 03_stream_enriched_logistics_costs_silver.py: Reads from the Bronze table, parses JSON, joins with supply_chain_lakehouse.gold.tariff_analysis (from DLT) and supply_chain_lakehouse.silver.fuel_prices (mocked in this chapter), calculates logistics costs, and writes to supply_chain_lakehouse.silver.enriched_logistics_costs.

Files Created/Modified:

  • workspace/supply_chain_project/08_structured_streaming_cost_monitoring/01_setup_schemas_and_lookups.py
  • workspace/supply_chain_project/08_structured_streaming_cost_monitoring/02_stream_raw_logistics_events_bronze.py
  • workspace/supply_chain_project/08_structured_streaming_cost_monitoring/03_stream_enriched_logistics_costs_silver.py
  • workspace/supply_chain_project/08_structured_streaming_cost_monitoring/04_simulate_kafka_data.py (Local utility)
  • workspace/supply_chain_project/08_structured_streaming_cost_monitoring/conf/kafka_config.py (or defined in notebooks for tutorial)

Integration with Existing Code:

  • This chapter leverages the gold.tariff_analysis Delta table, which is an output of your DLT pipelines from previous chapters, demonstrating the interoperability of DLT and Structured Streaming within the Lakehouse.
  • The silver.enriched_logistics_costs table is now available for consumption by downstream analytics, reporting, and potentially other DLT pipelines (e.g., to create a gold layer for specific business metrics).

6. Common Issues & Solutions

  1. Kafka Connection/Authentication Errors:

    • Issue: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer or Auth mechanism Plain not supported.
    • Debug: Double-check your kafka.bootstrap.servers, kafka.sasl.jaas.config, kafka.security.protocol, and kafka.sasl.mechanism options. Ensure your API key and secret are correct and have the necessary permissions on the Kafka topic. Verify network connectivity between your Databricks cluster and Kafka.
    • Prevention: Always test Kafka connectivity separately (e.g., using a simple kafka-python client or kafkacat) before integrating with Spark. Use Databricks Secrets for credentials.
  2. Streaming Query Never Starts or Fails Immediately:

    • Issue: The stream query shows “Starting…” indefinitely, or fails with a generic StreamingQueryException.
    • Debug:
      • Check logs: Look at the Spark Driver logs and executor logs in the Databricks UI for more specific error messages.
      • Checkpoint Location: Ensure the checkpointLocation path is valid and accessible by the cluster. Incorrect permissions or non-existent paths are common issues.
      • Input Data: If reading from Kafka, ensure there’s data in the topic. If reading from a Delta table, ensure the table exists and has data.
      • Schema Mismatch: If parsing JSON, a mismatch between the expected logistics_event_schema and the actual Kafka message structure can cause issues. Use failOnDataLoss="false" and inspect the value column in the bronze table.
    • Prevention: Start with a very simple stream (e.g., just reading Kafka and printing to console) to isolate issues. Gradually add transformations.
  3. Data Not Appearing in Silver Table / Incorrect Joins:

    • Issue: Data flows to the Bronze table, but the Silver table remains empty or contains incorrect/null values after enrichment.
    • Debug:
      • Check Bronze Data: Query the Bronze table to ensure raw events are landing correctly and the value column contains valid JSON.
      • Inspect parsed_df: In the Silver notebook, add a display(parsed_df) or parsed_df.printSchema() to verify JSON parsing.
      • Lookup Table Data: Query silver.fuel_prices and gold.tariff_analysis to ensure they contain the expected lookup data.
      • Join Conditions: Carefully review your join conditions. A leftouter join is usually preferred for enrichment to avoid dropping events, but if the conditions are always false, you’ll see nulls. Check for data type mismatches or casing issues in join keys.
      • Timestamp/Date Filters: Ensure to_date(col("event_timestamp")) >= lookup_df["effective_date"] logic is correct for time-sensitive lookups.
    • Prevention: Use EXPLAIN on your DataFrames to see the execution plan. Test joins with a small batch of data before running the full stream.

7. Testing & Verification

To thoroughly test and verify the work of this chapter, follow these steps:

  1. Initial Setup Verification:

    • Run 01_setup_schemas_and_lookups.py.
    • Query supply_chain_lakehouse.silver.fuel_prices and supply_chain_lakehouse.gold.tariff_analysis to confirm mock data presence.
  2. Bronze Layer Verification:

    • Run 02_stream_raw_logistics_events_bronze.py.
    • Run 04_simulate_kafka_data.py (from your local machine) to send a batch of 5-10 messages.
    • Wait 30-60 seconds for the stream to process.
    • Query SELECT * FROM supply_chain_lakehouse.bronze.raw_logistics_events in a new Databricks cell. Verify:
      • New records are present.
      • value column contains the expected JSON.
      • ingestion_time is recent.
  3. Silver Layer Verification:

    • Ensure 02_stream_raw_logistics_events_bronze.py is still running.
    • Run 03_stream_enriched_logistics_costs_silver.py.
    • Run 04_simulate_kafka_data.py again to send another batch of messages.
    • Wait 60-90 seconds for both streams to process.
    • Query SELECT * FROM supply_chain_lakehouse.silver.enriched_logistics_costs in a new Databricks cell. Verify:
      • New records are present.
      • event_id, shipment_id, event_type, event_timestamp are correctly parsed.
      • product_hs_code is present.
      • fuel_price_per_unit is populated (should match your mock data for “Diesel” and “North America”, “Europe”, “Asia” regions).
      • calculated_fuel_cost_usd is correctly computed (fuel_consumed_liters * fuel_price_per_unit).
      • tariff_impact_usd is populated (e.g., 5.0 for HS code “870323” and 10.0 for “847130” if using the dummy tariffs).
      • total_logistics_cost_usd is the sum of calculated costs.
      • processing_timestamp is recent.
      • data_quality_status is ‘CLEAN’.
  4. Stopping Streams: After verification, stop both streaming queries (bronze_stream_query.stop() and silver_stream_query.stop()) if you are not proceeding immediately.

8. Summary & Next Steps

In this comprehensive chapter, you successfully implemented a real-time logistics cost monitoring pipeline using Spark Structured Streaming on Databricks. You learned how to:

  • Set up Unity Catalog schemas and mock lookup tables (silver.fuel_prices).
  • Ingest raw logistics event data from Kafka into a Bronze Delta table (bronze.raw_logistics_events).
  • Implement a Silver-layer streaming job to parse, clean, and enrich logistics events.
  • Join streaming data with static lookup tables (gold.tariff_analysis and silver.fuel_prices) to correlate tariff impacts and fuel costs.
  • Calculate real-time logistics cost components and total costs.
  • Write the enriched, cost-calculated data to a Silver Delta table (silver.enriched_logistics_costs), ready for downstream analytics.
  • Consider key production aspects like error handling, performance, security (using Databricks Secrets and Unity Catalog), and monitoring.

This pipeline forms a critical component of our real-time supply chain intelligence platform, providing immediate insights into operational costs.

In the next chapter, we will focus on Customs Trade Data Lakehouse for HS Code Classification Validation and Anomaly Detection. We will explore how to ingest and process customs trade data, validate HS code classifications using machine learning models, and detect anomalies in trade patterns, further enhancing the data quality and analytical capabilities of our Lakehouse.