Chapter 9: Building the Customs Trade Data Lakehouse & HS Code Validation

Welcome to Chapter 9 of our real-time supply chain project! In this chapter, we will lay the foundation for intelligent customs trade data analysis by building a robust Data Lakehouse. Specifically, we’ll focus on ingesting and preparing customs declaration data, establishing a master data repository for HS (Harmonized System) codes, and setting up initial data quality validation using Databricks Delta Live Tables (DLT).

This step is critical because accurate HS code classification is paramount for correct tariff application, trade compliance, and avoiding costly penalties. By centralizing and validating this data within a structured Lakehouse environment, we create a reliable source for our tariff impact analysis and anomaly detection systems. We’ll leverage DLT’s declarative nature to build resilient, self-managing data pipelines that transform raw, semi-structured customs data into clean, conformed datasets.

From previous chapters, you should have your Databricks environment configured and a landing zone for raw data (e.g., from Kafka topics written to cloud storage). We’ll assume raw customs declarations are landing in a designated cloud storage path. By the end of this chapter, you will have a fully functional Bronze and Silver layer for your customs trade data, ready for advanced HS code validation and anomaly detection in subsequent chapters.

Planning & Design

Our Data Lakehouse will adhere to the Medallion Architecture, providing a structured approach to data processing:

  • Bronze Layer: This layer will store raw, immutable data exactly as it’s ingested from external sources. It acts as a historical archive and a source of truth for all raw data. For customs declarations, this means capturing the raw JSON or CSV payloads.
  • Silver Layer: In this layer, data from the Bronze layer is cleaned, parsed, standardized, and enriched. We’ll apply data quality rules, transform data types, and select relevant fields. This layer is designed to be a reliable source for analytics and downstream applications.
  • Gold Layer (Future): This layer will house highly aggregated and business-specific data, optimized for reporting, dashboards, and machine learning models. We will build this in later chapters.

Component Architecture for Customs Trade Data

graph TD A[External Customs Systems/APIs] --> B(Kafka Topics) B --> C[Cloud Storage Landing Zone - Raw Customs Data] C --> DLT_BRONZE{Databricks Delta Live Tables: Bronze Layer} DLT_BRONZE --> E[Delta Lake: `bronze_customs_declarations`] E --> DLT_SILVER{Databricks Delta Live Tables: Silver Layer} DLT_SILVER --> F[Delta Lake: `silver_customs_declarations`] G[External HS Code Master Data - CSV/Parquet] --> DLT_HS_MASTER{Databricks Delta Live Tables: HS Code Master} DLT_HS_MASTER --> H[Delta Lake: `silver_hs_code_master`] F & H --> I[HS Code Validation & Anomaly Detection - Future] I --> J[Gold Layer Analytics - Future]

Database Schema (Conceptual)

bronze_customs_declarations (Streaming Live Table)

  • _raw_data: STRING (Raw JSON payload)
  • _ingestion_timestamp: TIMESTAMP (When record was ingested into Bronze)
  • _source_file: STRING (Path to the source file if applicable)

silver_customs_declarations (Streaming Live Table)

  • declaration_id: STRING (Unique identifier for the declaration)
  • product_description: STRING (Description of the product)
  • declared_hs_code: STRING (HS code provided in the declaration)
  • importer_exporter_name: STRING (Name of the entity, potentially masked)
  • declaration_date: DATE (Date of the customs declaration)
  • value_usd: DECIMAL(18,2) (Value of goods in USD)
  • currency: STRING (Original currency)
  • country_of_origin: STRING
  • country_of_destination: STRING
  • source_system: STRING (e.g., “Kafka_Customs_Feed”)
  • processing_timestamp: TIMESTAMP (When record was processed into Silver)
  • _bronze_ingestion_timestamp: TIMESTAMP (Link to Bronze record)

silver_hs_code_master (Live Table - Batch)

  • hs_code: STRING (The official HS code, e.g., 6-digit or 10-digit)
  • description: STRING (Official description of the HS code)
  • tariff_rate_import: DECIMAL(5,2) (Current import tariff rate)
  • tariff_rate_export: DECIMAL(5,2) (Current export tariff rate)
  • effective_date: DATE (When the tariff rate became effective)
  • end_date: DATE (When the tariff rate expired, or NULL if current)
  • source_agency: STRING (e.g., “WCO”, “USITC”)
  • last_updated: TIMESTAMP

File Structure

We’ll organize our DLT pipeline code within a Databricks Repo.

