Chapter 14: CI/CD for Databricks Pipelines with Databricks Asset Bundles

Chapter Introduction

In previous chapters, we meticulously crafted robust data pipelines using Databricks Delta Live Tables (DLT) for real-time ingestion, Spark Structured Streaming for logistics cost monitoring, and various Spark jobs for tariff analysis and anomaly detection. We’ve built the individual components, but deploying and managing these complex pipelines across different environments (development, staging, production) can quickly become a significant challenge without proper automation. This is where Continuous Integration/Continuous Deployment (CI/CD) comes into play, ensuring that our code changes are consistently tested, validated, and deployed.

This chapter will guide you through establishing a production-ready CI/CD workflow for our Databricks assets using Databricks Asset Bundles (DABs). DABs provide an Infrastructure-as-Code (IaC) approach for managing Databricks workspaces, allowing us to define and deploy all our Databricks resources—like DLT pipelines, jobs, notebooks, and even associated infrastructure—declaratively. By the end of this chapter, you will have a fully automated process that takes our source code, validates it, and deploys it to the target Databricks environment, drastically improving reliability, consistency, and speed of delivery.

The primary outcome of this chapter will be a GitHub Actions workflow that automatically deploys our DLT pipelines and Spark Structured Streaming jobs to a specified Databricks workspace whenever changes are pushed to our version control system. We will leverage Databricks Asset Bundles to define our deployment targets and configurations, ensuring that environment-specific parameters are handled gracefully. This setup is crucial for maintaining the integrity and performance of our real-time supply chain analytics solution in a dynamic production environment.

Planning & Design

Implementing CI/CD for Databricks requires a structured approach. Our design will revolve around Databricks Asset Bundles as the central mechanism for defining and deploying our Databricks resources.

Component Architecture for CI/CD:

  1. Source Code Repository (GitHub): All our DLT pipeline code, Spark Structured Streaming jobs, and configuration files (databricks.yml) will reside here.
  2. Databricks Asset Bundle (DAB): A local project structure that defines all Databricks resources (DLT pipelines, jobs, notebooks, clusters) and their configurations for various environments. The databricks.yml file is the heart of the bundle.
  3. CI/CD Workflow (GitHub Actions): This automated process will:
    • Trigger on specific events (e.g., push to main branch).
    • Install the Databricks CLI.
    • Authenticate with Databricks using a Personal Access Token (PAT) or Service Principal.
    • Validate the Databricks Asset Bundle.
    • Deploy the bundle to the target Databricks workspace.
  4. Databricks Workspace: The actual environment where DLT pipelines run, Spark jobs execute, and data is processed and stored in Delta Lake tables. We will aim for development, staging, and production workspaces (or distinct directories/schemas within a single workspace).

File Structure:

We will adopt a structure that separates our source code from the bundle definition, allowing for clear organization.

.
├── .github/
│   └── workflows/
│       └── deploy.yml              # GitHub Actions workflow for CI/CD
├── src/
│   ├── dlt_pipelines/
│   │   └── supply_chain_ingestion.py  # Our DLT pipeline code
│   └── jobs/
│       └── logistics_cost_monitoring.py # Our Spark Structured Streaming job
│       └── tariff_analysis_batch.py # Our batch tariff analysis job
│   └── notebooks/
│       └── anomaly_detection.ipynb  # Notebook for anomaly detection (can be run as job)
├── databricks.yml                  # Main Databricks Asset Bundle definition
├── README.md
└── requirements.txt

This structure ensures that our Python and notebook code are logically grouped, and the databricks.yml at the root defines how these assets are deployed.

Step-by-Step Implementation

3.1. Initializing the Databricks Asset Bundle Project

First, let’s set up the basic structure for our Databricks Asset Bundle.

a) Setup/Configuration

Create a new directory for your project if you haven’t already, and navigate into it. We’ll start by initializing a Databricks Asset Bundle.

# Ensure you are in the root of your project, e.g., real-time-supply-chain-analytics/
mkdir -p src/dlt_pipelines src/jobs src/notebooks
# Initialize a Databricks Asset Bundle
databricks bundle init

This command will prompt you to choose a template. Select the simple template for now, as we’ll customize it extensively. It will create a databricks.yml file and a resources directory (which we’ll later rename/restructure to match our src directory).

b) Core Implementation

Now, let’s move our existing DLT pipeline and Spark Structured Streaming job code into the src directory we just created. For demonstration, let’s assume you have:

  • A DLT pipeline definition in dlt_supply_chain_ingestion.py (from Chapter 2).
  • A Spark Structured Streaming job for logistics cost monitoring in logistics_cost_monitoring.py (from Chapter 9).
  • A batch job for tariff analysis in tariff_analysis_batch.py (from Chapter 6).
  • An anomaly detection notebook in anomaly_detection.ipynb (from Chapter 12).

File: src/dlt_pipelines/supply_chain_ingestion.py (This is a simplified example; your actual DLT code from Chapter 2 would go here.)

# src/dlt_pipelines/supply_chain_ingestion.py
import dlt
from pyspark.sql.functions import col, lit, current_timestamp, sha2, concat_ws

