Chapter 7: HS Code-based Tariff Impact Analysis with DLT

1. Chapter Introduction

In this chapter, we will build a robust, real-time data pipeline using Databricks Delta Live Tables (DLT) to perform HS Code-based tariff impact analysis. This pipeline will ingest raw trade data, enrich it with historical and current tariff rates, and then aggregate the estimated tariff costs to provide actionable insights into the financial impact of import/export duties.

Understanding tariff impacts is crucial for modern supply chains. Tariffs can significantly influence procurement costs, pricing strategies, and overall profitability. By automating this analysis with DLT, businesses can gain near real-time visibility into these costs, enabling proactive decision-making to mitigate risks and optimize trade routes or sourcing strategies. This step is a cornerstone for building a resilient and cost-effective supply chain.

As a prerequisite, we assume you have successfully set up your Databricks environment and Unity Catalog from previous chapters. Specifically, we’ll be working with a bronze_trade_data table (which we’ll simulate for this chapter if not already present) that contains raw import/export records, including HS codes, origin/destination countries, and declared values. We’ll also need a source for historical tariff rates, which we will simulate as a static Delta table for simplicity in this tutorial, although in a real-world scenario, it would likely be ingested from an external API or data provider.

Upon completion of this chapter, you will have a fully functional DLT pipeline that generates:

  • A Silver Delta table (silver_enriched_trade_tariffs) containing individual trade records enriched with the applicable tariff rates and calculated estimated tariff costs.
  • A Gold Delta table (gold_tariff_impact_summary) providing aggregated insights into total tariff costs by product, country, and time period, ready for downstream analytics and reporting.

2. Planning & Design

Component Architecture for Tariff Impact Analysis

Our DLT pipeline for tariff impact analysis will follow the Medallion Architecture pattern (Bronze -> Silver -> Gold).

  • Bronze Layer: Raw trade data (bronze_trade_data) ingested from various sources (e.g., Kafka, flat files). This layer is assumed to be available from previous chapters or will be mocked.
  • Reference Data Ingestion (Static Silver): For tariff rates, we will create a silver_tariff_rates table. In a production environment, this table would likely be populated by another DLT pipeline consuming from an external tariff data API or batch files. For this tutorial, we will directly create and populate it with sample data.
  • Silver Layer (Enrichment): The silver_enriched_trade_tariffs table will be derived from the bronze_trade_data by joining with silver_tariff_rates to apply the correct tariff based on HS Code, origin/destination, and date.
  • Gold Layer (Aggregation): The gold_tariff_impact_summary table will aggregate the enriched data to provide high-level summaries suitable for business intelligence dashboards and further analysis.
graph TD A[Raw Trade Data Kafka Files] --> Bronze_Layer[Bronze Layer Trade Data] C[External Tariff Data API CSV] --> Silver_Tariff_Rates[Static Silver Layer Tariff Rates] B -->|DLT Pipeline Enrichment| E[Silver Layer Enriched Trade Tariffs] D -->|Used for Lookup| E E -->|DLT Pipeline Aggregation| F[Gold Layer Tariff Impact Summary] F -->|Used for BI Reporting| G[Analytics Dashboards]

Database Schema Design

We will leverage Unity Catalog for managing our tables. All tables will reside in a dedicated schema (e.g., supply_chain_analytics).

  • bronze_trade_data (Assumed existing, or mocked for this chapter):

    • id STRING (Unique shipment ID)
    • event_timestamp TIMESTAMP (Timestamp of the trade event)
    • raw_hs_code STRING (Harmonized System Code, potentially needing cleanup)
    • product_description STRING
    • origin_country STRING (ISO 2-letter code)
    • destination_country STRING (ISO 2-letter code)
    • declared_value DECIMAL(18, 2) (Value of goods)
    • quantity DECIMAL(18, 2)
    • unit_of_measure STRING
    • currency STRING
    • _ingestion_timestamp TIMESTAMP
  • silver_tariff_rates (New, static for this chapter):

    • hs_code STRING (Cleaned HS Code)
    • origin_country STRING
    • destination_country STRING
    • effective_start_date DATE
    • effective_end_date DATE
    • tariff_rate_percentage DECIMAL(5, 4) (e.g., 0.05 for 5%)
    • tariff_rate_fixed_per_unit DECIMAL(10, 4) (e.g., $1.50 per unit)
    • currency STRING
    • tariff_description STRING
  • silver_enriched_trade_tariffs (New, DLT managed):

    • trade_id STRING (from id in bronze)
    • event_timestamp TIMESTAMP
    • hs_code STRING (Cleaned HS Code)
    • product_description STRING
    • origin_country STRING
    • destination_country STRING
    • declared_value DECIMAL(18, 2)
    • quantity DECIMAL(18, 2)
    • unit_of_measure STRING
    • currency STRING
    • applied_tariff_rate_percentage DECIMAL(5, 4)
    • applied_tariff_rate_fixed_per_unit DECIMAL(10, 4)
    • estimated_tariff_cost DECIMAL(18, 2)
    • _processing_timestamp TIMESTAMP
  • gold_tariff_impact_summary (New, DLT managed):

    • reporting_period_month DATE (e.g., first day of the month)
    • hs_code STRING
    • origin_country STRING
    • destination_country STRING
    • total_declared_value DECIMAL(20, 2)
    • total_quantity DECIMAL(20, 2)
    • total_estimated_tariff_cost DECIMAL(20, 2)
    • average_tariff_rate_percentage DECIMAL(5, 4)
    • num_shipments BIGINT
    • _processing_timestamp TIMESTAMP

File Structure

We will organize our DLT pipeline code within a dlt_pipelines directory.

.
├── dlt_pipelines/
│   ├── tariff_analysis/
│   │   ├── config.py
│   │   ├── 01_tariff_rates_ingestion.py
│   │   ├── 02_trade_tariff_enrichment.py
│   │   └── 03_tariff_impact_aggregation.py
│   └── __init__.py
├── README.md
└── requirements.txt