├── src/
│   ├── data_lakehouse/
│   │   ├── customs/
│   │   │   ├── dlt_pipelines/
│   │   │   │   └── customs_dlt_pipeline.py  # Main DLT pipeline definition
│   │   │   └── schemas/
│   │   │       └── customs_schemas.py       # Pydantic/StructType schemas
│   ├── common/
│   │   └── utils.py                       # Common utilities like logging
├── notebooks/
│   └── dlt_pipeline_runner.ipynb         # Notebook to trigger DLT pipeline
├── conf/
│   └── dlt_config.json                   # DLT pipeline configuration

Step-by-Step Implementation

a) Setup/Configuration

First, ensure you have a Databricks workspace and a Databricks Repo set up. Clone your repo and create the file structure as outlined above.

We’ll start by defining the schemas for our customs data. This helps enforce data quality and provides clarity.

File: src/data_lakehouse/customs/schemas/customs_schemas.py

# src/data_lakehouse/customs/schemas/customs_schemas.py
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DecimalType, DateType

# Schema for raw customs declaration data (Bronze layer)
# Assuming raw data is JSON strings, we'll parse it in DLT
raw_customs_declaration_schema = StructType([
    StructField("declaration_id", StringType(), True),
    StructField("product_description", StringType(), True),
    StructField("declared_hs_code", StringType(), True),
    StructField("importer_exporter_name", StringType(), True),
    StructField("declaration_date", StringType(), True), # Raw string, will be parsed to DateType
    StructField("value", StringType(), True),          # Raw string, will be parsed to DecimalType
    StructField("currency", StringType(), True),
    StructField("country_of_origin", StringType(), True),
    StructField("country_of_destination", StringType(), True),
    StructField("source_system", StringType(), True)
])

# Schema for the Silver layer customs declarations
silver_customs_declaration_schema = StructType([
    StructField("declaration_id", StringType(), False), # Not nullable
    StructField("product_description", StringType(), True),
    StructField("declared_hs_code", StringType(), True),
    StructField("importer_exporter_name", StringType(), True),
    StructField("declaration_date", DateType(), True),
    StructField("value_usd", DecimalType(18, 2), True),
    StructField("currency", StringType(), True),
    StructField("country_of_origin", StringType(), True),
    StructField("country_of_destination", StringType(), True),
    StructField("source_system", StringType(), True),
    StructField("processing_timestamp", TimestampType(), False),
    StructField("_bronze_ingestion_timestamp", TimestampType(), False)
])

# Schema for HS Code Master Data (Silver layer)
hs_code_master_schema = StructType([
    StructField("hs_code", StringType(), False),
    StructField("description", StringType(), True),
    StructField("tariff_rate_import", DecimalType(5, 2), True),
    StructField("tariff_rate_export", DecimalType(5, 2), True),
    StructField("effective_date", DateType(), True),
    StructField("end_date", DateType(), True),
    StructField("source_agency", StringType(), True),
    StructField("last_updated", TimestampType(), True)
])

Next, create the main DLT pipeline file.

File: src/data_lakehouse/customs/dlt_pipelines/customs_dlt_pipeline.py

# src/data_lakehouse/customs/dlt_pipelines/customs_dlt_pipeline.py
import dlt
from pyspark.sql.functions import col, current_timestamp, from_json, to_date, regexp_replace, lit, coalesce, round, expr
from pyspark.sql.types import StringType, DecimalType

# Import custom schemas
from data_lakehouse.customs.schemas.customs_schemas import (
    raw_customs_declaration_schema,
    silver_customs_declaration_schema,
    hs_code_master_schema
)

# Configuration for input paths - these will be set via DLT pipeline settings
# For local testing or development, you might hardcode them or use default values.
# In production, DLT parameters are preferred.
RAW_CUSTOMS_LANDING_PATH = "/Volumes/main/default/raw_customs_data" # Example Unity Catalog volume path
HS_CODE_MASTER_PATH = "/Volumes/main/default/hs_code_master_data/hs_codes.csv" # Example Unity Catalog volume path
CURRENCY_EXCHANGE_RATES_PATH = "/Volumes/main/default/currency_exchange_rates/latest.csv" # Example Unity Catalog volume path

