Chapter 5: Real-time Supply Chain Delay Analytics (Gold Layer)
Chapter Introduction
Welcome to Chapter 5, where we elevate our supply chain data from the Silver layer to the Gold layer. In this crucial phase, we will build Databricks Delta Live Tables (DLT) pipelines to perform real-time aggregations and derive actionable insights for supply chain delay analytics. This involves taking the cleaned and enriched data from our Silver tables and transforming it into easily consumable metrics, such as average delay times, on-time delivery rates, and identifying critical delay incidents.
The Gold layer is paramount because it serves as the direct source for business intelligence dashboards, reporting tools, and downstream analytical applications. By providing pre-aggregated and highly curated data, we empower business users to make swift, data-driven decisions without needing to understand the complexities of raw or semi-processed data. This abstraction not only improves data accessibility but also significantly boosts query performance for analytical workloads.
Our journey in this chapter relies on the robust Silver layer DLT pipelines established in previous chapters, particularly the supply_chain_silver.enriched_events table. We will leverage this foundation to build our Gold tables incrementally. By the end of this chapter, you will have production-ready Gold Delta tables, such as supply_chain_gold.delay_summary_hourly and supply_chain_gold.critical_delays, providing real-time visibility into supply chain performance.
Planning & Design
Building the Gold layer requires careful planning to ensure the derived metrics are accurate, consistent, and meet the analytical needs of the business.
Component Architecture
Our Gold layer architecture will extend our existing Medallion Lakehouse pattern:
- Source:
supply_chain_silver.enriched_events(and potentially other Silver dimension tables likeproductsorsuppliers). - Transformation Logic: Databricks Delta Live Tables (DLT) pipeline processing the Silver data. This pipeline will perform:
- Aggregations (e.g.,
COUNT,AVG,SUM). - Calculations (e.g., on-time delivery rate).
- Filtering (e.g., for critical delays).
- Potentially joins with other Silver dimension tables for richer context.
- Aggregations (e.g.,
- Destination: Dedicated Gold Delta tables (
supply_chain_gold.delay_summary_hourly,supply_chain_gold.critical_delays) optimized for read performance and analytical queries.
Database Schema Design
We will define two primary Gold tables:
supply_chain_gold.delay_summary_hourly: This table will provide an hourly summary of supply chain performance, enabling trends and aggregated analysis.Column Name Data Type Description event_date_hourTIMESTAMP Hourly timestamp for aggregation. product_idSTRING Identifier for the product. supplier_idSTRING Identifier for the supplier. route_idSTRING Identifier for the shipping route. total_shipmentsLONG Total number of shipments in the hour for the given dimensions. total_delayed_shipmentsLONG Total shipments that experienced a delay. avg_delay_minutesDOUBLE Average delay in minutes for shipments. min_delay_minutesDOUBLE Minimum delay in minutes for shipments. max_delay_minutesDOUBLE Maximum delay in minutes for shipments. on_time_delivery_rateDOUBLE Percentage of shipments delivered on time. last_updatedTIMESTAMP Timestamp when the record was last updated. supply_chain_gold.critical_delays: This table will log specific incidents that are deemed “critical” based on a predefined delay threshold, allowing for focused investigation.Column Name Data Type Description event_idSTRING Unique identifier for the original supply chain event. event_timestampTIMESTAMP Timestamp of the event. product_idSTRING Identifier for the product. supplier_idSTRING Identifier for the supplier. route_idSTRING Identifier for the shipping route. expected_delivery_timeTIMESTAMP Original expected delivery timestamp. actual_delivery_timeTIMESTAMP Actual delivery timestamp. delay_minutesDOUBLE Calculated delay in minutes. delay_reasonSTRING Categorized reason for the delay (e.g., weather, customs, transport). severity_levelSTRING Severity of the critical delay (e.g., “High”, “Critical”). last_updatedTIMESTAMP Timestamp when the record was last updated.
File Structure
We will continue to organize our DLT pipelines within the dlt_pipelines directory. For the Gold layer, we’ll create a new Python file that defines our Gold tables.
.
├── dlt_pipelines/
│ ├── bronze_pipeline.py
│ ├── silver_pipeline.py
│ └── gold_pipeline.py # New file for this chapter
├── notebooks/
│ ├── 01_setup_dlt_env.ipynb
│ └── 02_test_dlt_pipelines.ipynb
├── config/
│ └── config.py
└── README.md
Step-by-Step Implementation
We will now implement the Gold layer DLT pipeline in a new Python file.
a) Setup/Configuration
First, create the new DLT pipeline file dlt_pipelines/gold_pipeline.py.
File: dlt_pipelines/gold_pipeline.py
# Databricks notebook source
# MAGIC %md
# MAGIC # Gold Layer DLT Pipeline for Real-time Supply Chain Delay Analytics
# MAGIC
# MAGIC This pipeline aggregates and enriches data from the Silver layer to create Gold tables
# MAGIC optimized for analytical consumption, providing key metrics and insights into supply chain delays.
import dlt
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Define the target schema (database) for Gold tables
# This assumes 'supply_chain_gold' is created and managed by Unity Catalog
# or as a standard database. For DLT, we define it in pipeline settings.
# For local testing, ensure the database exists.
# Configuration for delay thresholds (can be parameterized or read from external config)
CRITICAL_DELAY_THRESHOLD_MINUTES = 60 # e.g., 1 hour delay is critical
@dlt.table(
name="delay_summary_hourly",
comment="Hourly aggregated summary of supply chain shipment delays and on-time performance.",
table_properties={
"quality": "gold",
"delta.logRetentionDuration": "30 days", # Retain logs for 30 days
"delta.targetFileSize": "128mb" # Optimize file size for analytical queries
}
)
@dlt.expect_or_drop("valid_event_date_hour", "event_date_hour IS NOT NULL")
@dlt.expect_or_drop("valid_product_id", "product_id IS NOT NULL")
def delay_summary_hourly():
"""
Aggregates enriched supply chain events hourly to provide delay summaries.
Reads from the 'supply_chain_silver.enriched_events' table.
"""
return (
dlt.read_stream("supply_chain_silver.enriched_events")
.withColumn("event_date_hour", F.date_trunc("hour", F.col("event_timestamp")))
.withColumn("delay_minutes",
F.when(F.col("actual_delivery_time").isNotNull(),
F.round((F.col("actual_delivery_time").cast("long") - F.col("expected_delivery_time").cast("long")) / 60, 2))
.otherwise(0)) # Calculate delay in minutes, default to 0 if not delivered yet
.groupBy("event_date_hour", "product_id", "supplier_id", "route_id")
.agg(
F.count("event_id").alias("total_shipments"),
F.sum(F.when(F.col("delay_minutes") > 0, 1).otherwise(0)).alias("total_delayed_shipments"),
F.avg(F.col("delay_minutes")).alias("avg_delay_minutes"),
F.min(F.col("delay_minutes")).alias("min_delay_minutes"),
F.max(F.col("delay_minutes")).alias("max_delay_minutes"),
F.current_timestamp().alias("last_updated")
)
.withColumn("on_time_delivery_rate",
F.round((F.col("total_shipments") - F.col("total_delayed_shipments")) / F.col("total_shipments") * 100, 2))
)
@dlt.table(
name="critical_delays",
comment="Table containing details of supply chain events identified as critical delays.",
table_properties={
"quality": "gold",
"delta.logRetentionDuration": "30 days",
"delta.targetFileSize": "128mb"
}
)
@dlt.expect_or_drop("valid_event_id", "event_id IS NOT NULL")
@dlt.expect_or_drop("has_delay_minutes", "delay_minutes IS NOT NULL AND delay_minutes > 0")
def critical_delays():
"""
Identifies and logs individual supply chain events that exceed a predefined critical delay threshold.
Reads from the 'supply_chain_silver.enriched_events' table.
"""
return (
dlt.read_stream("supply_chain_silver.enriched_events")
.withColumn("delay_minutes",
F.when(F.col("actual_delivery_time").isNotNull(),
F.round((F.col("actual_delivery_time").cast("long") - F.col("expected_delivery_time").cast("long")) / 60, 2))
.otherwise(0))
.filter(F.col("delay_minutes") >= CRITICAL_DELAY_THRESHOLD_MINUTES)
.select(
F.col("event_id"),
F.col("event_timestamp"),
F.col("product_id"),
F.col("supplier_id"),
F.col("route_id"),
F.col("expected_delivery_time"),
F.col("actual_delivery_time"),
F.col("delay_minutes"),
F.col("delay_reason"), # Assuming delay_reason is available from Silver layer
F.lit("Critical").alias("severity_level"), # Assign a severity level
F.current_timestamp().alias("last_updated")
)
)
# Optional: Add a materialized view for faster BI queries if needed
# @dlt.table(
# name="daily_delay_trends_mv",
# comment="Materialized view for daily delay trends, optimized for BI dashboards.",
# as_view=True # Use as_view=True for materialized views
# )
# def daily_delay_trends_mv():
# return (
# dlt.read("delay_summary_hourly") # Reads from the previously defined DLT table
# .withColumn("event_date", F.to_date(F.col("event_date_hour")))
# .groupBy("event_date", "product_id")
# .agg(
# F.sum("total_shipments").alias("daily_total_shipments"),
# F.avg("avg_delay_minutes").alias("daily_avg_delay_minutes"),
# F.current_timestamp().alias("last_updated")
# )
# )
Explanation of the Code Block:
import dlt: Imports the Delta Live Tables library.from pyspark.sql import functions as F: Imports PySpark SQL functions, aliased asFfor convenience.CRITICAL_DELAY_THRESHOLD_MINUTES: A constant defining what constitutes a “critical” delay. In a real-world scenario, this would likely be configurable, perhaps read from a Databricks secret or an external configuration table.@dlt.table(...)decorator: This decorator marks a Python function as a DLT table.name: Specifies the name of the output Delta table within the target schema (e.g.,delay_summary_hourly).comment: Provides a descriptive comment for the table, useful for data cataloging.table_properties: Allows setting Delta Lake table properties directly."quality": "gold": A custom property to denote the data quality level."delta.logRetentionDuration": "30 days": Configures how long Delta Lake transaction logs are retained, impactingVACUUMoperations."delta.targetFileSize": "128mb": Optimizes the target file size for data written to the Delta table, crucial for query performance in analytical workloads (prevents many small files or excessively large ones).
@dlt.expect_or_drop(...): These are DLT “expectations” for data quality.- They define rules that data must satisfy. If a record violates an expectation,
expect_or_dropwill remove it from the output table, preventing bad data from polluting the Gold layer. Other options likeexpect_or_fail(pipeline fails) orexpect_or_warn(logs a warning) are also available depending on the desired strictness.
- They define rules that data must satisfy. If a record violates an expectation,
delay_summary_hourly()function:dlt.read_stream("supply_chain_silver.enriched_events"): Reads data incrementally from our Silver layerenriched_eventstable. DLT automatically manages the streaming offsets.withColumn("event_date_hour", F.date_trunc("hour", F.col("event_timestamp"))): Extracts the hour from theevent_timestampto group data hourly.withColumn("delay_minutes", ...): Calculates the delay in minutes. It handles cases whereactual_delivery_timemight be null (i.e., shipment not yet delivered) by assigning a 0-minute delay, which can be adjusted based on business logic. Theroundfunction ensures clean numerical output.groupBy(...): Groups the data by the specified dimensions for aggregation.agg(...): Performs the actual aggregations:count("event_id"): Counts total shipments.sum(F.when(F.col("delay_minutes") > 0, 1).otherwise(0)): Counts delayed shipments.avg,min,maxfordelay_minutes: Provides comprehensive delay statistics.current_timestamp(): Records when the aggregation was last updated.
withColumn("on_time_delivery_rate", ...): Calculates the on-time delivery rate as a percentage.
critical_delays()function:dlt.read_stream("supply_chain_silver.enriched_events"): Again, reads incrementally from the Silver layer.withColumn("delay_minutes", ...): Recalculatesdelay_minutes(could be refactored into a shared function if more complex).filter(F.col("delay_minutes") >= CRITICAL_DELAY_THRESHOLD_MINUTES): Filters for only those events that meet our definition of a critical delay.select(...): Selects and renames columns to match ourcritical_delaysschema.F.lit("Critical").alias("severity_level"): Assigns a static severity level for these filtered events.
b) Core Implementation
The core implementation is provided in the dlt_pipelines/gold_pipeline.py file above. This file contains the complete definitions for both delay_summary_hourly and critical_delays DLT tables.
c) Testing This Component
To test the Gold layer DLT pipeline, you need to first ensure your Bronze and Silver pipelines are running and generating data. Then, deploy and run the Gold pipeline.
Steps to Test:
- Ensure Silver Data Exists: Verify that
supply_chain_silver.enriched_eventshas data. If not, run your Bronze and Silver DLT pipelines first. - Create/Update DLT Pipeline:
- Navigate to the Databricks Workspace.
- Go to “Workflows” -> “Delta Live Tables”.
- Click “Create Pipeline” or select your existing pipeline and click “Edit Pipeline”.
- Add
dlt_pipelines/gold_pipeline.pyas a new notebook source (if creating a new DLT pipeline specifically for Gold) or add it to your existing multi-tier pipeline definition. - Crucially, ensure the target database for this pipeline is set to
supply_chain_gold. - Configure
Cluster modetoEnhanced Autoscalingfor production-readiness. - Set
ChanneltoCURRENTfor the latest features. - Set
Pipeline modetoContinuousfor real-time processing. - Click “Create” or “Save”.
- Start the DLT Pipeline:
- Select the newly created or updated pipeline.
- Click “Start”.
- Monitor the DLT UI to observe the pipeline’s execution, data flow from Silver to Gold, and table creation.
- Query Gold Tables: Once the pipeline is running and data is flowing, you can query the Gold tables using Databricks SQL or a separate notebook.
Example Queries (in a Databricks Notebook or SQL Editor):
-- Query 1: Check the hourly delay summary
SELECT *
FROM supply_chain_gold.delay_summary_hourly
ORDER BY event_date_hour DESC
LIMIT 100;
-- Query 2: Check critical delays
SELECT *
FROM supply_chain_gold.critical_delays
ORDER BY event_timestamp DESC
LIMIT 100;
-- Query 3: Verify data quality expectations (optional, DLT UI shows this)
-- This is more for validating the DLT expectations reporting
SELECT
SUM(CASE WHEN _is_valid_event_date_hour THEN 1 ELSE 0 END) AS valid_event_date_hour_count,
SUM(CASE WHEN _is_valid_product_id THEN 1 ELSE 0 END) AS valid_product_id_count,
COUNT(*) AS total_records
FROM supply_chain_gold.delay_summary_hourly_violations; -- DLT creates a violations table by default for expectations
Expected Behavior:
- The
delay_summary_hourlytable should populate with aggregated metrics, updating as new data arrives in the Silver layer. - The
critical_delaystable should contain individual records for shipments exceeding theCRITICAL_DELAY_THRESHOLD_MINUTES. - The DLT UI should show the successful creation and updates of the
delay_summary_hourlyandcritical_delaystables, with green checks indicating successful expectation checks. - No records should be dropped or quarantined if the Silver data adheres to the expectations. If any Silver data violates the Gold layer expectations, those records will be dropped or logged to a quarantine table (depending on the expectation configuration).
Debugging Tips:
- DLT UI Logs: The DLT UI provides detailed logs for each step of the pipeline. Check for errors, warnings, and expectation violation reports.
- Source Data Integrity: If Gold tables are empty or incorrect, first verify the data in your Silver layer (
supply_chain_silver.enriched_events). - Schema Mismatches: Ensure that column names and data types expected by the Gold pipeline (e.g.,
event_timestamp,actual_delivery_time) match those in the Silver table. - Filter/Aggregation Logic: Double-check your
filterconditions andgroupBy/aggfunctions for correctness. - Spark UI: For performance issues, use the Spark UI accessible via the DLT pipeline details page to inspect job execution, stages, and tasks.
Production Considerations
Transitioning Gold layer DLT pipelines to production requires careful attention to performance, reliability, security, and maintainability.
Error Handling
Databricks DLT provides robust error handling capabilities:
- DLT Expectations: As demonstrated,
dlt.expect_or_drop(orexpect_or_fail,expect_or_warn) are critical for data quality. For production, define a comprehensive set of expectations based on business rules for each Gold table. Consider usingexpect_or_failfor critical data integrity rules (e.g.,event_id IS NOT NULL) andexpect_or_dropfor less critical issues that shouldn’t halt the pipeline but should be logged. - DLT Metrics: DLT automatically captures metrics on data quality, including successful records, dropped records, and violated expectations. Monitor these metrics via the DLT UI or by querying the
system.workflow.eventstable (or similar audit tables in Unity Catalog) for automated alerts. - Dead Letter Queue (DLQ): While DLT handles dropped records, for complex scenarios or sensitive data, you might want to explicitly write records failing expectations to a separate “quarantine” Delta table for manual review and reprocessing. This can be achieved by adding an output path for invalid records in your expectations.
- Retry Mechanisms: DLT pipelines inherently have retry mechanisms for transient failures. Ensure your source systems (e.g., Kafka) retain data for a sufficient period to allow for retries.
Performance Optimization
Optimizing Gold layer performance is crucial for real-time analytics:
- Photon Engine: Ensure your DLT pipeline is configured to use the Photon engine. Photon significantly accelerates Spark workloads, especially for SQL and Delta Lake operations, which are heavily used in aggregation layers.
- Enhanced Autoscaling: Utilize DLT’s Enhanced Autoscaling. It intelligently scales clusters based on workload demand, ensuring optimal resource utilization and cost efficiency while maintaining performance.
- Delta Lake Optimizations:
delta.targetFileSize: As set intable_properties, this helps prevent the “small file problem” and ensures optimal file sizes for reads.OPTIMIZEandVACUUM: DLT automatically runsOPTIMIZEfor tables withAPPLY CHANGES INTOandtargetFileSizeset. For other tables, consider schedulingOPTIMIZEandVACUUMas separate Databricks Jobs on your Gold tables periodically (e.g., daily) to further compact files and remove stale data.- Z-ordering: If your Gold tables are frequently filtered by specific high-cardinality columns (e.g.,
product_id,supplier_id), consider Z-ordering these columns duringOPTIMIZEoperations to co-locate related data, drastically improving query performance. This can be done viaALTER TABLE ... ZORDER BY (...)or as part of a scheduledOPTIMIZEjob.
- Partitioning: For very large Gold tables, consider partitioning by a low-cardinality, frequently queried column (e.g.,
event_date). However, DLT’s internal optimizations often reduce the need for manual partitioning. - Materialized Views (Optional): For highly complex queries or those requiring very low latency, consider creating Delta Live Tables as materialized views (
as_view=True) on top of your Gold tables. These can be incrementally updated by DLT.
Security Considerations
Security at the Gold layer is paramount as it often exposes business-critical data:
- Unity Catalog Integration: Leverage Databricks Unity Catalog for fine-grained access control.
- Schema/Table Permissions: Grant
SELECTpermissions to BI tools and analytical users onsupply_chain_goldschema and its tables. RestrictMODIFY,CREATE,DELETEto data engineers responsible for the DLT pipeline. - Column-Level Security: For sensitive columns (e.g., if any PII were to make it to Gold, though ideally filtered earlier), use Unity Catalog’s column-level access control.
- Row-Level Security: Implement row-level security using SQL functions or views in Unity Catalog if different user groups should only see specific subsets of data (e.g., a regional manager only sees data for their region).
- Schema/Table Permissions: Grant
- Service Principals/Managed Identities: DLT pipelines should run under a service principal or managed identity with the least necessary privileges to access source Silver tables and write to target Gold tables. Avoid using personal access tokens for production pipelines.
- Data Encryption: Delta Lake tables are stored in cloud object storage (e.g., S3, ADLS Gen2, GCS). Ensure this storage is encrypted at rest (SSE-S3, SSE-KMS, Azure Storage Encryption, Google Cloud Storage Encryption) and in transit (TLS/SSL). Databricks handles this by default for managed tables.
- Audit Logging: All access to Gold tables via Unity Catalog is logged, providing a comprehensive audit trail for compliance and security monitoring.
Logging and Monitoring
Effective logging and monitoring are crucial for operational excellence:
- DLT UI: The DLT UI provides a real-time view of pipeline status, data flow, and health metrics.
- Databricks Monitoring Tools: Utilize Databricks’ built-in monitoring capabilities, including event logs and metrics, to track pipeline health, data volumes, and processing latency.
- Custom Logging: For specific debugging or operational insights within your Python DLT code, use Python’s
loggingmodule. DLT integrates these logs into the pipeline’s execution logs. - Alerting: Set up alerts based on DLT pipeline failures, high numbers of dropped records (expectation violations), or significant deviations in processing latency. Integrate with tools like PagerDuty, Slack, or email.
- Databricks SQL Dashboards: Build Databricks SQL Dashboards on top of Gold tables and DLT system tables (
system.workflow.events) to visualize data quality, pipeline performance, and business metrics.
Code Review Checkpoint
At this point, we have successfully implemented the Gold layer DLT pipeline for real-time supply chain delay analytics.
Summary of what was built:
delay_summary_hourlytable: An hourly aggregated view of supply chain performance, including total shipments, delayed shipments, average/min/max delay times, and on-time delivery rates.critical_delaystable: A granular log of individual supply chain events that exceed a predefined critical delay threshold, providing details for immediate investigation.- DLT Pipeline: A robust Databricks Delta Live Tables pipeline that incrementally processes data from the Silver layer, applies aggregations and filters, and writes to the Gold layer.
- Data Quality: Implemented DLT Expectations (
expect_or_drop) to ensure data quality at the Gold layer. - Production Best Practices: Incorporated
table_propertiesfor Delta Lake optimization and noted considerations for security, performance, and monitoring.
Files created/modified:
dlt_pipelines/gold_pipeline.py: New file containing the DLT definitions fordelay_summary_hourlyandcritical_delays.
How it integrates with existing code:
The gold_pipeline.py seamlessly integrates by reading directly from the supply_chain_silver.enriched_events table, which is the output of our Silver layer DLT pipeline (defined in silver_pipeline.py). This establishes a clear dependency and data flow in our Medallion Lakehouse architecture. The Gold tables are now ready for consumption by BI tools or further analytical models.
Common Issues & Solutions
Here are a few common issues you might encounter when working with DLT Gold layer pipelines and their solutions:
Issue:
supply_chain_golddatabase not found or permissions error.- Description: When running the DLT pipeline, you might see errors indicating that the target database
supply_chain_golddoes not exist or the pipeline’s service principal lacks permissions to create/write to it. - Debugging: Check the DLT pipeline settings. Ensure the “Target Schema” field is correctly set to
supply_chain_gold. If using Unity Catalog, verify that the service principal or user running the DLT pipeline hasCREATE SCHEMA,USE SCHEMA, andCREATE TABLEpermissions on the Unity Catalog metastore and the target schema. - Solution:
- For Unity Catalog: Grant the necessary permissions:
GRANT CREATE SCHEMA ON CATALOG <your_catalog_name> TO `<service_principal_id>`; GRANT USE SCHEMA ON SCHEMA <your_catalog_name>.supply_chain_gold TO `<service_principal_id>`; GRANT CREATE TABLE ON SCHEMA <your_catalog_name>.supply_chain_gold TO `<service_principal_id>`; - For Hive Metastore (legacy): Ensure the database exists or is created automatically by DLT (which it usually does if permissions are sufficient).
- For Unity Catalog: Grant the necessary permissions:
- Description: When running the DLT pipeline, you might see errors indicating that the target database
Issue: Data quality expectation failures leading to dropped records.
- Description: You notice that records are being dropped from your Gold tables, and the DLT UI reports expectation violations (e.g.,
valid_product_idfailed). - Debugging:
- Examine the DLT UI’s “Data Quality” tab for the affected table. It will show the number of records violating each expectation.
- Query the DLT event log or the violation table (e.g.,
supply_chain_gold.delay_summary_hourly_violations) to inspect the dropped records and understand why they failed. - Inspect the source
supply_chain_silver.enriched_eventstable for the problematic data.
- Solution:
- Root Cause Analysis: Determine if the issue is with the upstream Silver data (e.g., a bug in the Silver pipeline introduced nulls) or if the Gold layer’s expectation is too strict or incorrect.
- Fix Upstream: If the Silver data is truly bad, fix the Silver pipeline to prevent bad data from reaching Gold.
- Adjust Expectation: If the expectation in Gold is too strict for the incoming data, you might need to adjust it (e.g.,
expect_or_warninstead ofexpect_or_dropif the issue is minor and shouldn’t block data flow). - Reprocess: After fixing, re-run the DLT pipeline (either a full refresh or allow continuous mode to catch up).
- Description: You notice that records are being dropped from your Gold tables, and the DLT UI reports expectation violations (e.g.,
Issue: Gold layer queries are slow, despite DLT optimizations.
- Description: Even with Photon and
targetFileSizeset, analytical queries on the Gold tables (e.g., from a BI tool) are performing poorly. - Debugging:
- Spark UI: Analyze the query plan in the Spark UI for the slow queries. Look for data skew, full table scans, or inefficient joins.
- Data Volume: Check the size of your Gold tables. Are they extremely large?
- Query Patterns: Understand how users are querying the data. What columns are used in
WHEREclauses orJOINconditions?
- Solution:
- Z-ordering: If queries frequently filter on high-cardinality columns (e.g.,
product_id,supplier_id), apply Z-ordering to these columns. This must be done via anOPTIMIZEcommand, which you can schedule as a separate Databricks Job.OPTIMIZE supply_chain_gold.delay_summary_hourly ZORDER BY (product_id, supplier_id); - Further Partitioning: For extremely large tables and specific query patterns, consider adding a partition column (e.g.,
event_date) to the Gold tables if not already present. This requires recreating the table or altering the DLT pipeline. - Materialized Views: If specific complex aggregations are performed repeatedly, create DLT-managed materialized views (
as_view=True) that pre-compute these aggregations. - Cluster Sizing: Ensure the cluster running the BI queries (if using Databricks SQL Warehouses) is appropriately sized.
- Data Skew: If Spark UI shows data skew, investigate the keys used in aggregations and joins. You might need to salt keys or use broadcast joins if appropriate.
- Z-ordering: If queries frequently filter on high-cardinality columns (e.g.,
- Description: Even with Photon and
Testing & Verification
After implementing and deploying the Gold layer DLT pipeline, thorough testing and verification are essential to ensure data accuracy and reliability.
How to test the chapter’s work:
Run the entire DLT pipeline: Start your DLT pipeline in continuous mode (or trigger a full refresh if preferred). Monitor the DLT UI to ensure all Bronze, Silver, and Gold tables are processing data correctly and without errors.
Generate Sample Data (if needed): If your Kafka producer isn’t actively sending data, use your simulated data generator from previous chapters to push new events into Kafka. Observe these events flowing through Bronze, Silver, and finally populating the Gold tables.
Data Validation Queries: Execute a series of SQL queries against the Gold tables to validate the aggregated results.
Verify
delay_summary_hourlyaggregations:-- Check total shipments and on-time rate for a specific product/supplier SELECT event_date_hour, product_id, supplier_id, total_shipments, total_delayed_shipments, on_time_delivery_rate, avg_delay_minutes FROM supply_chain_gold.delay_summary_hourly WHERE product_id = 'P001' AND supplier_id = 'S001' ORDER BY event_date_hour DESC LIMIT 10; -- Verify counts against Silver layer for a specific hour (manual check) -- This requires knowing the exact events that flowed into Silver for a given hour. -- For example, if you know 100 events for P001/S001 happened in '2025-12-20 10:00:00' -- in Silver, then total_shipments in Gold for that hour should be 100.Verify
critical_delaysentries:-- Check if critical delays are being captured based on the threshold SELECT * FROM supply_chain_gold.critical_delays WHERE delay_minutes >= 60 -- Should match CRITICAL_DELAY_THRESHOLD_MINUTES ORDER BY event_timestamp DESC LIMIT 10; -- Cross-reference a critical delay event with the Silver layer SELECT * FROM supply_chain_silver.enriched_events WHERE event_id IN (SELECT event_id FROM supply_chain_gold.critical_delays LIMIT 1); -- Verify that the delay_minutes calculated in Silver matches the one in GoldCheck Data Quality Expectations:
-- Inspect any records dropped due to expectations (if any) SELECT * FROM supply_chain_gold.delay_summary_hourly_violations; SELECT * FROM supply_chain_gold.critical_delays_violations;
What should work now:
- Real-time aggregation of supply chain events into hourly summaries of delay metrics.
- Identification and logging of individual critical delay incidents.
- Data quality checks enforcing integrity for Gold layer tables.
- The DLT pipeline should be running continuously, processing new Silver data and updating Gold tables with minimal latency.
- The Gold tables are ready for direct consumption by BI tools (e.g., Databricks SQL Dashboards, Power BI, Tableau) to visualize key performance indicators (KPIs) and alert on critical events.
How to verify everything is correct:
- DLT UI Health: Ensure the DLT pipeline graph shows green checkmarks for all tables and updates are occurring regularly. Check the “Data Quality” tab for any unexpected dropped records.
- Query Results: The SQL queries above should return meaningful, accurate data that aligns with your understanding of the Silver layer input.
- Business Logic Validation: Share sample Gold layer data with a business analyst or domain expert to confirm that the derived metrics (e.g.,
avg_delay_minutes,on_time_delivery_rate) make sense and are calculated according to business rules.
Summary & Next Steps
Congratulations! In this chapter, we successfully built the Gold layer of our real-time supply chain analytics solution using Databricks Delta Live Tables. We developed robust DLT pipelines to aggregate cleaned and enriched data from the Silver layer into actionable, production-ready Gold tables: delay_summary_hourly for performance trends and critical_delays for incident management. We emphasized production considerations such as DLT expectations for data quality, performance optimizations using Photon and Delta Lake features, and comprehensive security and monitoring practices with Unity Catalog.
This Gold layer is a significant milestone, as it transforms raw event data into valuable business intelligence, enabling real-time decision-making and proactive management of supply chain operations. The data is now optimized for consumption by dashboards, reports, and downstream analytical applications.
In the next chapter, we will shift our focus to another critical aspect of supply chain management: HS Code-based Import-Export Tariff Impact Analysis with Historical Trend Processing in Databricks. This will involve integrating external tariff data, processing it to understand its impact on procurement costs, and building historical trend analysis capabilities, further enriching our supply chain intelligence.