# Get environment-specific parameters (e.g., from DLT pipeline configuration)
# These will be passed via the databricks.yml bundle definition
@dlt.table(
    comment="Raw Kafka events for supply chain, ingested into Bronze layer."
)
def bronze_supply_chain_events():
    # Placeholder for Kafka source. In a real DLT, this would read from Kafka
    # For bundle testing, we might use a simulated source or a file.
    # In production, DLT would manage the Kafka connection.
    # We assume Kafka data is JSON with 'value' and 'timestamp' fields.
    kafka_source_topic = spark.conf.get("supply_chain.kafka_source_topic", "supply_chain_events_raw")

    # Simulate Kafka read for demonstration
    # In a real scenario, this would be:
    # df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "your_kafka_brokers").option("subscribe", kafka_source_topic).load()

    # For local testing with bundles, we might read from a file or mock data
    # For DLT, the expectation is a streaming source.
    # Let's assume a simple file-based mock for bundle deployment testing
    # In production DLT, this would be a direct Kafka connector

    # This is a placeholder. Real DLT would read from Kafka directly configured in DLT settings.
    # For a bundle, DLT pipeline settings would include Kafka configs.
    # Here, we're just defining the transformation logic.

    # Example: Mocking a stream for DLT definition
    # In a real DLT pipeline, you'd configure the Kafka source in the DLT UI or bundle config.
    # This Python file defines the *tables* within that pipeline.

    # For the purpose of this bundle definition, we need to ensure the DLT code is valid Python.
    # The actual Kafka source connection details are typically passed as pipeline configurations.

    # Example: Creating a dummy DataFrame for code validation
    df = (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", f"dbfs:/dlt_checkpoints/bronze_supply_chain_events_schema")
        .load(spark.conf.get("supply_chain.bronze_source_path", "/databricks-datasets/structured-streaming/events/"))
        .select(
            col("value").cast("string").alias("raw_json"),
            current_timestamp().alias("processing_timestamp"),
            lit(kafka_source_topic).alias("source_topic")
        )
    )
    return df

@dlt.table(
    comment="Cleaned and enriched supply chain events in Silver layer."
)
@dlt.expect_or_drop("has_valid_json", "raw_json IS NOT NULL")
def silver_supply_chain_events():
    # Read from bronze table
    bronze_df = dlt.read("bronze_supply_chain_events")

    # Example: Parse JSON, add a unique ID, and select relevant fields
    silver_df = (
        bronze_df
        .withColumn("event_data", from_json(col("raw_json"), "SCHEMA_OF_YOUR_JSON_DATA")) # Replace SCHEMA_OF_YOUR_JSON_DATA
        .withColumn("event_id", sha2(concat_ws("-", col("raw_json"), col("processing_timestamp")), 256))
        .select(
            col("event_id"),
            col("processing_timestamp"),
            col("event_data.*") # Explode event_data fields
        )
        # Example: Add data quality checks
        .filter(col("event_data.item_id").isNotNull()) # Ensure critical fields are present
    )
    return silver_df

# Additional DLT tables (e.g., gold layer, aggregated views) would follow here.

File: src/jobs/logistics_cost_monitoring.py (This is a simplified example; your actual Structured Streaming code from Chapter 9 would go here.)

# src/jobs/logistics_cost_monitoring.py
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, current_timestamp, lit
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

def create_spark_session(app_name):
    """Creates and returns a SparkSession."""
    return SparkSession.builder.appName(app_name).getOrCreate()

def define_schema():
    """Defines the schema for logistics cost events."""
    return StructType([
        StructField("shipment_id", StringType(), True),
        StructField("cost_type", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("currency", StringType(), True),
        StructField("timestamp", TimestampType(), True)
    ])

def process_logistics_costs(spark, kafka_bootstrap_servers, kafka_topic, checkpoint_location, output_table_name):
    """
    Reads logistics cost data from Kafka, processes it, and writes to a Delta table.
    """
    print(f"Starting logistics cost monitoring job for topic: {kafka_topic}")
    print(f"Kafka brokers: {kafka_bootstrap_servers}")
    print(f"Checkpoint location: {checkpoint_location}")
    print(f"Output table: {output_table_name}")

    schema = define_schema()

    # Read from Kafka
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic) \
        .option("startingOffsets", "latest") \
        .load()

    # Process streaming data
    parsed_df = df.selectExpr("CAST(value AS STRING) as json_payload", "timestamp as kafka_timestamp") \
                  .withColumn("data", from_json(col("json_payload"), schema)) \
                  .select(col("data.*"), col("kafka_timestamp"), current_timestamp().alias("processing_time"))

    # Add error handling for malformed JSON or missing fields
    # Example: Filter out rows where 'amount' is null after parsing
    validated_df = parsed_df.filter(col("amount").isNotNull())

    # Write to Delta table
    query = validated_df \
        .writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", checkpoint_location) \
        .option("mergeSchema", "true") \
        .trigger(processingTime="60 seconds") # Process every 60 seconds
        .toTable(output_table_name)

    print(f"Streaming query started for {output_table_name}.")
    # For production, we would typically let the job run indefinitely.
    # For testing, you might add a .awaitTermination(timeout=...)
    query.awaitTermination()
    print(f"Streaming query for {output_table_name} terminated.")


if __name__ == "__main__":
    # Expecting arguments from the job configuration in databricks.yml
    if len(sys.argv) < 5:
        print("Usage: logistics_cost_monitoring.py <kafka_bootstrap_servers> <kafka_topic> <checkpoint_location> <output_table_name>")
        sys.exit(1)

    kafka_bootstrap_servers = sys.argv[1]
    kafka_topic = sys.argv[2]
    checkpoint_location = sys.argv[3]
    output_table_name = sys.argv[4]

    spark = create_spark_session("LogisticsCostMonitoring")

    # Configure logging for better visibility
    spark.sparkContext.setLogLevel("INFO")

    try:
        process_logistics_costs(spark, kafka_bootstrap_servers, kafka_topic, checkpoint_location, output_table_name)
    except Exception as e:
        print(f"Error in logistics cost monitoring job: {e}")
        # Log the full traceback for debugging
        import traceback
        traceback.print_exc()
        sys.exit(1) # Exit with an error code