# --- Bronze Layer: Raw Ingestion ---
@dlt.table(
    name="bronze_customs_declarations",
    comment="Raw customs declaration data ingested from landing zone.",
    table_properties={"quality": "bronze"}
)
@dlt.expect_all_or_drop({"valid_declaration_id": "declaration_id IS NOT NULL"})
def bronze_customs_declarations():
    """
    Ingests raw JSON customs declaration data from a cloud storage landing zone
    into the Bronze layer.
    """
    # For production, RAW_CUSTOMS_LANDING_PATH should be configured as a DLT pipeline parameter.
    # We assume raw JSON files are landing here, e.g., from Kafka Connect S3 Sink.
    df = (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.maxFilesPerTrigger", 100)
        .option("cloudFiles.schemaLocation", f"{RAW_CUSTOMS_LANDING_PATH}/_schemas/bronze_customs_declarations")
        .load(RAW_CUSTOMS_LANDING_PATH)
        .withColumn("_ingestion_timestamp", current_timestamp())
        .withColumn("_source_file", expr("input_file_name()"))
    )
    # Parse the raw JSON string into its structured fields
    # We use a temporary column 'parsed_data' to extract fields
    parsed_df = df.withColumn("parsed_data", from_json(col("_raw_data"), raw_customs_declaration_schema)) \
                  .select(
                      col("parsed_data.declaration_id").alias("declaration_id"),
                      col("parsed_data.product_description").alias("product_description"),
                      col("parsed_data.declared_hs_code").alias("declared_hs_code"),
                      col("parsed_data.importer_exporter_name").alias("importer_exporter_name"),
                      col("parsed_data.declaration_date").alias("declaration_date"),
                      col("parsed_data.value").alias("value"),
                      col("parsed_data.currency").alias("currency"),
                      col("parsed_data.country_of_origin").alias("country_of_origin"),
                      col("parsed_data.country_of_destination").alias("country_of_destination"),
                      col("parsed_data.source_system").alias("source_system"),
                      col("_ingestion_timestamp"),
                      col("_source_file")
                  )
    return parsed_df

# --- Silver Layer: Cleaned and Conformed Data ---
@dlt.table(
    name="silver_customs_declarations",
    comment="Cleaned and conformed customs declaration data.",
    table_properties={"quality": "silver"}
)
@dlt.expect("valid_hs_code_format", "declared_hs_code IS NOT NULL AND LENGTH(declared_hs_code) >= 6")
@dlt.expect("positive_value", "value_usd >= 0")
@dlt.expect_or_drop("valid_declaration_date", "declaration_date IS NOT NULL")
def silver_customs_declarations():
    """
    Transforms raw customs declaration data from the Bronze layer into a cleaned,
    conformed Silver layer table.
    """
    # Read from Bronze layer
    bronze_df = dlt.read_stream("bronze_customs_declarations")

    # Read latest currency exchange rates (assumed to be a small, frequently updated CSV/Delta table)
    # For a real-time system, this might come from a streaming source or a dedicated DLT table.
    # For simplicity, we'll treat it as a batch load for this chapter.
    exchange_rates_df = (
        spark.read
        .format("csv")
        .option("header", "true")
        .load(CURRENCY_EXCHANGE_RATES_PATH)
        .select(col("currency_code").alias("from_currency"),
                col("usd_rate").cast(DecimalType(10, 4)).alias("usd_rate"))
    )

    # Perform cleaning and transformations
    transformed_df = (
        bronze_df.withColumn("processing_timestamp", current_timestamp())
        .withColumn("declaration_date", to_date(col("declaration_date"), "yyyy-MM-dd"))
        .withColumn("cleaned_value",
                    regexp_replace(col("value"), "[^0-9.]", "").cast(DecimalType(18, 2))) # Remove non-numeric chars
        .join(exchange_rates_df, bronze_df.currency == exchange_rates_df.from_currency, "left")
        .withColumn("value_usd", coalesce(round(col("cleaned_value") * col("usd_rate"), 2), col("cleaned_value"))) # Convert to USD, fallback to original if rate missing
        .select(
            col("declaration_id"),
            col("product_description"),
            col("declared_hs_code"),
            col("importer_exporter_name"),
            col("declaration_date"),
            col("value_usd"),
            col("currency"),
            col("country_of_origin"),
            col("country_of_destination"),
            col("source_system"),
            col("processing_timestamp"),
            col("_ingestion_timestamp").alias("_bronze_ingestion_timestamp")
        )
    )
    return transformed_df

