Chapter 10: Anomaly Detection for Trade Data and Logistics Costs
Chapter Introduction
In the intricate world of supply chain management, unexpected deviations can lead to significant financial losses, operational inefficiencies, and compliance risks. Identifying these anomalies in real-time is paramount for proactive decision-making. This chapter focuses on building robust anomaly detection mechanisms for two critical areas: HS Code classifications within trade data and real-time logistics costs. We will leverage Databricks’ powerful ecosystem, including Delta Lake for reliable data storage, PySpark for scalable data processing, and MLflow for managing the end-to-end machine learning lifecycle, from experimentation to model deployment.
By the end of this chapter, you will have implemented a system capable of automatically detecting unusual patterns in your trade and logistics data. This includes identifying potentially misclassified HS codes, fraudulent activities, or sudden, unexplained spikes or drops in logistics expenses. We will ensure the models are tracked and managed using MLflow, adhering to best practices for production-ready machine learning pipelines.
This chapter builds upon the data pipelines established in previous sections, particularly the customs_trade.hs_code_classification_gold table from our Customs Trade Data Lakehouse and the logistics.cost_monitoring_gold table from the Streaming Logistics Cost Monitoring. Our expected outcome is a set of trained anomaly detection models registered in MLflow, along with a process to apply these models to incoming data, writing any detected anomalies to dedicated Delta Lake tables for further investigation and alerting.
Planning & Design
Component Architecture
The anomaly detection system will integrate seamlessly with our existing Databricks Lakehouse architecture.
- Data Sources:
customs_trade.hs_code_classification_gold: Contains validated and enriched HS code classification data.logistics.cost_monitoring_gold: Contains real-time and historical logistics cost data, including tariff and fuel price correlations.
- Data Preparation & Feature Engineering: PySpark will be used to load data from Delta tables, perform any necessary feature engineering (e.g., scaling, one-hot encoding for categorical features) to prepare it for anomaly detection models.
- Anomaly Detection Models: We will implement two distinct anomaly detection models, one for trade data (HS Codes) and one for logistics costs. For simplicity and broad applicability, we’ll start with the Isolation Forest algorithm, known for its effectiveness in high-dimensional datasets and its ability to handle various data types.
- MLflow for Model Management:
- Experiment Tracking: MLflow will log model parameters (e.g.,
contamination), evaluation metrics (e.g., number of anomalies detected), and artifacts (the trained model itself) for each training run. - Model Registry: Once a model performs satisfactorily, it will be registered in the MLflow Model Registry, allowing for version control, stage transitions (Staging, Production), and easy deployment.
- Experiment Tracking: MLflow will log model parameters (e.g.,
- Inference & Anomaly Output: The registered models will be loaded for inference. When new data arrives, the models will predict anomaly scores. Data points identified as anomalies will be enriched with metadata (e.g., anomaly score, model version) and written to dedicated Delta Lake tables.
- Anomaly Sink:
anomalies.detected_hs_code_anomalies: Stores anomalies related to HS code classifications.anomalies.detected_logistics_cost_anomalies: Stores anomalies related to logistics costs.anomalies.all_detected_anomalies: A unified view or table combining all detected anomalies for a holistic overview.
#### Database Schema (New Tables)
We will introduce new Delta tables under a new `anomalies` schema in Unity Catalog to store the detected anomalies.
**Schema: `anomalies`**
1. **`anomalies.detected_hs_code_anomalies`**
* `anomaly_id` (STRING): Unique identifier for the anomaly.
* `transaction_id` (STRING): Original transaction ID from `hs_code_classification_gold`.
* `hs_code` (STRING): The HS code identified as anomalous.
* `product_name` (STRING): Product name associated with the anomaly.
* `country_of_origin` (STRING): Country of origin.
* `destination_country` (STRING): Destination country.
* `transaction_value` (DOUBLE): Transaction value.
* `quantity` (DOUBLE): Quantity.
* `anomaly_score` (DOUBLE): The score assigned by the anomaly detection model (lower indicates higher anomaly).
* `is_anomaly` (BOOLEAN): Flag indicating if it's an anomaly.
* `detection_timestamp` (TIMESTAMP): When the anomaly was detected.
* `model_version` (STRING): Version of the MLflow model used for detection.
* `model_name` (STRING): Name of the MLflow model.
* `details` (STRING): JSON string for additional context/features at time of detection.
2. **`anomalies.detected_logistics_cost_anomalies`**
* `anomaly_id` (STRING): Unique identifier for the anomaly.
* `shipment_id` (STRING): Original shipment ID from `cost_monitoring_gold`.
* `route_id` (STRING): Route ID.
* `total_cost_usd` (DOUBLE): Total cost of the shipment.
* `cost_per_unit_usd` (DOUBLE): Cost per unit.
* `fuel_price_usd_per_liter` (DOUBLE): Fuel price at the time.
* `tariff_impact_usd` (DOUBLE): Tariff impact at the time.
* `anomaly_score` (DOUBLE): The score assigned by the anomaly detection model.
* `is_anomaly` (BOOLEAN): Flag indicating if it's an anomaly.
* `detection_timestamp` (TIMESTAMP): When the anomaly was detected.
* `model_version` (STRING): Version of the MLflow model used for detection.
* `model_name` (STRING): Name of the MLflow model.
* `details` (STRING): JSON string for additional context/features.
3. **`anomalies.all_detected_anomalies`** (View or Union Table)
This will be a view or a Delta table created by `UNION`ing the two anomaly tables, possibly adding a `anomaly_type` column for distinction. For simplicity, we'll suggest a view for now.
#### File Structure
We will implement this within a single Databricks notebook for clarity in a tutorial setting, but in a production environment, these would be separate Python modules or notebooks orchestrated by Databricks Workflows (Jobs).
├── notebooks/ │ ├── 10_anomaly_detection_mlflow/ │ │ └── anomaly_detection_pipeline.py (Databricks notebook, written in Python) ├── conf/ │ └── pipeline_config.py (Existing configuration file)
### Step-by-Step Implementation
We will perform all steps within a Databricks Notebook. Ensure your cluster has access to the necessary libraries (e.g., `scikit-learn`, `mlflow`). Databricks Runtime typically includes these.
#### 1. Setup/Configuration
First, we'll set up our Databricks environment, import necessary libraries, and define global configurations.
**File:** `notebooks/10_anomaly_detection_mlflow/anomaly_detection_pipeline.py`
```python
# Databricks Notebook: anomaly_detection_pipeline.py
# --- 1. Setup/Configuration ---
import logging
import uuid
import json
from datetime import datetime
import mlflow
import mlflow.sklearn
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp, sha2, concat_ws, col, to_json, struct
from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline as SparkPipeline
from sklearn.ensemble import IsolationForest
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score # For evaluation if labels were available
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Initialize Spark Session (already available in Databricks notebooks)
spark = SparkSession.builder.appName("AnomalyDetectionPipeline").getOrCreate()
# Unity Catalog configuration
CATALOG_NAME = "main" # Replace with your Unity Catalog name
SCHEMA_ANOMALIES = "anomalies"
SCHEMA_CUSTOMS = "customs_trade"
SCHEMA_LOGISTICS = "logistics"
# Input Tables
HS_CODE_GOLD_TABLE = f"{CATALOG_NAME}.{SCHEMA_CUSTOMS}.hs_code_classification_gold"
LOGISTICS_COST_GOLD_TABLE = f"{CATALOG_NAME}.{SCHEMA_LOGISTICS}.cost_monitoring_gold"
# Output Tables
DETECTED_HS_CODE_ANOMALIES_TABLE = f"{CATALOG_NAME}.{SCHEMA_ANOMALIES}.detected_hs_code_anomalies"
DETECTED_LOGISTICS_COST_ANOMALIES_TABLE = f"{CATALOG_NAME}.{SCHEMA_ANOMALIES}.detected_logistics_cost_anomalies"
# MLflow Experiment Names
MLFLOW_EXPERIMENT_HS_CODE = "/Shared/anomaly_detection/hs_code_anomalies"
MLFLOW_EXPERIMENT_LOGISTICS_COST = "/Shared/anomaly_detection/logistics_cost_anomalies"
# Create schemas if they don't exist (ensure you have necessary permissions)
try:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG_NAME}.{SCHEMA_ANOMALIES}")
logger.info(f"Schema {CATALOG_NAME}.{SCHEMA_ANOMALIES} ensured.")
except Exception as e:
logger.error(f"Failed to create schema {CATALOG_NAME}.{SCHEMA_ANOMALIES}: {e}")
# In a real production system, this might be handled via IaC like Databricks Asset Bundles.
# Define output table DDLs
# DDL for HS Code Anomalies
hs_code_anomalies_ddl = f"""
CREATE TABLE IF NOT EXISTS {DETECTED_HS_CODE_ANOMALIES_TABLE} (
anomaly_id STRING,
transaction_id STRING,
hs_code STRING,
product_name STRING,
country_of_origin STRING,
destination_country STRING,
transaction_value DOUBLE,
quantity DOUBLE,
anomaly_score DOUBLE,
is_anomaly BOOLEAN,
detection_timestamp TIMESTAMP,
model_version STRING,
model_name STRING,
details STRING
) USING DELTA
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
"""
# DDL for Logistics Cost Anomalies
logistics_cost_anomalies_ddl = f"""
CREATE TABLE IF NOT EXISTS {DETECTED_LOGISTICS_COST_ANOMALIES_TABLE} (
anomaly_id STRING,
shipment_id STRING,
route_id STRING,
total_cost_usd DOUBLE,
cost_per_unit_usd DOUBLE,
fuel_price_usd_per_liter DOUBLE,
tariff_impact_usd DOUBLE,
anomaly_score DOUBLE,
is_anomaly BOOLEAN,
detection_timestamp TIMESTAMP,
model_version STRING,
model_name STRING,
details STRING
) USING DELTA
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
"""
try:
spark.sql(hs_code_anomalies_ddl)
spark.sql(logistics_cost_anomalies_ddl)
logger.info("Anomaly output tables ensured.")
except Exception as e:
logger.error(f"Failed to create anomaly output tables: {e}")
Explanation:
- We import necessary Python and PySpark libraries, including
mlflowandsklearn.ensemble.IsolationForest. - Logging is configured to provide informative messages.
- SparkSession is initialized (though usually implicitly available in Databricks notebooks).
- Unity Catalog and schema names are defined as constants for easy modification and clarity.
- Input and output table names are set.
- MLflow experiment names are defined to organize our model runs.
- We explicitly create the
anomaliesschema and the Delta tables for storing detected anomalies, includingTBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')for potential future CDC (Change Data Capture) applications. Error handling is included for schema and table creation.
2. Core Implementation
We will implement two main functions: one for HS Code anomaly detection and another for logistics cost anomaly detection. Each function will handle data loading, feature engineering, model training, MLflow tracking, and inference.
2.1. HS Code Anomaly Detection
This section focuses on identifying unusual HS code classifications based on features like transaction value, quantity, and origin/destination countries.
File: notebooks/10_anomaly_detection_mlflow/anomaly_detection_pipeline.py (Append to existing content)
# --- 2.1. HS Code Anomaly Detection ---
def train_and_detect_hs_code_anomalies(
input_table_name: str,
output_table_name: str,
mlflow_experiment_name: str,
model_name: str,
contamination: float = 0.01,
train_data_limit: int = 100000
):
"""
Trains an Isolation Forest model for HS Code anomaly detection,
logs it to MLflow, and applies it to new data.
Args:
input_table_name (str): Full path to the HS Code gold Delta table.
output_table_name (str): Full path to the output Delta table for anomalies.
mlflow_experiment_name (str): MLflow experiment name.
model_name (str): Name to register the model in MLflow Model Registry.
contamination (float): The proportion of outliers in the data set.
train_data_limit (int): Limit for training data rows to prevent OOM errors.
"""
logger.info(f"Starting HS Code anomaly detection pipeline for experiment: {mlflow_experiment_name}")
mlflow.set_experiment(mlflow_experiment_name)
with mlflow.start_run(run_name="HS_Code_Anomaly_Training"):
mlflow.log_param("contamination", contamination)
mlflow.log_param("train_data_limit", train_data_limit)
try:
# 1. Load Data for Training
df_hs_code = spark.table(input_table_name)
# Select relevant features and convert to Pandas for scikit-learn
# For simplicity, we'll use a sampled batch for training.
# In production, consider a larger, representative dataset or streaming retraining.
feature_cols = [
"transaction_value",
"quantity",
"tariff_rate"
]
# For categorical features like country_of_origin, destination_country, we need to encode them.
# Using PySpark ML for preprocessing before converting to Pandas for IsolationForest.
# This handles large datasets more efficiently.
# Drop rows with nulls in critical features for this model
df_hs_code_cleaned = df_hs_code.select(
"transaction_id", "hs_code", "product_name",
"country_of_origin", "destination_country",
"transaction_value", "quantity", "tariff_rate"
).na.drop(subset=feature_cols)
# String Indexer for categorical features
country_indexer = StringIndexer(inputCol="country_of_origin", outputCol="country_of_origin_indexed", handleInvalid="keep")
dest_indexer = StringIndexer(inputCol="destination_country", outputCol="destination_country_indexed", handleInvalid="keep")
# One-Hot Encoder for indexed categorical features
country_ohe = OneHotEncoder(inputCol="country_of_origin_indexed", outputCol="country_of_origin_vec")
dest_ohe = OneHotEncoder(inputCol="destination_country_indexed", outputCol="destination_country_vec")
# Assemble all features into a single vector
assembler = VectorAssembler(
inputCols=feature_cols + ["country_of_origin_vec", "destination_country_vec"],
outputCol="features_raw"
)
# Scale numerical features
scaler = StandardScaler(inputCol="features_raw", outputCol="features", withStd=True, withMean=True)
# Create a PySpark ML Pipeline for preprocessing
preprocessing_pipeline = SparkPipeline(stages=[
country_indexer,
dest_indexer,
country_ohe,
dest_ohe,
assembler,
scaler
])
# Fit the preprocessing pipeline on a sample of the data
logger.info(f"Fitting preprocessing pipeline on {input_table_name}...")
# Sample to prevent OOM for large datasets during initial pipeline fit
df_train_sample = df_hs_code_cleaned.sample(fraction=0.1, seed=42).limit(train_data_limit)
fitted_pipeline = preprocessing_pipeline.fit(df_train_sample)
# Transform the entire dataset (or a recent batch for training)
df_processed = fitted_pipeline.transform(df_hs_code_cleaned).cache() # Cache for repeated use
# Select features for Isolation Forest (as Pandas DataFrame)
# Take a sample for training the Isolation Forest model
pandas_df_train = df_processed.select("features").sample(fraction=0.1, seed=42).limit(train_data_limit).toPandas()
X_train = pd.DataFrame(pandas_df_train['features'].tolist())
if X_train.empty:
logger.warning("No data available for HS Code model training. Skipping.")
mlflow.log_param("status", "skipped_no_data")
return
# 2. Train Isolation Forest Model
logger.info(f"Training Isolation Forest model with {X_train.shape[0]} samples...")
model = IsolationForest(contamination=contamination, random_state=42)
model.fit(X_train)
# 3. Log Model with MLflow
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="hs_code_anomaly_model",
registered_model_name=model_name,
conda_env={
"channels": ["conda-forge"],
"dependencies": [
"python=3.9",
"pip",
{"pip": ["mlflow==2.10.1", "scikit-learn==1.3.2", "pandas==2.1.4", "pyspark==3.5.0"]}
]
}
)
logger.info(f"HS Code Anomaly Model logged to MLflow and registered as '{model_name}'.")
# 4. Perform Inference on recent data (or a batch)
logger.info("Performing inference on HS Code data...")
# We will use the fitted PySpark pipeline to transform the inference data
# Then convert to Pandas for sklearn model prediction
# Get the latest version of the registered model for inference
latest_model_version = mlflow.tracking.MlflowClient().get_latest_versions(model_name, stages=["None"])[0].version
logged_model_uri = f"models:/{model_name}/{latest_model_version}"
loaded_model = mlflow.sklearn.load_model(logged_model_uri)
logger.info(f"Loaded MLflow model '{model_name}' version {latest_model_version} for inference.")
# Apply the preprocessing pipeline to the full dataset for inference
df_inference_processed = fitted_pipeline.transform(df_hs_code_cleaned)
# Convert to Pandas for inference (on a batch, not the entire Spark DF at once)
# For production, consider using Spark UDF with the loaded model for distributed inference
# Or use a PySpark MLlib equivalent if available for IsolationForest
# For this tutorial, we'll take a batch for inference.
# In a real-time scenario, this would be new incoming data.
pandas_df_inference = df_inference_processed.select("transaction_id", "hs_code", "product_name",
"country_of_origin", "destination_country",
"transaction_value", "quantity", "tariff_rate",
"features").toPandas()
if pandas_df_inference.empty:
logger.info("No data for HS Code inference. Skipping anomaly writing.")
return
X_inference = pd.DataFrame(pandas_df_inference['features'].tolist())
# Predict anomaly scores (-1 for outliers, 1 for inliers)
pandas_df_inference['anomaly_score'] = loaded_model.decision_function(X_inference)
pandas_df_inference['is_anomaly'] = pandas_df_inference['anomaly_score'] < 0 # Threshold for outliers
# Filter for anomalies
anomalies_df = pandas_df_inference[pandas_df_inference['is_anomaly']]
if not anomalies_df.empty:
# Prepare anomalies for writing to Delta Lake
spark_anomalies_df = spark.createDataFrame(anomalies_df) \
.withColumn("anomaly_id", sha2(concat_ws("_", col("transaction_id"), current_timestamp()), 256)) \
.withColumn("detection_timestamp", current_timestamp()) \
.withColumn("model_version", lit(str(latest_model_version))) \
.withColumn("model_name", lit(model_name)) \
.withColumn("details", to_json(struct(
col("hs_code").alias("original_hs_code"), # Store original hs_code
col("country_of_origin"),
col("destination_country")
)))
# Select and reorder columns to match target schema
final_anomalies_df = spark_anomalies_df.select(
"anomaly_id",
"transaction_id",
"hs_code",
"product_name",
"country_of_origin",
"destination_country",
"transaction_value",
"quantity",
"anomaly_score",
"is_anomaly",
"detection_timestamp",
"model_version",
"model_name",
"details"
)
# Write anomalies to Delta table
final_anomalies_df.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.saveAsTable(output_table_name)
logger.info(f"Detected {final_anomalies_df.count()} HS Code anomalies and wrote to {output_table_name}.")
mlflow.log_metric("num_anomalies_detected_hs_code", final_anomalies_df.count())
else:
logger.info("No HS Code anomalies detected in the current batch.")
mlflow.log_metric("num_anomalies_detected_hs_code", 0)
mlflow.log_param("status", "completed")
except Exception as e:
logger.error(f"Error in HS Code anomaly detection: {e}", exc_info=True)
mlflow.log_param("status", "failed")
mlflow.log_param("error_message", str(e))
raise # Re-raise to ensure job fails if critical error occurs
Explanation:
train_and_detect_hs_code_anomaliesfunction: Encapsulates the entire workflow.- MLflow Setup: Sets the experiment and starts a new run, logging
contaminationandtrain_data_limitparameters. - Data Loading: Reads from
hs_code_classification_gold. - Feature Engineering (PySpark ML Pipeline):
StringIndexerandOneHotEncoderare used for categorical features (country_of_origin,destination_country). This is crucial for numerical ML models.VectorAssemblercombines all features (numerical and encoded categorical) into a single vector.StandardScalernormalizes the features, which is often beneficial for distance-based or tree-based algorithms like Isolation Forest.- A
SparkPipelineis constructed to chain these transformations, making preprocessing reusable and robust. - The pipeline is
fiton asampleof the data to handle large datasets efficiently.
- Model Training (Isolation Forest):
- The preprocessed Spark DataFrame is sampled and converted to a Pandas DataFrame for
scikit-learn’sIsolationForest. In a large-scale production system, for pure Python models, consider using Spark UDFs for distributed inference, or a native PySpark MLlib equivalent if available and suitable. - The
IsolationForestmodel is initialized with acontaminationparameter (the expected proportion of outliers) andrandom_statefor reproducibility. - The model is
fiton the prepared training data.
- The preprocessed Spark DataFrame is sampled and converted to a Pandas DataFrame for
- MLflow Logging:
mlflow.sklearn.log_modelsaves the trainedscikit-learnmodel to MLflow, registers it in the Model Registry, and specifies aconda_envfor reproducibility.
- Inference:
- The latest version of the registered model is loaded using
mlflow.sklearn.load_model. - The same preprocessing pipeline is applied to the data intended for inference.
- The transformed data is converted to Pandas for
loaded_model.decision_function(), which provides anomaly scores. is_anomalyflag is set based on the score threshold (scores < 0 usually indicate anomalies for Isolation Forest).
- The latest version of the registered model is loaded using
- Anomaly Output:
- Detected anomalies are converted back to a Spark DataFrame.
- An
anomaly_idis generated using a SHA2 hash oftransaction_idandcurrent_timestampfor uniqueness. detection_timestamp,model_version,model_name, anddetails(a JSON string of relevant original features) are added.- The final DataFrame is written to
anomalies.detected_hs_code_anomaliesusingappendmode andmergeSchemafor flexibility.
- Error Handling & Logging: Comprehensive
try-exceptblocks are used to catch errors, log them, and update MLflow run status.
2.2. Logistics Cost Anomaly Detection
This section focuses on detecting unusual fluctuations in logistics costs, potentially correlated with fuel prices or tariffs.
File: notebooks/10_anomaly_detection_mlflow/anomaly_detection_pipeline.py (Append to existing content)
# --- 2.2. Logistics Cost Anomaly Detection ---
def train_and_detect_logistics_cost_anomalies(
input_table_name: str,
output_table_name: str,
mlflow_experiment_name: str,
model_name: str,
contamination: float = 0.005, # Logistics costs might have fewer anomalies
train_data_limit: int = 100000
):
"""
Trains an Isolation Forest model for Logistics Cost anomaly detection,
logs it to MLflow, and applies it to new data.
Args:
input_table_name (str): Full path to the logistics cost gold Delta table.
output_table_name (str): Full path to the output Delta table for anomalies.
mlflow_experiment_name (str): MLflow experiment name.
model_name (str): Name to register the model in MLflow Model Registry.
contamination (float): The proportion of outliers in the data set.
train_data_limit (int): Limit for training data rows to prevent OOM errors.
"""
logger.info(f"Starting Logistics Cost anomaly detection pipeline for experiment: {mlflow_experiment_name}")
mlflow.set_experiment(mlflow_experiment_name)
with mlflow.start_run(run_name="Logistics_Cost_Anomaly_Training"):
mlflow.log_param("contamination", contamination)
mlflow.log_param("train_data_limit", train_data_limit)
try:
# 1. Load Data for Training
df_logistics = spark.table(input_table_name)
feature_cols = [
"distance_km",
"fuel_price_usd_per_liter",
"tariff_impact_usd",
"total_cost_usd",
"cost_per_unit_usd"
]
# Drop rows with nulls in critical features
df_logistics_cleaned = df_logistics.select(
"shipment_id", "route_id", "vehicle_type",
"distance_km", "fuel_price_usd_per_liter",
"tariff_impact_usd", "total_cost_usd", "cost_per_unit_usd"
).na.drop(subset=feature_cols)
# String Indexer for categorical feature
vehicle_type_indexer = StringIndexer(inputCol="vehicle_type", outputCol="vehicle_type_indexed", handleInvalid="keep")
# One-Hot Encoder for indexed categorical feature
vehicle_type_ohe = OneHotEncoder(inputCol="vehicle_type_indexed", outputCol="vehicle_type_vec")
# Assemble all features into a single vector
assembler = VectorAssembler(
inputCols=feature_cols + ["vehicle_type_vec"],
outputCol="features_raw"
)
# Scale numerical features
scaler = StandardScaler(inputCol="features_raw", outputCol="features", withStd=True, withMean=True)
# Create a PySpark ML Pipeline for preprocessing
preprocessing_pipeline = SparkPipeline(stages=[
vehicle_type_indexer,
vehicle_type_ohe,
assembler,
scaler
])
# Fit the preprocessing pipeline on a sample of the data
logger.info(f"Fitting preprocessing pipeline on {input_table_name}...")
df_train_sample = df_logistics_cleaned.sample(fraction=0.1, seed=42).limit(train_data_limit)
fitted_pipeline = preprocessing_pipeline.fit(df_train_sample)
df_processed = fitted_pipeline.transform(df_logistics_cleaned).cache()
pandas_df_train = df_processed.select("features").sample(fraction=0.1, seed=42).limit(train_data_limit).toPandas()
X_train = pd.DataFrame(pandas_df_train['features'].tolist())
if X_train.empty:
logger.warning("No data available for Logistics Cost model training. Skipping.")
mlflow.log_param("status", "skipped_no_data")
return
# 2. Train Isolation Forest Model
logger.info(f"Training Isolation Forest model with {X_train.shape[0]} samples...")
model = IsolationForest(contamination=contamination, random_state=42)
model.fit(X_train)
# 3. Log Model with MLflow
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="logistics_cost_anomaly_model",
registered_model_name=model_name,
conda_env={
"channels": ["conda-forge"],
"dependencies": [
"python=3.9",
"pip",
{"pip": ["mlflow==2.10.1", "scikit-learn==1.3.2", "pandas==2.1.4", "pyspark==3.5.0"]}
]
}
)
logger.info(f"Logistics Cost Anomaly Model logged to MLflow and registered as '{model_name}'.")
# 4. Perform Inference
logger.info("Performing inference on Logistics Cost data...")
latest_model_version = mlflow.tracking.MlflowClient().get_latest_versions(model_name, stages=["None"])[0].version
logged_model_uri = f"models:/{model_name}/{latest_model_version}"
loaded_model = mlflow.sklearn.load_model(logged_model_uri)
logger.info(f"Loaded MLflow model '{model_name}' version {latest_model_version} for inference.")
df_inference_processed = fitted_pipeline.transform(df_logistics_cleaned)
pandas_df_inference = df_inference_processed.select("shipment_id", "route_id", "vehicle_type",
"total_cost_usd", "cost_per_unit_usd",
"fuel_price_usd_per_liter", "tariff_impact_usd",
"features").toPandas()
if pandas_df_inference.empty:
logger.info("No data for Logistics Cost inference. Skipping anomaly writing.")
return
X_inference = pd.DataFrame(pandas_df_inference['features'].tolist())
pandas_df_inference['anomaly_score'] = loaded_model.decision_function(X_inference)
pandas_df_inference['is_anomaly'] = pandas_df_inference['anomaly_score'] < 0
anomalies_df = pandas_df_inference[pandas_df_inference['is_anomaly']]
if not anomalies_df.empty:
spark_anomalies_df = spark.createDataFrame(anomalies_df) \
.withColumn("anomaly_id", sha2(concat_ws("_", col("shipment_id"), current_timestamp()), 256)) \
.withColumn("detection_timestamp", current_timestamp()) \
.withColumn("model_version", lit(str(latest_model_version))) \
.withColumn("model_name", lit(model_name)) \
.withColumn("details", to_json(struct(
col("vehicle_type"),
col("route_id")
)))
final_anomalies_df = spark_anomalies_df.select(
"anomaly_id",
"shipment_id",
"route_id",
"total_cost_usd",
"cost_per_unit_usd",
"fuel_price_usd_per_liter",
"tariff_impact_usd",
"anomaly_score",
"is_anomaly",
"detection_timestamp",
"model_version",
"model_name",
"details"
)
final_anomalies_df.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.saveAsTable(output_table_name)
logger.info(f"Detected {final_anomalies_df.count()} Logistics Cost anomalies and wrote to {output_table_name}.")
mlflow.log_metric("num_anomalies_detected_logistics_cost", final_anomalies_df.count())
else:
logger.info("No Logistics Cost anomalies detected in the current batch.")
mlflow.log_metric("num_anomalies_detected_logistics_cost", 0)
mlflow.log_param("status", "completed")
except Exception as e:
logger.error(f"Error in Logistics Cost anomaly detection: {e}", exc_info=True)
mlflow.log_param("status", "failed")
mlflow.log_param("error_message", str(e))
raise
Explanation:
The train_and_detect_logistics_cost_anomalies function follows a very similar structure to the HS Code function, but it’s tailored to the specific features relevant for logistics costs (distance_km, fuel_price_usd_per_liter, tariff_impact_usd, total_cost_usd, cost_per_unit_usd, and vehicle_type). It also uses StringIndexer and OneHotEncoder for the vehicle_type categorical feature. The contamination parameter is adjusted to reflect a potentially different expected anomaly rate for logistics costs.
2.3. Orchestration & Execution
Now, we’ll create the main execution block to call our anomaly detection functions.
File: notebooks/10_anomaly_detection_mlflow/anomaly_detection_pipeline.py (Append to existing content)
# --- 2.3. Orchestration & Execution ---
if __name__ == "__main__":
logger.info("Starting main anomaly detection pipeline execution.")
# Call HS Code Anomaly Detection
logger.info("--- Running HS Code Anomaly Detection ---")
try:
train_and_detect_hs_code_anomalies(
input_table_name=HS_CODE_GOLD_TABLE,
output_table_name=DETECTED_HS_CODE_ANOMALIES_TABLE,
mlflow_experiment_name=MLFLOW_EXPERIMENT_HS_CODE,
model_name="hs_code_anomaly_detector_model",
contamination=0.01 # 1% expected anomalies
)
logger.info("HS Code Anomaly Detection completed successfully.")
except Exception as e:
logger.error(f"HS Code Anomaly Detection failed: {e}")
# Call Logistics Cost Anomaly Detection
logger.info("--- Running Logistics Cost Anomaly Detection ---")
try:
train_and_detect_logistics_cost_anomalies(
input_table_name=LOGISTICS_COST_GOLD_TABLE,
output_table_name=DETECTED_LOGISTICS_COST_ANOMALIES_TABLE,
mlflow_experiment_name=MLFLOW_EXPERIMENT_LOGISTICS_COST,
model_name="logistics_cost_anomaly_detector_model",
contamination=0.005 # 0.5% expected anomalies
)
logger.info("Logistics Cost Anomaly Detection completed successfully.")
except Exception as e:
logger.error(f"Logistics Cost Anomaly Detection failed: {e}")
# --- Create a unified view for all anomalies ---
try:
all_anomalies_view_ddl = f"""
CREATE OR REPLACE VIEW {CATALOG_NAME}.{SCHEMA_ANOMALIES}.all_detected_anomalies AS
SELECT
anomaly_id,
transaction_id AS source_id, -- Use source_id for generic reference
'HS_CODE_ANOMALY' AS anomaly_type,
hs_code AS anomalous_item_id,
product_name AS anomalous_item_name,
country_of_origin,
destination_country,
transaction_value AS primary_value,
quantity AS secondary_value,
anomaly_score,
is_anomaly,
detection_timestamp,
model_version,
model_name,
details
FROM {DETECTED_HS_CODE_ANOMALIES_TABLE}
UNION ALL
SELECT
anomaly_id,
shipment_id AS source_id,
'LOGISTICS_COST_ANOMALY' AS anomaly_type,
route_id AS anomalous_item_id,
vehicle_type AS anomalous_item_name,
NULL AS country_of_origin, -- Not directly applicable
NULL AS destination_country, -- Not directly applicable
total_cost_usd AS primary_value,
cost_per_unit_usd AS secondary_value,
anomaly_score,
is_anomaly,
detection_timestamp,
model_version,
model_name,
details
FROM {DETECTED_LOGISTICS_COST_ANOMALIES_TABLE}
"""
spark.sql(all_anomalies_view_ddl)
logger.info(f"Unified view {CATALOG_NAME}.{SCHEMA_ANOMALIES}.all_detected_anomalies created/updated.")
except Exception as e:
logger.error(f"Failed to create unified anomaly view: {e}")
logger.info("Main anomaly detection pipeline execution finished.")
Explanation:
- The
if __name__ == "__main__":block ensures this code runs when the notebook is executed directly. - It sequentially calls the
train_and_detect_hs_code_anomaliesandtrain_and_detect_logistics_cost_anomaliesfunctions. - Each call is wrapped in a
try-exceptblock to ensure that if one pipeline fails, the other can still attempt to run, and the overall job can log the failure. - Finally, a Unity Catalog view
all_detected_anomaliesis created or replaced, consolidating results from both anomaly detection pipelines into a single, queryable interface. This view standardizes column names for easier querying across different anomaly types and adds ananomaly_typecolumn for distinction.
Testing This Component
To test this component, you’ll need to ensure you have data in your hs_code_classification_gold and logistics.cost_monitoring_gold tables. For a quick test, you can create some dummy data if these tables are not yet populated from previous chapters.
1. Create Dummy Data (if needed):
Run this in a separate Databricks cell before running the main anomaly detection pipeline.
# --- Dummy Data Generation (Run if your gold tables are empty) ---
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from datetime import datetime, timedelta
import random
# Dummy HS Code Data
hs_code_schema = StructType([
StructField("transaction_id", StringType(), True),
StructField("product_name", StringType(), True),
StructField("product_description", StringType(), True),
StructField("hs_code", StringType(), True),
StructField("country_of_origin", StringType(), True),
StructField("destination_country", StringType(), True),
StructField("transaction_value", DoubleType(), True),
StructField("quantity", DoubleType(), True),
StructField("tariff_rate", DoubleType(), True),
StructField("timestamp", TimestampType(), True)
])
hs_code_data = []
for i in range(100):
val = round(random.uniform(100, 10000), 2)
qty = random.randint(1, 500)
hs_code_data.append((
f"TXN{i:03d}",
f"Product {i%10}",
f"Description for Product {i%10}",
f"0101{random.randint(10,99)}", # Normal HS codes
random.choice(["USA", "CAN", "MEX"]),
random.choice(["GER", "FRA", "JPN"]),
val,
qty,
random.uniform(0.01, 0.15),
datetime.now() - timedelta(days=random.randint(1, 30))
))
# Introduce a few anomalies for HS Code
hs_code_data.append(("TXN_ANOMALY_1", "Rare Product", "Very rare item", "999999", "USA", "CHN", 1000000.0, 1, 0.5, datetime.now()))
hs_code_data.append(("TXN_ANOMALY_2", "Low Value High Qty", "Common item, but weird values", "010120", "USA", "GER", 5.0, 50000, 0.02, datetime.now()))
df_hs_code_dummy = spark.createDataFrame(hs_code_data, schema=hs_code_schema)
df_hs_code_dummy.write.format("delta").mode("overwrite").saveAsTable(HS_CODE_GOLD_TABLE)
print(f"Dummy HS Code data written to {HS_CODE_GOLD_TABLE}. Count: {df_hs_code_dummy.count()}")
# Dummy Logistics Cost Data
logistics_schema = StructType([
StructField("shipment_id", StringType(), True),
StructField("route_id", StringType(), True),
StructField("vehicle_type", StringType(), True),
StructField("distance_km", DoubleType(), True),
StructField("fuel_price_usd_per_liter", DoubleType(), True),
StructField("tariff_impact_usd", DoubleType(), True),
StructField("total_cost_usd", DoubleType(), True),
StructField("cost_per_unit_usd", DoubleType(), True),
StructField("event_timestamp", TimestampType(), True)
])
logistics_data = []
for i in range(100):
dist = random.uniform(100, 5000)
fuel = random.uniform(0.8, 1.5)
tariff = random.uniform(10, 500)
total_cost = dist * random.uniform(0.1, 0.5) + tariff + fuel * 100 # Simple cost model
cost_per_unit = total_cost / random.randint(10, 1000)
logistics_data.append((
f"SHP{i:03d}",
f"RTE{i%5}",
random.choice(["Truck", "Ship", "Air Cargo"]),
dist,
fuel,
tariff,
total_cost,
cost_per_unit,
datetime.now() - timedelta(hours=random.randint(1, 72))
))
# Introduce a few anomalies for Logistics Costs
logistics_data.append(("SHP_ANOMALY_1", "RTE_CRITICAL", "Air Cargo", 100.0, 5.0, 10000.0, 50000.0, 500.0, datetime.now())) # High tariff/cost
logistics_data.append(("SHP_ANOMALY_2", "RTE_LONG", "Truck", 10000.0, 0.1, 10.0, 100.0, 0.01, datetime.now())) # Very low cost for long distance
df_logistics_dummy = spark.createDataFrame(logistics_data, schema=logistics_schema)
df_logistics_dummy.write.format("delta").mode("overwrite").saveAsTable(LOGISTICS_COST_GOLD_TABLE)
print(f"Dummy Logistics Cost data written to {LOGISTICS_COST_GOLD_TABLE}. Count: {df_logistics_dummy.count()}")
2. Run the Anomaly Detection Pipeline:
Execute the main Python code block in your Databricks notebook.
3. Verification:
Check Databricks Logs: Observe the output in the notebook cells for
INFOandWARNINGmessages. Look for confirmation that models were trained, logged, and anomalies were written.Check MLflow UI:
- Navigate to the MLflow UI (usually accessible from the left sidebar in Databricks).
- You should see two new experiments:
/Shared/anomaly_detection/hs_code_anomaliesand/Shared/anomaly_detection/logistics_cost_anomalies. - Inside each experiment, you’ll find a run (e.g., “HS_Code_Anomaly_Training”). Click on it to see logged parameters (
contamination,train_data_limit,status), metrics (num_anomalies_detected_hs_code,num_anomalies_detected_logistics_cost), and artifacts (thehs_code_anomaly_modelorlogistics_cost_anomaly_modelartifact path). - Check the MLflow Model Registry to confirm that
hs_code_anomaly_detector_modelandlogistics_cost_anomaly_detector_modelare registered with at least one version.
Query Output Tables:
- Run SQL queries in a new Databricks cell to inspect the anomaly tables:
SELECT * FROM main.anomalies.detected_hs_code_anomalies; SELECT * FROM main.anomalies.detected_logistics_cost_anomalies; SELECT * FROM main.anomalies.all_detected_anomalies;- You should see the dummy anomalies you injected, along with their
anomaly_scoreandis_anomalyflags. - Verify that
model_versionandmodel_nameare correctly populated. - Check the
detailscolumn for the JSON string containing additional context.
This structured testing approach ensures that each part of the pipeline (data loading, model training, MLflow logging, inference, and data writing) functions as expected.
Production Considerations
Deploying anomaly detection systems to production requires careful planning beyond just the code.
- Orchestration and Scheduling:
- Databricks Workflows (Jobs): The notebook should be scheduled as a Databricks Job to run periodically (e.g., daily for retraining, hourly for inference on new data). Separate jobs for training and inference are recommended.
- Incremental Processing: For inference, instead of reprocessing all historical data, implement logic to read only the latest incremental data from the source Delta tables (e.g., using
delta.logVersionorevent_timestampfilters) to reduce computation and cost.
- Model Retraining Strategy:
- Frequency: Anomaly detection models can suffer from concept drift. Schedule regular retraining (e.g., weekly, monthly) using fresh historical data.
- Automated Retraining: Automate the retraining process via Databricks Jobs. Consider setting up alerts for model performance degradation.
- Performance Optimization:
- Cluster Sizing: Use appropriately sized Databricks clusters. Leverage Photon-enabled clusters for faster query performance on Delta tables.
- Data Sampling: For training, sample large datasets to reduce training time, as long as the sample remains representative.
- Distributed Inference: For very large inference datasets, convert the
scikit-learnmodel to a PySpark UDF or exploremlflow.pyfunc.spark_udf()to distribute inference across the Spark cluster, avoidingtoPandas()operations on massive DataFrames. - Delta Lake Optimizations: Ensure Delta tables are optimized regularly (
OPTIMIZE table_name ZORDER BY (col1, col2)) for faster reads.
- Security Considerations:
- Unity Catalog: Leverage Unity Catalog for fine-grained access control (
GRANT SELECT ON TABLE ...,GRANT CREATE TABLE ON SCHEMA ...) to source data tables, anomaly output tables, and MLflow experiments/models. - Least Privilege: Ensure Databricks service principals or users running the jobs have only the necessary permissions.
- Data Masking/Tokenization: If sensitive data is used in features, ensure it’s masked or tokenized before model training and inference.
- Unity Catalog: Leverage Unity Catalog for fine-grained access control (
- Logging and Monitoring:
- Structured Logging: Use structured logging (e.g., JSON logs) for easier parsing and analysis in external monitoring systems.
- Databricks Monitoring: Monitor job success/failure, cluster utilization, and execution times directly within Databricks.
- MLflow Tracking: Continuously monitor MLflow experiments for model performance metrics (e.g., number of anomalies detected, model drift indicators).
- Alerting: Set up alerts (e.g., via Databricks Alerts, PagerDuty, Slack) for high anomaly counts, model training failures, or significant drops in model performance.
- Data Quality Checks: Implement data quality checks (e.g., using Delta Live Tables expectations or Great Expectations) on both input data and anomaly output to ensure data integrity.
- Model Versioning and Rollback:
- MLflow Model Registry: Use the MLflow Model Registry to manage model versions and stages (Staging, Production). This allows for easy deployment of new versions and quick rollbacks to previous stable versions if issues arise.
- CI/CD Integration: Integrate MLflow model deployment into your CI/CD pipeline, allowing automated testing and promotion of models.
Code Review Checkpoint
At this point, you have implemented a robust anomaly detection pipeline within Databricks:
- Files Created/Modified:
notebooks/10_anomaly_detection_mlflow/anomaly_detection_pipeline.py: Contains all the Python code for setting up, training, and inferring anomalies for both HS codes and logistics costs, integrated with MLflow.
- Key Functionality:
- Data loading and preprocessing (feature engineering, scaling, encoding) using PySpark ML pipelines.
- Anomaly detection model training using
scikit-learn’sIsolationForest. - Comprehensive MLflow integration for experiment tracking (parameters, metrics, artifacts) and model registration.
- Inference logic to apply trained models and identify anomalies.
- Writing detected anomalies to dedicated Delta Lake tables in Unity Catalog.
- Creation of a unified view for all detected anomalies.
- Integration:
- This pipeline integrates with the
customs_trade.hs_code_classification_goldandlogistics.cost_monitoring_goldtables created in previous chapters. - It leverages Unity Catalog for schema and table management, and MLflow for robust model lifecycle management.
- This pipeline integrates with the
The code emphasizes production readiness with detailed logging, error handling, and modular functions.
Common Issues & Solutions
Issue:
Py4JJavaError: An error occurred while calling o76.toPandas.(Out-of-Memory duringtoPandas()):- Problem: Converting a very large Spark DataFrame to a Pandas DataFrame (
.toPandas()) can exhaust the driver’s memory, especially when dealing with millions of rows. This is common during training or inference if not handled carefully. - Solution:
- Sampling: For training, use
.sample()and.limit()on the Spark DataFrame before.toPandas()to train on a representative subset of the data, as demonstrated in this chapter. - Distributed Inference: For inference on large datasets, avoid
.toPandas(). Instead, usemlflow.pyfunc.spark_udf()to wrap yourscikit-learnmodel as a Spark UDF. This allows Spark to distribute the inference logic across worker nodes. - Increase Driver Memory: As a temporary workaround, you can increase the Spark driver’s memory in the cluster configuration, but this is not a scalable solution for very large datasets.
- PySpark MLlib Alternatives: If possible, use native PySpark MLlib models for anomaly detection (e.g.,
LocalOutlierFactororKMeansfor clustering-based anomaly detection) which are designed for distributed processing.
- Sampling: For training, use
- Problem: Converting a very large Spark DataFrame to a Pandas DataFrame (
Issue: Model Drift / Poor Anomaly Detection Performance Over Time:
- Problem: The characteristics of “normal” data can change over time (e.g., new HS codes become common, logistics costs fluctuate due to market changes), causing the model to misclassify or miss actual anomalies.
- Solution:
- Regular Retraining: Implement a scheduled job (e.g., weekly) to retrain your anomaly detection models using the most recent and relevant historical data.
- Monitoring Model Performance: Track metrics related to anomaly detection (e.g., number of anomalies, distribution of anomaly scores) and set up alerts for significant deviations that might indicate drift.
- Adaptive Models: Explore more adaptive anomaly detection algorithms or techniques like online learning, though these are more complex to implement.
- Human Feedback Loop: Incorporate a mechanism for human analysts to review detected anomalies and provide feedback, which can then be used to fine-tune model parameters or retrain.
Issue: Permissions Errors with Unity Catalog or MLflow:
- Problem: You encounter
PERMISSION_DENIEDerrors when creating schemas/tables, reading/writing data, or interacting with MLflow. - Solution:
- Verify Unity Catalog Permissions: Ensure the user or service principal running the Databricks job has the necessary
CREATE SCHEMA,CREATE TABLE,SELECT,MODIFY(forAPPEND) permissions on the catalog and schema. For MLflow, ensureREAD_METADATA,CREATE_RUN,REGISTER_MODELpermissions. - Workspace Permissions: Check that the user has appropriate permissions to create/manage MLflow experiments in the
/Shared/path. - Admin Console: A Unity Catalog administrator can grant these permissions using SQL commands (e.g.,
GRANT CREATE SCHEMA ON CATALOG main TO <user_or_group>).
- Verify Unity Catalog Permissions: Ensure the user or service principal running the Databricks job has the necessary
- Problem: You encounter
Testing & Verification
To fully test and verify the chapter’s work, follow these steps:
- Execute the Full Pipeline: Run the
anomaly_detection_pipeline.pynotebook. Observe the logs in the Databricks output. - Inspect MLflow Experiments:
- Go to the MLflow UI.
- Confirm that two new experiments, “hs_code_anomalies” and “logistics_cost_anomalies”, exist under the
/Shared/anomaly_detection/path. - For each experiment, verify that a run was completed successfully, parameters like
contaminationwere logged, and metrics likenum_anomalies_detected_hs_code(orlogistics_cost) are present. - Ensure the
hs_code_anomaly_modelandlogistics_cost_anomaly_modelartifacts are correctly stored within their respective runs.
- Check MLflow Model Registry:
- Navigate to the “Models” section in the MLflow UI.
- Confirm that
hs_code_anomaly_detector_modelandlogistics_cost_anomaly_detector_modelare registered. - Verify that at least one version exists for each model and that its status is “Ready.”
- Query Anomaly Tables:
- Open a new Databricks notebook cell and run the following SQL queries:
SELECT * FROM main.anomalies.detected_hs_code_anomalies ORDER BY detection_timestamp DESC; SELECT * FROM main.anomalies.detected_logistics_cost_anomalies ORDER BY detection_timestamp DESC; SELECT * FROM main.anomalies.all_detected_anomalies ORDER BY detection_timestamp DESC; - Expected Behavior:
- You should see rows in both
detected_hs_code_anomaliesanddetected_logistics_cost_anomaliestables. - If you used the dummy data, the manually injected anomalies (e.g., “TXN_ANOMALY_1”, “SHP_ANOMALY_1”) should be present, with
is_anomalyset toTRUEand a negativeanomaly_score. - The
model_version,model_name, anddetailscolumns should be correctly populated. - The
all_detected_anomaliesview should combine results from both tables, with the correctanomaly_typeset.
- You should see rows in both
- Open a new Databricks notebook cell and run the following SQL queries:
- Simulate New Data with Anomalies: Modify the dummy data generation script to inject more anomalies, then rerun the anomaly detection pipeline to ensure new anomalies are picked up. This simulates continuous operation.
By completing these steps, you can confidently verify that your anomaly detection models are trained, managed, and deployed correctly, and that the system effectively identifies and records anomalies in your trade and logistics data.
Summary & Next Steps
In this chapter, we successfully implemented a robust anomaly detection system for our real-time supply chain. We leveraged Databricks, PySpark, scikit-learn’s Isolation Forest, and MLflow to:
- Train and deploy anomaly detection models for HS Code classifications and logistics costs.
- Integrate MLflow for comprehensive experiment tracking, parameter logging, metric recording, and model registration.
- Write detected anomalies to dedicated Delta Lake tables, ensuring data quality and traceability.
- Created a unified view for all detected anomalies, providing a single point of access for downstream analytics and alerting.
This capability significantly enhances our supply chain intelligence by providing early warnings for potential issues like misclassified goods, fraudulent activities, or unexpected cost fluctuations.
The next logical step in our project is to act on these detected anomalies. In Chapter 11: Real-time Alerting and Dashboards, we will build a real-time alerting system using Databricks SQL Alerts and integrate these anomaly insights into interactive dashboards for operational monitoring and business intelligence. We will also explore how to integrate with external notification services to ensure stakeholders are informed promptly.