File: src/jobs/tariff_analysis_batch.py (This is a simplified example; your actual batch tariff analysis code would go here.)

# src/jobs/tariff_analysis_batch.py
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_date, year, month, dayofmonth

def create_spark_session(app_name):
    """Creates and returns a SparkSession."""
    return SparkSession.builder.appName(app_name).getOrCreate()

def run_tariff_analysis(spark, input_table, output_table, analysis_date=None):
    """
    Performs historical tariff trend analysis and writes results to a Delta table.
    """
    print(f"Starting tariff analysis job for input: {input_table}, output: {output_table}")

    if analysis_date is None:
        analysis_date = str(current_date()) # Default to current date if not provided

    try:
        # Read historical trade data (e.g., from a Delta table populated by DLT)
        trade_data_df = spark.read.table(input_table)

        # Example: Perform some tariff analysis (e.g., average tariff by HS code per month)
        tariff_trends_df = trade_data_df \
            .withColumn("analysis_year", year(col("import_date"))) \
            .withColumn("analysis_month", month(col("import_date"))) \
            .groupBy("hs_code", "analysis_year", "analysis_month") \
            .agg(
                avg("tariff_rate").alias("average_tariff_rate"),
                count("shipment_id").alias("total_shipments")
            ) \
            .withColumn("analysis_run_date", lit(analysis_date))

        # Write results to output Delta table
        # Using 'overwrite' for simplicity, but 'merge' is better for incremental updates
        tariff_trends_df.write \
            .format("delta") \
            .mode("overwrite") \
            .option("mergeSchema", "true") \
            .saveAsTable(output_table)

        print(f"Successfully completed tariff analysis. Results written to {output_table}.")

    except Exception as e:
        print(f"Error in tariff analysis job: {e}")
        import traceback
        traceback.print_exc()
        sys.exit(1)

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("Usage: tariff_analysis_batch.py <input_table> <output_table> [analysis_date]")
        sys.exit(1)

    input_table = sys.argv[1]
    output_table = sys.argv[2]
    analysis_date = sys.argv[3] if len(sys.argv) > 3 else None

    spark = create_spark_session("TariffAnalysisBatch")
    spark.sparkContext.setLogLevel("INFO")

    run_tariff_analysis(spark, input_table, output_table, analysis_date)

File: src/notebooks/anomaly_detection.ipynb (This would be your actual notebook content saved as .ipynb. For the bundle, we just need the file to exist.) You would save your notebook content from Chapter 12 as src/notebooks/anomaly_detection.ipynb.

# Placeholder for src/notebooks/anomaly_detection.ipynb content
# This is typically a mix of markdown and code cells.
# For simplicity, imagine it contains PySpark code for anomaly detection.
# Example:
# %python
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName("AnomalyDetection").getOrCreate()
#
# input_table = spark.conf.get("anomaly_detection.input_table")
# output_table = spark.conf.get("anomaly_detection.output_table")
#
# df = spark.read.table(input_table)
# # ... apply anomaly detection logic ...
# anomalies_df.write.format("delta").mode("overwrite").saveAsTable(output_table)
# print(f"Anomaly detection completed. Results in {output_table}")

c) Testing This Component

At this stage, we are just organizing files. The individual components (DLT, Structured Streaming, batch jobs) should have been tested in their respective chapters. The next step will be to test the bundle definition itself.

3.2. Defining Databricks Assets in databricks.yml

Now, let’s configure the databricks.yml file to define our Databricks resources and how they should be deployed to different environments.

a) Setup/Configuration

Open the databricks.yml file created by databricks bundle init. We will modify it to suit our project.

b) Core Implementation

The databricks.yml file uses YAML syntax to define resources, environments, and deployment settings. We’ll define:

  • Bundle Name: A unique identifier for our project.
  • Target Environments: development, staging, production, each pointing to a different Databricks workspace (or configuration within one workspace).
  • Resources:
    • DLT Pipelines: For supply_chain_ingestion.py.
    • Jobs: For logistics_cost_monitoring.py and tariff_analysis_batch.py.
    • Notebooks: For anomaly_detection.ipynb (to be run as a job).

File: databricks.yml

# databricks.yml
# This file defines the Databricks Asset Bundle for our Real-time Supply Chain project.
# It declares all Databricks resources (DLT pipelines, jobs, notebooks) and
# their configurations, allowing for consistent deployment across environments.

bundle:
  name: supply-chain-analytics-bundle # A unique name for your bundle