# --- Silver Layer: HS Code Master Data ---
@dlt.table(
    name="silver_hs_code_master",
    comment="Master data for HS codes, descriptions, and tariff rates.",
    table_properties={"quality": "silver"}
)
@dlt.expect_all_or_fail({"unique_hs_code": "hs_code IS NOT NULL",
                         "valid_tariff_rates": "tariff_rate_import >= 0 AND tariff_rate_export >= 0"})
def silver_hs_code_master():
    """
    Loads HS code master data from a static CSV/Parquet file into the Silver layer.
    This table serves as a reference for validation and enrichment.
    """
    # For production, HS_CODE_MASTER_PATH should be configured as a DLT pipeline parameter.
    # This is typically a batch load, refreshed periodically.
    df = (
        spark.read
        .format("csv") # Or "delta", "parquet" depending on source
        .option("header", "true")
        .option("inferSchema", "false") # Explicit schema for production
        .schema(hs_code_master_schema)
        .load(HS_CODE_MASTER_PATH)
        .withColumn("last_updated", current_timestamp())
    )
    return df

Explanation of the DLT Pipeline:

  1. Imports: We import dlt for DLT decorators, pyspark.sql.functions for data transformations, and our custom schemas.
  2. Configuration: RAW_CUSTOMS_LANDING_PATH, HS_CODE_MASTER_PATH, and CURRENCY_EXCHANGE_RATES_PATH are placeholders. In a real production setup, these would be passed as parameters to your DLT pipeline configuration, making it flexible across environments (dev, staging, prod).
  3. bronze_customs_declarations (Bronze Layer):
    • @dlt.table: Defines this function as a DLT table.
    • name: The name of the Delta table in Unity Catalog (e.g., main.default.bronze_customs_declarations).
    • comment and table_properties: Provide metadata for discoverability and governance.
    • @dlt.expect_all_or_drop: This is a crucial data quality constraint. If declaration_id is null, the entire row will be dropped. DLT will log these dropped records. This ensures only records with a basic identifier proceed.
    • spark.readStream.format("cloudFiles"): This is Databricks’ Auto Loader, highly optimized for incremental processing of files landing in cloud storage. It automatically tracks processed files, handles schema evolution, and is fault-tolerant.
    • option("cloudFiles.schemaLocation"): Specifies a location for Auto Loader to store schema inference and evolution files.
    • .withColumn("_ingestion_timestamp", current_timestamp()): Adds a timestamp for when the record entered the Bronze layer.
    • .withColumn("_source_file", expr("input_file_name()")): Captures the original file name for auditing.
    • from_json: Parses the raw JSON string (assuming the _raw_data column contains the JSON) into a structured format using raw_customs_declaration_schema.
  4. silver_customs_declarations (Silver Layer):
    • @dlt.expect, @dlt.expect_or_drop: More specific data quality checks.
      • valid_hs_code_format: Ensures declared_hs_code is not null and has a minimum length (e.g., 6 digits for international standard). If violated, the row is marked as invalid but still processed (not dropped by default unless FAIL ON VIOLATION is set).
      • positive_value: Ensures value_usd is not negative.
      • valid_declaration_date: If declaration_date is null after conversion, the row is dropped.
    • dlt.read_stream("bronze_customs_declarations"): Reads incrementally from our Bronze table, ensuring continuous processing.
    • Currency Conversion: We join with a static (for now) exchange_rates_df to convert value to value_usd. In a real-world scenario, CURRENCY_EXCHANGE_RATES_PATH might point to another DLT table or a real-time feed.
    • to_date, regexp_replace, cast: Standard PySpark SQL functions for data cleaning and type conversion. regexp_replace is used to clean potential non-numeric characters from the value field before casting to Decimal.
    • coalesce: Used for value_usd to handle cases where currency conversion might fail (e.g., missing exchange rate), falling back to the original cleaned value.
  5. silver_hs_code_master (Silver Layer - Master Data):
    • This is a regular dlt.table (not dlt.read_stream) because it’s typically a batch load of static or periodically updated master data.
    • @dlt.expect_all_or_fail: If any of these expectations (non-null hs_code, valid tariff rates) are violated, the entire pipeline will fail, indicating a critical issue with the master data itself. This is stricter than _or_drop.
    • spark.read.format("csv"): Reads the master data from a CSV file. For production, specify inferSchema="false" and explicitly provide the schema to prevent issues.

c) Testing This Component