3. Step-by-Step Implementation

We will create the necessary files and implement the DLT logic incrementally.

a) Setup/Configuration

First, let’s establish our project structure and a configuration file.

  1. Create Project Directories: If you haven’t already, create the dlt_pipelines/tariff_analysis directories.

    mkdir -p dlt_pipelines/tariff_analysis
    touch dlt_pipelines/__init__.py
    
  2. Create dlt_pipelines/tariff_analysis/config.py: This file will hold common configurations like our Unity Catalog schema name.

    # dlt_pipelines/tariff_analysis/config.py
    
    import os
    
    # Define the Unity Catalog schema where DLT tables will be created
    # Replace 'your_catalog' and 'your_schema' with your actual Unity Catalog and schema names
    # Example: 'main.supply_chain_analytics'
    UNITY_CATALOG_SCHEMA = os.getenv("UNITY_CATALOG_SCHEMA", "main.supply_chain_analytics")
    
    # Define table names
    BRONZE_TRADE_DATA_TABLE = f"{UNITY_CATALOG_SCHEMA}.bronze_trade_data"
    SILVER_TARIFF_RATES_TABLE = f"{UNITY_CATALOG_SCHEMA}.silver_tariff_rates"
    SILVER_ENRICHED_TRADE_TARIFFS_TABLE = f"{UNITY_CATALOG_SCHEMA}.silver_enriched_trade_tariffs"
    GOLD_TARIFF_IMPACT_SUMMARY_TABLE = f"{UNITY_CATALOG_SCHEMA}.gold_tariff_impact_summary"
    
    # Define DLT pipeline name (for logging/identification)
    DLT_PIPELINE_NAME = "tariff_impact_analysis_pipeline"
    
    # Logging configuration
    LOGGING_CONFIG = {
        'version': 1,
        'disable_existing_loggers': False,
        'formatters': {
            'standard': {
                'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            },
        },
        'handlers': {
            'console': {
                'level': 'INFO',
                'formatter': 'standard',
                'class': 'logging.StreamHandler',
            },
        },
        'loggers': {
            '': {  # root logger
                'handlers': ['console'],
                'level': 'INFO',
                'propagate': True
            },
            'dlt_tariff_analysis': {
                'handlers': ['console'],
                'level': 'INFO',
                'propagate': False
            },
        }
    }
    

    Explanation:

    • We define the UNITY_CATALOG_SCHEMA as an environment variable for flexibility. This is crucial for production deployments where you might have different catalogs/schemas for dev, staging, and prod.
    • Table names are constructed using f-strings for clarity and consistency.
    • Basic logging configuration is included, which is good practice for any production-ready application.

b) Core Implementation

We will now implement the DLT pipelines. Remember, DLT pipelines are typically defined in Python or SQL files that are then deployed as a single pipeline.

i. Simulate Bronze Trade Data (If not already present)

For this chapter, if you don’t have bronze_trade_data from previous steps, we’ll create a simple one. In a production scenario, this table would be continuously populated by a separate ingestion pipeline (e.g., from Kafka).

Create a Databricks Notebook (e.g., setup_bronze_data.py) in your workspace:

# Databricks Notebook: setup_bronze_data.py
# Run this once to create sample bronze_trade_data if it doesn't exist

import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DecimalType

# Import configuration
import sys
import os
# Add the parent directory of dlt_pipelines to the Python path
# This assumes setup_bronze_data.py is at the root level alongside dlt_pipelines
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), 'dlt_pipelines', 'tariff_analysis')))
from config import UNITY_CATALOG_SCHEMA, BRONZE_TRADE_DATA_TABLE

print(f"Using Unity Catalog Schema: {UNITY_CATALOG_SCHEMA}")
print(f"Creating/Updating Bronze Table: {BRONZE_TRADE_DATA_TABLE}")

# Define schema for raw trade data
trade_data_schema = StructType([
    StructField("id", StringType(), True),
    StructField("event_timestamp", TimestampType(), True),
    StructField("raw_hs_code", StringType(), True),
    StructField("product_description", StringType(), True),
    StructField("origin_country", StringType(), True),
    StructField("destination_country", StringType(), True),
    StructField("declared_value", DecimalType(18, 2), True),
    StructField("quantity", DecimalType(18, 2), True),
    StructField("unit_of_measure", StringType(), True),
    StructField("currency", StringType(), True)
])

# Sample data for bronze_trade_data
sample_trade_data = [
    ("TID001", "2024-10-01 10:00:00", "851712", "Smartphones", "CN", "US", 10000.00, 100, "PCS", "USD"),
    ("TID002", "2024-10-02 11:30:00", "847130", "Laptops", "TW", "US", 15000.00, 50, "PCS", "USD"),
    ("TID003", "2024-11-05 09:00:00", "870323", "Electric Cars", "DE", "FR", 500000.00, 10, "PCS", "EUR"),
    ("TID004", "2024-11-10 14:00:00", "851712", "Smartphones", "CN", "US", 12000.00, 120, "PCS", "USD"),
    ("TID005", "2024-12-01 16:00:00", "847130", "Laptops", "TW", "US", 18000.00, 60, "PCS", "USD"),
    ("TID006", "2024-12-05 08:00:00", "870323", "Electric Cars", "DE", "FR", 600000.00, 12, "PCS", "EUR"),
    ("TID007", "2025-01-01 10:00:00", "851712", "Smartphones", "VN", "US", 9000.00, 90, "PCS", "USD"), # New origin
    ("TID008", "2025-01-02 11:30:00", "847130", "Laptops", "TW", "GB", 16000.00, 55, "PCS", "GBP"), # New destination
]

# Convert event_timestamp strings to actual timestamps
parsed_trade_data = []
for row in sample_trade_data:
    parsed_row = list(row)
    parsed_row[1] = F.to_timestamp(F.lit(row[1])).cast(TimestampType())
    parsed_trade_data.append(parsed_row)