environments:
  # Development environment configuration
  development:
    # Host URL for your Databricks development workspace
    # Replace with your actual Databricks workspace URL (e.g., https://adb-xxxxxxxxxxxxxxxx.xx.databricks.com)
    host: {{ env.DATABRICKS_HOST_DEV }}
    # Catalog and schema for development data. Using Unity Catalog best practices.
    catalog: supply_chain_dev_catalog
    schema: dev
    # Base path for storing checkpoints and other artifacts in DBFS
    base_path: /Users/{{ user.email }}/supply_chain_dev_bundle_artifacts
    # Cluster configuration for development jobs (smaller, cost-optimized)
    cluster_config: &dev_cluster_config
      node_type_id: Standard_DS3_v2 # Example: Smaller instance type
      num_workers: 2 # Example: Fewer workers
      spark_version: 13.3.x-scala2.12 # Use a recent stable Spark version
      autotermination_minutes: 30 # Auto-terminate after inactivity
      data_security_mode: SINGLE_USER # For dev, single user is often sufficient
      runtime_engine: STANDARD
      custom_tags:
        bundle_environment: development
        bundle_name: supply-chain-analytics-bundle

  # Staging environment configuration
  staging:
    host: {{ env.DATABRICKS_HOST_STAGING }}
    catalog: supply_chain_staging_catalog
    schema: staging
    base_path: /supply_chain_staging_bundle_artifacts
    cluster_config: &staging_cluster_config
      node_type_id: Standard_DS4_v2 # Slightly larger for staging
      num_workers: 4
      spark_version: 13.3.x-scala2.12
      autoscale:
        min_workers: 2
        max_workers: 6
      autotermination_minutes: 60
      data_security_mode: SINGLE_USER # Or USER_ISOLATION if Unity Catalog is enabled and shared
      runtime_engine: PHOTON # Use Photon for better performance
      custom_tags:
        bundle_environment: staging
        bundle_name: supply-chain-analytics-bundle

  # Production environment configuration
  production:
    host: {{ env.DATABRICKS_HOST_PROD }}
    catalog: supply_chain_prod_catalog
    schema: prod
    base_path: /supply_chain_prod_bundle_artifacts
    cluster_config: &prod_cluster_config
      node_type_id: Standard_DS5_v2 # Larger, more powerful for production
      num_workers: 8
      autoscale:
        min_workers: 4
        max_workers: 16
      spark_version: 13.3.x-scala2.12
      autotermination_minutes: 120 # Keep alive longer for critical jobs
      data_security_mode: USER_ISOLATION # Recommended for production with Unity Catalog
      runtime_engine: PHOTON
      custom_tags:
        bundle_environment: production
        bundle_name: supply-chain-analytics-bundle
      # Additional production-specific configurations, e.g., instance_pool_id

resources:
  # All resources defined here will be deployed to the target environment.
  # The 'target' property within a resource can override environment settings.

  dlt_pipelines:
    # DLT pipeline for real-time supply chain event ingestion
    supply_chain_ingestion_pipeline:
      name: "{{ bundle.name }}-{{ environment.schema }}-supply-chain-ingestion" # Unique name based on bundle and environment
      target: "{{ environment.catalog }}.{{ environment.schema }}" # Unity Catalog target schema
      configuration:
        # Example DLT pipeline specific configurations
        supply_chain.kafka_source_topic: "supply_chain_events_raw"
        supply_chain.bronze_source_path: "/databricks-datasets/structured-streaming/events/" # Placeholder for demo
        # Add actual Kafka broker configs here, often from secrets or environment variables
        # spark.kafka.brokers: "{{ env.KAFKA_BROKERS }}"
      clusters:
        - label: default
          num_workers: 3 # DLT manages cluster, but you can suggest size
          node_type_id: Standard_DS4_v2
          autoscale:
            min_workers: 1
            max_workers: 5
      # Define libraries for the DLT pipeline
      libraries:
        - file:
            path: ./src/dlt_pipelines/supply_chain_ingestion.py
      continuous: false # Set to true for continuous processing
      photon: true
      channel: CURRENT # Use CURRENT for latest DLT features
      development: # Overrides for development environment
        continuous: false # Often run as triggered in dev for cost savings
        photon: false # Disable photon in dev to save cost
        channel: PREVIEW # Test new DLT features
      production: # Overrides for production environment
        continuous: true # Continuous processing for real-time
        clusters:
          - label: default
            num_workers: 5
            autoscale:
              min_workers: 2
              max_workers: 10
        # Add notifications, alerts etc. for production
        notifications:
          - alert_type: ON_FAILURE
            email_recipients:
              - "devops@example.com"
              - "data-eng@example.com"

  jobs:
    # Spark Structured Streaming job for logistics cost monitoring
    logistics_cost_monitoring_job:
      name: "{{ bundle.name }}-{{ environment.schema }}-logistics-cost-monitoring"
      tasks:
        - task_key: monitor_costs
          python_wheel_task:
            python_file: ./src/jobs/logistics_cost_monitoring.py
            parameters:
              - "{{ env.KAFKA_BOOTSTRAP_SERVERS }}" # Parameter 1: Kafka brokers
              - "logistics_cost_events"             # Parameter 2: Kafka topic
              - "{{ environment.base_path }}/checkpoints/logistics_cost_monitoring" # Parameter 3: Checkpoint
              - "{{ environment.catalog }}.{{ environment.schema }}.logistics_costs_silver" # Parameter 4: Output table
          new_cluster: *prod_cluster_config # Use production cluster config by default
          # Override cluster for dev/staging if needed
          development:
            new_cluster: *dev_cluster_config
          staging:
            new_cluster: *staging_cluster_config
          max_retries: 3 # Retry failed tasks
          timeout_seconds: 3600 # 1 hour timeout
          # Add schedule for production environment if it's not a continuous stream
          # schedule:
          #   quartz_cron_expression: "0 0 0 * * ?"
          #   timezone_id: "UTC"
      job_clusters:
        - job_cluster_key: base_cluster
          new_cluster: *prod_cluster_config # Define a base cluster for the job
          development:
            new_cluster: *dev_cluster_config
          staging:
            new_cluster: *staging_cluster_config

    # Batch job for historical tariff impact analysis
    tariff_analysis_batch_job:
      name: "{{ bundle.name }}-{{ environment.schema }}-tariff-analysis-batch"
      tasks:
        - task_key: analyze_tariffs
          python_wheel_task:
            python_file: ./src/jobs/tariff_analysis_batch.py
            parameters:
              - "{{ environment.catalog }}.{{ environment.schema }}.trade_data_gold" # Input table
              - "{{ environment.catalog }}.{{ environment.schema }}.tariff_trends_gold" # Output table
              # Optional: Pass analysis_date as a parameter, or let job default
          new_cluster: *prod_cluster_config
          development:
            new_cluster: *dev_cluster_config
          staging:
            new_cluster: *staging_cluster_config
          max_retries: 1
          timeout_seconds: 1800
      schedule: # Schedule this batch job
        quartz_cron_expression: "0 0 2 * * ?" # Run daily at 2 AM UTC
        timezone_id: "UTC"
      email_notifications:
        on_failure:
          - "data-eng@example.com"

    # Notebook job for HS Code anomaly detection
    anomaly_detection_notebook_job:
      name: "{{ bundle.name }}-{{ environment.schema }}-anomaly-detection"
      tasks:
        - task_key: detect_anomalies
          notebook_task:
            notebook_path: ./src/notebooks/anomaly_detection.ipynb
            source: WORKSPACE # Deploy notebook to workspace
            base_parameters:
              input_table: "{{ environment.catalog }}.{{ environment.schema }}.hs_code_classification_silver"
              output_table: "{{ environment.catalog }}.{{ environment.schema }}.hs_code_anomalies_gold"
          new_cluster: *prod_cluster_config
          development:
            new_cluster: *dev_cluster_config
          staging:
            new_cluster: *staging_cluster_config
          max_retries: 1
          timeout_seconds: 1800
      schedule: # Schedule this notebook job
        quartz_cron_expression: "0 0 4 * * ?" # Run daily at 4 AM UTC
        timezone_id: "UTC"