To test this DLT pipeline, you’ll need to:

  1. Upload Sample Data:

    • Create a sample raw_customs_data.json file.
    • Create a sample hs_codes.csv file.
    • Create a sample latest.csv for currency exchange rates.
    • Upload these files to the Unity Catalog volumes or cloud storage paths specified in your DLT pipeline (RAW_CUSTOMS_LANDING_PATH, HS_CODE_MASTER_PATH, CURRENCY_EXCHANGE_RATES_PATH).

    Example raw_customs_data.json (upload this as a file to RAW_CUSTOMS_LANDING_PATH):

    {"declaration_id": "DCL001", "product_description": "Laptop Computers", "declared_hs_code": "847130", "importer_exporter_name": "Tech Corp", "declaration_date": "2025-10-01", "value": "125000.00", "currency": "USD", "country_of_origin": "CN", "country_of_destination": "US", "source_system": "Kafka_Customs_Feed", "_raw_data": "{\"declaration_id\": \"DCL001\", \"product_description\": \"Laptop Computers\", \"declared_hs_code\": \"847130\", \"importer_exporter_name\": \"Tech Corp\", \"declaration_date\": \"2025-10-01\", \"value\": \"125000.00\", \"currency\": \"USD\", \"country_of_origin\": \"CN\", \"country_of_destination\": \"US\", \"source_system\": \"Kafka_Customs_Feed\"}"}
    {"declaration_id": "DCL002", "product_description": "Smartphones", "declared_hs_code": "851712", "importer_exporter_name": "Mobile Inc", "declaration_date": "2025-10-02", "value": "200000.00", "currency": "EUR", "country_of_origin": "VN", "country_of_destination": "DE", "source_system": "Kafka_Customs_Feed", "_raw_data": "{\"declaration_id\": \"DCL002\", \"product_description\": \"Smartphones\", \"declared_hs_code\": \"851712\", \"importer_exporter_name\": \"Mobile Inc\", \"declaration_date\": \"2025-10-02\", \"value\": \"200000.00\", \"currency\": \"EUR\", \"country_of_origin\": \"VN\", \"country_of_destination\": \"DE\", \"source_system\": \"Kafka_Customs_Feed\"}"}
    {"declaration_id": "DCL003", "product_description": "Raw Materials", "declared_hs_code": "280300", "importer_exporter_name": "Chem Co", "declaration_date": "2025-10-03", "value": "50000.00", "currency": "GBP", "country_of_origin": "IN", "country_of_destination": "UK", "source_system": "Kafka_Customs_Feed", "_raw_data": "{\"declaration_id\": \"DCL003\", \"product_description\": \"Raw Materials\", \"declared_hs_code\": \"280300\", \"importer_exporter_name\": \"Chem Co\", \"declaration_date\": \"2025-10-03\", \"value\": \"50000.00\", \"currency\": \"GBP\", \"country_of_origin\": \"IN\", \"country_of_destination\": \"UK\", \"source_system\": \"Kafka_Customs_Feed\"}"}
    {"declaration_id": "DCL004", "product_description": "Invalid HS Code", "declared_hs_code": "123", "importer_exporter_name": "Bad Co", "declaration_date": "2025-10-04", "value": "1000.00", "currency": "USD", "country_of_origin": "US", "country_of_destination": "CA", "source_system": "Kafka_Customs_Feed", "_raw_data": "{\"declaration_id\": \"DCL004\", \"product_description\": \"Invalid HS Code\", \"declared_hs_code\": \"123\", \"importer_exporter_name\": \"Bad Co\", \"declaration_date\": \"2025-10-04\", \"value\": \"1000.00\", \"currency\": \"USD\", \"country_of_origin\": \"US\", \"country_of_destination\": \"CA\", \"source_system\": \"Kafka_Customs_Feed\"}"}
    {"declaration_id": "DCL005", "product_description": "Negative Value", "declared_hs_code": "900000", "importer_exporter_name": "Neg Co", "declaration_date": "2025-10-05", "value": "-500.00", "currency": "USD", "country_of_origin": "US", "country_of_destination": "CA", "source_system": "Kafka_Customs_Feed", "_raw_data": "{\"declaration_id\": \"DCL005\", \"product_description\": \"Negative Value\", \"declared_hs_code\": \"900000\", \"importer_exporter_name\": \"Neg Co\", \"declaration_date\": \"2025-10-05\", \"value\": \"-500.00\", \"currency\": \"USD\", \"country_of_origin\": \"US\", \"country_of_destination\": \"CA\", \"source_system\": \"Kafka_Customs_Feed\"}"}
    

    Example hs_codes.csv (upload this to HS_CODE_MASTER_PATH):

    hs_code,description,tariff_rate_import,tariff_rate_export,effective_date,end_date,source_agency
    847130,Automatic data processing machines,0.00,0.00,2024-01-01,,WCO
    851712,Telephones for cellular networks,0.00,0.00,2024-01-01,,WCO
    280300,Carbon (carbon blacks and other forms of carbon),0.00,0.00,2024-01-01,,WCO
    900000,General goods,5.00,0.00,2024-01-01,,WCO
    

    Example latest.csv (for currency exchange rates, upload this to CURRENCY_EXCHANGE_RATES_PATH):

    currency_code,usd_rate,last_updated
    USD,1.0000,2025-12-20
    EUR,1.0850,2025-12-20
    GBP,1.2600,2025-12-20
    
  2. Create a DLT Pipeline in Databricks:

    • Navigate to “Workflows” -> “Delta Live Tables” in your Databricks workspace.
    • Click “Create Pipeline”.
    • Pipeline Name: Customs_Trade_Lakehouse_Pipeline
    • Pipeline Mode: Continuous (for real-time ingestion)
    • Source Libraries: Click “Add pipeline library”, select “Workspace file” and navigate to src/data_lakehouse/customs/dlt_pipelines/customs_dlt_pipeline.py.
    • Target Schema: Specify a schema name (e.g., main.default.customs_lakehouse). Ensure your Unity Catalog is enabled and you have write permissions to this schema.
    • Storage Location: (Optional but recommended for production) Specify a cloud storage path where DLT can store checkpointing and schema information.
    • Advanced Options:
      • Configuration: Add the following key-value pairs to pass the paths to your DLT pipeline. Replace catalog_name.schema_name with your actual Unity Catalog path.
        • RAW_CUSTOMS_LANDING_PATH: /Volumes/catalog_name/schema_name/raw_customs_data
        • HS_CODE_MASTER_PATH: /Volumes/catalog_name/schema_name/hs_code_master_data/hs_codes.csv
        • CURRENCY_EXCHANGE_RATES_PATH: /Volumes/catalog_name/schema_name/currency_exchange_rates/latest.csv
      • Cluster Policy: (Optional) Select a cluster policy for cost control and standardization.
      • Photon: Enable Photon for improved performance.
      • Enhanced Autoscaling: Enable for optimal resource utilization.
    • Click “Create”.
  3. Start the DLT Pipeline:

    • Once created, select your pipeline and click “Start”.
    • Monitor the pipeline in the DLT UI. It will show the graph of tables and their status (Initializing, Running, Updating).
  4. Verify Data:

    • After the pipeline runs successfully, go to the SQL Editor or browse your Unity Catalog.
    • Query the created tables:
      SELECT * FROM main.default.customs_lakehouse.bronze_customs_declarations;
      SELECT * FROM main.default.customs_lakehouse.silver_customs_declarations;
      SELECT * FROM main.default.customs_lakehouse.silver_hs_code_master;
      
    • Check for the expected number of rows, data types, and transformations.
    • Observe how the DCL004 (invalid HS code format) and DCL005 (negative value) records are handled by the EXPECT clauses in silver_customs_declarations. Depending on the policy (_or_drop), they might be dropped or still present with quality metrics indicating violations.