# Create a DataFrame
df = spark.createDataFrame(sample_trade_data, schema=trade_data_schema) \
    .withColumn("event_timestamp", F.to_timestamp(F.col("event_timestamp"))) \
    .withColumn("_ingestion_timestamp", F.current_timestamp())

# Write to Unity Catalog as a Delta table
# Ensure the catalog and schema exist first:
# spark.sql(f"CREATE CATALOG IF NOT EXISTS {UNITY_CATALOG_SCHEMA.split('.')[0]}")
# spark.sql(f"CREATE SCHEMA IF NOT EXISTS {UNITY_CATALOG_SCHEMA}")

df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(BRONZE_TRADE_DATA_TABLE)

print(f"Sample data successfully written to {BRONZE_TRADE_DATA_TABLE}")

# Verify data
display(spark.sql(f"SELECT * FROM {BRONZE_TRADE_DATA_TABLE} LIMIT 5"))

Explanation:

  • This is a standard Databricks notebook, not a DLT pipeline. It’s used to set up our bronze_trade_data table in Unity Catalog for demonstration purposes.
  • We import dlt for context but primarily use standard PySpark for DataFrame operations.
  • The sys.path.append is crucial to allow the notebook to import our config.py from the dlt_pipelines/tariff_analysis directory.
  • A StructType defines the schema for our bronze table.
  • Sample data is created, converted to a DataFrame, and written to Unity Catalog using saveAsTable with mode("overwrite") and option("overwriteSchema", "true") for idempotency during setup.
  • Action: Run this notebook once in your Databricks workspace. Make sure to replace main.supply_chain_analytics with your actual Unity Catalog and schema name if different, and ensure the catalog and schema exist or create them with spark.sql("CREATE CATALOG IF NOT EXISTS ...") and spark.sql("CREATE SCHEMA IF NOT EXISTS ...").
ii. Ingest Static Tariff Rates (01_tariff_rates_ingestion.py)

This DLT file will create our silver_tariff_rates table. While marked as “static” for this tutorial (meaning we define data directly in code), in a real DLT pipeline, this would typically read from a streaming or batch source of tariff data.

# dlt_pipelines/tariff_analysis/01_tariff_rates_ingestion.py

import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DateType, DecimalType
import logging
import logging.config
import sys
import os

# Import configuration
# Add the parent directory to the Python path if running as a DLT pipeline from the root
# DLT automatically handles module imports if files are in the same pipeline definition.
# This path modification is more for local testing or if this file were standalone.
# For DLT, ensure all Python files are part of the same pipeline source.
try:
    from dlt_pipelines.tariff_analysis.config import (
        UNITY_CATALOG_SCHEMA, SILVER_TARIFF_RATES_TABLE, LOGGING_CONFIG
    )
    logging.config.dictConfig(LOGGING_CONFIG)
    logger = logging.getLogger('dlt_tariff_analysis')
except ImportError:
    # Fallback for local testing outside DLT context or if config is not in path
    print("Warning: Could not import config. Assuming default values for local testing.")
    UNITY_CATALOG_SCHEMA = "main.supply_chain_analytics" # Default for local testing
    SILVER_TARIFF_RATES_TABLE = f"{UNITY_CATALOG_SCHEMA}.silver_tariff_rates"
    logger = logging.getLogger('dlt_tariff_analysis')
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)


logger.info(f"Configured for Unity Catalog Schema: {UNITY_CATALOG_SCHEMA}")
logger.info(f"Targeting Silver Tariff Rates Table: {SILVER_TARIFF_RATES_TABLE}")

@dlt.table(
    name="silver_tariff_rates",
    comment="Reference table for HS Code-based tariff rates, including historical data.",
    table_properties={
        "delta.logRetentionDuration": "30 days",
        "delta.deletedFileRetentionDuration": "7 days",
        "pipelines.autoOptimize.zorderCols": "hs_code",
    }
)
@dlt.expect_or_drop("valid_hs_code", "hs_code IS NOT NULL AND LENGTH(hs_code) >= 6")
@dlt.expect_or_drop("valid_countries", "origin_country IS NOT NULL AND destination_country IS NOT NULL")
@dlt.expect_or_drop("valid_effective_dates", "effective_start_date IS NOT NULL AND effective_end_date IS NOT NULL AND effective_start_date <= effective_end_date")
def create_silver_tariff_rates():
    """
    Creates or updates the silver_tariff_rates table with sample tariff data.
    In a production scenario, this would typically ingest from a more dynamic source.
    """
    logger.info("Generating sample tariff rates data for silver_tariff_rates table.")

    tariff_data = [
        ("851712", "CN", "US", "2024-01-01", "2024-10-31", 0.15, 0.00, "USD", "Standard tariff for smartphones from China to US"),
        ("851712", "CN", "US", "2024-11-01", "2025-12-31", 0.18, 0.00, "USD", "Increased tariff for smartphones from China to US"), # Tariff increase
        ("851712", "VN", "US", "2025-01-01", "2025-12-31", 0.05, 0.00, "USD", "Lower tariff for smartphones from Vietnam to US"), # New origin tariff
        ("847130", "TW", "US", "2024-01-01", "2025-12-31", 0.07, 0.00, "USD", "Standard tariff for laptops from Taiwan to US"),
        ("870323", "DE", "FR", "2024-01-01", "2025-12-31", 0.02, 0.00, "EUR", "Intra-EU tariff for electric cars"),
        ("847130", "TW", "GB", "2025-01-01", "2025-12-31", 0.08, 0.00, "GBP", "Post-Brexit tariff for laptops from Taiwan to UK"), # New destination tariff
    ]

    schema = StructType([
        StructField("hs_code", StringType(), False),
        StructField("origin_country", StringType(), False),
        StructField("destination_country", StringType(), False),
        StructField("effective_start_date", DateType(), False),
        StructField("effective_end_date", DateType(), False),
        StructField("tariff_rate_percentage", DecimalType(5, 4), False),
        StructField("tariff_rate_fixed_per_unit", DecimalType(10, 4), False),
        StructField("currency", StringType(), False),
        StructField("tariff_description", StringType(), True),
    ])

    return (
        spark.createDataFrame(data=tariff_data, schema=schema)
        .withColumn("effective_start_date", F.to_date(F.col("effective_start_date")))
        .withColumn("effective_end_date", F.to_date(F.col("effective_end_date")))
    )