Explanation of databricks.yml:

  • bundle.name: A unique identifier for your project bundle.
  • environments: Defines different deployment targets.
    • host: The Databricks workspace URL. We use {{ env.DATABRICKS_HOST_DEV }} etc. to pull this from environment variables, which is crucial for CI/CD.
    • catalog / schema: Specifies the Unity Catalog catalog and schema to use for tables created by this environment. This ensures data isolation.
    • base_path: A DBFS path for checkpoints, logs, and other artifacts.
    • cluster_config: Reusable cluster configurations for jobs. We use YAML anchors (&dev_cluster_config) and aliases (*dev_cluster_config) for cleaner syntax.
  • resources: This section defines the actual Databricks objects to be deployed.
    • dlt_pipelines: Defines Delta Live Tables pipelines.
      • target: The Unity Catalog schema where DLT tables will be created.
      • configuration: Key-value pairs passed as pipeline settings.
      • libraries: Points to our DLT Python script.
      • continuous: true for real-time, false for triggered.
      • development/production overrides: Allows specific settings for different environments (e.g., continuous can be false in development to save costs, but true in production).
    • jobs: Defines Databricks Jobs.
      • tasks: A list of tasks within a job.
      • python_wheel_task: For deploying Python scripts. python_file points to our script, parameters are command-line arguments.
      • notebook_task: For deploying and running notebooks. notebook_path points to our notebook.
      • new_cluster: Specifies the cluster configuration for the task, referencing our environment-defined clusters.
      • schedule: Defines a cron schedule for batch jobs.
      • email_notifications: Configures alerts for job status.
  • Variable Substitution ({{ ... }}): Databricks Asset Bundles support powerful variable substitution:
    • {{ bundle.name }}: The name of the bundle.
    • {{ environment.schema }}: The schema defined for the current deployment environment.
    • {{ user.email }}: The email of the user running the bundle (useful for dev paths).
    • {{ env.YOUR_ENV_VAR }}: Accesses environment variables set in your shell or CI/CD system. This is crucial for sensitive data like Kafka brokers or API keys, and for host URLs.

c) Testing This Component

Before setting up CI/CD, let’s validate and try a local deployment.

  1. Install Databricks CLI: If you haven’t already, ensure you have Databricks CLI v0.200 or higher.
    pip install databricks-cli --upgrade
    
  2. Configure Databricks CLI: Authenticate your local CLI with your Databricks workspace.
    databricks configure --token
    # Databricks Host (e.g., https://adb-xxxxxxxxxxxxxxxx.xx.databricks.com): <YOUR_DEV_DATABRICKS_HOST>
    # Token: <YOUR_DATABRICKS_PAT>
    
    • Important: Generate a Personal Access Token (PAT) from your Databricks workspace settings (User Settings -> Developer -> Access Tokens). Store it securely.
  3. Set Environment Variables: For a local test, you need to export the environment variables referenced in databricks.yml.
    export DATABRICKS_HOST_DEV="<YOUR_DEV_DATABRICKS_HOST>"
    export DATABRICKS_HOST_STAGING="<YOUR_STAGING_DATABRICKS_HOST>" # If you have one
    export DATABRICKS_HOST_PROD="<YOUR_PROD_DATABRICKS_HOST>"     # If you have one
    export KAFKA_BOOTSTRAP_SERVERS="<YOUR_KAFKA_BROKERS_HOST:PORT>" # e.g., pkc-xxxx.us-east-1.aws.confluent.cloud:9092
    
  4. Validate the Bundle:
    databricks bundle validate
    
    This command checks your databricks.yml for syntax errors and ensures all referenced files exist. It’s a critical step before deployment.
  5. Deploy to Development Environment:
    databricks bundle deploy --target development
    
    This command will deploy all resources defined under the development environment in your databricks.yml to your configured Databricks workspace.
    • Expected Behavior: You should see output indicating resource creation/update (DLT pipelines, jobs). If successful, navigate to your Databricks workspace UI (Workflows -> Delta Live Tables and Workflows -> Jobs) and verify that the pipelines and jobs have been created. You can then manually start them to ensure they run correctly.

