1. Chapter Introduction
In modern supply chains, real-time visibility into logistics costs is paramount for effective decision-making, cost optimization, and competitive advantage. This chapter guides you through building a robust, real-time logistics cost monitoring pipeline using Apache Spark Structured Streaming on Databricks. We will ingest streaming logistics events from Kafka, process them to calculate various cost components, and enrich them with previously generated tariff data and dynamic fuel prices.
This step is crucial for providing immediate insights into transportation expenses, identifying cost anomalies, and enabling proactive adjustments to logistics strategies. By correlating live logistics data with tariff impacts and fuel price fluctuations, businesses can understand the true cost drivers and optimize routes, carriers, and procurement.
Prerequisites: Before starting this chapter, you should have:
- A Databricks workspace configured with Unity Catalog.
- Kafka up and running (either self-managed or a managed service like Confluent Cloud, AWS MSK, Azure Event Hubs).
- The DLT pipelines from previous chapters deployed, specifically those populating the
gold.tariff_analysistable with HS Code-based tariff impacts. - An understanding of Spark SQL and basic Python programming.
Expected Outcome:
By the end of this chapter, you will have a production-ready Spark Structured Streaming application that continuously ingests logistics events, calculates real-time costs, enriches them with tariff and fuel price data, and stores the results in a Delta Lake table (silver.enriched_logistics_costs) within your Unity Catalog-managed Lakehouse. This data will be ready for downstream analytics and reporting.
2. Planning & Design
To achieve real-time logistics cost monitoring, we’ll implement a multi-hop streaming architecture on Databricks, leveraging Spark Structured Streaming for its scalability and fault tolerance.
Component Architecture
Our streaming pipeline will consist of two main Structured Streaming jobs, forming a Bronze-to-Silver layer transformation:
Raw Logistics Event Ingestion (Bronze Layer):
- Source: Kafka topic (
logistics_events). - Process: Read raw JSON messages, apply a schema, add ingestion metadata (timestamp, Kafka offset).
- Sink:
bronze.raw_logistics_eventsDelta table. This table will hold the immutable, raw stream of events.
- Source: Kafka topic (
Enriched Logistics Cost Calculation (Silver Layer):
- Source:
bronze.raw_logistics_eventsDelta table (as a streaming source). - Lookup Sources:
gold.tariff_analysisDelta table (from DLT pipeline) for HS Code tariff impacts.silver.fuel_pricesDelta table (a new table we’ll introduce, representing current fuel prices by region/type).
- Process:
- Join streaming logistics events with static/slowly changing tariff data and fuel price data.
- Calculate various cost components: fuel cost, tariff cost, handling fees, etc.
- Apply data quality checks and error handling.
- Sink:
silver.enriched_logistics_costsDelta table. This table will contain the cleaned, enriched, and cost-calculated logistics data.
- Source:
Database Schema (Unity Catalog)
We will define the schemas for the new Delta tables in Unity Catalog.
bronze.raw_logistics_events
This table will store the raw JSON messages from Kafka, along with metadata added during ingestion.
| Column Name | Data Type | Description |
|---|---|---|
value | STRING | Raw JSON message from Kafka |
key | BINARY | Kafka message key (if any) |
topic | STRING | Kafka topic name |
partition | INT | Kafka partition |
offset | LONG | Kafka offset |
timestamp | TIMESTAMP | Kafka message timestamp |
ingestion_time | TIMESTAMP | Timestamp when the record was ingested into Delta |
silver.fuel_prices
This table will store fuel price data, which we’ll assume is updated periodically (e.g., daily) or streamed from another source. For this chapter, we’ll simulate it.
| Column Name | Data Type | Description |
|---|---|---|
fuel_type | STRING | Type of fuel (e.g., Diesel, Gasoline) |
region | STRING | Geographic region for the price |
price_per_unit | DOUBLE | Price per unit of fuel (e.g., per liter/gallon) |
currency | STRING | Currency of the price |
effective_date | DATE | Date when the price became effective |
update_time | TIMESTAMP | Timestamp of the last update |
silver.enriched_logistics_costs
This table will contain the processed and enriched logistics events with calculated costs.
| Column Name | Data Type | Description |
|---|---|---|
event_id | STRING | Unique identifier for the logistics event |
shipment_id | STRING | Identifier for the shipment |
event_type | STRING | Type of event (e.g., ‘SHIPMENT_UPDATE’, ‘FUEL_STOP’) |
event_timestamp | TIMESTAMP | Timestamp of the logistics event |
origin_location | STRING | Origin location of the shipment |
destination_location | STRING | Destination location of the shipment |
product_hs_code | STRING | Harmonized System Code of the product |
distance_km | DOUBLE | Distance covered for the event (in kilometers) |
fuel_consumed_liters | DOUBLE | Fuel consumed for the event (in liters) |
driver_id | STRING | ID of the driver involved |
current_status | STRING | Current status of the shipment |
tariff_impact_usd | DOUBLE | Calculated tariff cost impact for the shipment |
fuel_price_per_unit | DOUBLE | Fuel price at the time/region of event |
calculated_fuel_cost_usd | DOUBLE | Calculated fuel cost for the event |
total_logistics_cost_usd | DOUBLE | Total calculated logistics cost for the event |
processing_timestamp | TIMESTAMP | Timestamp when the record was processed |
data_quality_status | STRING | Status of data quality check (e.g., ‘CLEAN’, ‘BAD_SCHEMA’) |
error_details | STRING | Details if data quality issues found |
File Structure
For this chapter, we will organize our code in Databricks notebooks, which can later be modularized into Python scripts and deployed via Databricks Asset Bundles for production CI/CD.
├── workspace/
│ ├── supply_chain_project/
│ │ ├── 08_structured_streaming_cost_monitoring/
│ │ │ ├── 01_setup_schemas_and_lookups.py
│ │ │ ├── 02_stream_raw_logistics_events_bronze.py
│ │ │ ├── 03_stream_enriched_logistics_costs_silver.py
│ │ │ ├── 04_simulate_kafka_data.py (for local testing)
│ │ │ ├── conf/
│ │ │ │ ├── kafka_config.py
3. Step-by-Step Implementation
We’ll build the pipeline incrementally. Ensure you have a Databricks cluster running with access to Unity Catalog.
3.1. Setup & Configuration
First, let’s define our Unity Catalog schema, simulate the silver.fuel_prices table, and configure Kafka connection details.
a) Create Unity Catalog Schema and silver.fuel_prices Table
We’ll create a Python notebook to set up the necessary Unity Catalog schema and a mock silver.fuel_prices table.
File: workspace/supply_chain_project/08_structured_streaming_cost_monitoring/01_setup_schemas_and_lookups.py
# Databricks notebook source
# MAGIC %md
# MAGIC ### Setup: Create Unity Catalog Schema and Mock Lookup Tables
# MAGIC
# MAGIC This notebook initializes the Unity Catalog schema and creates a mock `silver.fuel_prices` table for our streaming pipeline.
# MAGIC It also defines the expected schema for incoming Kafka messages.
# COMMAND ----------
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import (
StructType, StructField, StringType, TimestampType, DoubleType, DateType, IntegerType
)
from delta.tables import DeltaTable
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# COMMAND ----------
# Configuration
CATALOG_NAME = "supply_chain_lakehouse"
SCHEMA_NAME = "bronze" # For raw events
SCHEMA_NAME_SILVER = "silver" # For enriched data and lookup tables
SCHEMA_NAME_GOLD = "gold" # For tariff analysis lookup
# COMMAND ----------
# MAGIC %md
# MAGIC #### 1. Create Unity Catalog Schemas
# MAGIC
# MAGIC We'll ensure the necessary schemas exist in Unity Catalog.
# COMMAND ----------
try:
spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG_NAME}")
spark.sql(f"USE CATALOG {CATALOG_NAME}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME_SILVER}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME_GOLD}") # Ensure gold schema from DLT exists
logger.info(f"Unity Catalog schemas '{CATALOG_NAME}.{SCHEMA_NAME}', '{CATALOG_NAME}.{SCHEMA_NAME_SILVER}', '{CATALOG_NAME}.{SCHEMA_NAME_GOLD}' ensured.")
except Exception as e:
logger.error(f"Error creating Unity Catalog schemas: {e}")
raise
# COMMAND ----------
# MAGIC %md
# MAGIC #### 2. Define Expected Kafka Message Schema
# MAGIC
# MAGIC This is the schema we expect for the `logistics_events` Kafka topic.
# COMMAND ----------
logistics_event_schema = StructType([
StructField("event_id", StringType(), False),
StructField("shipment_id", StringType(), False),
StructField("event_type", StringType(), False), # e.g., 'SHIPMENT_UPDATE', 'FUEL_STOP'
StructField("event_timestamp", TimestampType(), False),
StructField("origin_location", StringType(), True),
StructField("destination_location", StringType(), True),
StructField("product_hs_code", StringType(), True), # For tariff correlation
StructField("distance_km", DoubleType(), True),
StructField("fuel_consumed_liters", DoubleType(), True),
StructField("driver_id", StringType(), True),
StructField("current_status", StringType(), True)
])
logger.info("Logistics event schema defined.")
# COMMAND ----------
# MAGIC %md
# MAGIC #### 3. Create or Update `silver.fuel_prices` Lookup Table
# MAGIC
# MAGIC We'll create a mock fuel prices table. In a real-world scenario, this would be populated by another data pipeline (e.g., daily batch job or another stream).
# COMMAND ----------
fuel_prices_table_path = f"{CATALOG_NAME}.{SCHEMA_NAME_SILVER}.fuel_prices"
# Data for the mock fuel prices table
fuel_price_data = [
("Diesel", "North America", 1.25, "USD", "2025-12-19", "2025-12-19 08:00:00"),
("Diesel", "Europe", 1.50, "EUR", "2025-12-19", "2025-12-19 08:00:00"),
("Diesel", "Asia", 1.10, "USD", "2025-12-19", "2025-12-19 08:00:00"),
("Gasoline", "North America", 1.15, "USD", "2025-12-19", "2025-12-19 08:00:00"),
("Gasoline", "Europe", 1.40, "EUR", "2025-12-19", "2025-12-19 08:00:00"),
]
fuel_prices_df = spark.createDataFrame(
fuel_price_data,
schema=["fuel_type", "region", "price_per_unit", "currency", "effective_date", "update_time"]
)
try:
# Use MERGE INTO for idempotent updates
# This ensures that if the table exists, it's updated, otherwise created.
# For a simple mock, we can just overwrite or create.
# For production, a MERGE is better if data changes frequently.
if spark.catalog.tableExists(fuel_prices_table_path):
logger.info(f"Table {fuel_prices_table_path} exists. Overwriting with mock data.")
fuel_prices_df.write \
.format("delta") \
.mode("overwrite") \
.option("mergeSchema", "true") \
.saveAsTable(fuel_prices_table_path)
else:
logger.info(f"Table {fuel_prices_table_path} does not exist. Creating with mock data.")
fuel_prices_df.write \
.format("delta") \
.mode("append") \
.saveAsTable(fuel_prices_table_path)
logger.info(f"Mock fuel prices table '{fuel_prices_table_path}' created/updated successfully.")
spark.sql(f"SELECT * FROM {fuel_prices_table_path}").display()
except Exception as e:
logger.error(f"Error creating/updating fuel prices table: {e}")
raise
# COMMAND ----------
# MAGIC %md
# MAGIC #### 4. Verify Tariff Analysis Table (from DLT)
# MAGIC
# MAGIC We'll verify the `gold.tariff_analysis` table from previous DLT chapters exists and has data.
# MAGIC This table is crucial for joining.
# COMMAND ----------
tariff_analysis_table_path = f"{CATALOG_NAME}.{SCHEMA_NAME_GOLD}.tariff_analysis"
try:
if spark.catalog.tableExists(tariff_analysis_table_path):
logger.info(f"Tariff analysis table '{tariff_analysis_table_path}' found.")
# Display a sample to confirm data presence
spark.sql(f"SELECT * FROM {tariff_analysis_table_path} LIMIT 5").display()
else:
logger.warning(f"Tariff analysis table '{tariff_analysis_table_path}' not found. Please ensure DLT pipelines from previous chapters are run.")
# Create a dummy table for local testing if not found, for demonstration purposes.
# In production, this would be a hard dependency.
dummy_tariff_data = [
("870323", "USA", "Mexico", 0.05, "2025-01-01"),
("870323", "Germany", "France", 0.02, "2025-01-01"),
("847130", "China", "USA", 0.10, "2025-01-01"),
]
dummy_tariff_df = spark.createDataFrame(
dummy_tariff_data,
schema=["hs_code", "origin_country", "destination_country", "tariff_rate", "effective_date"]
)
dummy_tariff_df.write \
.format("delta") \
.mode("overwrite") \
.saveAsTable(tariff_analysis_table_path)
logger.info(f"Created dummy tariff analysis table '{tariff_analysis_table_path}' for demonstration.")
spark.sql(f"SELECT * FROM {tariff_analysis_table_path} LIMIT 5").display()
except Exception as e:
logger.error(f"Error verifying/creating tariff analysis table: {e}")
raise
# COMMAND ----------
# MAGIC %md
# MAGIC #### 5. Kafka Configuration
# MAGIC
# MAGIC We'll store Kafka connection details in a configuration file or Databricks secrets. For this tutorial, we'll use a Python dictionary, but strongly recommend Databricks Secrets for production.
# COMMAND ----------
# File: conf/kafka_config.py
# This file would typically be managed by Databricks Secrets or a configuration management system.
# For tutorial purposes, we define it here.
kafka_bootstrap_servers = "<YOUR_KAFKA_BOOTSTRAP_SERVERS>" # e.g., "pkc-lxxxx.us-east-1.aws.confluent.cloud:9092"
kafka_topic_logistics_events = "logistics_events"
kafka_sasl_jaas_config = "org.apache.kafka.common.security.plain.PlainLoginModule required username='<YOUR_KAFKA_API_KEY>' password='<YOUR_KAFKA_API_SECRET>';"
kafka_security_protocol = "SASL_SSL"
kafka_sasl_mechanism = "PLAIN"
# IMPORTANT: In a production environment, NEVER hardcode credentials.
# Use Databricks Secrets (dbutils.secrets.get) for secure access.
# Example:
# kafka_bootstrap_servers = dbutils.secrets.get(scope="kafka_scope", key="bootstrap_servers")
# kafka_api_key = dbutils.secrets.get(scope="kafka_scope", key="api_key")
# kafka_api_secret = dbutils.secrets.get(scope="kafka_scope", key="api_secret")
# kafka_sasl_jaas_config = f"org.apache.kafka.common.security.plain.PlainLoginModule required username='{kafka_api_key}' password='{kafka_api_secret}';"
logger.info("Kafka configuration loaded (ensure secrets are handled securely in production).")
# COMMAND ----------
# MAGIC %md
# MAGIC #### Setup Complete
# MAGIC
# MAGIC The necessary Unity Catalog schemas are in place, mock lookup tables are populated, and Kafka configuration is defined. We are now ready to build our streaming jobs.
# COMMAND ----------
# End of 01_setup_schemas_and_lookups.py
b) Testing This Component
- Run the
01_setup_schemas_and_lookups.pynotebook in your Databricks workspace. - Verify in Unity Catalog that
supply_chain_lakehouse.bronze,supply_chain_lakehouse.silver, andsupply_chain_lakehouse.goldschemas exist. - Confirm that
supply_chain_lakehouse.silver.fuel_pricesandsupply_chain_lakehouse.gold.tariff_analysistables are created and contain the mock data by running:SELECT * FROM supply_chain_lakehouse.silver.fuel_prices; SELECT * FROM supply_chain_lakehouse.gold.tariff_analysis; - Ensure you’ve replaced
<YOUR_KAFKA_BOOTSTRAP_SERVERS>,<YOUR_KAFKA_API_KEY>, and<YOUR_KAFKA_API_SECRET>with your actual Kafka connection details. For production, set up Databricks Secrets.
3.2. Core Implementation: Raw Logistics Event Ingestion (Bronze Layer)
This job reads raw JSON events from Kafka and writes them to the bronze.raw_logistics_events Delta table.
File: workspace/supply_chain_project/08_structured_streaming_cost_monitoring/02_stream_raw_logistics_events_bronze.py
# Databricks notebook source
# MAGIC %md
# MAGIC ### Stream Raw Logistics Events to Bronze Delta Table
# MAGIC
# MAGIC This notebook sets up a Spark Structured Streaming job to ingest raw logistics events from a Kafka topic
# MAGIC and land them into the `bronze.raw_logistics_events` Delta table.
# COMMAND ----------
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, from_json, lit, expr
from pyspark.sql.types import (
StructType, StructField, StringType, TimestampType, DoubleType, DateType, IntegerType
)
from delta.tables import DeltaTable
from pyspark.sql.streaming import StreamingQuery
from pyspark.sql.utils import StreamingQueryException
# Import Kafka configuration
# In a production setup, this would be a dedicated config module or Databricks Secrets
try:
from supply_chain_project.08_structured_streaming_cost_monitoring.conf.kafka_config import (
kafka_bootstrap_servers, kafka_topic_logistics_events,
kafka_sasl_jaas_config, kafka_security_protocol, kafka_sasl_mechanism
)
except ImportError:
# Fallback for direct notebook execution if conf/kafka_config.py is not in path or run separately
logging.warning("kafka_config.py not found as a module. Assuming variables are defined in current scope or hardcoded for testing.")
kafka_bootstrap_servers = "<YOUR_KAFKA_BOOTSTRAP_SERVERS>"
kafka_topic_logistics_events = "logistics_events"
kafka_sasl_jaas_config = "org.apache.kafka.common.security.plain.PlainLoginModule required username='<YOUR_KAFKA_API_KEY>' password='<YOUR_KAFKA_API_SECRET>';"
kafka_security_protocol = "SASL_SSL"
kafka_sasl_mechanism = "PLAIN"
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# COMMAND ----------
# Configuration
CATALOG_NAME = "supply_chain_lakehouse"
SCHEMA_NAME = "bronze"
BRONZE_TABLE_NAME = "raw_logistics_events"
BRONZE_TABLE_PATH = f"{CATALOG_NAME}.{SCHEMA_NAME}.{BRONZE_TABLE_NAME}"
CHECKPOINT_LOCATION = f"/mnt/{CATALOG_NAME}/checkpoints/{SCHEMA_NAME}/{BRONZE_TABLE_NAME}" # Use ADLS/S3 mount for checkpoints
# Ensure mount point exists if using /mnt/
# dbutils.fs.mkdirs(f"/mnt/{CATALOG_NAME}/checkpoints")
# For Unity Catalog, external locations are preferred for managed checkpoints.
# For simplicity, we assume an external location or managed checkpointing.
# COMMAND ----------
# MAGIC %md
# MAGIC #### 1. Define Kafka Read Stream
# COMMAND ----------
try:
# Read from Kafka using Spark Structured Streaming
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", kafka_topic_logistics_events) \
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.option("kafka.security.protocol", kafka_security_protocol) \
.option("kafka.sasl.mechanism", kafka_sasl_mechanism) \
.option("kafka.sasl.jaas.config", kafka_sasl_jaas_config) \
.load()
logger.info(f"Kafka read stream initialized for topic: {kafka_topic_logistics_events}")
except Exception as e:
logger.error(f"Failed to initialize Kafka read stream: {e}")
raise
# COMMAND ----------
# MAGIC %md
# MAGIC #### 2. Transform and Select Data for Bronze Table
# COMMAND ----------
# Add ingestion timestamp and cast value to string
bronze_df = kafka_df.selectExpr(
"CAST(key AS BINARY) as key",
"CAST(value AS STRING) as value",
"topic",
"partition",
"offset",
"timestamp",
"current_timestamp() as ingestion_time"
)
logger.info("Data transformed for bronze layer.")
# COMMAND ----------
# MAGIC %md
# MAGIC #### 3. Write Stream to Delta Lake Bronze Table
# COMMAND ----------
# Define the write stream query
try:
bronze_stream_query = bronze_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", CHECKPOINT_LOCATION) \
.option("mergeSchema", "true") # Allows schema evolution
.trigger(processingTime="30 seconds") # Process data every 30 seconds
.toTable(BRONZE_TABLE_PATH)
logger.info(f"Bronze stream query started for table: {BRONZE_TABLE_PATH}. Checkpoint: {CHECKPOINT_LOCATION}")
logger.info("To stop the stream, run: bronze_stream_query.stop()")
# For development, you might want to wait for termination for a short period
# bronze_stream_query.awaitTermination(timeout=300) # Wait for 5 minutes
except StreamingQueryException as e:
logger.error(f"Streaming query failed: {e}")
# Handle specific streaming errors
if "offset out of range" in str(e):
logger.error("Kafka offset out of range. Consider resetting startingOffsets or checkpoint.")
raise
except Exception as e:
logger.error(f"An unexpected error occurred during stream writing: {e}")
raise
# COMMAND ----------
# MAGIC %md
# MAGIC #### Monitoring and Stopping the Stream
# MAGIC
# MAGIC You can monitor the stream's progress in the Databricks UI under the "Streaming" tab of your cluster.
# MAGIC To manually stop the stream (e.g., for development or redeployment):
# COMMAND ----------
# To stop the stream, uncomment and run the following line
# if bronze_stream_query.isActive:
# bronze_stream_query.stop()
# logger.info("Bronze stream query stopped.")
# COMMAND ----------
# End of 02_stream_raw_logistics_events_bronze.py
c) Testing This Component
Run the Bronze Ingestion Notebook: Execute
02_stream_raw_logistics_events_bronze.pyin your Databricks workspace. It should start a streaming job.Simulate Kafka Data (if not already generating): You’ll need to send messages to your
logistics_eventsKafka topic. Here’s a simple Python script (run from your local machine or another Databricks notebook) to simulate data.File:
workspace/supply_chain_project/08_structured_streaming_cost_monitoring/04_simulate_kafka_data.py# This script is to be run locally or in a separate environment to push data to Kafka. # It requires 'kafka-python' library: pip install kafka-python import json import time from datetime import datetime from kafka import KafkaProducer import uuid import random # Kafka configuration (replace with your details) KAFKA_BOOTSTRAP_SERVERS = "<YOUR_KAFKA_BOOTSTRAP_SERVERS>" KAFKA_TOPIC = "logistics_events" KAFKA_API_KEY = "<YOUR_KAFKA_API_KEY>" KAFKA_API_SECRET = "<YOUR_KAFKA_API_SECRET>" # Ensure to replace with your actual Kafka connection details producer = KafkaProducer( bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, security_protocol="SASL_SSL", sasl_mechanism="PLAIN", sasl_plain_username=KAFKA_API_KEY, sasl_plain_password=KAFKA_API_SECRET, value_serializer=lambda v: json.dumps(v).encode('utf-8') ) print(f"Kafka Producer initialized for topic: {KAFKA_TOPIC}") def generate_logistics_event(): event_id = str(uuid.uuid4()) shipment_id = f"SHIP{random.randint(10000, 99999)}" event_type = random.choice(["SHIPMENT_UPDATE", "FUEL_STOP", "ROUTE_CHANGE", "DELIVERY_SCAN"]) event_timestamp = datetime.now().isoformat(timespec='seconds') + 'Z' # ISO 8601 format # Mock HS codes that exist in our dummy tariff table hs_codes = ["870323", "847130"] product_hs_code = random.choice(hs_codes) # Mock locations origin_locations = ["Port of Hamburg", "Shenzhen Logistics Hub", "Chicago Distribution Ctr"] destination_locations = ["New York Port", "Rotterdam Warehouse", "Los Angeles DC"] regions = ["North America", "Europe", "Asia"] # For fuel price lookup event_data = { "event_id": event_id, "shipment_id": shipment_id, "event_type": event_type, "event_timestamp": event_timestamp, "origin_location": random.choice(origin_locations), "destination_location": random.choice(destination_locations), "product_hs_code": product_hs_code, "distance_km": round(random.uniform(50, 1000), 2) if event_type != "DELIVERY_SCAN" else None, "fuel_consumed_liters": round(random.uniform(10, 200), 2) if event_type == "FUEL_STOP" else None, "driver_id": f"DRV{random.randint(100, 999)}", "current_status": random.choice(["IN_TRANSIT", "AT_PORT", "DELIVERED", "DELAYED"]) } return event_data # Send 10 messages every 5 seconds for i in range(20): event = generate_logistics_event() print(f"Sending event: {event['event_id']}") producer.send(KAFKA_TOPIC, event) time.sleep(5) producer.flush() print("Finished sending messages.")Verify Data in Bronze Table: In a new Databricks cell or notebook, query the
bronze.raw_logistics_eventstable:SELECT * FROM supply_chain_lakehouse.bronze.raw_logistics_events;You should see raw Kafka messages appearing. Check the
valuecolumn for JSON content andingestion_timefor recent timestamps.
3.3. Core Implementation: Enriched Logistics Cost Calculation (Silver Layer)
This job reads from the bronze.raw_logistics_events Delta table, parses the JSON, joins with gold.tariff_analysis and silver.fuel_prices, calculates costs, and writes to silver.enriched_logistics_costs.
File: workspace/supply_chain_project/08_structured_streaming_cost_monitoring/03_stream_enriched_logistics_costs_silver.py
# Databricks notebook source
# MAGIC %md
# MAGIC ### Stream Enriched Logistics Costs to Silver Delta Table
# MAGIC
# MAGIC This notebook sets up a Spark Structured Streaming job to read raw logistics events from the Bronze layer,
# MAGIC parse them, enrich with tariff and fuel price data, calculate total costs, and land them into the
# MAGIC `silver.enriched_logistics_costs` Delta table.
# COMMAND ----------
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col, current_timestamp, from_json, lit, expr, coalesce, when,
date_trunc, to_date, year, month, dayofmonth,
sha2, concat_ws, md5
)
from pyspark.sql.types import (
StructType, StructField, StringType, TimestampType, DoubleType, DateType, IntegerType, LongType
)
from delta.tables import DeltaTable
from pyspark.sql.streaming import StreamingQuery
from pyspark.sql.utils import StreamingQueryException
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# COMMAND ----------
# Configuration
CATALOG_NAME = "supply_chain_lakehouse"
SCHEMA_NAME_BRONZE = "bronze"
SCHEMA_NAME_SILVER = "silver"
SCHEMA_NAME_GOLD = "gold"
BRONZE_TABLE_PATH = f"{CATALOG_NAME}.{SCHEMA_NAME_BRONZE}.raw_logistics_events"
SILVER_FUEL_PRICES_TABLE_PATH = f"{CATALOG_NAME}.{SCHEMA_NAME_SILVER}.fuel_prices"
GOLD_TARIFF_ANALYSIS_TABLE_PATH = f"{CATALOG_NAME}.{SCHEMA_NAME_GOLD}.tariff_analysis"
SILVER_ENRICHED_COSTS_TABLE_NAME = "enriched_logistics_costs"
SILVER_ENRICHED_COSTS_TABLE_PATH = f"{CATALOG_NAME}.{SCHEMA_NAME_SILVER}.{SILVER_ENRICHED_COSTS_TABLE_NAME}"
CHECKPOINT_LOCATION_SILVER = f"/mnt/{CATALOG_NAME}/checkpoints/{SCHEMA_NAME_SILVER}/{SILVER_ENRICHED_COSTS_TABLE_NAME}"
# Ensure mount point exists if using /mnt/
# dbutils.fs.mkdirs(f"/mnt/{CATALOG_NAME}/checkpoints")
# For Unity Catalog, external locations are preferred for managed checkpoints.
# COMMAND ----------
# MAGIC %md
# MAGIC #### 1. Define Expected Kafka Message Schema (for parsing Bronze `value` column)
# MAGIC
# MAGIC This schema must match the structure of your Kafka `logistics_events` messages.
# COMMAND ----------
# This schema is used to parse the 'value' (JSON string) column from the bronze table
logistics_event_schema = StructType([
StructField("event_id", StringType(), False),
StructField("shipment_id", StringType(), False),
StructField("event_type", StringType(), False), # e.g., 'SHIPMENT_UPDATE', 'FUEL_STOP'
StructField("event_timestamp", TimestampType(), False),
StructField("origin_location", StringType(), True),
StructField("destination_location", StringType(), True),
StructField("product_hs_code", StringType(), True), # For tariff correlation
StructField("distance_km", DoubleType(), True),
StructField("fuel_consumed_liters", DoubleType(), True),
StructField("driver_id", StringType(), True),
StructField("current_status", StringType(), True)
])
logger.info("Logistics event schema for parsing defined.")
# COMMAND ----------
# MAGIC %md
# MAGIC #### 2. Read Stream from Bronze Delta Table
# COMMAND ----------
try:
# Read from the bronze Delta table as a streaming source
bronze_stream_df = spark.readStream \
.format("delta") \
.option("ignoreDeletes", "true") \
.option("ignoreChanges", "true") \
.table(BRONZE_TABLE_PATH)
logger.info(f"Structured Stream initialized from Bronze table: {BRONZE_TABLE_PATH}")
except Exception as e:
logger.error(f"Failed to initialize stream from Bronze Delta table: {e}")
raise
# COMMAND ----------
# MAGIC %md
# MAGIC #### 3. Parse JSON, Flatten, and Apply Data Quality Checks
# COMMAND ----------
# Parse the JSON string 'value' column into structured columns
parsed_df = bronze_stream_df.withColumn(
"parsed_data",
from_json(col("value"), logistics_event_schema)
).select(
col("parsed_data.*"),
col("ingestion_time").alias("bronze_ingestion_time"),
col("offset").alias("kafka_offset_id") # Keep offset for traceability
)
# Apply basic data quality checks
# For example, filter out records where 'event_id' or 'shipment_id' is null
# Or flag them for a Dead Letter Queue (DLQ) if using a more advanced pattern
clean_df = parsed_df.filter(
col("event_id").isNotNull() & col("shipment_id").isNotNull()
).withColumn(
"data_quality_status", lit("CLEAN")
).withColumn(
"error_details", lit(None)
)
# For records failing schema parsing or basic checks, you might route them to a DLQ
# For simplicity, we're filtering out. In production, consider a more robust DLQ approach.
bad_schema_df = parsed_df.filter(
col("event_id").isNull() | col("shipment_id").isNull()
).withColumn(
"data_quality_status", lit("BAD_SCHEMA_OR_MISSING_KEY_FIELDS")
).withColumn(
"error_details", lit("Missing event_id or shipment_id in raw JSON")
)
# Union clean and bad_schema records to write to a single table, but flag bad ones.
# For this tutorial, we'll proceed with `clean_df` directly for simplicity,
# assuming a separate DLQ stream for `bad_schema_df` in a full production system.
# If we wanted to write all, we'd union them and then write.
# For now, we'll only process clean_df to the silver table.
logger.info("JSON parsing and initial data quality checks applied.")
# COMMAND ----------
# MAGIC %md
# MAGIC #### 4. Load Lookup Tables (Tariff Analysis and Fuel Prices)
# MAGIC
# MAGIC We'll load these as batch DataFrames which Spark will handle efficiently (e.g., broadcasting for small tables) for joining with the stream.
# COMMAND ----------
try:
# Load the tariff analysis table
tariff_analysis_df = spark.read.table(GOLD_TARIFF_ANALYSIS_TABLE_PATH) \
.select(
col("hs_code").alias("tariff_hs_code"),
col("origin_country").alias("tariff_origin_country"),
col("destination_country").alias("tariff_destination_country"),
col("tariff_rate"),
col("effective_date").alias("tariff_effective_date")
)
logger.info(f"Tariff analysis lookup table '{GOLD_TARIFF_ANALYSIS_TABLE_PATH}' loaded.")
# Load the fuel prices table
fuel_prices_df = spark.read.table(SILVER_FUEL_PRICES_TABLE_PATH) \
.select(
col("fuel_type").alias("fuel_lookup_type"),
col("region").alias("fuel_lookup_region"),
col("price_per_unit").alias("fuel_price_per_unit"),
col("currency").alias("fuel_currency"),
col("effective_date").alias("fuel_effective_date")
)
logger.info(f"Fuel prices lookup table '{SILVER_FUEL_PRICES_TABLE_PATH}' loaded.")
except Exception as e:
logger.error(f"Failed to load lookup tables: {e}")
raise
# COMMAND ----------
# MAGIC %md
# MAGIC #### 5. Enrich Data with Lookups and Calculate Costs
# COMMAND ----------
# Determine region from origin_location for fuel price lookup (simplified)
# In reality, you'd need a more sophisticated geo-mapping service
enriched_df = clean_df.withColumn(
"event_region",
when(col("origin_location").contains("America"), "North America")
.when(col("origin_location").contains("Europe"), "Europe")
.when(col("origin_location").contains("Asia"), "Asia")
.otherwise("Other")
)
# Join with Fuel Prices
# This is a stream-static join. Spark handles this by broadcasting the static DF.
enriched_df = enriched_df.join(
fuel_prices_df,
(enriched_df["event_region"] == fuel_prices_df["fuel_lookup_region"]) &
(to_date(enriched_df["event_timestamp"]) >= fuel_prices_df["fuel_effective_date"]) & # Use effective date for prices
(lit("Diesel") == fuel_prices_df["fuel_lookup_type"]), # Assuming 'Diesel' for simplicity, could be dynamic
"leftouter"
)
# Join with Tariff Analysis
enriched_df = enriched_df.join(
tariff_analysis_df,
(enriched_df["product_hs_code"] == tariff_analysis_df["tariff_hs_code"]) &
(to_date(enriched_df["event_timestamp"]) >= tariff_analysis_df["tariff_effective_date"]), # Use effective date for tariffs
"leftouter"
)
# Calculate costs
final_silver_df = enriched_df.withColumn(
"calculated_fuel_cost_usd",
coalesce(col("fuel_consumed_liters") * col("fuel_price_per_unit"), lit(0.0))
).withColumn(
"tariff_impact_usd",
when(col("tariff_rate").isNotNull(), lit(100.0) * col("tariff_rate")) # Example: 100 USD base value for calculation
.otherwise(lit(0.0))
).withColumn(
"other_fixed_cost_usd", lit(50.0) # Example: a fixed handling cost per event
).withColumn(
"total_logistics_cost_usd",
col("calculated_fuel_cost_usd") + col("tariff_impact_usd") + col("other_fixed_cost_usd")
).withColumn(
"processing_timestamp", current_timestamp()
).select(
col("event_id"),
col("shipment_id"),
col("event_type"),
col("event_timestamp"),
col("origin_location"),
col("destination_location"),
col("product_hs_code"),
col("distance_km"),
col("fuel_consumed_liters"),
col("driver_id"),
col("current_status"),
col("tariff_impact_usd"),
col("fuel_price_per_unit"),
col("calculated_fuel_cost_usd"),
col("total_logistics_cost_usd"),
col("processing_timestamp"),
col("data_quality_status"),
col("error_details")
)
logger.info("Data enriched and costs calculated.")
# COMMAND ----------
# MAGIC %md
# MAGIC #### 6. Write Stream to Delta Lake Silver Table
# COMMAND ----------
# Define the write stream query for the silver table
try:
silver_stream_query = final_silver_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", CHECKPOINT_LOCATION_SILVER) \
.option("mergeSchema", "true") \
.trigger(processingTime="30 seconds") # Process data every 30 seconds
.toTable(SILVER_ENRICHED_COSTS_TABLE_PATH)
logger.info(f"Silver stream query started for table: {SILVER_ENRICHED_COSTS_TABLE_PATH}. Checkpoint: {CHECKPOINT_LOCATION_SILVER}")
logger.info("To stop the stream, run: silver_stream_query.stop()")
# For development, you might want to wait for termination for a short period
# silver_stream_query.awaitTermination(timeout=300) # Wait for 5 minutes
except StreamingQueryException as e:
logger.error(f"Streaming query failed: {e}")
# Handle specific streaming errors
raise
except Exception as e:
logger.error(f"An unexpected error occurred during stream writing to silver: {e}")
raise
# COMMAND ----------
# MAGIC %md
# MAGIC #### Monitoring and Stopping the Stream
# MAGIC
# MAGIC You can monitor the stream's progress in the Databricks UI under the "Streaming" tab of your cluster.
# MAGIC To manually stop the stream (e.g., for development or redeployment):
# COMMAND ----------
# To stop the stream, uncomment and run the following line
# if silver_stream_query.isActive:
# silver_stream_query.stop()
# logger.info("Silver stream query stopped.")
# COMMAND ----------
# End of 03_stream_enriched_logistics_costs_silver.py
c) Testing This Component
- Ensure Bronze Stream is Running: Verify that the
02_stream_raw_logistics_events_bronze.pynotebook’s stream is active. - Run the Silver Enrichment Notebook: Execute
03_stream_enriched_logistics_costs_silver.pyin your Databricks workspace. This will start the second streaming job. - Simulate More Kafka Data: Use the
04_simulate_kafka_data.pyscript to send more events to Kafka. - Verify Data in Silver Table: After a few minutes, query the
silver.enriched_logistics_coststable:You should see records withSELECT * FROM supply_chain_lakehouse.silver.enriched_logistics_costs;tariff_impact_usd,fuel_price_per_unit,calculated_fuel_cost_usd, andtotal_logistics_cost_usdpopulated. Check that the values make sense based on your mock data and calculations. Specifically, look forproduct_hs_codevalues like “870323” or “847130” to see if tariff impacts are applied correctly.
4. Production Considerations
Deploying Spark Structured Streaming jobs in production requires careful attention to robustness, performance, security, and observability.
Error Handling and Dead Letter Queues (DLQ):
- Schema Evolution: Use
option("mergeSchema", "true")for Delta sinks to handle minor schema changes. For breaking changes, consider a separate stream for failed records. - Corrupted Records: For Kafka sources,
option("failOnDataLoss", "false")prevents the stream from failing on unrecoverable data loss. For parsing JSON, records that failfrom_jsoncan be routed to a DLQ. Instead of simply filtering them out, usebad_records_pathoption or aUNIONwith abad_schema_dfto write them to a separate error table (e.g.,bronze.logistics_events_errors) for later investigation. - Lookup Failures: If joins fail (e.g., missing HS codes or fuel prices), the
leftouterjoin ensures the main event still flows through, but the joined columns will be null. Downstream logic must handle these nulls or flag them for review.
- Schema Evolution: Use
Performance Optimization:
- Cluster Sizing: Right-size your Databricks cluster based on data volume, velocity, and complexity of transformations. Use autoscaling to handle varying loads.
- Micro-Batch Interval (
processingTime): Tune this (e.g.,trigger(processingTime="30 seconds")) to balance latency and throughput. Shorter intervals mean lower latency but potentially higher overhead. - State Management: For stateful operations (like aggregations or deduplication with watermarking), optimize state storage (e.g., RocksDB state store on Databricks) and checkpointing. Our current pipeline is stateless, simplifying this.
- Join Optimization: Spark automatically optimizes stream-static joins. Ensure static lookup tables are small enough to be broadcast (
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")). For larger lookup tables, consider re-partitioning or pre-joining. - Delta Lake Optimizations:
optimizeWriteandautoCompactare automatically enabled for DLT. For manual Structured Streaming, considerspark.sql("ALTER TABLE ... SET TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true', 'delta.autoOptimize.autoCompact' = 'true')")for your Delta tables, or runOPTIMIZEcommands periodically.
Security Considerations:
- Unity Catalog: Leverage Unity Catalog for fine-grained access control on all your Delta tables (
bronze.raw_logistics_events,silver.fuel_prices,gold.tariff_analysis,silver.enriched_logistics_costs). GrantSELECTprivileges to streaming jobs andMODIFYprivileges to the target tables. - Secret Management: Crucially, use Databricks Secrets to store Kafka credentials (API keys, secrets, bootstrap servers) instead of hardcoding them. Access them using
dbutils.secrets.get(scope="your_scope", key="your_key"). - Network Security: Ensure your Databricks workspace has secure network connectivity to your Kafka cluster (e.g., VPC peering, Private Link, or secure egress configurations).
- Unity Catalog: Leverage Unity Catalog for fine-grained access control on all your Delta tables (
Logging and Monitoring:
- Structured Logging: Implement robust logging within your Spark applications using Python’s
loggingmodule. Log key metrics, errors, and progress. - Databricks Monitoring: Utilize Databricks’ built-in monitoring tools for streaming jobs (Spark UI, Streaming tab). Set up alerts for stream failures or backlogs.
- External Monitoring: Integrate Databricks logs and metrics with external monitoring systems like Splunk, ELK stack, Prometheus/Grafana, or cloud-native monitoring services (Azure Monitor, AWS CloudWatch) for centralized observability.
- Metrics: Monitor Kafka consumer lag, processing rates, number of input rows, number of output rows, and error counts.
- Structured Logging: Implement robust logging within your Spark applications using Python’s
5. Code Review Checkpoint
At this point, you have successfully built and tested two interconnected Spark Structured Streaming jobs:
02_stream_raw_logistics_events_bronze.py: Ingests raw logistics events from Kafka intosupply_chain_lakehouse.bronze.raw_logistics_events.03_stream_enriched_logistics_costs_silver.py: Reads from the Bronze table, parses JSON, joins withsupply_chain_lakehouse.gold.tariff_analysis(from DLT) andsupply_chain_lakehouse.silver.fuel_prices(mocked in this chapter), calculates logistics costs, and writes tosupply_chain_lakehouse.silver.enriched_logistics_costs.
Files Created/Modified:
workspace/supply_chain_project/08_structured_streaming_cost_monitoring/01_setup_schemas_and_lookups.pyworkspace/supply_chain_project/08_structured_streaming_cost_monitoring/02_stream_raw_logistics_events_bronze.pyworkspace/supply_chain_project/08_structured_streaming_cost_monitoring/03_stream_enriched_logistics_costs_silver.pyworkspace/supply_chain_project/08_structured_streaming_cost_monitoring/04_simulate_kafka_data.py(Local utility)workspace/supply_chain_project/08_structured_streaming_cost_monitoring/conf/kafka_config.py(or defined in notebooks for tutorial)
Integration with Existing Code:
- This chapter leverages the
gold.tariff_analysisDelta table, which is an output of your DLT pipelines from previous chapters, demonstrating the interoperability of DLT and Structured Streaming within the Lakehouse. - The
silver.enriched_logistics_coststable is now available for consumption by downstream analytics, reporting, and potentially other DLT pipelines (e.g., to create agoldlayer for specific business metrics).
6. Common Issues & Solutions
Kafka Connection/Authentication Errors:
- Issue:
org.apache.kafka.common.KafkaException: Failed to construct kafka consumerorAuth mechanism Plain not supported. - Debug: Double-check your
kafka.bootstrap.servers,kafka.sasl.jaas.config,kafka.security.protocol, andkafka.sasl.mechanismoptions. Ensure your API key and secret are correct and have the necessary permissions on the Kafka topic. Verify network connectivity between your Databricks cluster and Kafka. - Prevention: Always test Kafka connectivity separately (e.g., using a simple
kafka-pythonclient orkafkacat) before integrating with Spark. Use Databricks Secrets for credentials.
- Issue:
Streaming Query Never Starts or Fails Immediately:
- Issue: The stream query shows “Starting…” indefinitely, or fails with a generic
StreamingQueryException. - Debug:
- Check logs: Look at the Spark Driver logs and executor logs in the Databricks UI for more specific error messages.
- Checkpoint Location: Ensure the
checkpointLocationpath is valid and accessible by the cluster. Incorrect permissions or non-existent paths are common issues. - Input Data: If reading from Kafka, ensure there’s data in the topic. If reading from a Delta table, ensure the table exists and has data.
- Schema Mismatch: If parsing JSON, a mismatch between the expected
logistics_event_schemaand the actual Kafka message structure can cause issues. UsefailOnDataLoss="false"and inspect thevaluecolumn in the bronze table.
- Prevention: Start with a very simple stream (e.g., just reading Kafka and printing to console) to isolate issues. Gradually add transformations.
- Issue: The stream query shows “Starting…” indefinitely, or fails with a generic
Data Not Appearing in Silver Table / Incorrect Joins:
- Issue: Data flows to the Bronze table, but the Silver table remains empty or contains incorrect/null values after enrichment.
- Debug:
- Check Bronze Data: Query the Bronze table to ensure raw events are landing correctly and the
valuecolumn contains valid JSON. - Inspect
parsed_df: In the Silver notebook, add adisplay(parsed_df)orparsed_df.printSchema()to verify JSON parsing. - Lookup Table Data: Query
silver.fuel_pricesandgold.tariff_analysisto ensure they contain the expected lookup data. - Join Conditions: Carefully review your join conditions. A
leftouterjoin is usually preferred for enrichment to avoid dropping events, but if the conditions are always false, you’ll see nulls. Check for data type mismatches or casing issues in join keys. - Timestamp/Date Filters: Ensure
to_date(col("event_timestamp")) >= lookup_df["effective_date"]logic is correct for time-sensitive lookups.
- Check Bronze Data: Query the Bronze table to ensure raw events are landing correctly and the
- Prevention: Use
EXPLAINon your DataFrames to see the execution plan. Test joins with a small batch of data before running the full stream.
7. Testing & Verification
To thoroughly test and verify the work of this chapter, follow these steps:
Initial Setup Verification:
- Run
01_setup_schemas_and_lookups.py. - Query
supply_chain_lakehouse.silver.fuel_pricesandsupply_chain_lakehouse.gold.tariff_analysisto confirm mock data presence.
- Run
Bronze Layer Verification:
- Run
02_stream_raw_logistics_events_bronze.py. - Run
04_simulate_kafka_data.py(from your local machine) to send a batch of 5-10 messages. - Wait 30-60 seconds for the stream to process.
- Query
SELECT * FROM supply_chain_lakehouse.bronze.raw_logistics_eventsin a new Databricks cell. Verify:- New records are present.
valuecolumn contains the expected JSON.ingestion_timeis recent.
- Run
Silver Layer Verification:
- Ensure
02_stream_raw_logistics_events_bronze.pyis still running. - Run
03_stream_enriched_logistics_costs_silver.py. - Run
04_simulate_kafka_data.pyagain to send another batch of messages. - Wait 60-90 seconds for both streams to process.
- Query
SELECT * FROM supply_chain_lakehouse.silver.enriched_logistics_costsin a new Databricks cell. Verify:- New records are present.
event_id,shipment_id,event_type,event_timestampare correctly parsed.product_hs_codeis present.fuel_price_per_unitis populated (should match your mock data for “Diesel” and “North America”, “Europe”, “Asia” regions).calculated_fuel_cost_usdis correctly computed (fuel_consumed_liters * fuel_price_per_unit).tariff_impact_usdis populated (e.g., 5.0 for HS code “870323” and 10.0 for “847130” if using the dummy tariffs).total_logistics_cost_usdis the sum of calculated costs.processing_timestampis recent.data_quality_statusis ‘CLEAN’.
- Ensure
Stopping Streams: After verification, stop both streaming queries (
bronze_stream_query.stop()andsilver_stream_query.stop()) if you are not proceeding immediately.
8. Summary & Next Steps
In this comprehensive chapter, you successfully implemented a real-time logistics cost monitoring pipeline using Spark Structured Streaming on Databricks. You learned how to:
- Set up Unity Catalog schemas and mock lookup tables (
silver.fuel_prices). - Ingest raw logistics event data from Kafka into a Bronze Delta table (
bronze.raw_logistics_events). - Implement a Silver-layer streaming job to parse, clean, and enrich logistics events.
- Join streaming data with static lookup tables (
gold.tariff_analysisandsilver.fuel_prices) to correlate tariff impacts and fuel costs. - Calculate real-time logistics cost components and total costs.
- Write the enriched, cost-calculated data to a Silver Delta table (
silver.enriched_logistics_costs), ready for downstream analytics. - Consider key production aspects like error handling, performance, security (using Databricks Secrets and Unity Catalog), and monitoring.
This pipeline forms a critical component of our real-time supply chain intelligence platform, providing immediate insights into operational costs.
In the next chapter, we will focus on Customs Trade Data Lakehouse for HS Code Classification Validation and Anomaly Detection. We will explore how to ingest and process customs trade data, validate HS code classifications using machine learning models, and detect anomalies in trade patterns, further enhancing the data quality and analytical capabilities of our Lakehouse.