Explanation:

  • @dlt.table decorator: Defines a Delta Live Table. We specify a descriptive comment and table_properties for better maintainability and performance. pipelines.autoOptimize.zorderCols is used to optimize reads on hs_code.
  • @dlt.expect_or_drop: These are DLT’s data quality constraints. If a record fails any of these expectations, it will be dropped from the table, preventing bad data from polluting downstream layers. This is a critical production best practice.
  • create_silver_tariff_rates function: This function generates a Spark DataFrame from static Python data. We cast date strings to DateType.
  • Logging: We use Python’s logging module to provide informative messages during pipeline execution, aiding in debugging and monitoring. The try-except block for config import helps with local development.
  • Action: This file will be part of your DLT pipeline definition.
iii. Trade Tariff Enrichment (02_trade_tariff_enrichment.py)

This DLT file will read from the bronze_trade_data and join it with silver_tariff_rates to calculate the estimated tariff cost.

# dlt_pipelines/tariff_analysis/02_trade_tariff_enrichment.py

import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, TimestampType
import logging
import logging.config
import sys
import os

# Import configuration
try:
    from dlt_pipelines.tariff_analysis.config import (
        UNITY_CATALOG_SCHEMA, BRONZE_TRADE_DATA_TABLE, SILVER_TARIFF_RATES_TABLE,
        SILVER_ENRICHED_TRADE_TARIFFS_TABLE, LOGGING_CONFIG
    )
    logging.config.dictConfig(LOGGING_CONFIG)
    logger = logging.getLogger('dlt_tariff_analysis')
except ImportError:
    print("Warning: Could not import config. Assuming default values for local testing.")
    UNITY_CATALOG_SCHEMA = "main.supply_chain_analytics"
    BRONZE_TRADE_DATA_TABLE = f"{UNITY_CATALOG_SCHEMA}.bronze_trade_data"
    SILVER_TARIFF_RATES_TABLE = f"{UNITY_CATALOG_SCHEMA}.silver_tariff_rates"
    SILVER_ENRICHED_TRADE_TARIFFS_TABLE = f"{UNITY_CATALOG_SCHEMA}.silver_enriched_trade_tariffs"
    logger = logging.getLogger('dlt_tariff_analysis')
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)

logger.info(f"Configured for Unity Catalog Schema: {UNITY_CATALOG_SCHEMA}")
logger.info(f"Reading from Bronze Table: {BRONZE_TRADE_DATA_TABLE}")
logger.info(f"Reading from Silver Tariff Rates Table: {SILVER_TARIFF_RATES_TABLE}")
logger.info(f"Targeting Silver Enriched Trade Tariffs Table: {SILVER_ENRICHED_TRADE_TARIFFS_TABLE}")

@dlt.table(
    name="silver_enriched_trade_tariffs",
    comment="Trade data enriched with applied tariff rates and estimated tariff costs.",
    table_properties={
        "delta.logRetentionDuration": "30 days",
        "delta.deletedFileRetentionDuration": "7 days",
        "pipelines.autoOptimize.zorderCols": "hs_code, origin_country, destination_country, event_timestamp",
    }
)
@dlt.expect_or_drop("valid_declared_value", "declared_value IS NOT NULL AND declared_value >= 0")
@dlt.expect_or_drop("tariff_cost_calculated", "estimated_tariff_cost IS NOT NULL")
def create_silver_enriched_trade_tariffs():
    """
    Reads bronze trade data, joins with silver tariff rates, and calculates estimated tariff costs.
    """
    logger.info("Reading bronze trade data from %s", BRONZE_TRADE_DATA_TABLE)
    bronze_df = dlt.read_stream(BRONZE_TRADE_DATA_TABLE)

    logger.info("Reading silver tariff rates from %s", SILVER_TARIFF_RATES_TABLE)
    tariff_df = dlt.read(SILVER_TARIFF_RATES_TABLE) # Use dlt.read for static/slowly changing dimension table

    logger.info("Joining trade data with tariff rates to enrich.")
    enriched_df = (
        bronze_df.alias("trade")
        .join(
            tariff_df.alias("tariff"),
            (F.col("trade.raw_hs_code") == F.col("tariff.hs_code")) &
            (F.col("trade.origin_country") == F.col("tariff.origin_country")) &
            (F.col("trade.destination_country") == F.col("tariff.destination_country")) &
            (F.to_date(F.col("trade.event_timestamp")) >= F.col("tariff.effective_start_date")) &
            (F.to_date(F.col("trade.event_timestamp")) <= F.col("tariff.effective_end_date")),
            "leftouter" # Use leftouter to keep all trade records, even if no tariff match
        )
        .withColumn("applied_tariff_rate_percentage", F.col("tariff.tariff_rate_percentage"))
        .withColumn("applied_tariff_rate_fixed_per_unit", F.col("tariff.tariff_rate_fixed_per_unit"))
        .withColumn("estimated_tariff_cost_percentage",
                    F.when(F.col("tariff.tariff_rate_percentage").isNotNull(),
                           F.col("trade.declared_value") * F.col("tariff.tariff_rate_percentage"))
                    .otherwise(0.00))
        .withColumn("estimated_tariff_cost_fixed",
                    F.when(F.col("tariff.tariff_rate_fixed_per_unit").isNotNull(),
                           F.col("trade.quantity") * F.col("tariff.tariff_rate_fixed_per_unit"))
                    .otherwise(0.00))
        .withColumn("estimated_tariff_cost",
                    (F.col("estimated_tariff_cost_percentage") + F.col("estimated_tariff_cost_fixed"))
                    .cast(DecimalType(18, 2)))
        .select(
            F.col("trade.id").alias("trade_id"),
            F.col("trade.event_timestamp"),
            F.col("trade.raw_hs_code").alias("hs_code"),
            F.col("trade.product_description"),
            F.col("trade.origin_country"),
            F.col("trade.destination_country"),
            F.col("trade.declared_value"),
            F.col("trade.quantity"),
            F.col("trade.unit_of_measure"),
            F.col("trade.currency"),
            F.col("applied_tariff_rate_percentage"),
            F.col("applied_tariff_rate_fixed_per_unit"),
            F.col("estimated_tariff_cost"),
            F.current_timestamp().alias("_processing_timestamp")
        )
    )
    logger.info("Finished enriching trade data with tariff rates.")
    return enriched_df