3.3. Setting up CI/CD with GitHub Actions

Now that our bundle is defined and deployable locally, let’s automate the deployment using GitHub Actions.

a) Setup/Configuration

Create the GitHub Actions workflow file: .github/workflows/deploy.yml.

b) Core Implementation

This workflow will trigger on push events to specific branches (main for staging, release for production) and use the databricks bundle deploy command.

File: .github/workflows/deploy.yml

# .github/workflows/deploy.yml
name: Databricks Bundle CI/CD

on:
  push:
    branches:
      - main # Triggers for staging deployment
      - release # Triggers for production deployment
  workflow_dispatch: # Allows manual triggering of the workflow

env:
  # Common environment variables for all jobs
  DATABRICKS_CLI_VERSION: "0.210.0" # Use a stable and recent version of Databricks CLI

jobs:
  validate-bundle:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout repository
        uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.10' # Ensure a compatible Python version

      - name: Install Databricks CLI
        run: pip install databricks-cli==${{ env.DATABRICKS_CLI_VERSION }}

      - name: Validate Databricks Bundle
        # The 'validate' command does not require authentication
        run: databricks bundle validate -t development # Validate against a specific target's config

  deploy-staging:
    needs: validate-bundle # Only run if validation passes
    if: github.ref == 'refs/heads/main' # Only deploy to staging on push to 'main'
    runs-on: ubuntu-latest
    environment: staging # Specify GitHub environment for secrets management
    steps:
      - name: Checkout repository
        uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.10'

      - name: Install Databricks CLI
        run: pip install databricks-cli==${{ env.DATABRICKS_CLI_VERSION }}

      - name: Configure Databricks CLI for Staging
        run: |
          databricks configure --host ${{ secrets.DATABRICKS_HOST_STAGING }} --token ${{ secrets.DATABRICKS_TOKEN_STAGING }}
          # Also set environment variables for bundle processing
          echo "DATABRICKS_HOST_STAGING=${{ secrets.DATABRICKS_HOST_STAGING }}" >> $GITHUB_ENV
          echo "KAFKA_BOOTSTRAP_SERVERS=${{ secrets.KAFKA_BOOTSTRAP_SERVERS_STAGING }}" >> $GITHUB_ENV
        env:
          DATABRICKS_HOST_STAGING: ${{ secrets.DATABRICKS_HOST_STAGING }}
          DATABRICKS_TOKEN_STAGING: ${{ secrets.DATABRICKS_TOKEN_STAGING }}
          KAFKA_BOOTSTRAP_SERVERS_STAGING: ${{ secrets.KAFKA_BOOTSTRAP_SERVERS_STAGING }}

      - name: Deploy Databricks Bundle to Staging
        run: databricks bundle deploy --target staging

  deploy-production:
    needs: deploy-staging # Only run if staging deployment passes
    if: github.ref == 'refs/heads/release' # Only deploy to production on push to 'release'
    runs-on: ubuntu-latest
    environment: production # Specify GitHub environment for secrets management
    steps:
      - name: Checkout repository
        uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.10'

      - name: Install Databricks CLI
        run: pip install databricks-cli==${{ env.DATABRICKS_CLI_VERSION }}

      - name: Configure Databricks CLI for Production
        run: |
          databricks configure --host ${{ secrets.DATABRICKS_HOST_PROD }} --token ${{ secrets.DATABRICKS_TOKEN_PROD }}
          # Also set environment variables for bundle processing
          echo "DATABRICKS_HOST_PROD=${{ secrets.DATABRICKS_HOST_PROD }}" >> $GITHUB_ENV
          echo "KAFKA_BOOTSTRAP_SERVERS=${{ secrets.KAFKA_BOOTSTRAP_SERVERS_PROD }}" >> $GITHUB_ENV
        env:
          DATABRICKS_HOST_PROD: ${{ secrets.DATABRICKS_HOST_PROD }}
          DATABRICKS_TOKEN_PROD: ${{ secrets.DATABRICKS_TOKEN_PROD }}
          KAFKA_BOOTSTRAP_SERVERS_PROD: ${{ secrets.KAFKA_BOOTSTRAP_SERVERS_PROD }}

      - name: Deploy Databricks Bundle to Production
        run: databricks bundle deploy --target production