Production Considerations

  • Error Handling:
    • DLT Expectations: We’ve implemented EXPECT and EXPECT_ALL_OR_DROP for bronze_customs_declarations and silver_customs_declarations. For silver_hs_code_master, we used EXPECT_ALL_OR_FAIL to ensure the integrity of critical master data.
    • Dead Letter Queues (DLQs): For records dropped by _or_drop expectations, DLT provides visibility in the UI. For more robust error handling, consider configuring a custom DLQ strategy where malformed records are written to a separate Delta table for manual review and reprocessing. This would involve a separate DLT table definition that reads from the “dropped” output of the main table.
    • Try-Except Blocks: For complex transformations outside DLT’s declarative expectations, Python’s try-except blocks can handle specific data parsing errors.
  • Performance Optimization:
    • DLT Serverless: Utilize Databricks Serverless DLT for hands-off infrastructure management and optimized cost-performance.
    • Enhanced Autoscaling: DLT pipelines automatically scale resources up or down based on workload, but ensure Enhanced Autoscaling is enabled for optimal performance.
    • Photon: Enable Photon runtime for all DLT pipelines to accelerate Spark workloads.
    • Small File Compaction: Delta Lake automatically handles small file compaction with OPTIMIZE commands. DLT pipelines often incorporate this automatically.
    • Z-Ordering: For large tables with frequent filtering on specific columns (e.g., declaration_date, declared_hs_code), consider applying ZORDER to improve query performance. While DLT can automate OPTIMIZE, ZORDER might need explicit configuration or a separate DLT job.
  • Security Considerations:
    • Unity Catalog: All tables are managed by Unity Catalog, providing centralized governance. Ensure appropriate grants are set for users and service principals accessing or modifying these tables (e.g., SELECT on Silver, MODIFY on Bronze/Silver for DLT service principal).
    • Data Masking/Tokenization: The importer_exporter_name or other sensitive fields in silver_customs_declarations might require masking or tokenization before being exposed to broader analytical teams. This can be implemented as an additional transformation step in the Silver layer or through Unity Catalog’s Row/Column Level Security.
    • Least Privilege: Grant DLT service principals and users only the necessary permissions to access source data and write to target tables.
  • Logging and Monitoring:
    • DLT UI: The DLT UI provides comprehensive logs, data quality metrics, and lineage information.
    • Databricks Monitoring: Integrate DLT logs with Databricks monitoring tools (e.g., Ganglia, Spark UI) and external monitoring solutions (e.g., Azure Monitor, AWS CloudWatch, Splunk) for real-time alerts on pipeline failures, performance degradation, or data quality violations.
    • Custom Logging: For debugging complex transformations, add standard Python logging statements within your DLT functions.