Explanation:

  • dlt.read_stream(BRONZE_TRADE_DATA_TABLE): This is crucial. It tells DLT to treat bronze_trade_data as a streaming source, enabling continuous processing. If bronze_trade_data is a batch table, DLT will still process it incrementally.
  • dlt.read(SILVER_TARIFF_RATES_TABLE): For reference tables that are slowly changing or static, dlt.read is used. DLT intelligently handles the refresh of these lookup tables.
  • Join Logic: The join conditions are critical: hs_code, origin_country, destination_country, and the event_timestamp falling within the tariff’s effective_start_date and effective_end_date. This ensures the correct historical tariff is applied.
  • leftouter join: This ensures that all trade records are kept, even if a matching tariff rate isn’t found. This is important for identifying trade without tariff impact or for flagging missing tariff data.
  • Tariff Cost Calculation: We calculate estimated_tariff_cost by summing percentage-based and fixed-per-unit tariffs. F.when().otherwise() handles cases where a tariff type might be NULL.
  • Data Quality Expectations: valid_declared_value and tariff_cost_calculated ensure core financial data integrity.
  • Logging: Clear logging messages indicate the stage of processing, which is invaluable for monitoring and debugging production pipelines.
  • Action: This file will be part of your DLT pipeline definition.
iv. Tariff Impact Aggregation (03_tariff_impact_aggregation.py)

This DLT file will aggregate the enriched data to provide high-level summaries.

# dlt_pipelines/tariff_analysis/03_tariff_impact_aggregation.py

import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, LongType, DateType
import logging
import logging.config
import sys
import os

# Import configuration
try:
    from dlt_pipelines.tariff_analysis.config import (
        UNITY_CATALOG_SCHEMA, SILVER_ENRICHED_TRADE_TARIFFS_TABLE,
        GOLD_TARIFF_IMPACT_SUMMARY_TABLE, LOGGING_CONFIG
    )
    logging.config.dictConfig(LOGGING_CONFIG)
    logger = logging.getLogger('dlt_tariff_analysis')
except ImportError:
    print("Warning: Could not import config. Assuming default values for local testing.")
    UNITY_CATALOG_SCHEMA = "main.supply_chain_analytics"
    SILVER_ENRICHED_TRADE_TARIFFS_TABLE = f"{UNITY_CATALOG_SCHEMA}.silver_enriched_trade_tariffs"
    GOLD_TARIFF_IMPACT_SUMMARY_TABLE = f"{UNITY_CATALOG_SCHEMA}.gold_tariff_impact_summary"
    logger = logging.getLogger('dlt_tariff_analysis')
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)

logger.info(f"Configured for Unity Catalog Schema: {UNITY_CATALOG_SCHEMA}")
logger.info(f"Reading from Silver Table: {SILVER_ENRICHED_TRADE_TARIFFS_TABLE}")
logger.info(f"Targeting Gold Tariff Impact Summary Table: {GOLD_TARIFF_IMPACT_SUMMARY_TABLE}")

@dlt.table(
    name="gold_tariff_impact_summary",
    comment="Aggregated summary of tariff impacts by HS Code, country, and reporting period.",
    table_properties={
        "delta.logRetentionDuration": "30 days",
        "delta.deletedFileRetentionDuration": "7 days",
        "pipelines.autoOptimize.zorderCols": "reporting_period_month, hs_code, origin_country, destination_country",
    }
)
@dlt.expect_or_fail("positive_total_tariff_cost", "total_estimated_tariff_cost >= 0")
@dlt.expect_or_fail("positive_num_shipments", "num_shipments > 0")
def create_gold_tariff_impact_summary():
    """
    Aggregates enriched trade data to summarize tariff impacts.
    """
    logger.info("Reading enriched trade data from %s", SILVER_ENRICHED_TRADE_TARIFFS_TABLE)
    silver_df = dlt.read_stream(SILVER_ENRICHED_TRADE_TARIFFS_TABLE)

    logger.info("Aggregating tariff impact data.")
    gold_df = (
        silver_df
        .withColumn("reporting_period_month", F.trunc(F.col("event_timestamp"), "month"))
        .groupBy(
            F.col("reporting_period_month"),
            F.col("hs_code"),
            F.col("origin_country"),
            F.col("destination_country")
        )
        .agg(
            F.sum("declared_value").alias("total_declared_value"),
            F.sum("quantity").alias("total_quantity"),
            F.sum("estimated_tariff_cost").alias("total_estimated_tariff_cost"),
            F.avg("applied_tariff_rate_percentage").alias("average_tariff_rate_percentage"),
            F.count("trade_id").alias("num_shipments")
        )
        .withColumn("_processing_timestamp", F.current_timestamp())
        .select(
            F.col("reporting_period_month").cast(DateType()), # Ensure consistent type
            F.col("hs_code"),
            F.col("origin_country"),
            F.col("destination_country"),
            F.col("total_declared_value").cast(DecimalType(20, 2)),
            F.col("total_quantity").cast(DecimalType(20, 2)),
            F.col("total_estimated_tariff_cost").cast(DecimalType(20, 2)),
            F.col("average_tariff_rate_percentage").cast(DecimalType(5, 4)),
            F.col("num_shipments").cast(LongType()),
            F.col("_processing_timestamp").cast(TimestampType())
        )
    )
    logger.info("Finished aggregating tariff impact data.")
    return gold_df