c) Testing This Component

  1. GitHub Repository: Push all your code (src/, databricks.yml, .github/workflows/deploy.yml) to a GitHub repository.
  2. GitHub Secrets: In your GitHub repository settings, go to Settings -> Security -> Secrets and variables -> Actions. Create repository secrets for:
    • DATABRICKS_HOST_STAGING: URL of your staging Databricks workspace.
    • DATABRICKS_TOKEN_STAGING: PAT for the staging workspace.
    • KAFKA_BOOTSTRAP_SERVERS_STAGING: Kafka brokers for staging.
    • DATABRICKS_HOST_PROD: URL of your production Databricks workspace.
    • DATABRICKS_TOKEN_PROD: PAT for the production workspace.
    • KAFKA_BOOTSTRAP_SERVERS_PROD: Kafka brokers for production.
    • Security Best Practice: Use separate PATs (or Service Principals) for each environment with the least privilege necessary.
  3. Trigger Workflow:
    • Push a change to the main branch. This should trigger validate-bundle and then deploy-staging.
    • Once main is stable, create a release branch from main and push to release. This should trigger validate-bundle and then deploy-production.
    • Alternatively, use workflow_dispatch to manually trigger the workflow and select a branch.
  4. Monitor Workflow: Go to the “Actions” tab in your GitHub repository to monitor the workflow execution. Check logs for any errors.
  5. Verify Deployment: After successful deployment, check your Databricks Staging and Production workspaces. You should see the DLT pipelines and jobs created/updated under the “Workflows” section, corresponding to the names defined in databricks.yml.

Production Considerations

  1. Security:

    • Databricks Authentication: For production CI/CD, always prefer Service Principals over Personal Access Tokens (PATs). Service Principals offer better security, auditability, and lifecycle management. Configure your CI/CD to use Service Principal credentials (client ID, client secret, tenant ID) stored as secrets.
    • Least Privilege: Ensure the Service Principal or PAT used for deployment has only the necessary permissions (e.g., CAN_MANAGE on DLT pipelines and jobs, CAN_MANAGE on clusters if creating new ones, CAN_USE on Unity Catalog schemas).
    • Unity Catalog: Leverage Unity Catalog for fine-grained access control to your data. The bundle configuration uses catalog.schema for table targets, ensuring data isolation and governance.
    • Secrets Management: Never hardcode sensitive information (Kafka brokers, API keys, database credentials) in databricks.yml or your code. Use environment variables and CI/CD secrets (like GitHub Secrets) that are securely injected at runtime.
  2. Performance Optimization:

    • Cluster Sizing: Define appropriate cluster sizes (node_type_id, num_workers, autoscale) in databricks.yml for each environment. Production clusters should be scaled for performance and resilience, while development clusters can be smaller to save costs.
    • Photon Engine: Enable Photon in production environments for DLT and Spark jobs where applicable, as it significantly boosts query performance.
    • DLT Continuous vs. Triggered: For real-time requirements, DLT pipelines should be continuous: true in production. For less stringent latency, continuous: false with a frequent trigger can be more cost-effective.
    • Checkpoint Locations: Ensure checkpoint locations are on reliable, performant storage (e.g., cloud object storage, not local DBFS) and are unique per environment.
  3. Error Handling & Resilience:

    • DLT Error Handling: DLT pipelines inherently offer robust error handling with expectations (expect_or_drop, expect_or_fail). Configure notifications in databricks.yml to alert on pipeline failures.
    • Job Retries: Configure max_retries for Databricks Jobs in databricks.yml to handle transient failures gracefully.
    • Timeout: Set timeout_seconds for jobs to prevent them from running indefinitely due to issues.
    • DLT Channel: Use CURRENT for production DLT pipelines for stability, and PREVIEW in development/staging to test new features.
  4. Logging and Monitoring:

    • Databricks Logging: All job and DLT pipeline runs generate logs within Databricks. Ensure your Python scripts use standard logging practices (logging module) to output relevant information.
    • Notifications: Configure email notifications in databricks.yml for job failures or DLT pipeline updates.
    • Monitoring Tools: Integrate Databricks logs and metrics with external monitoring solutions (e.g., Datadog, Prometheus, Splunk) for centralized observability. (This will be covered in the next chapter).
  5. Deployment Strategies:

    • Blue-Green/Canary Deployments: While databricks bundle deploy performs an in-place update, you can implement more advanced strategies by defining multiple DLT pipelines or jobs within your bundle (e.g., supply-chain-ingestion-v1, supply-chain-ingestion-v2) and gradually shifting traffic, or deploying to a separate “blue” workspace and switching DNS. Bundles provide the IaC foundation for such strategies.
    • Rollback: Version control (Git) provides the primary rollback mechanism. If a deployment fails or introduces issues, you can revert to a previous commit and re-deploy.

Code Review Checkpoint

At this point, we have significantly enhanced our project’s operational readiness.

Summary of what was built:

  • Databricks Asset Bundle (databricks.yml): A declarative definition of our DLT pipelines, Spark Structured Streaming jobs, batch jobs, and notebook jobs, along with their environment-specific configurations (cluster sizes, target schemas, parameters).
  • Structured Codebase: Our Python scripts and notebooks are organized within the src/ directory, ready for bundle deployment.
  • GitHub Actions Workflow: An automated CI/CD pipeline that validates our bundle and deploys it to staging (on main branch push) and production (on release branch push).
  • Environment-Specific Configuration: Robust handling of different configurations for development, staging, and production using variable substitution and GitHub Secrets.

Files created/modified:

  • databricks.yml (New or heavily modified)
  • src/dlt_pipelines/supply_chain_ingestion.py (Existing, potentially refined for bundle parameters)
  • src/jobs/logistics_cost_monitoring.py (Existing, potentially refined for bundle parameters)
  • src/jobs/tariff_analysis_batch.py (Existing, potentially refined for bundle parameters)
  • src/notebooks/anomaly_detection.ipynb (Existing, potentially refined for bundle parameters)
  • .github/workflows/deploy.yml (New)

