Chapter 2: Simulating Real-time Supply Chain Events with Kafka
Welcome to Chapter 2 of our comprehensive guide! In this chapter, we’re laying the foundation for our real-time supply chain analytics platform by simulating the very events that drive it. We will build a robust Kafka producer application that generates realistic supply chain events, such as shipment updates, inventory changes, and order status modifications, and publishes them to a Kafka topic.
This step is crucial because it provides the continuous, streaming data source that our entire analytics pipeline will process. Instead of relying on static datasets or complex integrations with live operational systems at this stage, we’ll create a controlled, repeatable environment for generating high-fidelity event data. This allows us to develop and test our downstream data ingestion and processing components effectively, ensuring they can handle real-world data volumes and velocity.
As a prerequisite, you should have completed Chapter 1, which covered the initial environment setup, including Docker, Python, and a local Kafka/Zookeeper cluster. By the end of this chapter, you will have a fully operational Kafka producer continuously publishing simulated supply chain events to a designated Kafka topic, ready for consumption by our Databricks Delta Live Tables pipelines in the subsequent chapters.
Planning & Design
Before we dive into coding, let’s outline the architecture and design considerations for our Kafka event simulation.
Component Architecture
Our event simulation setup will consist of two main components:
- Kafka Cluster: This will be our message broker, comprising Zookeeper (for coordination) and Kafka Brokers (for message storage and delivery). We assume this is already running from Chapter 1’s setup.
- Python Kafka Producer Application: A Python script that generates simulated supply chain events and publishes them to a Kafka topic. This application will be designed for continuous operation, simulating a constant stream of updates from various supply chain systems.
Kafka Topic Design
We will use a single Kafka topic named supply_chain_events for simplicity and to centralize all event types for initial ingestion. Each message published to this topic will be a JSON string representing a single supply chain event.
Event Schema (JSON example):
{
"event_id": "uuid",
"event_type": "SHIPMENT_UPDATE | INVENTORY_CHANGE | ORDER_STATUS_UPDATE",
"timestamp": "ISO 8601 string",
"payload": {
"shipment_id": "string",
"tracking_number": "string",
"status": "IN_TRANSIT | DELIVERED | DELAYED | EXCEPTION",
"location": {
"latitude": "float",
"longitude": "float",
"city": "string",
"country": "string"
},
"estimated_delivery_date": "ISO 8601 string",
"product_id": "string",
"quantity": "integer",
"warehouse_id": "string",
"order_id": "string",
"customer_id": "string",
"order_status": "PENDING | PROCESSING | SHIPPED | DELIVERED | CANCELLED",
"items": [
{
"product_id": "string",
"quantity": "integer",
"price": "float"
}
]
},
"source_system": "string",
"correlation_id": "uuid"
}
This schema is designed to be flexible, allowing for different event_type values to carry distinct payload structures, while maintaining common metadata like event_id, timestamp, source_system, and correlation_id. We will use Python’s Pydantic library to enforce this schema and ensure data quality at the source.
File Structure
We will organize our producer application within a dedicated directory:
.
├── src/
│ └── kafka_producer/
│ ├── __init__.py
│ ├── config.py # Configuration settings for Kafka
│ ├── models.py # Pydantic models for event schemas
│ ├── generator.py # Logic for generating simulated events
│ └── producer.py # Main Kafka producer application
└── requirements.txt
Step-by-Step Implementation
a) Setup/Configuration
First, let’s ensure our environment is ready and set up the basic project structure.
1. Verify Kafka Cluster Status:
Ensure your Kafka and Zookeeper containers are running. If not, navigate to your docker-compose.yaml (from Chapter 1) and run:
docker-compose up -d
You should see output indicating that zookeeper and kafka services are up.
2. Create Project Directory and requirements.txt:
Create the src/kafka_producer directory and define Python dependencies.
mkdir -p src/kafka_producer
touch requirements.txt
Add the following to requirements.txt:
confluent-kafka==2.3.0
Faker==23.0.0
pydantic==2.5.3
python-dotenv==1.0.0
Why these dependencies?
confluent-kafka: The official and highly performant Python client for Apache Kafka. It’s built onlibrdkafka, a C/C++ library, offering excellent performance and reliability.Faker: A fantastic library for generating realistic-looking fake data, which is perfect for our simulation.pydantic: For defining data schemas. It provides data validation and settings management using Python type hints, ensuring our generated events conform to a strict structure.python-dotenv: To manage environment variables for configuration, keeping sensitive information or environment-specific settings out of the codebase.
3. Install Dependencies: It’s good practice to use a virtual environment.
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
4. Create Configuration File (.env and src/kafka_producer/config.py):
We’ll use environment variables for Kafka connection details.
Create a .env file at the root of your project:
# .env
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_TOPIC=supply_chain_events
Now, create src/kafka_producer/config.py to load these variables using python-dotenv and Pydantic for robust configuration management.
# src/kafka_producer/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
import os
# Get the directory of the current script
current_dir = os.path.dirname(os.path.abspath(__file__))
# Get the root project directory (one level up from src/kafka_producer)
project_root = os.path.dirname(os.path.dirname(current_dir))
class KafkaConfig(BaseSettings):
"""
Configuration settings for Kafka.
Loads from .env file and environment variables.
"""
model_config = SettingsConfigDict(env_file=os.path.join(project_root, '.env'), extra='ignore')
KAFKA_BOOTSTRAP_SERVERS: str = "localhost:9092"
KAFKA_TOPIC: str = "supply_chain_events"
KAFKA_CLIENT_ID: str = "supply-chain-producer"
KAFKA_ACKS: str = "all" # Ensure all replicas acknowledge the message
KAFKA_RETRIES: int = 5 # Number of times to retry sending a message
KAFKA_COMPRESSION_TYPE: str = "snappy" # Use snappy compression for efficiency
KAFKA_LINGER_MS: int = 100 # Batch messages for 100ms
KAFKA_BATCH_SIZE_BYTES: int = 1048576 # 1MB batch size
# Optional: Security configuration for production
KAFKA_SECURITY_PROTOCOL: str = "PLAINTEXT" # "SASL_SSL" for production
KAFKA_SASL_MECHANISM: str = "PLAIN" # E.g., "SCRAM-SHA-256"
KAFKA_SASL_USERNAME: str | None = None
KAFKA_SASL_PASSWORD: str | None = None
KAFKA_SSL_CA_LOCATION: str | None = None
KAFKA_SSL_CERT_LOCATION: str | None = None
KAFKA_SSL_KEY_LOCATION: str | None = None
def get_producer_config(self) -> dict:
"""Returns a dictionary of producer-specific configurations."""
config = {
'bootstrap.servers': self.KAFKA_BOOTSTRAP_SERVERS,
'client.id': self.KAFKA_CLIENT_ID,
'acks': self.KAFKA_ACKS,
'retries': self.KAFKA_RETRIES,
'compression.type': self.KAFKA_COMPRESSION_TYPE,
'linger.ms': self.KAFKA_LINGER_MS,
'batch.size': self.KAFKA_BATCH_SIZE_BYTES,
'enable.idempotence': True # Ensures messages are written exactly once
}
if self.KAFKA_SECURITY_PROTOCOL != "PLAINTEXT":
config.update({
'security.protocol': self.KAFKA_SECURITY_PROTOCOL,
'sasl.mechanism': self.KAFKA_SASL_MECHANISM,
'sasl.username': self.KAFKA_SASL_USERNAME,
'sasl.password': self.KAFKA_SASL_PASSWORD,
'ssl.ca.location': self.KAFKA_SSL_CA_LOCATION,
'ssl.certificate.location': self.KAFKA_SSL_CERT_LOCATION,
'ssl.key.location': self.KAFKA_SSL_KEY_LOCATION,
})
return config
kafka_config = KafkaConfig()
Explanation:
Pydantic_settings.BaseSettings: Automatically loads environment variables.SettingsConfigDict(env_file=...)points to our.envfile.KAFKA_BOOTSTRAP_SERVERSandKAFKA_TOPIC: Essential for connecting to Kafka and publishing.KAFKA_ACKS,KAFKA_RETRIES,KAFKA_COMPRESSION_TYPE,KAFKA_LINGER_MS,KAFKA_BATCH_SIZE_BYTES: These are crucial for production readiness.acks='all'ensures high durability.retrieshandles transient network issues.compression.typereduces network bandwidth usage.linger.msandbatch.sizeoptimize throughput by batching messages.enable.idempotence=True: Guarantees exactly-once message delivery semantics for individual producers, preventing duplicate messages on retries.
- Security parameters (
KAFKA_SECURITY_PROTOCOL, etc.): Placeholder for secure Kafka configurations (SASL/SSL) which are critical in production but often simplified for local development. We default toPLAINTEXTbut provide the structure forSASL_SSL.
b) Core Implementation
Now, let’s build the Python components for generating and producing events.
1. Define Event Models (src/kafka_producer/models.py):
Using Pydantic, we define the structure of our supply chain events.
# src/kafka_producer/models.py
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
class Location(BaseModel):
"""Represents geographical coordinates and location details."""
latitude: float = Field(..., description="Latitude coordinate")
longitude: float = Field(..., description="Longitude coordinate")
city: str = Field(..., description="City name")
country: str = Field(..., description="Country name")
class ProductItem(BaseModel):
"""Represents a single product item in an order or shipment."""
product_id: str = Field(..., description="Unique identifier for the product")
quantity: int = Field(..., gt=0, description="Quantity of the product")
price: float = Field(..., gt=0, description="Unit price of the product")
class ShipmentPayload(BaseModel):
"""Payload for SHIPMENT_UPDATE events."""
shipment_id: str = Field(..., description="Unique identifier for the shipment")
tracking_number: str = Field(..., description="Tracking number for the shipment")
status: str = Field(..., pattern="^(IN_TRANSIT|DELIVERED|DELAYED|EXCEPTION)$", description="Current status of the shipment")
location: Location = Field(..., description="Current location of the shipment")
estimated_delivery_date: datetime = Field(..., description="Estimated delivery date")
carrier: str = Field(..., description="Shipping carrier name")
class InventoryPayload(BaseModel):
"""Payload for INVENTORY_CHANGE events."""
product_id: str = Field(..., description="Unique identifier for the product")
warehouse_id: str = Field(..., description="Unique identifier for the warehouse")
quantity_change: int = Field(..., description="Change in quantity (positive for increase, negative for decrease)")
current_quantity: int = Field(..., ge=0, description="Current stock quantity after change")
reason: str = Field(..., description="Reason for inventory change (e.g., 'SALE', 'RESTOCK', 'RETURN')")
class OrderPayload(BaseModel):
"""Payload for ORDER_STATUS_UPDATE events."""
order_id: str = Field(..., description="Unique identifier for the order")
customer_id: str = Field(..., description="Unique identifier for the customer")
order_status: str = Field(..., pattern="^(PENDING|PROCESSING|SHIPPED|DELIVERED|CANCELLED)$", description="Current status of the order")
items: List[ProductItem] = Field(..., min_length=1, description="List of items in the order")
total_amount: float = Field(..., gt=0, description="Total monetary amount of the order")
class SupplyChainEvent(BaseModel):
"""Base model for all supply chain events."""
event_id: str = Field(..., description="Unique identifier for the event")
event_type: str = Field(..., pattern="^(SHIPMENT_UPDATE|INVENTORY_CHANGE|ORDER_STATUS_UPDATE)$", description="Type of the supply chain event")
timestamp: datetime = Field(default_factory=datetime.utcnow, description="UTC timestamp of when the event occurred")
payload: dict = Field(..., description="Dynamic payload based on event_type")
source_system: str = Field(..., description="System that generated the event (e.g., 'WMS', 'TMS', 'OMS')")
correlation_id: Optional[str] = Field(None, description="Identifier to link related events across systems")
# Pydantic's custom validator to ensure payload matches event_type
def model_post_init(self, __context):
if self.event_type == "SHIPMENT_UPDATE":
self.payload = ShipmentPayload(**self.payload).model_dump()
elif self.event_type == "INVENTORY_CHANGE":
self.payload = InventoryPayload(**self.payload).model_dump()
elif self.event_type == "ORDER_STATUS_UPDATE":
self.payload = OrderPayload(**self.payload).model_dump()
else:
raise ValueError(f"Unknown event_type: {self.event_type}")
# For serialization to JSON
def to_json_string(self) -> str:
"""Converts the Pydantic model to a JSON string, handling datetimes."""
# Use model_dump_json for direct JSON serialization with datetime handling
return self.model_dump_json(by_alias=True)
Explanation:
- We define separate Pydantic models for
Location,ProductItem, and the specific payloads (ShipmentPayload,InventoryPayload,OrderPayload). This provides strong typing and validation for nested structures. SupplyChainEventis the main event model. It includes common fields and a dynamicpayloadfield.Field(..., pattern="^...$"): Uses regular expressions for string validation, ensuring event types and statuses conform to expected values.default_factory=datetime.utcnow: Automatically sets the timestamp to the current UTC time if not provided, a good practice for event data.model_post_init: A Pydantic hook used here as a custom validator to ensure thepayloaddictionary matches the schema of theevent_type. This is a powerful way to enforce conditional schema validation.to_json_string(): A utility method to serialize the Pydantic model to a JSON string, which is the format Kafka expects.model_dump_jsonhandlesdatetimeobjects correctly.
2. Event Generation Logic (src/kafka_producer/generator.py):
This script will use Faker to generate diverse and realistic event data.
# src/kafka_producer/generator.py
import uuid
import random
from datetime import datetime, timedelta
from faker import Faker
from src.kafka_producer.models import SupplyChainEvent, ShipmentPayload, InventoryPayload, OrderPayload, Location, ProductItem
fake = Faker()
def generate_location() -> Location:
"""Generates a random location."""
return Location(
latitude=float(fake.latitude()),
longitude=float(fake.longitude()),
city=fake.city(),
country=fake.country()
)
def generate_product_item() -> ProductItem:
"""Generates a random product item."""
return ProductItem(
product_id=f"PROD-{fake.unique.random_int(min=1000, max=9999)}",
quantity=random.randint(1, 10),
price=round(random.uniform(10.0, 500.0), 2)
)
def generate_shipment_update() -> SupplyChainEvent:
"""Generates a simulated SHIPMENT_UPDATE event."""
shipment_id = str(uuid.uuid4())
tracking_number = fake.unique.bothify(text='TRK-##########')
status = random.choice(["IN_TRANSIT", "DELIVERED", "DELAYED", "EXCEPTION"])
location = generate_location()
estimated_delivery_date = fake.date_time_between(start_date="now", end_date="+30d", tzinfo=None)
payload = ShipmentPayload(
shipment_id=shipment_id,
tracking_number=tracking_number,
status=status,
location=location,
estimated_delivery_date=estimated_delivery_date,
carrier=fake.company() + " Logistics"
)
return SupplyChainEvent(
event_id=str(uuid.uuid4()),
event_type="SHIPMENT_UPDATE",
timestamp=datetime.utcnow(),
payload=payload.model_dump(), # Use model_dump to get a dict
source_system="TMS", # Transportation Management System
correlation_id=shipment_id # Link to shipment_id
)
def generate_inventory_change() -> SupplyChainEvent:
"""Generates a simulated INVENTORY_CHANGE event."""
product_id = f"PROD-{fake.unique.random_int(min=1000, max=9999)}"
warehouse_id = f"WH-{fake.unique.random_int(min=100, max=999)}"
quantity_change = random.randint(-50, 50)
current_quantity = max(0, 1000 + quantity_change) # Simulate a base quantity
reason = random.choice(["SALE", "RESTOCK", "RETURN", "DAMAGE", "ADJUSTMENT"])
payload = InventoryPayload(
product_id=product_id,
warehouse_id=warehouse_id,
quantity_change=quantity_change,
current_quantity=current_quantity,
reason=reason
)
return SupplyChainEvent(
event_id=str(uuid.uuid4()),
event_type="INVENTORY_CHANGE",
timestamp=datetime.utcnow(),
payload=payload.model_dump(),
source_system="WMS", # Warehouse Management System
correlation_id=product_id # Link to product_id
)
def generate_order_status_update() -> SupplyChainEvent:
"""Generates a simulated ORDER_STATUS_UPDATE event."""
order_id = f"ORD-{fake.unique.random_int(min=10000, max=99999)}"
customer_id = f"CUST-{fake.unique.random_int(min=1000, max=9999)}"
order_status = random.choice(["PENDING", "PROCESSING", "SHIPPED", "DELIVERED", "CANCELLED"])
items = [generate_product_item() for _ in range(random.randint(1, 3))]
total_amount = round(sum(item.quantity * item.price for item in items), 2)
payload = OrderPayload(
order_id=order_id,
customer_id=customer_id,
order_status=order_status,
items=items,
total_amount=total_amount
)
return SupplyChainEvent(
event_id=str(uuid.uuid4()),
event_type="ORDER_STATUS_UPDATE",
timestamp=datetime.utcnow(),
payload=payload.model_dump(),
source_system="OMS", # Order Management System
correlation_id=order_id # Link to order_id
)
def generate_random_event() -> SupplyChainEvent:
"""Generates a random supply chain event of any type."""
event_generators = [
generate_shipment_update,
generate_inventory_change,
generate_order_status_update
]
return random.choice(event_generators)()
Explanation:
Faker: Initialized to generate various types of fake data (names, addresses, dates, etc.).uuid.uuid4(): Generates unique identifiers for events and entities.random.choice(): Used to randomly select event types, statuses, and other categorical data.- Each
generate_*function creates a specific type ofSupplyChainEventby populating itspayloadwith the corresponding Pydantic model (ShipmentPayload, etc.) and then converting it to a dictionary using.model_dump(). generate_random_event(): A convenience function to pick one of the specific event generators randomly, ensuring a mix of event types in our stream.
3. Main Kafka Producer Application (src/kafka_producer/producer.py):
This script orchestrates the generation and sending of events to Kafka.
# src/kafka_producer/producer.py
import time
import json
import logging
from confluent_kafka import Producer, KafkaException
from src.kafka_producer.config import kafka_config
from src.kafka_producer.generator import generate_random_event
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def delivery_report(err, msg):
"""
Callback function to report the delivery status of a message.
This is crucial for understanding if messages are successfully sent or not.
"""
if err is not None:
logger.error(f"Message delivery failed: {err}")
# In a real-world scenario, you might want to log this to a dead-letter queue
# or trigger an alert for messages that consistently fail.
else:
logger.debug(f"Message delivered to topic '{msg.topic()}' partition [{msg.partition()}] "
f"at offset {msg.offset()} (key: {msg.key()}, value: {msg.value()[:50].decode('utf-8')}...)")
def create_kafka_producer() -> Producer:
"""
Initializes and returns a Confluent Kafka Producer instance.
Includes robust error handling for connection.
"""
producer_config = kafka_config.get_producer_config()
logger.info(f"Initializing Kafka producer with config: {producer_config['bootstrap.servers']}")
try:
producer = Producer(producer_config)
logger.info("Kafka producer initialized successfully.")
return producer
except KafkaException as e:
logger.critical(f"Failed to create Kafka producer: {e}")
raise
def produce_events(producer: Producer, num_events: int = -1, delay_sec: float = 1.0):
"""
Continuously generates and produces supply chain events to Kafka.
:param producer: The Kafka producer instance.
:param num_events: Number of events to produce. -1 for infinite.
:param delay_sec: Delay between producing events in seconds.
"""
topic = kafka_config.KAFKA_TOPIC
logger.info(f"Starting to produce events to topic: {topic}")
event_count = 0
while num_events == -1 or event_count < num_events:
try:
event = generate_random_event()
event_json = event.to_json_string()
key = event.event_id.encode('utf-8') # Use event_id as key for partitioning
# Asynchronously produce message
producer.produce(
topic=topic,
value=event_json.encode('utf-8'),
key=key,
callback=delivery_report
)
# Poll for any delivered/failed messages to invoke callbacks
# This is crucial for non-blocking production and error handling.
producer.poll(0)
event_count += 1
if event_count % 100 == 0:
logger.info(f"Produced {event_count} events. Last event type: {event.event_type}")
time.sleep(delay_sec)
except BufferError:
# If the internal queue is full, flush and retry
logger.warning("Producer buffer full. Flushing and retrying...")
producer.flush(timeout=5) # Flush with a timeout
if producer.flush(timeout=0) > 0: # Check if buffer is still full after flush
logger.error("Failed to clear producer buffer after flush. Exiting to prevent data loss.")
break # Or implement more sophisticated retry logic
time.sleep(delay_sec) # Wait before next attempt
except KafkaException as e:
logger.error(f"Kafka error while producing: {e}. Attempting to recover...")
# Depending on error, might need to re-initialize producer or exit
time.sleep(5) # Wait before retrying
except Exception as e:
logger.critical(f"An unexpected error occurred: {e}", exc_info=True)
break # Exit on unexpected errors
logger.info(f"Finished producing {event_count} events.")
# Ensure all outstanding messages are delivered before exiting
remaining_messages = producer.flush(timeout=10)
if remaining_messages > 0:
logger.warning(f"WARNING: {remaining_messages} messages still in queue after flush timeout.")
if __name__ == "__main__":
try:
producer = create_kafka_producer()
# Produce events indefinitely with a 0.1-second delay between each
produce_events(producer, num_events=-1, delay_sec=0.1)
except Exception as main_exception:
logger.critical(f"Main producer application failed: {main_exception}")
Explanation:
- Logging: Configured with
logging.basicConfigfor clear output.logger.info,logger.error,logger.criticalare used for different severity levels. delivery_report(err, msg): This callback is essential for asynchronous producers. It’s invoked when a message is successfully delivered or fails. In production, failed messages might be sent to a Dead-Letter Queue (DLQ) for further analysis and reprocessing.create_kafka_producer(): Initializes theconfluent_kafka.Producerwith settings fromkafka_config. Includes atry-exceptblock for robust initialization error handling.produce_events():while num_events == -1 or event_count < num_events: Controls event generation, either infinite or for a specific count.event.to_json_string().encode('utf-8'): Converts the Pydantic event object into a JSON string and then encodes it to bytes, as Kafka messages are byte arrays.key = event.event_id.encode('utf-8'): Using a message key is a best practice. Kafka uses the key to determine the partition where the message will be stored. Messages with the same key go to the same partition, which is crucial for maintaining order for related events (e.g., all updates for a specificshipment_id).producer.produce(...): Asynchronously sends the message. Thecallbackis crucial for handling delivery results.producer.poll(0): This is critical forconfluent-kafka. It allows the producer to invoke thedelivery_reportcallback for any messages that have completed their delivery cycle (either successfully or with an error). Withoutpoll(), callbacks won’t be triggered, and the internal buffer might fill up.BufferErrorhandling: If the producer’s internal buffer is full (meaning messages aren’t being sent fast enough), weflush()to force sending. This prevents the producer from blocking indefinitely and potentially losing data if not handled.producer.flush(timeout=10): At the end, ensures all buffered messages are sent before the program exits.
if __name__ == "__main__":: Standard Python entry point.
c) Testing This Component
Let’s test our Kafka producer to ensure it’s generating and publishing events correctly.
1. Run the Kafka Producer:
Open your terminal, activate your virtual environment, and run the producer script:
source .venv/bin/activate
python src/kafka_producer/producer.py
You should see log messages indicating the producer is initializing and then continuously producing events:
2025-12-20 10:00:00,123 - INFO - Initializing Kafka producer with config: localhost:9092
2025-12-20 10:00:00,125 - INFO - Kafka producer initialized successfully.
2025-12-20 10:00:00,126 - INFO - Starting to produce events to topic: supply_chain_events
2025-12-20 10:00:00,227 - DEBUG - Message delivered to topic 'supply_chain_events' partition [0] at offset 0 (key: b'uuid...', value: b'{"event_id": "uuid", "event_type": "SHIPMENT_UPDATE", ...')
...
2025-12-20 10:00:10,123 - INFO - Produced 100 events. Last event type: INVENTORY_CHANGE
...
2. Verify Messages with Kafka Console Consumer:
While the producer is running, open a new terminal and use the kafka-console-consumer tool (available with your Kafka installation, typically in bin/ directory if you installed Kafka directly, or accessible within the Docker container).
If using the Docker setup from Chapter 1, you can access the Kafka container’s shell:
docker exec -it kafka-broker bash
Then, run the consumer inside the container:
/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic supply_chain_events --from-beginning
You should see a continuous stream of JSON messages, each representing a simulated supply chain event. This confirms that your producer is successfully publishing messages to the supply_chain_events topic.
{"event_id": "1a2b3c4d-...", "event_type": "SHIPMENT_UPDATE", "timestamp": "2025-12-20T10:00:00.123456", "payload": {"shipment_id": "...", "status": "IN_TRANSIT", ...}, "source_system": "TMS", ...}
{"event_id": "5e6f7a8b-...", "event_type": "INVENTORY_CHANGE", "timestamp": "2025-12-20T10:00:00.234567", "payload": {"product_id": "...", "warehouse_id": "...", "quantity_change": 5, ...}, "source_system": "WMS", ...}
...
Press Ctrl+C in both terminals to stop the consumer and producer.
Production Considerations
Building a robust Kafka producer for production goes beyond basic functionality. Here are key considerations:
Error Handling & Durability:
- Delivery Callbacks: The
delivery_reportcallback is essential. In production, messages that fail persistently should be logged, potentially sent to a Dead-Letter Queue (DLQ) topic, or trigger alerts. This prevents data loss and provides visibility into issues. - Retries: The
retriesconfiguration (KAFKA_RETRIES) inconfig.pyis vital for handling transient network failures. - Idempotence:
enable.idempotence=True(set inconfig.py) ensures that messages are written to Kafka exactly once, even if the producer retries sending due to network errors. This is a critical feature for data integrity. - Acks:
acks='all'ensures that the leader broker waits for all in-sync replicas to acknowledge the message before considering it committed, maximizing durability.
- Delivery Callbacks: The
Performance Optimization:
- Batching:
linger.msandbatch.size(configured inconfig.py) are crucial. Instead of sending each message individually, the producer batches messages, reducing network overhead and increasing throughput. - Compression:
compression.type='snappy'(or ’lz4’, ‘zstd’) compresses message batches, further reducing network bandwidth usage and improving throughput, especially for large messages. - Asynchronous Production:
producer.produce()is asynchronous. Theproducer.poll(0)call in a loop allows the producer to efficiently manage its internal buffer and send messages in the background without blocking the main application thread.
- Batching:
Security Considerations:
- Authentication and Authorization: In a production environment, Kafka access should be secured. This typically involves:
- SASL (Simple Authentication and Security Layer): For authenticating clients. Mechanisms like
PLAIN,SCRAM-SHA-256,SCRAM-SHA-512are common. - SSL/TLS: For encrypting data in transit between clients and brokers.
- ACLs (Access Control Lists): For authorizing specific users/services to produce/consume from certain topics.
- SASL (Simple Authentication and Security Layer): For authenticating clients. Mechanisms like
- Our
config.pyincludes placeholders for these settings (KAFKA_SECURITY_PROTOCOL,KAFKA_SASL_USERNAME, etc.). You would populate these with credentials and paths to certificates in a secure environment.
- Authentication and Authorization: In a production environment, Kafka access should be secured. This typically involves:
Logging and Monitoring:
- Structured Logging: Use libraries like
structlogor configure Python’sloggingmodule to output JSON logs. This makes it easier for log aggregation tools (e.g., ELK Stack, Splunk, Datadog) to parse and analyze logs. - Metrics: Monitor key producer metrics:
- Message throughput: Messages/sec, bytes/sec.
- Latency: Time from
produce()call todelivery_report(). - Buffer usage: How full the internal producer buffer is.
- Error rates: Delivery failures, network errors.
- Tools like Prometheus/Grafana or cloud-native monitoring services can collect and visualize these metrics.
- Structured Logging: Use libraries like
Code Review Checkpoint
At this point, you have successfully implemented a real-time event simulation layer using Kafka.
Summary of what was built:
- A Python application that leverages
confluent-kafkaandFakerto generate realistic supply chain events. - Pydantic models for strict schema definition and validation of event data.
- Robust Kafka producer configuration with production best practices like batching, compression, idempotence, and asynchronous delivery.
- Logging and error handling mechanisms for reliable event production.
Files created/modified:
requirements.txt: Addedconfluent-kafka,Faker,pydantic,python-dotenv..env: ConfiguredKAFKA_BOOTSTRAP_SERVERSandKAFKA_TOPIC.src/kafka_producer/config.py: Defined Kafka configuration settings using Pydantic.src/kafka_producer/models.py: Defined Pydantic models forSupplyChainEventand its various payloads.src/kafka_producer/generator.py: Implemented event generation logic usingFaker.src/kafka_producer/producer.py: The main script to initialize the Kafka producer and continuously send events.
How it integrates with existing code:
This chapter builds upon the Kafka and Docker setup from Chapter 1. The producer connects to the Kafka broker running in Docker and publishes to the supply_chain_events topic. This data stream will be the input for our Databricks Delta Live Tables pipeline in the next chapter.
Common Issues & Solutions
Developers often encounter a few common issues when working with Kafka producers.
Kafka Broker Connection Refused:
- Error Message:
KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Failed to connect to broker: ... Connection refused"} - Cause: The Kafka broker is not running, or the
KAFKA_BOOTSTRAP_SERVERSaddress is incorrect/inaccessible. - Solution:
- Ensure your Docker containers for Kafka and Zookeeper are running (
docker psto verify). - Verify
KAFKA_BOOTSTRAP_SERVERSin your.envfile andsrc/kafka_producer/config.pymatches the Kafka service’s exposed port (e.g.,localhost:9092). - Check Docker logs for Kafka:
docker logs kafka-broker. - If running Kafka on a different machine or cloud, ensure network connectivity (firewalls, security groups).
- Ensure your Docker containers for Kafka and Zookeeper are running (
- Error Message:
Messages Not Appearing in Consumer / Producer Buffer Full:
- Cause: The producer is sending messages faster than Kafka can receive them, or the
producer.poll(0)call is missing/infrequent, preventing delivery callbacks and buffer management. - Solution:
- Ensure
producer.poll(0)is called frequently in yourproduce_eventsloop. - Check
BufferErrorhandling inproducer.py. Ensureproducer.flush()is called when the buffer is full. - Verify the Kafka consumer is connected to the correct topic and bootstrap servers.
- Increase Kafka broker capacity (if in production) or reduce producer message rate (
delay_sec).
- Ensure
- Cause: The producer is sending messages faster than Kafka can receive them, or the
Serialization Errors (e.g.,
value must be bytes):- Error Message:
TypeError: value must be bytesor similar when producing messages. - Cause: Kafka messages must be byte arrays. You’re trying to send a string or an object directly without encoding.
- Solution:
- Ensure your message
valueandkeyare explicitly encoded to bytes using.encode('utf-8')before passing them toproducer.produce(). Ourevent.to_json_string().encode('utf-8')handles this correctly. - Verify your Pydantic model’s
to_json_stringmethod correctly outputs a string.
- Ensure your message
- Error Message:
Testing & Verification
To thoroughly test and verify the work done in this chapter:
Start the Kafka Cluster:
docker-compose up -dConfirm both
zookeeperandkafka-brokerservices are running.Run the Kafka Producer:
source .venv/bin/activate python src/kafka_producer/producer.pyObserve the console output for
INFOmessages indicating events are being produced, andDEBUGmessages (if enabled for debug) for successful delivery reports.Consume Messages and Inspect Data: Open another terminal and use the
kafka-console-consumerto inspect the messages:docker exec -it kafka-broker bash -c "/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic supply_chain_events --from-beginning"- Verify continuous flow: Messages should be streaming continuously.
- Verify JSON format: Each message should be a valid JSON object.
- Verify schema: Check that the
event_id,event_type,timestamp,source_system, and the nestedpayloadfields are present and conform to the expected structure (e.g.,SHIPMENT_UPDATEevents haveshipment_id,status,location, etc.). - Verify data realism: The
Faker-generated data should look plausible (e.g., valid city names, reasonable quantities).
Check Producer Robustness:
- Temporarily stop the Kafka broker (
docker stop kafka-broker) while the producer is running. The producer should log connection errors and ideally retry. - Restart the Kafka broker (
docker start kafka-broker). The producer should eventually reconnect and resume producing messages. This demonstrates the producer’s resilience.
- Temporarily stop the Kafka broker (
By following these steps, you can confidently verify that your real-time supply chain event simulation with Kafka is working as expected and is robust enough for our development needs.
Summary & Next Steps
In this chapter, we successfully built a production-ready Kafka producer application to simulate real-time supply chain events. We established a robust event schema using Pydantic, implemented dynamic data generation with Faker, and configured our Kafka producer with essential best practices for performance, reliability, and error handling. This component now serves as the critical data ingress point for our entire supply chain analytics pipeline.
This simulated data stream is the raw material for our data lakehouse. Having a consistent and controlled source of streaming data allows us to focus on building the downstream processing logic without worrying about external system dependencies.
In the next chapter, we will take these raw events and ingest them into our Databricks Lakehouse using Delta Live Tables (DLT). We will explore how DLT simplifies the creation of reliable, maintainable, and testable data pipelines, transforming our raw Kafka events into structured, high-quality Delta tables ready for analysis.