Explanation:

  • dlt.read_stream(SILVER_ENRICHED_TRADE_TARIFFS_TABLE): Reads from the previously created Silver table as a stream.
  • F.trunc(F.col("event_timestamp"), "month"): This function is used to group data by month, creating a reporting_period_month column.
  • groupBy and agg: Standard Spark operations to perform aggregations like sum of declared value, total tariff cost, average tariff rate, and count of shipments.
  • @dlt.expect_or_fail: These are stricter data quality rules. If a record fails, the entire pipeline run will fail. This is suitable for gold tables where data quality is paramount for business decisions.
  • Type Casting: Explicitly casting aggregated columns to ensure correct types and precision in the Gold table.
  • Logging: Provides visibility into the aggregation process.
  • Action: This file will be part of your DLT pipeline definition.

c) Testing This Component

To test the DLT pipeline, you need to create a DLT pipeline definition in your Databricks workspace.

  1. Create a DLT Pipeline:

    • Navigate to “Workflows” -> “Delta Live Tables” in your Databricks workspace.
    • Click “Create Pipeline”.
    • Pipeline name: Realtime_Tariff_Impact_Analysis (or tariff_impact_analysis_pipeline if you prefer to match the config)
    • Pipeline mode: Continuous (for real-time processing) or Triggered (for scheduled batch processing, good for initial testing). Let’s start with Triggered for easier verification.
    • Notebook Libraries: Select “Python” and add the paths to your DLT pipeline files:
      • /path/to/dlt_pipelines/tariff_analysis/01_tariff_rates_ingestion.py
      • /path/to/dlt_pipelines/tariff_analysis/02_trade_tariff_enrichment.py
      • /path/to/dlt_pipelines/tariff_analysis/03_tariff_impact_aggregation.py (Replace /path/to/ with the actual path in your workspace, e.g., /Users/your.email@example.com/dlt_pipelines/tariff_analysis/01_tariff_rates_ingestion.py)
    • Target schema: Enter your Unity Catalog schema, e.g., main.supply_chain_analytics. This should match the UNITY_CATALOG_SCHEMA in your config.py.
    • Compute:
      • Pipeline type: Serverless (recommended for DLT for managed infrastructure)
      • (If Serverless is not available, choose Fixed size or Autoscaling cluster. Ensure you select a DLT runtime version.)
    • Advanced Options:
      • Configuration: Add a Spark configuration spark.databricks.io.cache.enabled true for performance.
      • Storage location: (Optional but recommended) A cloud storage path for checkpointing and pipeline logs, e.g., s3://your-bucket/dlt_checkpoints/tariff_analysis/ or abfss://your-container@your-adls.dfs.core.windows.net/dlt_checkpoints/tariff_analysis/.
    • Click “Create”.
  2. Run the Pipeline:

    • After creation, click the “Start” button on your pipeline.
    • Monitor the DLT UI for progress. It will show the graph of tables being created and data flowing through them.
  3. Verify Data in Databricks SQL or a Notebook: Once the pipeline run is successful, you can query the created tables.

    Open a new Databricks Notebook or Databricks SQL Query Editor:

    -- Databricks SQL Query Editor or Notebook
    
    -- Replace 'main.supply_chain_analytics' with your actual Unity Catalog schema
    USE CATALOG main;
    USE SCHEMA supply_chain_analytics;
    
    -- Verify silver_tariff_rates table
    SELECT * FROM silver_tariff_rates ORDER BY hs_code, effective_start_date;
    
    -- Verify silver_enriched_trade_tariffs table
    SELECT
        trade_id,
        event_timestamp,
        hs_code,
        origin_country,
        destination_country,
        declared_value,
        applied_tariff_rate_percentage,
        estimated_tariff_cost
    FROM silver_enriched_trade_tariffs
    ORDER BY event_timestamp DESC
    LIMIT 10;
    
    -- Verify gold_tariff_impact_summary table
    SELECT
        reporting_period_month,
        hs_code,
        origin_country,
        destination_country,
        total_declared_value,
        total_estimated_tariff_cost,
        average_tariff_rate_percentage,
        num_shipments
    FROM gold_tariff_impact_summary
    ORDER BY reporting_period_month DESC, hs_code
    LIMIT 10;
    

    Expected behavior:

    • silver_tariff_rates should contain the static tariff data you defined.
    • silver_enriched_trade_tariffs should show your bronze trade records with applied_tariff_rate_percentage and estimated_tariff_cost populated based on the tariff data and event timestamp. Note how TID001 and TID004 (CN to US, HS 851712) have different tariffs applied based on their event_timestamp falling into different tariff periods. TID007 (VN to US) should have a lower tariff, and TID008 (TW to GB) should show the new GB tariff.
    • gold_tariff_impact_summary should show aggregated sums and averages for each month, HS code, and country pair.

4. Production Considerations