Code Review Checkpoint

At this checkpoint, we have successfully implemented the initial layers of our Customs Trade Data Lakehouse.

Summary of what was built:

  • Bronze Layer (bronze_customs_declarations): A streaming DLT table ingesting raw customs declaration data from a cloud storage landing zone, ensuring basic data quality for declaration_id.
  • Silver Layer (silver_customs_declarations): A streaming DLT table transforming the Bronze data, performing cleaning, type conversions, currency conversion to USD, and applying more rigorous data quality checks for HS code format, positive values, and valid declaration dates.
  • HS Code Master Data (silver_hs_code_master): A batch DLT table loading a reference dataset of HS codes and their associated tariffs, with strict quality checks to ensure master data integrity.

Files created/modified:

  • src/data_lakehouse/customs/schemas/customs_schemas.py
  • src/data_lakehouse/customs/dlt_pipelines/customs_dlt_pipeline.py

How it integrates with existing code:

This chapter leverages the raw data ingestion mechanisms we established in earlier chapters (e.g., Kafka writing to cloud storage). The DLT pipelines now pick up this raw data, process it, and store it in a structured, governed format within Delta Lake tables managed by Unity Catalog. These Silver layer tables will serve as the foundation for the tariff impact analysis and anomaly detection components we build next.

Common Issues & Solutions

  1. Issue: cloudFiles.schemaLocation permissions error.

    • Symptom: DLT pipeline fails to start or process files with errors related to accessing the schema location.
    • Reason: The DLT service principal (or the user running the pipeline) does not have sufficient permissions to create or write to the specified cloudFiles.schemaLocation path in your cloud storage or Unity Catalog volume.
    • Solution: Ensure the identity used by your DLT pipeline has READ, WRITE, and CREATE permissions on the schemaLocation path. For Unity Catalog volumes, this means CREATE VOLUME, USE VOLUME, MODIFY privileges.
  2. Issue: Data quality expectations causing pipeline failures or unexpected drops.

    • Symptom: DLT pipeline fails or many rows are dropped, but the raw data looks “mostly” okay.
    • Reason: Your EXPECT clauses might be too strict for the incoming data quality, or the data transformations preceding the EXPECT are not robust enough. For example, to_date might fail if the date format isn’t consistently yyyy-MM-dd.
    • Solution:
      • Review EXPECT policies: Understand the difference between _or_drop, _or_fail, and default (metrics only). Adjust as needed.
      • Refine transformations: Add more robust cleaning steps before applying expectations (e.g., try_cast or coalesce with default values).
      • Inspect dropped records: Use DLT’s UI to view data quality metrics and sample dropped records to understand the exact violations.
      • Staged Expectations: Start with EXPECT (metrics only) to gather data quality insights without blocking the pipeline, then gradually introduce _or_drop or _or_fail as data quality improves.
  3. Issue: Schema evolution not handled correctly.

    • Symptom: New fields in incoming raw data are not appearing in the Bronze or Silver tables, or the pipeline fails due to schema mismatch.
    • Reason: While Auto Loader (cloudFiles) has robust schema evolution capabilities, sometimes explicit handling is needed, especially if schema changes are complex or involve nested structures.
    • Solution:
      • Auto Loader Default: cloudFiles is configured for schema inference and evolution by default. Ensure cloudFiles.schemaLocation is correctly configured.
      • cloudFiles.schemaHints: For specific fields, you can provide hints to Auto Loader.
      • APPLY CHANGES INTO: For Silver tables, if you expect schema changes from Bronze, you might need to re-evaluate how APPLY CHANGES INTO or explicit schema definition impacts downstream tables. Always ensure your DLT SELECT statements are resilient to schema changes in upstream tables (e.g., using col("new_field") with coalesce for nulls if the field might not always exist).