How it integrates with existing code: The Databricks Asset Bundle acts as the orchestration layer for our existing DLT and Spark code. It picks up the Python scripts and notebooks from the src directory, packages them, and deploys them as defined in databricks.yml. The parameters defined in the bundle (e.g., kafka_bootstrap_servers, checkpoint_location, output_table_name) are passed directly to our Python scripts, making them configurable per environment without code changes.

Common Issues & Solutions

  1. Bundle Validation Errors (databricks bundle validate):

    • Issue: databricks bundle validate fails with YAML syntax errors or file not found errors.
    • Solution: Carefully check your databricks.yml for correct YAML indentation and syntax. Ensure all file paths (e.g., ./src/dlt_pipelines/supply_chain_ingestion.py) are correct relative to the databricks.yml file.
    • Prevention: Use a good YAML linter in your IDE. Always run databricks bundle validate locally before committing.
  2. Permissions Issues During Deployment:

    • Issue: The CI/CD pipeline fails with PERMISSION_DENIED errors when deploying to Databricks.
    • Solution: The Databricks PAT or Service Principal used by your CI/CD system lacks the necessary permissions.
      • Verify the PAT/Service Principal has CAN_MANAGE permissions on the target workspace’s DLT pipelines, Jobs, and potentially CAN_MANAGE on the clusters or instance pools if the bundle creates new ones.
      • For Unity Catalog, ensure the identity has USE CATALOG and CREATE SCHEMA (or USE SCHEMA) permissions on the target catalog/schema.
    • Debugging: Check the detailed error message in the CI/CD logs. It usually indicates which specific permission is missing.
  3. Environment Variable/Secret Configuration Problems:

    • Issue: databricks bundle deploy fails because an environment variable (e.g., KAFKA_BOOTSTRAP_SERVERS) is not found, or its value is incorrect.
    • Solution:
      • Local: Ensure you have exported all required environment variables in your shell before running databricks bundle deploy.
      • CI/CD: Double-check that all GitHub Secrets are correctly named, have the correct values, and are exposed to the workflow job using the env: block in the GitHub Actions YAML. Remember that secrets are case-sensitive.
    • Prevention: Use a consistent naming convention for secrets. Test local deployment with dummy environment variables first to ensure the bundle picks them up correctly.

Testing & Verification

To thoroughly test and verify the CI/CD setup:

  1. Trigger a Full Deployment:
    • Make a minor, innocuous change to one of your DLT Python files (e.g., add a comment).
    • Commit the change and push it to your main branch.
    • Observe the GitHub Actions workflow. It should trigger, run validate-bundle, and then deploy-staging.
    • Once deploy-staging is successful, create a release branch from main and push it. This should trigger deploy-production.
  2. Verify Databricks Resources:
    • Log in to your Databricks Staging workspace. Navigate to “Workflows” -> “Delta Live Tables” and “Workflows” -> “Jobs”.
    • Confirm that supply-chain-analytics-bundle-staging-supply-chain-ingestion DLT pipeline is present.
    • Confirm that supply-chain-analytics-bundle-staging-logistics-cost-monitoring and other jobs are present.
    • Repeat this verification for your Databricks Production workspace after the production deployment.
  3. Run and Monitor Pipelines/Jobs:
    • DLT Pipeline: Manually start the DLT pipeline in the staging workspace. Check the pipeline graph and event log for successful execution and data processing. Verify that data lands in the supply_chain_staging_catalog.staging schema.
    • Streaming Job: Manually run the logistics-cost-monitoring job in staging. If you have a Kafka producer configured for staging, send some test messages and observe if the job consumes them and writes to the logistics_costs_silver table.
    • Batch Jobs: Manually run the tariff-analysis-batch and anomaly-detection jobs. Verify their output tables.
    • Automated Runs: Ensure that scheduled batch jobs (tariff analysis, anomaly detection) are correctly scheduled in Databricks.
  4. Check Logs: Review the logs of the running pipelines/jobs in Databricks for any errors or warnings.
  5. Data Validation: Perform spot checks on the data generated in the target Delta tables in both staging and production to ensure correctness and completeness.

Summary & Next Steps

Congratulations! You have successfully implemented a robust CI/CD pipeline for your Databricks data assets using Databricks Asset Bundles and GitHub Actions. This chapter has transformed our individual data pipelines into a deployable, version-controlled, and automated system, a critical step towards production readiness. We covered:

  • The importance of CI/CD for data engineering projects.
  • How to structure your Databricks project for bundle deployment.
  • Defining DLT pipelines, Spark Jobs, and Notebook Jobs in databricks.yml.
  • Parameterizing configurations for different environments using variable substitution.
  • Automating deployment with GitHub Actions, including secret management.
  • Key production considerations for security, performance, error handling, and deployment strategies.

With CI/CD in place, you can now confidently iterate on your data pipelines, knowing that every change will be validated and deployed consistently. This automation significantly reduces manual errors, accelerates development cycles, and ensures the reliability of our real-time supply chain analytics solution.

In the next and final chapter, we will focus on Monitoring, Alerting, and Observability for Real-time Data Pipelines. We’ll integrate Databricks with monitoring tools, set up custom alerts for pipeline health and data quality, and establish dashboards to provide real-time insights into our supply chain operations.