Deploying DLT pipelines to production requires careful planning for reliability, performance, security, and maintainability.

  • Error Handling:

    • DLT’s built-in expect statements (expect_or_drop, expect_or_fail) are paramount. Use expect_or_drop for non-critical data quality issues where you can tolerate dropping malformed records, and expect_or_fail for critical issues that warrant halting the pipeline to prevent downstream corruption.
    • Implement robust logging within your Python code to capture application-level errors and processing details.
    • Configure DLT to send notifications (e.g., to Slack, PagerDuty) on pipeline failures or data quality violations.
  • Performance Optimization:

    • Serverless DLT: Leverage Databricks Serverless DLT for automated infrastructure management and scaling. This significantly reduces operational overhead.
    • pipelines.autoOptimize.zorderCols: Use this table property on frequently joined or filtered columns (e.g., hs_code, event_timestamp, country codes) to optimize query performance. DLT automatically runs OPTIMIZE and ZORDER jobs.
    • Incremental Processing: Ensure dlt.read_stream() is used where appropriate to process only new data, minimizing compute resources.
    • Cluster Sizing (if not Serverless): Monitor pipeline metrics to fine-tune cluster size and autoscaling settings. Over-provisioning leads to cost, under-provisioning leads to delays.
    • Data Skew: Monitor for data skew during joins and aggregations. If encountered, consider repartitioning the DataFrame before the operation.
  • Security Considerations:

    • Unity Catalog: Utilize Unity Catalog for fine-grained access control (table, column, row-level). Grant only necessary permissions to DLT service principals.
    • Principle of Least Privilege: Ensure the service principal running the DLT pipeline has the minimum required permissions to read from source tables and write to target tables.
    • Data Masking/Encryption: If HS codes or trade values are considered sensitive, implement data masking for non-production environments or encryption at rest/in transit.
    • Network Security: Configure network access controls (e.g., Private Link, VPC/VNet peering) to restrict access to your Databricks workspace and underlying data sources.
  • Logging and Monitoring:

    • DLT UI: The DLT UI provides a visual graph of your pipeline, data flow, and health metrics.
    • Databricks Logs: Access detailed driver and executor logs for debugging.
    • System Tables: DLT and Unity Catalog expose system tables (e.g., system.billing.usage, system.workflow.events) that provide metadata, lineage, and performance metrics for monitoring.
    • External Monitoring: Integrate Databricks logs and metrics with external monitoring tools (e.g., Splunk, Datadog, Grafana) for centralized observability and alerting.
  • CI/CD and Deployment:

    • Databricks Asset Bundles (DABs): Use DABs for defining, deploying, and managing your DLT pipelines, notebooks, and infrastructure as code. This enables automated deployment workflows, version control, and consistency across environments (dev, staging, prod).
    • Git Integration: Store your DLT Python files in a Git repository (e.g., GitHub, GitLab, Azure DevOps). Databricks Workflows can be configured to trigger DLT pipeline updates on Git pushes.

5. Code Review Checkpoint

At this stage, you have implemented a complete DLT pipeline for HS Code-based tariff impact analysis.

Summary of what was built:

  • dlt_pipelines/tariff_analysis/config.py: Centralized configuration for Unity Catalog schema and table names.
  • setup_bronze_data.py (Databricks Notebook): A utility to create sample bronze_trade_data if not already present.
  • dlt_pipelines/tariff_analysis/01_tariff_rates_ingestion.py: A DLT table (silver_tariff_rates) containing static/historical tariff rates, with data quality expectations.
  • dlt_pipelines/tariff_analysis/02_trade_tariff_enrichment.py: A DLT table (silver_enriched_trade_tariffs) that streams from bronze_trade_data, joins with silver_tariff_rates based on HS code, country, and date, and calculates estimated_tariff_cost. Includes robust data quality checks.
  • dlt_pipelines/tariff_analysis/03_tariff_impact_aggregation.py: A DLT table (gold_tariff_impact_summary) that streams from silver_enriched_trade_tariffs, aggregates tariff impacts by reporting period, HS code, and country, and includes strict data quality checks.

Integration: These three Python files form a single DLT pipeline. When deployed together, DLT automatically orchestrates the dependencies: silver_tariff_rates is created first, then silver_enriched_trade_tariffs reads from it and bronze_trade_data, and finally gold_tariff_impact_summary reads from silver_enriched_trade_tariffs. The config.py ensures consistent naming and schema across all components.

6. Common Issues & Solutions

Here are some common issues you might encounter and how to address them:

  1. Issue: ImportError: No module named 'dlt_pipelines' when running DLT.

    • Cause: The DLT runtime cannot find your config.py or other modules if they are not correctly packaged or located relative to the DLT pipeline files.
    • Solution: Ensure that your DLT pipeline definition includes all necessary Python files (e.g., 01_tariff_rates_ingestion.py, 02_trade_tariff_enrichment.py, 03_tariff_impact_aggregation.py, and crucially, config.py if it’s meant to be imported directly). If config.py is in the same folder as the DLT Python files, it should be importable. If config.py is in a parent directory, you might need to adjust the Python path or structure your DLT deployment (e.g., using Databricks Repos or Asset Bundles) to ensure all modules are accessible. The try-except ImportError block in our DLT files helps with local testing but for DLT, all files in the pipeline definition are typically treated as one module.
  2. Issue: No tariff rates applied (estimated_tariff_cost is 0 or NULL) in silver_enriched_trade_tariffs.

    • Cause: Mismatched join keys, incorrect date ranges, or missing tariff data.
      • HS codes might not match exactly (e.g., “851712” vs “8517120000”).
      • Origin/destination country codes might differ (e.g., “USA” vs “US”).
      • The event_timestamp of the trade record falls outside the effective_start_date and effective_end_date of any available tariff.
      • bronze_trade_data contains records for which no tariff data exists in silver_tariff_rates.
    • Solution:
      • Debugging: Use display() or SELECT queries on bronze_trade_data and silver_tariff_rates separately to inspect the data.
      • Schema & Data Type Check: Ensure hs_code, country codes, and date fields have consistent data types and formats across both tables.
      • Join Condition Verification: Double-check the join logic, especially the date range condition (F.to_date(F.col("trade.event_timestamp")) >= F.col("tariff.effective_start_date")).
      • Data Profiling: Run data profiling on both source tables to identify unique values, common patterns, and potential data quality issues in HS codes or country names. Consider adding a data cleansing step for raw_hs_code in a Silver layer (e.g., padding/trimming) before this join if necessary.
      • Logging: Add more detailed logging statements in the enrichment function to log records that fail to find a tariff match.
  3. Issue: DLT pipeline fails due to expect_or_fail constraints.

    • Cause: Critical data quality issues in the input data that violate the strict expect_or_fail rules defined in your DLT tables. For example, total_estimated_tariff_cost becoming negative due to an upstream calculation error, or num_shipments being zero for an aggregated group.
    • Solution:
      • Analyze DLT UI: The DLT UI will show which expectation failed and for how many records.
      • Inspect Upstream Data: Query the source table (silver_enriched_trade_tariffs for the gold table) to identify the specific records that caused the failure.
      • Adjust Expectations (Carefully): If the failure reveals a legitimate data pattern you weren’t expecting but is acceptable, consider changing expect_or_fail to expect_or_drop or refining the expectation logic. However, for gold tables, it’s often better to fix the upstream data or processing logic to meet the stringent quality requirements.
      • Root Cause Analysis: Trace back to the source of the problematic data. Is it an ingestion error? A transformation bug? Fix it at the earliest possible stage in the pipeline.