Testing & Verification

To thoroughly test and verify the work in this chapter:

  1. Run the DLT Pipeline in Continuous Mode: This ensures it’s configured for real-time processing and can handle new incoming data.
  2. Ingest More Sample Data: Add new JSON files to your RAW_CUSTOMS_LANDING_PATH (e.g., another set of 5-10 declarations, including some with known data quality issues) while the pipeline is running. Observe the DLT UI for new updates and successful processing.
  3. Monitor Data Quality Metrics: In the DLT UI, click on each table (bronze_customs_declarations, silver_customs_declarations, silver_hs_code_master) and review the “Data Quality” tab.
    • Verify that bronze_customs_declarations shows 100% adherence for valid_declaration_id (since we used _or_drop).
    • For silver_customs_declarations, check the valid_hs_code_format and positive_value expectations. The records with “123” HS code and “-500” value should show violations, but the pipeline should not fail (unless configured _or_fail). The valid_declaration_date should show 100% if all dates were valid strings.
    • For silver_hs_code_master, all expectations should show 100% adherence, otherwise, the pipeline would have failed.
  4. Query Data for Integrity:
    -- Verify Bronze layer
    SELECT COUNT(*) FROM main.default.customs_lakehouse.bronze_customs_declarations;
    SELECT * FROM main.default.customs_lakehouse.bronze_customs_declarations WHERE declaration_id IS NULL; -- Should be 0 if _or_drop worked
    
    -- Verify Silver Customs layer
    SELECT COUNT(*) FROM main.default.customs_lakehouse.silver_customs_declarations;
    SELECT * FROM main.default.customs_lakehouse.silver_customs_declarations WHERE declared_hs_code LIKE '123%'; -- Should be present, but DLT UI shows quality violation
    SELECT * FROM main.default.customs_lakehouse.silver_customs_declarations WHERE value_usd < 0; -- Should be present, but DLT UI shows quality violation
    SELECT * FROM main.default.customs_lakehouse.silver_customs_declarations WHERE currency = 'EUR'; -- Check if USD conversion is correct
    SELECT declaration_id, declaration_date, value_usd, _bronze_ingestion_timestamp, processing_timestamp FROM main.default.customs_lakehouse.silver_customs_declarations;
    
    -- Verify HS Code Master layer
    SELECT COUNT(*) FROM main.default.customs_lakehouse.silver_hs_code_master;
    SELECT * FROM main.default.customs_lakehouse.silver_hs_code_master WHERE hs_code = '847130';
    
  5. Check for Schema Evolution: If you introduce a new field in a subsequent raw JSON file, verify that bronze_customs_declarations automatically updates its schema.

If all checks pass, your Customs Trade Data Lakehouse is successfully established with core ingestion and data quality mechanisms!

Summary & Next Steps

In this comprehensive chapter, we successfully built the foundational layers of our Customs Trade Data Lakehouse using Databricks Delta Live Tables. We implemented:

  • A Bronze layer for raw, immutable customs declarations, leveraging Auto Loader for efficient streaming ingestion.
  • A Silver layer for cleaned, transformed, and currency-converted customs declarations, applying robust data quality expectations.
  • A Silver layer for HS Code Master Data, crucial for future validation and enrichment.

This robust Lakehouse structure, governed by Unity Catalog and powered by DLT, provides a single source of truth for our customs trade data. It’s now clean, standardized, and ready for advanced analytics.

In the next chapter, we will leverage this prepared data to implement HS Code Classification Validation and Anomaly Detection. This will involve comparing declared HS codes against historical patterns and master data to identify potential misclassifications or suspicious activities, further enhancing our supply chain intelligence.