Introduction
Welcome back, intrepid tester! In the previous chapters, you mastered the art of using Testcontainers to bring real databases into your tests. That was a huge step up from in-memory fakes, but what about the broader landscape of modern applications? Many microservices don’t just talk to databases; they communicate through message brokers, call other APIs, and integrate with external services.
This chapter is your passport to confidently testing those complex interactions. We’re going to tackle two crucial areas:
- Message Brokers: Specifically, we’ll dive into Kafka, a cornerstone of many asynchronous microservice architectures. You’ll learn how to spin up a fully functional Kafka cluster for your tests, allowing your applications to produce and consume messages against a real instance.
- Web Service Interactions: We’ll explore how to test scenarios where your application needs to communicate with other HTTP-based services. This means bringing those dependent services to life within containers for your tests.
By the end of this chapter, you’ll be equipped to test your microservices’ interactions with external systems with unparalleled realism, ensuring robust and reliable applications. Remember our guiding principles: baby steps, practical application, and true understanding. Let’s get started!
Core Concepts: Why Real Message Brokers and Web Services?
Before we jump into code, let’s understand why Testcontainers shines brightest when testing these types of dependencies.
The Challenge of Distributed System Testing
Modern applications, especially those built with microservices, are inherently distributed. They rely on networks, message queues, API calls, and shared state. Testing such systems presents unique challenges:
- Mocks and Fakes Fall Short: While mocks are great for unit testing isolated components, they often fail to replicate the subtle nuances, latency, error conditions, and protocol quirks of real systems. A mock Kafka might work fine, but what if your application misinterprets a specific message header, or struggles with the broker’s acknowledgment semantics?
- Integration Gaps: Even if individual services work, their integration points (how they talk to each other) are often the source of bugs. These bugs are incredibly hard to catch without testing against the actual integration mechanisms.
- Environment Drift: Using a separate “integration testing environment” can lead to differences between your test setup and production, making tests less reliable. Testcontainers aims to bridge this gap.
The Testcontainers Solution: Realism and Isolation
Testcontainers addresses these challenges head-on:
- Real Dependencies: Instead of fakes, Testcontainers spins up actual instances of Kafka, Redis, PostgreSQL, or even your custom microservices in Docker containers. This ensures your application interacts with the real thing, catching integration bugs early.
- Isolated Environments: Each test run gets its own fresh, clean set of containers. No shared state, no pollution from previous tests, no “it worked on my machine” excuses. This is crucial for reliable and repeatable tests.
- Disposable and Fast: Containers are started quickly and torn down automatically after tests, leaving no trace. This keeps your development and CI/CD environments clean.
Testing message brokers and web services with Testcontainers allows you to simulate your production environment’s critical communication pathways, giving you high confidence that your distributed application truly works as intended.
Kafka in Microservices: A Quick Refresher
Kafka is a distributed streaming platform that acts as a central nervous system for many microservice architectures. Services produce events to Kafka topics, and other services consume those events. This asynchronous communication pattern is powerful but also adds complexity to testing. We need a real Kafka broker to ensure our producers correctly serialize messages and our consumers correctly deserialize and process them, including error handling, retries, and exactly-once processing semantics.
Kafka with Testcontainers: A Practical Guide
Let’s get our hands dirty by spinning up a Kafka broker using Testcontainers across different languages. We’ll simulate a simple producer and consumer scenario.
Kafka Version Note: As of 2026-02-14, confluentinc/cp-kafka images are widely used and stable. We’ll use a recent 7.6.0 version in our examples.
1. Java (JUnit 5 + Testcontainers-Kafka)
First, make sure you have your build tool (Maven or Gradle) configured with the Testcontainers dependencies.
pom.xml (Maven) / build.gradle (Gradle) Additions:
<!-- Maven pom.xml -->
<dependencies>
<!-- Testcontainers core -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.19.4</version> <!-- Latest stable as of Feb 2026 -->
<scope>test</scope>
</dependency>
<!-- JUnit Jupiter for testing -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.10.1</version> <!-- Latest stable as of Feb 2026 -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.10.1</version>
<scope>test</scope>
</dependency>
<!-- Testcontainers Kafka Module -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.4</version> <!-- Match core Testcontainers version -->
<scope>test</scope>
</dependency>
<!-- Kafka client for interaction -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version> <!-- Compatible with Kafka 7.x -->
<scope>test</scope>
</dependency>
<!-- SLF4J for logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.11</version> <!-- Latest stable as of Feb 2026 -->
<scope>test</scope>
</dependency>
</dependencies>
// Gradle build.gradle
dependencies {
// Testcontainers core
testImplementation 'org.testcontainers:testcontainers:1.19.4' // Latest stable as of Feb 2026
// JUnit Jupiter for testing
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.1' // Latest stable as of Feb 2026
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.1'
// Testcontainers Kafka Module
testImplementation 'org.testcontainers:kafka:1.19.4' // Match core Testcontainers version
// Kafka client for interaction
testImplementation 'org.apache.kafka:kafka-clients:3.6.1' // Compatible with Kafka 7.x
// SLF4J for logging
testImplementation 'org.slf4j:slf4j-simple:2.0.11' // Latest stable as of Feb 2026
}
test {
useJUnitPlatform()
}
Now, let’s write a simple Java test that uses a Kafka container:
// src/test/java/com/example/KafkaIntegrationTest.java
package com.example;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.*;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class KafkaIntegrationTest {
// Step 1: Declare the KafkaContainer
// We'll use the Confluent Platform Kafka image for robustness.
// As of Feb 2026, version 7.6.0 is a stable choice.
@Container
private static KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));
// Step 2: Start the container once for all tests in this class
// This is a common optimization for faster test suites.
@BeforeAll
static void startKafka() {
kafka.start();
}
// Step 3: Stop the container after all tests
@AfterAll
static void stopKafka() {
kafka.stop();
}
@Test
void shouldProduceAndConsumeMessage() throws ExecutionException, InterruptedException {
String topic = "my-test-topic-" + UUID.randomUUID(); // Unique topic for isolation
String message = "Hello, Testcontainers Kafka!";
// Configure Producer
Properties producerProps = new Properties();
// Get Kafka's bootstrap servers dynamically from the container
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Configure Consumer
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + UUID.randomUUID()); // Unique consumer group
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start reading from the beginning
// Step 4: Create a Kafka Producer
try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
// Step 5: Send a message
producer.send(new ProducerRecord<>(topic, "test-key", message)).get(); // .get() makes it synchronous
producer.flush(); // Ensure message is sent
}
// Step 6: Create a Kafka Consumer
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
consumer.subscribe(Collections.singletonList(topic)); // Subscribe to our test topic
// Step 7: Poll for messages
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10)); // Poll for 10 seconds
// Assertions
assertFalse(records.isEmpty(), "Should have received at least one message");
assertEquals(1, records.count(), "Should have received exactly one message");
assertEquals(message, records.iterator().next().value(), "Received message should match sent message");
}
}
}
Explanation:
@Container private static KafkaContainer kafka = ...: This line declares our Kafka container. We specify the Docker image name. Testcontainers automatically handles downloading, starting, and stopping Kafka, including its Zookeeper dependency.@BeforeAll static void startKafka() { kafka.start(); }: Thestart()method launches the container before any tests run.staticensures it’s done once for the entire test class.@AfterAll static void stopKafka() { kafka.stop(); }: Thestop()method gracefully shuts down the container after all tests complete.kafka.getBootstrapServers(): This is the magic! Testcontainers dynamically provides the correct, dynamically mapped port and host for your application to connect to the Kafka broker inside the container. This removes hardcoding and ensures portability.- The rest of the code sets up standard Kafka producer and consumer clients using the provided bootstrap servers, then sends and receives a message, asserting its content. We use
UUID.randomUUID()for topic and consumer group IDs to ensure test isolation even if multiple tests run concurrently.
2. Python (Pytest + Testcontainers-Python)
For Python, we’ll leverage pytest and the testcontainers-python library.
Installation:
pip install pytest testcontainers-core testcontainers-kafka-enterprise kafka-python
Note: testcontainers-kafka-enterprise is the module that provides the KafkaContainer for Testcontainers-Python. kafka-python is a client library to interact with Kafka.
As of 2026-02-14, testcontainers-python 4.14.1 is a recent stable version.
# tests/test_kafka_integration.py
import pytest
from testcontainers.kafka import KafkaContainer # This module is usually from testcontainers-kafka-enterprise
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import NoBrokersAvailable
import json
import time
import uuid
# Define a fixture for the Kafka container
# This fixture will start the Kafka container before tests that use it
# and stop it afterwards. 'scope="session"' means it runs once for all tests.
@pytest.fixture(scope="session")
def kafka_container():
# Using a stable Confluent Platform Kafka image
# As of Feb 2026, version 7.6.0 is a stable choice.
with KafkaContainer("confluentinc/cp-kafka:7.6.0") as kafka:
yield kafka.get_bootstrap_server()
# Fixture to provide a Kafka producer
@pytest.fixture
def kafka_producer(kafka_container):
# The 'retry_backoff_ms' is important here for Python clients
# to give Kafka time to start up and become ready.
retries = 5
for i in range(retries):
try:
producer = KafkaProducer(
bootstrap_servers=kafka_container,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3,
retry_backoff_ms=200, # Small delay between retries
api_version=(0,11,5) # Specify API version for broader compatibility
)
yield producer
return
except NoBrokersAvailable as e:
if i < retries - 1:
print(f"Kafka producer connection failed, retrying... ({i+1}/{retries})")
time.sleep(2) # Wait a bit longer before next retry
else:
raise e # Re-raise if all retries fail
# Fixture to provide a Kafka consumer
@pytest.fixture
def kafka_consumer(kafka_container):
group_id = f"test-group-{uuid.uuid4()}" # Unique group ID for isolation
consumer = KafkaConsumer(
bootstrap_servers=kafka_container,
group_id=group_id,
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
consumer_timeout_ms=5000, # Wait up to 5 seconds for messages
api_version=(0,11,5)
)
yield consumer
consumer.close() # Ensure consumer is closed after test
# Now, let's write our test!
def test_should_produce_and_consume_message_python(kafka_producer, kafka_consumer):
topic = f"my-python-test-topic-{uuid.uuid4()}"
message_payload = {"id": 123, "data": "Hello from Python Testcontainers!"}
# Step 1: Subscribe the consumer to the topic before producing
kafka_consumer.subscribe([topic])
# Step 2: Produce a message
future = kafka_producer.send(topic, message_payload)
record_metadata = future.get(timeout=10) # Wait for the message to be sent
assert record_metadata.topic == topic
print(f"Message sent to topic {record_metadata.topic}, partition {record_metadata.partition}, offset {record_metadata.offset}")
# Step 3: Consume messages
received_message = None
for msg in kafka_consumer:
if msg.topic == topic and msg.value == message_payload:
received_message = msg
break # Found our message
# Assertions
assert received_message is not None, "Did not receive the expected message"
assert received_message.value == message_payload, "Received message content mismatch"
assert received_message.topic == topic, "Received message topic mismatch"
Explanation:
@pytest.fixture(scope="session") def kafka_container():: Pytest fixtures are powerful for managing resources.scope="session"ensures the Kafka container is started once and shared across all tests in the session, improving performance. Theyield kafka.get_bootstrap_server()passes the dynamically determined Kafka connection string to tests.KafkaContainer("confluentinc/cp-kafka:7.6.0"): Similar to Java, this instantiates our Kafka container.kafka_producerandkafka_consumerfixtures: These create Kafka client instances, configured with thebootstrap_serversfrom ourkafka_containerfixture. Theretry_backoff_msis important for the producer to gracefully handle the brief startup time of Kafka.consumer.subscribe([topic]): The consumer needs to subscribe to a topic before it can receive messages.producer.send(topic, message_payload): Sends the message..get(timeout=10)makes it block until the message is acknowledged by Kafka, which is useful in tests.for msg in kafka_consumer:: The consumer polls for messages. We loop until we find our specific message or theconsumer_timeout_msis reached.
3. JavaScript/TypeScript (Node.js + Testcontainers-Node)
For Node.js, we’ll use testcontainers with kafkajs (a popular Kafka client for Node.js).
Installation:
npm install --save-dev testcontainers @types/jest jest kafkajs
# Or yarn:
# yarn add -D testcontainers @types/jest jest kafkajs
Note: As of 2026-02-14, testcontainers (Node.js) is in a stable release, e.g., 10.9.0 or higher.
// tests/kafka.test.ts
import { KafkaContainer } from 'testcontainers';
import { Kafka, Producer, Consumer } from 'kafkajs';
import { v4 as uuidv4 } from 'uuid';
describe('Kafka Integration with Testcontainers', () => {
let kafkaContainer: KafkaContainer;
let kafka: Kafka;
let producer: Producer;
let consumer: Consumer;
let topic: string;
let messages: any[] = [];
// Before all tests, start the Kafka container and set up clients
beforeAll(async () => {
// Step 1: Start the Kafka Container
// As of Feb 2026, version 7.6.0 is a stable choice for Confluent Platform Kafka.
kafkaContainer = await new KafkaContainer('confluentinc/cp-kafka:7.6.0').start();
// Step 2: Initialize KafkaJS client
kafka = new Kafka({
brokers: [kafkaContainer.getBootstrapServers()],
clientId: `my-app-${uuidv4()}`
});
// Step 3: Create a producer
producer = kafka.producer();
await producer.connect();
// Step 4: Create a consumer
consumer = kafka.consumer({ groupId: `test-group-${uuidv4()}` });
await consumer.connect();
// Create a unique topic for this test run
topic = `node-test-topic-${uuidv4()}`;
// Step 5: Subscribe the consumer to the topic
await consumer.subscribe({ topic, fromBeginning: true });
// Set up message collection for the consumer
consumer.run({
eachMessage: async ({ message }) => {
if (message.value) {
messages.push(JSON.parse(message.value.toString()));
}
},
});
// Give the consumer a moment to subscribe and be ready
await new Promise(resolve => setTimeout(resolve, 2000));
}, 60000); // Increased timeout for container startup
// After all tests, stop the producer, consumer, and container
afterAll(async () => {
await producer.disconnect();
await consumer.disconnect();
await kafkaContainer.stop();
});
// Before each test, clear previous messages
beforeEach(() => {
messages = [];
});
test('should produce and consume a message from Kafka', async () => {
const testMessage = { id: 1, text: 'Hello from Node.js Testcontainers!' };
// Step 6: Produce a message
await producer.send({
topic: topic,
messages: [{ value: JSON.stringify(testMessage) }],
});
// Step 7: Wait for the consumer to process the message
// In a real app, this would be an assertion on the side effect
// or a mock of the next service in the chain. For testing, we poll.
let retries = 0;
const maxRetries = 10;
while (messages.length === 0 && retries < maxRetries) {
await new Promise(resolve => setTimeout(resolve, 500)); // Wait for 500ms
retries++;
}
// Assertions
expect(messages.length).toBe(1);
expect(messages[0]).toEqual(testMessage);
});
});
Explanation:
import { KafkaContainer } from 'testcontainers';: Imports theKafkaContainerclass.kafkaContainer = await new KafkaContainer('confluentinc/cp-kafka:7.6.0').start();: Instantiates and starts the Kafka container.awaitis used because starting a container is an asynchronous operation.kafka = new Kafka({ brokers: [kafkaContainer.getBootstrapServers()], ... });: Initializes thekafkajsclient, passing the dynamically retrieved bootstrap servers.producer.connect(),consumer.connect(),consumer.subscribe(): Standardkafkajsclient setup.consumer.run({ eachMessage: async ({ message }) => { ... } });: This sets up a listener for messages. Messages are pushed into amessagesarray for later assertion.producer.send(): Sends the message to the topic.- Polling Loop: The
whileloop simulates waiting for the message to be processed by our consumer. In a real integration test, you’d likely assert on the side effects of message processing (e.g., data updated in a database, another service receiving an API call), rather than directly consuming the message in the test itself. This approach shows direct consumption for demonstration.
Testing Web Service Interactions: A Client-Server Scenario
Many microservices act as clients to other HTTP APIs. Testcontainers can spin up those dependent APIs in containers, allowing you to test your client service against a real, running instance of its dependency.
Let’s imagine you have a UserService that needs to fetch user details from an AuthService. We’ll spin up a mock AuthService in a container and test our UserService against it.
For this, we’ll use GenericContainer, which allows us to run any Docker image. We’ll simulate a simple HTTP server within a container.
1. Java: Testing a Client Service against a Containerized API
First, let’s create a very simple “AuthService” that runs on a specific port and returns a JSON response. We’ll use a plain Java HTTP server for simplicity, but in a real scenario, this would be your actual Spring Boot/Micronaut/Quarkus application.
AuthService.java (This would be your actual dependent service code, placed somewhere accessible to Docker, or a pre-built image):
// src/main/java/com/example/AuthService.java
package com.example;
import com.sun.net.httpserver.HttpServer;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpExchange;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
// This is a minimal HTTP server for demonstration.
// In a real-world scenario, this would be your dependent microservice.
public class AuthService {
public static void main(String[] args) throws IOException {
int port = Integer.parseInt(System.getProperty("PORT", "8080"));
HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
server.createContext("/user/123", new UserHandler());
server.setExecutor(null); // creates a default executor
server.start();
System.out.println("AuthService started on port " + port);
}
static class UserHandler implements HttpHandler {
@Override
public void handle(HttpExchange exchange) throws IOException {
String response = "{\"id\": 123, \"username\": \"testuser\", \"email\": \"test@example.com\"}";
exchange.getResponseHeaders().set("Content-Type", "application/json");
exchange.sendResponseHeaders(200, response.length());
OutputStream os = exchange.getResponseBody();
os.write(response.getBytes());
os.close();
}
}
}
To run this in a Testcontainers GenericContainer, we need a Dockerfile that builds and runs it.
Dockerfile (in src/test/resources/auth-service/Dockerfile):
# src/test/resources/auth-service/Dockerfile
FROM eclipse-temurin:21-jre-jammy
WORKDIR /app
COPY target/classes/com/example/AuthService.class /app/com/example/
COPY target/classes/com/example/AuthService$UserHandler.class /app/com/example/
CMD ["java", "-cp", "/app", "com.example.AuthService"]
EXPOSE 8080
Now, let’s write our UserService client and its test:
UserService.java (Our client service that depends on AuthService):
// src/main/java/com/example/UserService.java
package com.example;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
// A simple client service that fetches user data from an external API
public class UserService {
private final String authServiceBaseUrl;
private final HttpClient httpClient;
public UserService(String authServiceBaseUrl) {
this.authServiceBaseUrl = authServiceBaseUrl;
this.httpClient = HttpClient.newHttpClient();
}
public String getUserDetails(String userId) throws IOException, InterruptedException {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(authServiceBaseUrl + "/user/" + userId))
.GET()
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
return response.body();
} else {
throw new RuntimeException("Failed to get user details: " + response.statusCode());
}
}
}
UserServiceIntegrationTest.java (The Testcontainers test):
// src/test/java/com/example/UserServiceIntegrationTest.java
package com.example;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.images.builder.ImageFromDockerfile;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.Executors;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class UserServiceIntegrationTest {
// Step 1: Declare the GenericContainer for our AuthService
// We'll build an image from a Dockerfile located in resources.
@Container
private static GenericContainer<?> authService = new GenericContainer<>(
new ImageFromDockerfile()
.withDockerfile(new File("src/test/resources/auth-service/Dockerfile").toPath())
.withFileFromClasspath("com/example/AuthService.class", "com/example/AuthService.class")
.withFileFromClasspath("com/example/AuthService$UserHandler.class", "com/example/AuthService$UserHandler.class")
)
.withExposedPorts(8080) // The port our AuthService listens on
.waitingFor(Wait.forHttp("/user/123").forPort(8080).forStatusCode(200)); // Wait until /user/123 returns 200 OK
private static UserService userService;
// Step 2: Start the container and initialize our client service
@BeforeAll
static void startAuthServiceAndInitClient() {
authService.start();
// Dynamically get the mapped host and port to connect our client
String authServiceBaseUrl = String.format("http://%s:%d",
authService.getHost(),
authService.getMappedPort(8080));
userService = new UserService(authServiceBaseUrl);
System.out.println("AuthService running at: " + authServiceBaseUrl);
}
// Step 3: Stop the container
@AfterAll
static void stopAuthService() {
authService.stop();
}
@Test
void shouldGetUserDetailsFromAuthService() throws IOException, InterruptedException {
// Step 4: Call our client service, which in turn calls the containerized AuthService
String userDetailsJson = userService.getUserDetails("123");
// Assertions
assertNotNull(userDetailsJson);
assertTrue(userDetailsJson.contains("testuser"));
assertTrue(userDetailsJson.contains("test@example.com"));
System.out.println("Received user details: " + userDetailsJson);
}
}
Explanation:
ImageFromDockerfile: This is a powerful feature! Testcontainers can build a Docker image on-the-fly from aDockerfileand local files. Here, we’re copying our compiledAuthService.classfiles into the container.withExposedPorts(8080): Tells Testcontainers that port 8080 inside the container should be accessible from the host. Testcontainers will dynamically map this to an available port on your host machine.waitingFor(Wait.forHttp("/user/123")...): This is a crucial wait strategy. It tells Testcontainers not to consider the container “started” until an HTTP GET request to/user/123on port 8080 returns a 200 OK status. This ensures ourAuthServiceis fully ready to accept requests before our test tries to connect.authService.getHost()andauthService.getMappedPort(8080): We use these methods to get the actual host and port that Testcontainers has exposed for theAuthService. This dynamically configures ourUserServiceclient.- The test then simply calls our
UserService, which internally makes an HTTP call to the containerizedAuthService, and we assert the response.
2. Python: Testing a Client Service against a Containerized API
We’ll use a similar approach in Python, creating a simple Flask application for our mock AuthService and then testing a Python client against it.
auth_service_app.py (our dependent Flask service, needs to be in a directory with a Dockerfile):
# auth-service-py/auth_service_app.py
from flask import Flask, jsonify
app = Flask(__name__)
@app.route("/user/<user_id>", methods=["GET"])
def get_user(user_id):
if user_id == "123":
return jsonify({"id": 123, "username": "pythonuser", "email": "python@example.com"})
return jsonify({"error": "User not found"}), 404
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080)
Dockerfile (in auth-service-py/Dockerfile):
# auth-service-py/Dockerfile
FROM python:3.10-slim-buster
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY auth_service_app.py .
CMD ["python", "auth_service_app.py"]
EXPOSE 8080
requirements.txt (in auth-service-py/requirements.txt):
Flask==2.3.3
user_service_client.py (Our Python client service):
# user_service_client.py
import requests
class UserService:
def __init__(self, auth_service_base_url: str):
self.auth_service_base_url = auth_service_base_url
def get_user_details(self, user_id: str) -> dict:
url = f"{self.auth_service_base_url}/user/{user_id}"
response = requests.get(url, timeout=5) # Add a timeout for robustness
response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
return response.json()
tests/test_web_service_integration.py (The Pytest Testcontainers test):
# tests/test_web_service_integration.py
import pytest
from testcontainers.core.container import GenericContainer
from testcontainers.core.waiting_utils import wait_for_http_status_code
from user_service_client import UserService
import os
# Define a fixture for the AuthService container
@pytest.fixture(scope="session")
def auth_service_container():
# Construct the path to the Dockerfile relative to the test file
current_dir = os.path.dirname(os.path.abspath(__file__))
dockerfile_dir = os.path.join(current_dir, "../auth-service-py")
with GenericContainer(image="test-auth-service-py", dockerfile=dockerfile_dir) \
.with_exposed_ports(8080) \
.with_env("PORT", "8080") \
.waiting_for(wait_for_http_status_code("/", 8080, 200, timeout=10)) as container:
# Testcontainers-Python's 'wait_for_http_status_code' needs an initial path.
# Once it confirms the server is up, we can use the specific endpoint for the test.
# A more robust wait strategy could also be used here if the root is not immediately available.
# Alternatively, for wait_for_http_status_code, we can use `/user/123` if that endpoint is the first to become ready.
# Let's refine the wait strategy to target our specific endpoint:
# Note: wait_for_http_status_code is a bit simplistic. Testcontainers-Python often
# recommends using custom wait strategies or a combination.
# For a truly robust wait, a custom predicate or retries on connection might be better.
# Given the Flask app serves `/user/123`, we'll make sure it's ready.
wait_for_http_status_code(container.get_container_host_ip(), container.get_exposed_port(8080), "/user/123", 200, timeout=10)
yield f"http://{container.get_container_host_ip()}:{container.get_exposed_port(8080)}"
# Fixture for our UserService client
@pytest.fixture
def user_service_client(auth_service_container):
return UserService(auth_service_container)
# The integration test
def test_should_get_user_details_from_auth_service_py(user_service_client):
user_id = "123"
user_details = user_service_client.get_user_details(user_id)
assert user_details is not None
assert user_details["id"] == 123
assert user_details["username"] == "pythonuser"
assert user_details["email"] == "python@example.com"
print(f"Received user details: {user_details}")
# Test for a non-existent user
with pytest.raises(requests.exceptions.HTTPError) as excinfo:
user_service_client.get_user_details("999")
assert excinfo.value.response.status_code == 404
Explanation:
GenericContainer(image="test-auth-service-py", dockerfile=dockerfile_dir): In Python, you can specifydockerfileto build an image from a localDockerfileand context. Theimagename here is a temporary name assigned during the build..with_exposed_ports(8080): Exposes the internal port 8080..with_env("PORT", "8080"): Passes an environment variable to the container, which our Flask app uses..waiting_for(wait_for_http_status_code(...)): This is the wait strategy. It waits until an HTTP GET request toauth_service_container.get_container_host_ip():auth_service_container.get_exposed_port(8080)/user/123returns a 200 status code. Thewait_for_http_status_codeutility is a convenient way to confirm the web service is ready.yield f"http://{container.get_container_host_ip()}:{container.get_exposed_port(8080)}": Provides the dynamically generated base URL for theAuthServiceto ouruser_service_clientfixture.- The test then calls the
UserServiceand asserts the JSON response.
3. JavaScript/TypeScript: Testing a Client Service against a Containerized API
We’ll follow the pattern again, creating a simple Express.js application for our mock AuthService.
auth-service-node/app.ts (our dependent Express service):
// auth-service-node/app.ts
import express from 'express';
const app = express();
const port = process.env.PORT || 8080;
app.get('/user/:userId', (req, res) => {
const userId = req.params.userId;
if (userId === '123') {
res.status(200).json({ id: 123, username: 'nodeuser', email: 'node@example.com' });
} else {
res.status(404).json({ error: 'User not found' });
}
});
app.listen(port, () => {
console.log(`AuthService Node.js running on http://localhost:${port}`);
});
auth-service-node/package.json:
{
"name": "auth-service-node",
"version": "1.0.0",
"description": "",
"main": "dist/app.js",
"scripts": {
"start": "tsc && node dist/app.js",
"build": "tsc"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"express": "^4.19.2"
},
"devDependencies": {
"@types/express": "^4.17.21",
"@types/node": "^20.11.17",
"typescript": "^5.3.3"
}
}
auth-service-node/tsconfig.json:
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"outDir": "./dist",
"strict": true,
"esModuleInterop": true,
"forceConsistentCasingInFileNames": true
},
"include": ["app.ts"]
}
user-service-client.ts (Our Node.js client service):
// user-service-client.ts
import fetch from 'node-fetch'; // For Node.js, fetch needs to be imported or use native HTTP
interface UserDetails {
id: number;
username: string;
email: string;
}
export class UserService {
private authServiceBaseUrl: string;
constructor(authServiceBaseUrl: string) {
this.authServiceBaseUrl = authServiceBaseUrl;
}
public async getUserDetails(userId: string): Promise<UserDetails> {
const response = await fetch(`${this.authServiceBaseUrl}/user/${userId}`);
if (!response.ok) {
throw new Error(`Failed to fetch user details: ${response.status} ${response.statusText}`);
}
return response.json() as Promise<UserDetails>;
}
}
tests/web-service.test.ts (The Jest Testcontainers test):
// tests/web-service.test.ts
import { GenericContainer, StartedTestContainer } from 'testcontainers';
import { UserService } from '../user-service-client';
import path from 'path';
// Note: Testcontainers-Node uses `Node.js` fetch which is built-in in recent Node versions.
// For older versions, you might need `npm install node-fetch` and import it.
describe('Web Service Integration with Testcontainers', () => {
let authServiceContainer: StartedTestContainer;
let userService: UserService;
let authServiceBaseUrl: string;
beforeAll(async () => {
// Step 1: Build and start the AuthService container from a local Dockerfile
// We assume the Dockerfile is in the 'auth-service-node' directory relative to this test.
const dockerfileContext = path.resolve(__dirname, '../auth-service-node');
authServiceContainer = await new GenericContainer(dockerfileContext)
.withExposedPorts(8080) // Port the Express app listens on
.withWaitStrategy(Wait.forHttp('/user/123').forPort(8080).forStatusCode(200)) // Wait until ready
.start();
// Step 2: Get the dynamically mapped URL for our client
authServiceBaseUrl = `http://${authServiceContainer.getHost()}:${authServiceContainer.getMappedPort(8080)}`;
userService = new UserService(authServiceBaseUrl);
console.log(`AuthService Node.js running at: ${authServiceBaseUrl}`);
}, 60000); // Increased timeout for container build and startup
afterAll(async () => {
await authServiceContainer.stop();
});
test('should get user details from containerized AuthService', async () => {
const userId = '123';
const userDetails = await userService.getUserDetails(userId);
expect(userDetails).toBeDefined();
expect(userDetails.id).toBe(123);
expect(userDetails.username).toBe('nodeuser');
expect(userDetails.email).toBe('node@example.com');
console.log(`Received user details: ${JSON.stringify(userDetails)}`);
});
test('should handle non-existent user correctly', async () => {
const userId = '999';
await expect(userService.getUserDetails(userId)).rejects.toThrow('Failed to fetch user details: 404 Not Found');
});
});
Explanation:
new GenericContainer(dockerfileContext): Testcontainers-Node can take a path to a directory containing aDockerfileas its first argument. It will then build the image locally and use it..withExposedPorts(8080): Exposes the internal port 8080..withWaitStrategy(Wait.forHttp('/user/123')...): Uses a built-in HTTP wait strategy to ensure the Express service is up and responsive before the test attempts to connect.authServiceContainer.getHost()andauthServiceContainer.getMappedPort(8080): Retrieves the dynamic host and port for ourUserServiceclient.- The tests then interact with the
UserServiceand assert the responses, including error handling.
Mini-Challenge: Extend the Kafka Producer/Consumer
You’ve seen how to produce and consume a single message. Now, it’s your turn to add some complexity!
Challenge: Modify one of the Kafka examples (Java, Python, or Node.js) to:
- Produce multiple messages (e.g., 5-10 messages with unique IDs or data).
- Consume all produced messages and verify that you received exactly the number of messages sent and that their contents are correct.
Hints:
- For producers, a simple loop around the
producer.send()call will work. Rememberproducer.flush()if sending multiple messages quickly and expecting them immediately. - For consumers, you’ll need to poll multiple times or increase the
pollduration (Java/Python) or refine your message collection logic (Node.js) to ensure all messages are captured. Awhileloop with a timeout is a robust way to collect all expected messages. - Consider using a
ListorArrayto store received messages and then check its size and contents.
What to Observe/Learn:
- How to handle batching messages and ensure they are all processed.
- The importance of consumer polling strategies and timeouts when dealing with asynchronous message flows in tests.
- Confirming message order (if your Kafka setup guarantees it and your test validates it).
Common Pitfalls & Troubleshooting
Working with message brokers and web services in containers can introduce some unique challenges. Here’s how to navigate them:
Kafka Startup Timeouts:
- Symptom:
NoBrokersAvailableerror,TimeoutException, or tests failing because Kafka isn’t ready. Kafka (and its Zookeeper dependency) can take a moment to initialize. - Fix:
- Increase Testcontainers wait strategy timeout: If using
KafkaContainer, Testcontainers has built-in wait strategies, but you can explicitly increase the startup timeout:.withStartupTimeout(Duration.ofSeconds(120))(Java). - Implement client-side retries: As shown in the Python Kafka producer example, clients should ideally have retry logic with backoff. This makes your application more resilient and your tests more stable.
- Check Docker logs: Inspect the container logs (
kafka.getLogs()ordocker logs <container_id>) for underlying issues.
- Increase Testcontainers wait strategy timeout: If using
- Symptom:
Web Service Reachability Issues (Port Mapping/Host):
- Symptom:
ConnectionRefused,ConnectException, orUnknownHostExceptionwhen your client tries to connect to the containerized service. - Fix:
- Verify
withExposedPorts(): Ensure you’ve correctly exposed the port your service listens on within the container. - Use
getMappedPort()andgetHost(): Always retrieve the connection details dynamically from Testcontainers. Never hardcodelocalhostand a fixed port for container communication, as the port is dynamically assigned on the host. - Check container logs: Make sure your web service inside the container actually started successfully and is listening on the expected port.
- Verify
- Symptom:
Asynchronous Assertions (Waiting for Events):
- Symptom: Tests failing because they assert too quickly before an asynchronous event (like a Kafka message being processed or an API call completing) has occurred.
- Fix:
- Polling with a timeout: As demonstrated in the Node.js Kafka consumer, a
whileloop that periodically checks a condition (e.g.,messages.length > 0) with asetTimeoutorThread.sleepand an overall timeout is a robust pattern. - Awaitability libraries: For more complex asynchronous scenarios, consider libraries that help with awaiting conditions, like Awaitility for Java or specific
async/awaitpatterns in JavaScript/Python. - Don’t over-rely on
Thread.sleep(): While useful for simple demonstrations, excessive use ofThread.sleep()leads to slow and flaky tests. Prefer explicit wait strategies or polling loops with sensible timeouts.
- Polling with a timeout: As demonstrated in the Node.js Kafka consumer, a
Summary
Fantastic work! You’ve successfully navigated the complexities of integration testing with message brokers and web services using Testcontainers. Here’s a quick recap of what you’ve achieved:
- Understanding the “Why”: You now grasp why testing against real instances of Kafka and dependent web services is crucial for distributed systems, moving beyond the limitations of mocks.
- Kafka Mastery: You’ve learned how to spin up disposable Kafka containers, configure producers and consumers, and verify message flow across Java, Python, and Node.js.
- Web Service Integration: You’ve mastered using
GenericContainerandImageFromDockerfileto bring custom or dependent microservices into your test environment, allowing your client services to interact with them realistically. - Dynamic Connectivity: You’re adept at using Testcontainers’ dynamic port mapping and host retrieval to ensure your applications connect correctly to these ephemeral services.
- Robust Testing: You’ve started implementing wait strategies and handling asynchronous assertions, making your integration tests reliable.
You’re now ready to tackle even more sophisticated integration scenarios in your microservice landscape.
In the next chapter, we’ll shift our focus to integrating Testcontainers into your CI/CD pipelines, ensuring your robust tests run automatically and efficiently with every code change. Get ready to automate!
References
- Testcontainers Official Documentation: https://testcontainers.com/
- Testcontainers-Java Kafka Module Documentation: https://java.testcontainers.org/modules/kafka/
- Testcontainers-Python Documentation: https://python.testcontainers.org/
- Testcontainers-Node Documentation: https://node.testcontainers.org/
- Apache Kafka Documentation: https://kafka.apache.org/documentation/
- Confluent Platform Docker Images: https://hub.docker.com/r/confluentinc/cp-kafka
This page is AI-assisted and reviewed. It references official documentation and recognized resources where relevant.