7. Testing & Verification

To thoroughly test and verify the chapter’s work, follow these steps:

  1. Ensure setup_bronze_data.py has been run successfully. This populates your bronze_trade_data table with the initial set of sample records.

  2. Start your DLT pipeline.

    • Go to the DLT UI in Databricks.
    • Select your Realtime_Tariff_Impact_Analysis pipeline.
    • Click “Start” (if in Triggered mode) or ensure it’s running (if Continuous).
    • Wait for the pipeline run to complete successfully (all tables should show a green checkmark).
  3. Query the tables in Databricks SQL or a new notebook:

    -- Use your Unity Catalog and schema
    USE CATALOG main;
    USE SCHEMA supply_chain_analytics;
    
    -- 1. Verify silver_tariff_rates
    SELECT * FROM silver_tariff_rates ORDER BY hs_code, effective_start_date;
    -- Expected: All 6 tariff rules should be present, with correct dates and rates.
    
    -- 2. Verify silver_enriched_trade_tariffs
    SELECT
        trade_id,
        CAST(event_timestamp AS DATE) AS trade_date,
        hs_code,
        origin_country,
        destination_country,
        declared_value,
        quantity,
        applied_tariff_rate_percentage,
        estimated_tariff_cost
    FROM silver_enriched_trade_tariffs
    ORDER BY trade_date, trade_id;
    /*
    Expected:
    - TID001 (2024-10-01, CN->US, 851712): should have tariff 0.15, cost = 10000 * 0.15 = 1500.00
    - TID002 (2024-10-02, TW->US, 847130): should have tariff 0.07, cost = 15000 * 0.07 = 1050.00
    - TID003 (2024-11-05, DE->FR, 870323): should have tariff 0.02, cost = 500000 * 0.02 = 10000.00
    - TID004 (2024-11-10, CN->US, 851712): should have tariff 0.18 (new rate!), cost = 12000 * 0.18 = 2160.00
    - TID005 (2024-12-01, TW->US, 847130): should have tariff 0.07, cost = 18000 * 0.07 = 1260.00
    - TID006 (2024-12-05, DE->FR, 870323): should have tariff 0.02, cost = 600000 * 0.02 = 12000.00
    - TID007 (2025-01-01, VN->US, 851712): should have tariff 0.05 (new origin rate!), cost = 9000 * 0.05 = 450.00
    - TID008 (2025-01-02, TW->GB, 847130): should have tariff 0.08 (new destination rate!), cost = 16000 * 0.08 = 1280.00
    */
    
    -- 3. Verify gold_tariff_impact_summary
    SELECT
        reporting_period_month,
        hs_code,
        origin_country,
        destination_country,
        total_declared_value,
        total_quantity,
        total_estimated_tariff_cost,
        average_tariff_rate_percentage,
        num_shipments
    FROM gold_tariff_impact_summary
    ORDER BY reporting_period_month, hs_code, origin_country, destination_country;
    /*
    Expected:
    - Aggregations for Oct 2024, Nov 2024, Dec 2024, Jan 2025 should be present.
    - Example for 2024-10-01 (CN->US, 851712): total_estimated_tariff_cost = 1500.00, num_shipments = 1
    - Example for 2024-11-01 (CN->US, 851712): total_estimated_tariff_cost = 2160.00, num_shipments = 1
    - Example for 2025-01-01 (VN->US, 851712): total_estimated_tariff_cost = 450.00, num_shipments = 1
    - Example for 2025-01-01 (TW->GB, 847130): total_estimated_tariff_cost = 1280.00, num_shipments = 1
    */
    
  4. Simulate new data arrival (optional but recommended for streaming):

    • Modify your setup_bronze_data.py notebook to append new records to bronze_trade_data (change mode("overwrite") to mode("append") for a real streaming scenario, or add new data and run it again in append mode).
    • If your DLT pipeline is in Continuous mode, it should automatically detect and process these new records. If in Triggered mode, run the pipeline again.
    • Re-query the Silver and Gold tables to see the updated results.

This comprehensive verification process ensures that the data flows correctly through each layer, calculations are accurate, and the pipeline adheres to the defined data quality expectations.

8. Summary & Next Steps

In this chapter, we successfully designed and implemented a real-time HS Code-based tariff impact analysis pipeline using Databricks Delta Live Tables. We established a foundational silver_tariff_rates table, enriched raw trade data with dynamic tariff information, and aggregated these insights into a gold_tariff_impact_summary table. We emphasized production-ready practices, including robust data quality expectations, logging, and performance considerations like ZORDER and Serverless DLT. This pipeline provides critical visibility into the financial implications of tariffs, enabling supply chain optimization.

This tariff impact analysis forms a vital part of our overall real-time supply chain intelligence platform. The generated Gold table can now be consumed by business intelligence tools for dashboards, or by other downstream applications for further analysis and decision-making.

In the next chapter, we will build upon this foundation by focusing on Streaming Logistics Cost Monitoring with Tariff and Fuel Price Correlation using Spark Structured Streaming. We will integrate external data sources like fuel prices and combine them with our tariff insights to provide a holistic view of logistics costs.