Chapter 7: HS Code-based Tariff Impact Analysis with DLT
1. Chapter Introduction
In this chapter, we will build a robust, real-time data pipeline using Databricks Delta Live Tables (DLT) to perform HS Code-based tariff impact analysis. This pipeline will ingest raw trade data, enrich it with historical and current tariff rates, and then aggregate the estimated tariff costs to provide actionable insights into the financial impact of import/export duties.
Understanding tariff impacts is crucial for modern supply chains. Tariffs can significantly influence procurement costs, pricing strategies, and overall profitability. By automating this analysis with DLT, businesses can gain near real-time visibility into these costs, enabling proactive decision-making to mitigate risks and optimize trade routes or sourcing strategies. This step is a cornerstone for building a resilient and cost-effective supply chain.
As a prerequisite, we assume you have successfully set up your Databricks environment and Unity Catalog from previous chapters. Specifically, we’ll be working with a bronze_trade_data table (which we’ll simulate for this chapter if not already present) that contains raw import/export records, including HS codes, origin/destination countries, and declared values. We’ll also need a source for historical tariff rates, which we will simulate as a static Delta table for simplicity in this tutorial, although in a real-world scenario, it would likely be ingested from an external API or data provider.
Upon completion of this chapter, you will have a fully functional DLT pipeline that generates:
- A Silver Delta table (
silver_enriched_trade_tariffs) containing individual trade records enriched with the applicable tariff rates and calculated estimated tariff costs. - A Gold Delta table (
gold_tariff_impact_summary) providing aggregated insights into total tariff costs by product, country, and time period, ready for downstream analytics and reporting.
2. Planning & Design
Component Architecture for Tariff Impact Analysis
Our DLT pipeline for tariff impact analysis will follow the Medallion Architecture pattern (Bronze -> Silver -> Gold).
- Bronze Layer: Raw trade data (
bronze_trade_data) ingested from various sources (e.g., Kafka, flat files). This layer is assumed to be available from previous chapters or will be mocked. - Reference Data Ingestion (Static Silver): For tariff rates, we will create a
silver_tariff_ratestable. In a production environment, this table would likely be populated by another DLT pipeline consuming from an external tariff data API or batch files. For this tutorial, we will directly create and populate it with sample data. - Silver Layer (Enrichment): The
silver_enriched_trade_tariffstable will be derived from thebronze_trade_databy joining withsilver_tariff_ratesto apply the correct tariff based on HS Code, origin/destination, and date. - Gold Layer (Aggregation): The
gold_tariff_impact_summarytable will aggregate the enriched data to provide high-level summaries suitable for business intelligence dashboards and further analysis.
Database Schema Design
We will leverage Unity Catalog for managing our tables. All tables will reside in a dedicated schema (e.g., supply_chain_analytics).
bronze_trade_data(Assumed existing, or mocked for this chapter):idSTRING (Unique shipment ID)event_timestampTIMESTAMP (Timestamp of the trade event)raw_hs_codeSTRING (Harmonized System Code, potentially needing cleanup)product_descriptionSTRINGorigin_countrySTRING (ISO 2-letter code)destination_countrySTRING (ISO 2-letter code)declared_valueDECIMAL(18, 2) (Value of goods)quantityDECIMAL(18, 2)unit_of_measureSTRINGcurrencySTRING_ingestion_timestampTIMESTAMP
silver_tariff_rates(New, static for this chapter):hs_codeSTRING (Cleaned HS Code)origin_countrySTRINGdestination_countrySTRINGeffective_start_dateDATEeffective_end_dateDATEtariff_rate_percentageDECIMAL(5, 4) (e.g., 0.05 for 5%)tariff_rate_fixed_per_unitDECIMAL(10, 4) (e.g., $1.50 per unit)currencySTRINGtariff_descriptionSTRING
silver_enriched_trade_tariffs(New, DLT managed):trade_idSTRING (fromidin bronze)event_timestampTIMESTAMPhs_codeSTRING (Cleaned HS Code)product_descriptionSTRINGorigin_countrySTRINGdestination_countrySTRINGdeclared_valueDECIMAL(18, 2)quantityDECIMAL(18, 2)unit_of_measureSTRINGcurrencySTRINGapplied_tariff_rate_percentageDECIMAL(5, 4)applied_tariff_rate_fixed_per_unitDECIMAL(10, 4)estimated_tariff_costDECIMAL(18, 2)_processing_timestampTIMESTAMP
gold_tariff_impact_summary(New, DLT managed):reporting_period_monthDATE (e.g., first day of the month)hs_codeSTRINGorigin_countrySTRINGdestination_countrySTRINGtotal_declared_valueDECIMAL(20, 2)total_quantityDECIMAL(20, 2)total_estimated_tariff_costDECIMAL(20, 2)average_tariff_rate_percentageDECIMAL(5, 4)num_shipmentsBIGINT_processing_timestampTIMESTAMP
File Structure
We will organize our DLT pipeline code within a dlt_pipelines directory.
.
├── dlt_pipelines/
│ ├── tariff_analysis/
│ │ ├── config.py
│ │ ├── 01_tariff_rates_ingestion.py
│ │ ├── 02_trade_tariff_enrichment.py
│ │ └── 03_tariff_impact_aggregation.py
│ └── __init__.py
├── README.md
└── requirements.txt
3. Step-by-Step Implementation
We will create the necessary files and implement the DLT logic incrementally.
a) Setup/Configuration
First, let’s establish our project structure and a configuration file.
Create Project Directories: If you haven’t already, create the
dlt_pipelines/tariff_analysisdirectories.mkdir -p dlt_pipelines/tariff_analysis touch dlt_pipelines/__init__.pyCreate
dlt_pipelines/tariff_analysis/config.py: This file will hold common configurations like our Unity Catalog schema name.# dlt_pipelines/tariff_analysis/config.py import os # Define the Unity Catalog schema where DLT tables will be created # Replace 'your_catalog' and 'your_schema' with your actual Unity Catalog and schema names # Example: 'main.supply_chain_analytics' UNITY_CATALOG_SCHEMA = os.getenv("UNITY_CATALOG_SCHEMA", "main.supply_chain_analytics") # Define table names BRONZE_TRADE_DATA_TABLE = f"{UNITY_CATALOG_SCHEMA}.bronze_trade_data" SILVER_TARIFF_RATES_TABLE = f"{UNITY_CATALOG_SCHEMA}.silver_tariff_rates" SILVER_ENRICHED_TRADE_TARIFFS_TABLE = f"{UNITY_CATALOG_SCHEMA}.silver_enriched_trade_tariffs" GOLD_TARIFF_IMPACT_SUMMARY_TABLE = f"{UNITY_CATALOG_SCHEMA}.gold_tariff_impact_summary" # Define DLT pipeline name (for logging/identification) DLT_PIPELINE_NAME = "tariff_impact_analysis_pipeline" # Logging configuration LOGGING_CONFIG = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'standard': { 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s' }, }, 'handlers': { 'console': { 'level': 'INFO', 'formatter': 'standard', 'class': 'logging.StreamHandler', }, }, 'loggers': { '': { # root logger 'handlers': ['console'], 'level': 'INFO', 'propagate': True }, 'dlt_tariff_analysis': { 'handlers': ['console'], 'level': 'INFO', 'propagate': False }, } }Explanation:
- We define the
UNITY_CATALOG_SCHEMAas an environment variable for flexibility. This is crucial for production deployments where you might have different catalogs/schemas for dev, staging, and prod. - Table names are constructed using f-strings for clarity and consistency.
- Basic logging configuration is included, which is good practice for any production-ready application.
- We define the
b) Core Implementation
We will now implement the DLT pipelines. Remember, DLT pipelines are typically defined in Python or SQL files that are then deployed as a single pipeline.
i. Simulate Bronze Trade Data (If not already present)
For this chapter, if you don’t have bronze_trade_data from previous steps, we’ll create a simple one. In a production scenario, this table would be continuously populated by a separate ingestion pipeline (e.g., from Kafka).
Create a Databricks Notebook (e.g., setup_bronze_data.py) in your workspace:
# Databricks Notebook: setup_bronze_data.py
# Run this once to create sample bronze_trade_data if it doesn't exist
import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DecimalType
# Import configuration
import sys
import os
# Add the parent directory of dlt_pipelines to the Python path
# This assumes setup_bronze_data.py is at the root level alongside dlt_pipelines
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), 'dlt_pipelines', 'tariff_analysis')))
from config import UNITY_CATALOG_SCHEMA, BRONZE_TRADE_DATA_TABLE
print(f"Using Unity Catalog Schema: {UNITY_CATALOG_SCHEMA}")
print(f"Creating/Updating Bronze Table: {BRONZE_TRADE_DATA_TABLE}")
# Define schema for raw trade data
trade_data_schema = StructType([
StructField("id", StringType(), True),
StructField("event_timestamp", TimestampType(), True),
StructField("raw_hs_code", StringType(), True),
StructField("product_description", StringType(), True),
StructField("origin_country", StringType(), True),
StructField("destination_country", StringType(), True),
StructField("declared_value", DecimalType(18, 2), True),
StructField("quantity", DecimalType(18, 2), True),
StructField("unit_of_measure", StringType(), True),
StructField("currency", StringType(), True)
])
# Sample data for bronze_trade_data
sample_trade_data = [
("TID001", "2024-10-01 10:00:00", "851712", "Smartphones", "CN", "US", 10000.00, 100, "PCS", "USD"),
("TID002", "2024-10-02 11:30:00", "847130", "Laptops", "TW", "US", 15000.00, 50, "PCS", "USD"),
("TID003", "2024-11-05 09:00:00", "870323", "Electric Cars", "DE", "FR", 500000.00, 10, "PCS", "EUR"),
("TID004", "2024-11-10 14:00:00", "851712", "Smartphones", "CN", "US", 12000.00, 120, "PCS", "USD"),
("TID005", "2024-12-01 16:00:00", "847130", "Laptops", "TW", "US", 18000.00, 60, "PCS", "USD"),
("TID006", "2024-12-05 08:00:00", "870323", "Electric Cars", "DE", "FR", 600000.00, 12, "PCS", "EUR"),
("TID007", "2025-01-01 10:00:00", "851712", "Smartphones", "VN", "US", 9000.00, 90, "PCS", "USD"), # New origin
("TID008", "2025-01-02 11:30:00", "847130", "Laptops", "TW", "GB", 16000.00, 55, "PCS", "GBP"), # New destination
]
# Convert event_timestamp strings to actual timestamps
parsed_trade_data = []
for row in sample_trade_data:
parsed_row = list(row)
parsed_row[1] = F.to_timestamp(F.lit(row[1])).cast(TimestampType())
parsed_trade_data.append(parsed_row)
# Create a DataFrame
df = spark.createDataFrame(sample_trade_data, schema=trade_data_schema) \
.withColumn("event_timestamp", F.to_timestamp(F.col("event_timestamp"))) \
.withColumn("_ingestion_timestamp", F.current_timestamp())
# Write to Unity Catalog as a Delta table
# Ensure the catalog and schema exist first:
# spark.sql(f"CREATE CATALOG IF NOT EXISTS {UNITY_CATALOG_SCHEMA.split('.')[0]}")
# spark.sql(f"CREATE SCHEMA IF NOT EXISTS {UNITY_CATALOG_SCHEMA}")
df.write.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable(BRONZE_TRADE_DATA_TABLE)
print(f"Sample data successfully written to {BRONZE_TRADE_DATA_TABLE}")
# Verify data
display(spark.sql(f"SELECT * FROM {BRONZE_TRADE_DATA_TABLE} LIMIT 5"))
Explanation:
- This is a standard Databricks notebook, not a DLT pipeline. It’s used to set up our
bronze_trade_datatable in Unity Catalog for demonstration purposes. - We import
dltfor context but primarily use standard PySpark for DataFrame operations. - The
sys.path.appendis crucial to allow the notebook to import ourconfig.pyfrom thedlt_pipelines/tariff_analysisdirectory. - A
StructTypedefines the schema for our bronze table. - Sample data is created, converted to a DataFrame, and written to Unity Catalog using
saveAsTablewithmode("overwrite")andoption("overwriteSchema", "true")for idempotency during setup. - Action: Run this notebook once in your Databricks workspace. Make sure to replace
main.supply_chain_analyticswith your actual Unity Catalog and schema name if different, and ensure the catalog and schema exist or create them withspark.sql("CREATE CATALOG IF NOT EXISTS ...")andspark.sql("CREATE SCHEMA IF NOT EXISTS ...").
ii. Ingest Static Tariff Rates (01_tariff_rates_ingestion.py)
This DLT file will create our silver_tariff_rates table. While marked as “static” for this tutorial (meaning we define data directly in code), in a real DLT pipeline, this would typically read from a streaming or batch source of tariff data.
# dlt_pipelines/tariff_analysis/01_tariff_rates_ingestion.py
import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DateType, DecimalType
import logging
import logging.config
import sys
import os
# Import configuration
# Add the parent directory to the Python path if running as a DLT pipeline from the root
# DLT automatically handles module imports if files are in the same pipeline definition.
# This path modification is more for local testing or if this file were standalone.
# For DLT, ensure all Python files are part of the same pipeline source.
try:
from dlt_pipelines.tariff_analysis.config import (
UNITY_CATALOG_SCHEMA, SILVER_TARIFF_RATES_TABLE, LOGGING_CONFIG
)
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger('dlt_tariff_analysis')
except ImportError:
# Fallback for local testing outside DLT context or if config is not in path
print("Warning: Could not import config. Assuming default values for local testing.")
UNITY_CATALOG_SCHEMA = "main.supply_chain_analytics" # Default for local testing
SILVER_TARIFF_RATES_TABLE = f"{UNITY_CATALOG_SCHEMA}.silver_tariff_rates"
logger = logging.getLogger('dlt_tariff_analysis')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.info(f"Configured for Unity Catalog Schema: {UNITY_CATALOG_SCHEMA}")
logger.info(f"Targeting Silver Tariff Rates Table: {SILVER_TARIFF_RATES_TABLE}")
@dlt.table(
name="silver_tariff_rates",
comment="Reference table for HS Code-based tariff rates, including historical data.",
table_properties={
"delta.logRetentionDuration": "30 days",
"delta.deletedFileRetentionDuration": "7 days",
"pipelines.autoOptimize.zorderCols": "hs_code",
}
)
@dlt.expect_or_drop("valid_hs_code", "hs_code IS NOT NULL AND LENGTH(hs_code) >= 6")
@dlt.expect_or_drop("valid_countries", "origin_country IS NOT NULL AND destination_country IS NOT NULL")
@dlt.expect_or_drop("valid_effective_dates", "effective_start_date IS NOT NULL AND effective_end_date IS NOT NULL AND effective_start_date <= effective_end_date")
def create_silver_tariff_rates():
"""
Creates or updates the silver_tariff_rates table with sample tariff data.
In a production scenario, this would typically ingest from a more dynamic source.
"""
logger.info("Generating sample tariff rates data for silver_tariff_rates table.")
tariff_data = [
("851712", "CN", "US", "2024-01-01", "2024-10-31", 0.15, 0.00, "USD", "Standard tariff for smartphones from China to US"),
("851712", "CN", "US", "2024-11-01", "2025-12-31", 0.18, 0.00, "USD", "Increased tariff for smartphones from China to US"), # Tariff increase
("851712", "VN", "US", "2025-01-01", "2025-12-31", 0.05, 0.00, "USD", "Lower tariff for smartphones from Vietnam to US"), # New origin tariff
("847130", "TW", "US", "2024-01-01", "2025-12-31", 0.07, 0.00, "USD", "Standard tariff for laptops from Taiwan to US"),
("870323", "DE", "FR", "2024-01-01", "2025-12-31", 0.02, 0.00, "EUR", "Intra-EU tariff for electric cars"),
("847130", "TW", "GB", "2025-01-01", "2025-12-31", 0.08, 0.00, "GBP", "Post-Brexit tariff for laptops from Taiwan to UK"), # New destination tariff
]
schema = StructType([
StructField("hs_code", StringType(), False),
StructField("origin_country", StringType(), False),
StructField("destination_country", StringType(), False),
StructField("effective_start_date", DateType(), False),
StructField("effective_end_date", DateType(), False),
StructField("tariff_rate_percentage", DecimalType(5, 4), False),
StructField("tariff_rate_fixed_per_unit", DecimalType(10, 4), False),
StructField("currency", StringType(), False),
StructField("tariff_description", StringType(), True),
])
return (
spark.createDataFrame(data=tariff_data, schema=schema)
.withColumn("effective_start_date", F.to_date(F.col("effective_start_date")))
.withColumn("effective_end_date", F.to_date(F.col("effective_end_date")))
)
Explanation:
@dlt.tabledecorator: Defines a Delta Live Table. We specify a descriptivecommentandtable_propertiesfor better maintainability and performance.pipelines.autoOptimize.zorderColsis used to optimize reads onhs_code.@dlt.expect_or_drop: These are DLT’s data quality constraints. If a record fails any of these expectations, it will be dropped from the table, preventing bad data from polluting downstream layers. This is a critical production best practice.create_silver_tariff_ratesfunction: This function generates a Spark DataFrame from static Python data. We cast date strings toDateType.- Logging: We use Python’s
loggingmodule to provide informative messages during pipeline execution, aiding in debugging and monitoring. Thetry-exceptblock for config import helps with local development. - Action: This file will be part of your DLT pipeline definition.
iii. Trade Tariff Enrichment (02_trade_tariff_enrichment.py)
This DLT file will read from the bronze_trade_data and join it with silver_tariff_rates to calculate the estimated tariff cost.
# dlt_pipelines/tariff_analysis/02_trade_tariff_enrichment.py
import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, TimestampType
import logging
import logging.config
import sys
import os
# Import configuration
try:
from dlt_pipelines.tariff_analysis.config import (
UNITY_CATALOG_SCHEMA, BRONZE_TRADE_DATA_TABLE, SILVER_TARIFF_RATES_TABLE,
SILVER_ENRICHED_TRADE_TARIFFS_TABLE, LOGGING_CONFIG
)
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger('dlt_tariff_analysis')
except ImportError:
print("Warning: Could not import config. Assuming default values for local testing.")
UNITY_CATALOG_SCHEMA = "main.supply_chain_analytics"
BRONZE_TRADE_DATA_TABLE = f"{UNITY_CATALOG_SCHEMA}.bronze_trade_data"
SILVER_TARIFF_RATES_TABLE = f"{UNITY_CATALOG_SCHEMA}.silver_tariff_rates"
SILVER_ENRICHED_TRADE_TARIFFS_TABLE = f"{UNITY_CATALOG_SCHEMA}.silver_enriched_trade_tariffs"
logger = logging.getLogger('dlt_tariff_analysis')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.info(f"Configured for Unity Catalog Schema: {UNITY_CATALOG_SCHEMA}")
logger.info(f"Reading from Bronze Table: {BRONZE_TRADE_DATA_TABLE}")
logger.info(f"Reading from Silver Tariff Rates Table: {SILVER_TARIFF_RATES_TABLE}")
logger.info(f"Targeting Silver Enriched Trade Tariffs Table: {SILVER_ENRICHED_TRADE_TARIFFS_TABLE}")
@dlt.table(
name="silver_enriched_trade_tariffs",
comment="Trade data enriched with applied tariff rates and estimated tariff costs.",
table_properties={
"delta.logRetentionDuration": "30 days",
"delta.deletedFileRetentionDuration": "7 days",
"pipelines.autoOptimize.zorderCols": "hs_code, origin_country, destination_country, event_timestamp",
}
)
@dlt.expect_or_drop("valid_declared_value", "declared_value IS NOT NULL AND declared_value >= 0")
@dlt.expect_or_drop("tariff_cost_calculated", "estimated_tariff_cost IS NOT NULL")
def create_silver_enriched_trade_tariffs():
"""
Reads bronze trade data, joins with silver tariff rates, and calculates estimated tariff costs.
"""
logger.info("Reading bronze trade data from %s", BRONZE_TRADE_DATA_TABLE)
bronze_df = dlt.read_stream(BRONZE_TRADE_DATA_TABLE)
logger.info("Reading silver tariff rates from %s", SILVER_TARIFF_RATES_TABLE)
tariff_df = dlt.read(SILVER_TARIFF_RATES_TABLE) # Use dlt.read for static/slowly changing dimension table
logger.info("Joining trade data with tariff rates to enrich.")
enriched_df = (
bronze_df.alias("trade")
.join(
tariff_df.alias("tariff"),
(F.col("trade.raw_hs_code") == F.col("tariff.hs_code")) &
(F.col("trade.origin_country") == F.col("tariff.origin_country")) &
(F.col("trade.destination_country") == F.col("tariff.destination_country")) &
(F.to_date(F.col("trade.event_timestamp")) >= F.col("tariff.effective_start_date")) &
(F.to_date(F.col("trade.event_timestamp")) <= F.col("tariff.effective_end_date")),
"leftouter" # Use leftouter to keep all trade records, even if no tariff match
)
.withColumn("applied_tariff_rate_percentage", F.col("tariff.tariff_rate_percentage"))
.withColumn("applied_tariff_rate_fixed_per_unit", F.col("tariff.tariff_rate_fixed_per_unit"))
.withColumn("estimated_tariff_cost_percentage",
F.when(F.col("tariff.tariff_rate_percentage").isNotNull(),
F.col("trade.declared_value") * F.col("tariff.tariff_rate_percentage"))
.otherwise(0.00))
.withColumn("estimated_tariff_cost_fixed",
F.when(F.col("tariff.tariff_rate_fixed_per_unit").isNotNull(),
F.col("trade.quantity") * F.col("tariff.tariff_rate_fixed_per_unit"))
.otherwise(0.00))
.withColumn("estimated_tariff_cost",
(F.col("estimated_tariff_cost_percentage") + F.col("estimated_tariff_cost_fixed"))
.cast(DecimalType(18, 2)))
.select(
F.col("trade.id").alias("trade_id"),
F.col("trade.event_timestamp"),
F.col("trade.raw_hs_code").alias("hs_code"),
F.col("trade.product_description"),
F.col("trade.origin_country"),
F.col("trade.destination_country"),
F.col("trade.declared_value"),
F.col("trade.quantity"),
F.col("trade.unit_of_measure"),
F.col("trade.currency"),
F.col("applied_tariff_rate_percentage"),
F.col("applied_tariff_rate_fixed_per_unit"),
F.col("estimated_tariff_cost"),
F.current_timestamp().alias("_processing_timestamp")
)
)
logger.info("Finished enriching trade data with tariff rates.")
return enriched_df
Explanation:
dlt.read_stream(BRONZE_TRADE_DATA_TABLE): This is crucial. It tells DLT to treatbronze_trade_dataas a streaming source, enabling continuous processing. Ifbronze_trade_datais a batch table, DLT will still process it incrementally.dlt.read(SILVER_TARIFF_RATES_TABLE): For reference tables that are slowly changing or static,dlt.readis used. DLT intelligently handles the refresh of these lookup tables.- Join Logic: The join conditions are critical:
hs_code,origin_country,destination_country, and theevent_timestampfalling within the tariff’seffective_start_dateandeffective_end_date. This ensures the correct historical tariff is applied. leftouterjoin: This ensures that all trade records are kept, even if a matching tariff rate isn’t found. This is important for identifying trade without tariff impact or for flagging missing tariff data.- Tariff Cost Calculation: We calculate
estimated_tariff_costby summing percentage-based and fixed-per-unit tariffs.F.when().otherwise()handles cases where a tariff type might beNULL. - Data Quality Expectations:
valid_declared_valueandtariff_cost_calculatedensure core financial data integrity. - Logging: Clear logging messages indicate the stage of processing, which is invaluable for monitoring and debugging production pipelines.
- Action: This file will be part of your DLT pipeline definition.
iv. Tariff Impact Aggregation (03_tariff_impact_aggregation.py)
This DLT file will aggregate the enriched data to provide high-level summaries.
# dlt_pipelines/tariff_analysis/03_tariff_impact_aggregation.py
import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, LongType, DateType
import logging
import logging.config
import sys
import os
# Import configuration
try:
from dlt_pipelines.tariff_analysis.config import (
UNITY_CATALOG_SCHEMA, SILVER_ENRICHED_TRADE_TARIFFS_TABLE,
GOLD_TARIFF_IMPACT_SUMMARY_TABLE, LOGGING_CONFIG
)
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger('dlt_tariff_analysis')
except ImportError:
print("Warning: Could not import config. Assuming default values for local testing.")
UNITY_CATALOG_SCHEMA = "main.supply_chain_analytics"
SILVER_ENRICHED_TRADE_TARIFFS_TABLE = f"{UNITY_CATALOG_SCHEMA}.silver_enriched_trade_tariffs"
GOLD_TARIFF_IMPACT_SUMMARY_TABLE = f"{UNITY_CATALOG_SCHEMA}.gold_tariff_impact_summary"
logger = logging.getLogger('dlt_tariff_analysis')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.info(f"Configured for Unity Catalog Schema: {UNITY_CATALOG_SCHEMA}")
logger.info(f"Reading from Silver Table: {SILVER_ENRICHED_TRADE_TARIFFS_TABLE}")
logger.info(f"Targeting Gold Tariff Impact Summary Table: {GOLD_TARIFF_IMPACT_SUMMARY_TABLE}")
@dlt.table(
name="gold_tariff_impact_summary",
comment="Aggregated summary of tariff impacts by HS Code, country, and reporting period.",
table_properties={
"delta.logRetentionDuration": "30 days",
"delta.deletedFileRetentionDuration": "7 days",
"pipelines.autoOptimize.zorderCols": "reporting_period_month, hs_code, origin_country, destination_country",
}
)
@dlt.expect_or_fail("positive_total_tariff_cost", "total_estimated_tariff_cost >= 0")
@dlt.expect_or_fail("positive_num_shipments", "num_shipments > 0")
def create_gold_tariff_impact_summary():
"""
Aggregates enriched trade data to summarize tariff impacts.
"""
logger.info("Reading enriched trade data from %s", SILVER_ENRICHED_TRADE_TARIFFS_TABLE)
silver_df = dlt.read_stream(SILVER_ENRICHED_TRADE_TARIFFS_TABLE)
logger.info("Aggregating tariff impact data.")
gold_df = (
silver_df
.withColumn("reporting_period_month", F.trunc(F.col("event_timestamp"), "month"))
.groupBy(
F.col("reporting_period_month"),
F.col("hs_code"),
F.col("origin_country"),
F.col("destination_country")
)
.agg(
F.sum("declared_value").alias("total_declared_value"),
F.sum("quantity").alias("total_quantity"),
F.sum("estimated_tariff_cost").alias("total_estimated_tariff_cost"),
F.avg("applied_tariff_rate_percentage").alias("average_tariff_rate_percentage"),
F.count("trade_id").alias("num_shipments")
)
.withColumn("_processing_timestamp", F.current_timestamp())
.select(
F.col("reporting_period_month").cast(DateType()), # Ensure consistent type
F.col("hs_code"),
F.col("origin_country"),
F.col("destination_country"),
F.col("total_declared_value").cast(DecimalType(20, 2)),
F.col("total_quantity").cast(DecimalType(20, 2)),
F.col("total_estimated_tariff_cost").cast(DecimalType(20, 2)),
F.col("average_tariff_rate_percentage").cast(DecimalType(5, 4)),
F.col("num_shipments").cast(LongType()),
F.col("_processing_timestamp").cast(TimestampType())
)
)
logger.info("Finished aggregating tariff impact data.")
return gold_df
Explanation:
dlt.read_stream(SILVER_ENRICHED_TRADE_TARIFFS_TABLE): Reads from the previously created Silver table as a stream.F.trunc(F.col("event_timestamp"), "month"): This function is used to group data by month, creating areporting_period_monthcolumn.groupByandagg: Standard Spark operations to perform aggregations like sum of declared value, total tariff cost, average tariff rate, and count of shipments.@dlt.expect_or_fail: These are stricter data quality rules. If a record fails, the entire pipeline run will fail. This is suitable for gold tables where data quality is paramount for business decisions.- Type Casting: Explicitly casting aggregated columns to ensure correct types and precision in the Gold table.
- Logging: Provides visibility into the aggregation process.
- Action: This file will be part of your DLT pipeline definition.
c) Testing This Component
To test the DLT pipeline, you need to create a DLT pipeline definition in your Databricks workspace.
Create a DLT Pipeline:
- Navigate to “Workflows” -> “Delta Live Tables” in your Databricks workspace.
- Click “Create Pipeline”.
- Pipeline name:
Realtime_Tariff_Impact_Analysis(ortariff_impact_analysis_pipelineif you prefer to match the config) - Pipeline mode:
Continuous(for real-time processing) orTriggered(for scheduled batch processing, good for initial testing). Let’s start withTriggeredfor easier verification. - Notebook Libraries: Select “Python” and add the paths to your DLT pipeline files:
/path/to/dlt_pipelines/tariff_analysis/01_tariff_rates_ingestion.py/path/to/dlt_pipelines/tariff_analysis/02_trade_tariff_enrichment.py/path/to/dlt_pipelines/tariff_analysis/03_tariff_impact_aggregation.py(Replace/path/to/with the actual path in your workspace, e.g.,/Users/your.email@example.com/dlt_pipelines/tariff_analysis/01_tariff_rates_ingestion.py)
- Target schema: Enter your Unity Catalog schema, e.g.,
main.supply_chain_analytics. This should match theUNITY_CATALOG_SCHEMAin yourconfig.py. - Compute:
- Pipeline type:
Serverless(recommended for DLT for managed infrastructure) - (If Serverless is not available, choose
Fixed sizeorAutoscalingcluster. Ensure you select a DLT runtime version.)
- Pipeline type:
- Advanced Options:
- Configuration: Add a Spark configuration
spark.databricks.io.cache.enabled truefor performance. - Storage location: (Optional but recommended) A cloud storage path for checkpointing and pipeline logs, e.g.,
s3://your-bucket/dlt_checkpoints/tariff_analysis/orabfss://your-container@your-adls.dfs.core.windows.net/dlt_checkpoints/tariff_analysis/.
- Configuration: Add a Spark configuration
- Click “Create”.
Run the Pipeline:
- After creation, click the “Start” button on your pipeline.
- Monitor the DLT UI for progress. It will show the graph of tables being created and data flowing through them.
Verify Data in Databricks SQL or a Notebook: Once the pipeline run is successful, you can query the created tables.
Open a new Databricks Notebook or Databricks SQL Query Editor:
-- Databricks SQL Query Editor or Notebook -- Replace 'main.supply_chain_analytics' with your actual Unity Catalog schema USE CATALOG main; USE SCHEMA supply_chain_analytics; -- Verify silver_tariff_rates table SELECT * FROM silver_tariff_rates ORDER BY hs_code, effective_start_date; -- Verify silver_enriched_trade_tariffs table SELECT trade_id, event_timestamp, hs_code, origin_country, destination_country, declared_value, applied_tariff_rate_percentage, estimated_tariff_cost FROM silver_enriched_trade_tariffs ORDER BY event_timestamp DESC LIMIT 10; -- Verify gold_tariff_impact_summary table SELECT reporting_period_month, hs_code, origin_country, destination_country, total_declared_value, total_estimated_tariff_cost, average_tariff_rate_percentage, num_shipments FROM gold_tariff_impact_summary ORDER BY reporting_period_month DESC, hs_code LIMIT 10;Expected behavior:
silver_tariff_ratesshould contain the static tariff data you defined.silver_enriched_trade_tariffsshould show your bronze trade records withapplied_tariff_rate_percentageandestimated_tariff_costpopulated based on the tariff data and event timestamp. Note howTID001andTID004(CN to US, HS 851712) have different tariffs applied based on theirevent_timestampfalling into different tariff periods.TID007(VN to US) should have a lower tariff, andTID008(TW to GB) should show the new GB tariff.gold_tariff_impact_summaryshould show aggregated sums and averages for each month, HS code, and country pair.
4. Production Considerations
Deploying DLT pipelines to production requires careful planning for reliability, performance, security, and maintainability.
Error Handling:
- DLT’s built-in
expectstatements (expect_or_drop,expect_or_fail) are paramount. Useexpect_or_dropfor non-critical data quality issues where you can tolerate dropping malformed records, andexpect_or_failfor critical issues that warrant halting the pipeline to prevent downstream corruption. - Implement robust logging within your Python code to capture application-level errors and processing details.
- Configure DLT to send notifications (e.g., to Slack, PagerDuty) on pipeline failures or data quality violations.
- DLT’s built-in
Performance Optimization:
- Serverless DLT: Leverage Databricks Serverless DLT for automated infrastructure management and scaling. This significantly reduces operational overhead.
pipelines.autoOptimize.zorderCols: Use this table property on frequently joined or filtered columns (e.g.,hs_code,event_timestamp, country codes) to optimize query performance. DLT automatically runsOPTIMIZEandZORDERjobs.- Incremental Processing: Ensure
dlt.read_stream()is used where appropriate to process only new data, minimizing compute resources. - Cluster Sizing (if not Serverless): Monitor pipeline metrics to fine-tune cluster size and autoscaling settings. Over-provisioning leads to cost, under-provisioning leads to delays.
- Data Skew: Monitor for data skew during joins and aggregations. If encountered, consider repartitioning the DataFrame before the operation.
Security Considerations:
- Unity Catalog: Utilize Unity Catalog for fine-grained access control (table, column, row-level). Grant only necessary permissions to DLT service principals.
- Principle of Least Privilege: Ensure the service principal running the DLT pipeline has the minimum required permissions to read from source tables and write to target tables.
- Data Masking/Encryption: If HS codes or trade values are considered sensitive, implement data masking for non-production environments or encryption at rest/in transit.
- Network Security: Configure network access controls (e.g., Private Link, VPC/VNet peering) to restrict access to your Databricks workspace and underlying data sources.
Logging and Monitoring:
- DLT UI: The DLT UI provides a visual graph of your pipeline, data flow, and health metrics.
- Databricks Logs: Access detailed driver and executor logs for debugging.
- System Tables: DLT and Unity Catalog expose system tables (e.g.,
system.billing.usage,system.workflow.events) that provide metadata, lineage, and performance metrics for monitoring. - External Monitoring: Integrate Databricks logs and metrics with external monitoring tools (e.g., Splunk, Datadog, Grafana) for centralized observability and alerting.
CI/CD and Deployment:
- Databricks Asset Bundles (DABs): Use DABs for defining, deploying, and managing your DLT pipelines, notebooks, and infrastructure as code. This enables automated deployment workflows, version control, and consistency across environments (dev, staging, prod).
- Git Integration: Store your DLT Python files in a Git repository (e.g., GitHub, GitLab, Azure DevOps). Databricks Workflows can be configured to trigger DLT pipeline updates on Git pushes.
5. Code Review Checkpoint
At this stage, you have implemented a complete DLT pipeline for HS Code-based tariff impact analysis.
Summary of what was built:
dlt_pipelines/tariff_analysis/config.py: Centralized configuration for Unity Catalog schema and table names.setup_bronze_data.py(Databricks Notebook): A utility to create samplebronze_trade_dataif not already present.dlt_pipelines/tariff_analysis/01_tariff_rates_ingestion.py: A DLT table (silver_tariff_rates) containing static/historical tariff rates, with data quality expectations.dlt_pipelines/tariff_analysis/02_trade_tariff_enrichment.py: A DLT table (silver_enriched_trade_tariffs) that streams frombronze_trade_data, joins withsilver_tariff_ratesbased on HS code, country, and date, and calculatesestimated_tariff_cost. Includes robust data quality checks.dlt_pipelines/tariff_analysis/03_tariff_impact_aggregation.py: A DLT table (gold_tariff_impact_summary) that streams fromsilver_enriched_trade_tariffs, aggregates tariff impacts by reporting period, HS code, and country, and includes strict data quality checks.
Integration:
These three Python files form a single DLT pipeline. When deployed together, DLT automatically orchestrates the dependencies: silver_tariff_rates is created first, then silver_enriched_trade_tariffs reads from it and bronze_trade_data, and finally gold_tariff_impact_summary reads from silver_enriched_trade_tariffs. The config.py ensures consistent naming and schema across all components.
6. Common Issues & Solutions
Here are some common issues you might encounter and how to address them:
Issue:
ImportError: No module named 'dlt_pipelines'when running DLT.- Cause: The DLT runtime cannot find your
config.pyor other modules if they are not correctly packaged or located relative to the DLT pipeline files. - Solution: Ensure that your DLT pipeline definition includes all necessary Python files (e.g.,
01_tariff_rates_ingestion.py,02_trade_tariff_enrichment.py,03_tariff_impact_aggregation.py, and crucially,config.pyif it’s meant to be imported directly). Ifconfig.pyis in the same folder as the DLT Python files, it should be importable. Ifconfig.pyis in a parent directory, you might need to adjust the Python path or structure your DLT deployment (e.g., using Databricks Repos or Asset Bundles) to ensure all modules are accessible. Thetry-except ImportErrorblock in our DLT files helps with local testing but for DLT, all files in the pipeline definition are typically treated as one module.
- Cause: The DLT runtime cannot find your
Issue: No tariff rates applied (
estimated_tariff_costis 0 or NULL) insilver_enriched_trade_tariffs.- Cause: Mismatched join keys, incorrect date ranges, or missing tariff data.
- HS codes might not match exactly (e.g., “851712” vs “8517120000”).
- Origin/destination country codes might differ (e.g., “USA” vs “US”).
- The
event_timestampof the trade record falls outside theeffective_start_dateandeffective_end_dateof any available tariff. bronze_trade_datacontains records for which no tariff data exists insilver_tariff_rates.
- Solution:
- Debugging: Use
display()orSELECTqueries onbronze_trade_dataandsilver_tariff_ratesseparately to inspect the data. - Schema & Data Type Check: Ensure
hs_code, country codes, and date fields have consistent data types and formats across both tables. - Join Condition Verification: Double-check the join logic, especially the date range condition (
F.to_date(F.col("trade.event_timestamp")) >= F.col("tariff.effective_start_date")). - Data Profiling: Run data profiling on both source tables to identify unique values, common patterns, and potential data quality issues in HS codes or country names. Consider adding a data cleansing step for
raw_hs_codein a Silver layer (e.g., padding/trimming) before this join if necessary. - Logging: Add more detailed logging statements in the enrichment function to log records that fail to find a tariff match.
- Debugging: Use
- Cause: Mismatched join keys, incorrect date ranges, or missing tariff data.
Issue: DLT pipeline fails due to
expect_or_failconstraints.- Cause: Critical data quality issues in the input data that violate the strict
expect_or_failrules defined in your DLT tables. For example,total_estimated_tariff_costbecoming negative due to an upstream calculation error, ornum_shipmentsbeing zero for an aggregated group. - Solution:
- Analyze DLT UI: The DLT UI will show which expectation failed and for how many records.
- Inspect Upstream Data: Query the source table (
silver_enriched_trade_tariffsfor the gold table) to identify the specific records that caused the failure. - Adjust Expectations (Carefully): If the failure reveals a legitimate data pattern you weren’t expecting but is acceptable, consider changing
expect_or_failtoexpect_or_dropor refining the expectation logic. However, for gold tables, it’s often better to fix the upstream data or processing logic to meet the stringent quality requirements. - Root Cause Analysis: Trace back to the source of the problematic data. Is it an ingestion error? A transformation bug? Fix it at the earliest possible stage in the pipeline.
- Cause: Critical data quality issues in the input data that violate the strict
7. Testing & Verification
To thoroughly test and verify the chapter’s work, follow these steps:
Ensure
setup_bronze_data.pyhas been run successfully. This populates yourbronze_trade_datatable with the initial set of sample records.Start your DLT pipeline.
- Go to the DLT UI in Databricks.
- Select your
Realtime_Tariff_Impact_Analysispipeline. - Click “Start” (if in Triggered mode) or ensure it’s running (if Continuous).
- Wait for the pipeline run to complete successfully (all tables should show a green checkmark).
Query the tables in Databricks SQL or a new notebook:
-- Use your Unity Catalog and schema USE CATALOG main; USE SCHEMA supply_chain_analytics; -- 1. Verify silver_tariff_rates SELECT * FROM silver_tariff_rates ORDER BY hs_code, effective_start_date; -- Expected: All 6 tariff rules should be present, with correct dates and rates. -- 2. Verify silver_enriched_trade_tariffs SELECT trade_id, CAST(event_timestamp AS DATE) AS trade_date, hs_code, origin_country, destination_country, declared_value, quantity, applied_tariff_rate_percentage, estimated_tariff_cost FROM silver_enriched_trade_tariffs ORDER BY trade_date, trade_id; /* Expected: - TID001 (2024-10-01, CN->US, 851712): should have tariff 0.15, cost = 10000 * 0.15 = 1500.00 - TID002 (2024-10-02, TW->US, 847130): should have tariff 0.07, cost = 15000 * 0.07 = 1050.00 - TID003 (2024-11-05, DE->FR, 870323): should have tariff 0.02, cost = 500000 * 0.02 = 10000.00 - TID004 (2024-11-10, CN->US, 851712): should have tariff 0.18 (new rate!), cost = 12000 * 0.18 = 2160.00 - TID005 (2024-12-01, TW->US, 847130): should have tariff 0.07, cost = 18000 * 0.07 = 1260.00 - TID006 (2024-12-05, DE->FR, 870323): should have tariff 0.02, cost = 600000 * 0.02 = 12000.00 - TID007 (2025-01-01, VN->US, 851712): should have tariff 0.05 (new origin rate!), cost = 9000 * 0.05 = 450.00 - TID008 (2025-01-02, TW->GB, 847130): should have tariff 0.08 (new destination rate!), cost = 16000 * 0.08 = 1280.00 */ -- 3. Verify gold_tariff_impact_summary SELECT reporting_period_month, hs_code, origin_country, destination_country, total_declared_value, total_quantity, total_estimated_tariff_cost, average_tariff_rate_percentage, num_shipments FROM gold_tariff_impact_summary ORDER BY reporting_period_month, hs_code, origin_country, destination_country; /* Expected: - Aggregations for Oct 2024, Nov 2024, Dec 2024, Jan 2025 should be present. - Example for 2024-10-01 (CN->US, 851712): total_estimated_tariff_cost = 1500.00, num_shipments = 1 - Example for 2024-11-01 (CN->US, 851712): total_estimated_tariff_cost = 2160.00, num_shipments = 1 - Example for 2025-01-01 (VN->US, 851712): total_estimated_tariff_cost = 450.00, num_shipments = 1 - Example for 2025-01-01 (TW->GB, 847130): total_estimated_tariff_cost = 1280.00, num_shipments = 1 */Simulate new data arrival (optional but recommended for streaming):
- Modify your
setup_bronze_data.pynotebook to append new records tobronze_trade_data(changemode("overwrite")tomode("append")for a real streaming scenario, or add new data and run it again in append mode). - If your DLT pipeline is in
Continuousmode, it should automatically detect and process these new records. If inTriggeredmode, run the pipeline again. - Re-query the Silver and Gold tables to see the updated results.
- Modify your
This comprehensive verification process ensures that the data flows correctly through each layer, calculations are accurate, and the pipeline adheres to the defined data quality expectations.
8. Summary & Next Steps
In this chapter, we successfully designed and implemented a real-time HS Code-based tariff impact analysis pipeline using Databricks Delta Live Tables. We established a foundational silver_tariff_rates table, enriched raw trade data with dynamic tariff information, and aggregated these insights into a gold_tariff_impact_summary table. We emphasized production-ready practices, including robust data quality expectations, logging, and performance considerations like ZORDER and Serverless DLT. This pipeline provides critical visibility into the financial implications of tariffs, enabling supply chain optimization.
This tariff impact analysis forms a vital part of our overall real-time supply chain intelligence platform. The generated Gold table can now be consumed by business intelligence tools for dashboards, or by other downstream applications for further analysis and decision-making.
In the next chapter, we will build upon this foundation by focusing on Streaming Logistics Cost Monitoring with Tariff and Fuel Price Correlation using Spark Structured Streaming. We will integrate external data sources like fuel prices and combine them with our tariff insights to provide a holistic view of logistics costs.