A real chat application needs to store messages to provide chat history. This chapter will guide you through setting up a SQLite database and integrating it into our FastAPI application using SQLAlchemy, a powerful SQL toolkit and Object-Relational Mapper (ORM).
Purpose of this Chapter
By the end of this chapter, you will:
- Understand the basics of ORM and why we use SQLAlchemy.
- Set up a SQLite database connection.
- Define database models for users and chat messages.
- Implement methods to store new messages and retrieve chat history.
- Update the WebSocket endpoint to save messages.
Concepts Explained: SQLAlchemy and ORM
Object-Relational Mapping (ORM) is a technique that lets you query and manipulate data from a database using an object-oriented paradigm. Instead of writing raw SQL, you interact with database tables as Python classes and objects.
SQLAlchemy is Python’s most popular SQL toolkit and ORM. It provides:
- Declarative mapping: Allows defining database tables and models using Python classes.
- Session management: Handles interactions with the database (add, commit, rollback).
- Abstraction: Works with various database backends (SQLite, PostgreSQL, MySQL, etc.), allowing us to easily switch later if needed.
SQLite is a C library that provides a lightweight, embedded relational database management system. It’s file-based, meaning the entire database is stored in a single file, making it excellent for development and simple deployments.
Step-by-Step Tasks
1. Install SQLAlchemy and Async Drivers
Stop your Uvicorn server (Ctrl+C). We need to install SQLAlchemy for ORM and aiosqlite for asynchronous SQLite database access.
pipenv install sqlalchemy aiosqlite
2. Create app/database.py for Database Setup
Create a new file app/database.py to handle our database engine, session, and base for declarative models.
# app/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
# Define the database URL
# For SQLite, this points to a file. For production, you'd use PostgreSQL, etc.
DATABASE_URL = "sqlite:///./chat.db"
# Create the SQLAlchemy engine
# connect_args are specific to SQLite and ensure only one thread can communicate
# if using synchronous operations (though we'll use async)
engine = create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}, echo=False # Set echo=True for SQL logs
)
# Create a SessionLocal class
# This will be used to create session instances for database interactions
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Base class for our ORM models
Base = declarative_base()
# Dependency to get a database session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
Code Explanation (app/database.py):
DATABASE_URL: Specifies the connection string for our SQLite database file namedchat.db.create_engine: Creates the SQLAlchemy engine.echo=Trueis useful during development to see the SQL queries being executed.SessionLocal: A factory for session objects.autocommit=Falsemeans we manuallycommit()changes,autoflush=Falseprevents flushing before commits.Base = declarative_base(): All our ORM models will inherit from thisBaseclass.get_db(): A FastAPI dependency function that provides a database session, ensuring it’s properly closed after use.
3. Create app/models.py for Database Models
Create a new file app/models.py to define our User and Message models.
# app/models.py
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
hashed_password = Column(String)
messages = relationship("Message", back_populates="owner")
class Message(Base):
__tablename__ = "messages"
id = Column(Integer, primary_key=True, index=True)
content = Column(String, index=True)
timestamp = Column(DateTime, default=datetime.utcnow)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="messages")
Code Explanation (app/models.py):
UserModel:__tablename__: Specifies the database table name.id,username,hashed_password: Define the columns with their types and constraints (e.g.,primary_key=True,unique=True,index=True).messages = relationship(...): Defines a relationship with theMessagemodel, indicating a user can have multiple messages.back_populateslinks it to theownerrelationship inMessage.
MessageModel:id,content,timestamp: Columns for message ID, content, and creation timestamp.owner_id = Column(Integer, ForeignKey("users.id")): Creates a foreign key linking messages to users.owner = relationship(...): Defines the reverse relationship, linking a message back to its owner.
4. Create Database Tables
Before running the application, we need to create the database tables. This is typically done once.
Add a small script to app/main.py (temporarily) or a dedicated create_db.py file to do this. For simplicity, let’s add it to app/main.py and then remove it after execution.
# app/main.py (TEMPORARY ADDITION for database creation)
# Add this BEFORE app = FastAPI() line
from .database import Base, engine # New import
# This will create the database tables defined in models.py
Base.metadata.create_all(bind=engine)
# (rest of your app/main.py content)
Now, run app/main.py once to create the chat.db file and tables.
pipenv shell
python -c "from app.database import Base, engine; from app import models; Base.metadata.create_all(bind=engine)"
After this command, you should see a chat.db file created in your realtime-chat-app directory. You can now remove or comment out the Base.metadata.create_all(bind=engine) line from app/main.py if you added it there.
5. Update Authentication to Use Database Users
Now, let’s modify app/main.py to use our User model for user authentication instead of the fake_users_db. We also need Pydantic models for request/response.
Create a new file app/schemas.py:
# app/schemas.py
from pydantic import BaseModel
from datetime import datetime
# Pydantic models for data validation and serialization
class UserCreate(BaseModel):
username: str
password: str
class UserResponse(BaseModel):
id: int
username: str
class Config:
from_attributes = True # Or orm_mode = True for Pydantic v1.x
class MessageCreate(BaseModel):
content: str
class MessageResponse(BaseModel):
id: int
content: str
timestamp: datetime
owner_id: int
owner: UserResponse # Include owner details
class Config:
from_attributes = True # Or orm_mode = True for Pydantic v1.x
Update app/main.py to use these models and interact with the database.
# app/main.py (further updated)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from typing import Dict, List
from sqlalchemy.orm import Session # New import
from .auth import Hasher, create_access_token, get_current_user, ACCESS_TOKEN_EXPIRE_MINUTES, Token
from .connections import ConnectionManager
from .database import get_db # New import
from . import models, schemas # New import
from datetime import timedelta
app = FastAPI()
manager = ConnectionManager()
# --- User Registration Endpoint ---
@app.post("/register", response_model=schemas.UserResponse, status_code=status.HTTP_201_CREATED)
async def register_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = db.query(models.User).filter(models.User.username == user.username).first()
if db_user:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered")
hashed_password = Hasher.get_password_hash(user.password)
db_user = models.User(username=user.username, hashed_password=hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.username == form_data.username).first()
if not user or not Hasher.verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# Update get_current_user to return the actual User model, not just username
from .auth import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, TokenData
async def get_current_user_db(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = db.query(models.User).filter(models.User.username == token_data.username).first()
if user is None:
raise credentials_exception
return user
@app.get("/users/me", response_model=schemas.UserResponse)
async def read_users_me(current_user: models.User = Depends(get_current_user_db)):
return current_user
# --- Chat History Endpoint ---
@app.get("/messages", response_model=List[schemas.MessageResponse])
async def get_chat_history(db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_db), skip: int = 0, limit: int = 100):
messages = db.query(models.Message).order_by(models.Message.timestamp.desc()).offset(skip).limit(limit).all()
# Eagerly load owner to avoid N+1 queries if owner details are needed
return messages
# Updated WebSocket endpoint to save messages
@app.websocket("/ws/{client_id}") # Renamed for clarity, original was fine too
async def websocket_chat(websocket: WebSocket, client_id: str, db: Session = Depends(get_db)): # Add db dependency
# NOTE: For production, authenticate WebSocket connection with JWT too.
# This example skips that for simplicity in message persistence, but it's crucial.
# You would parse a token from a query parameter or header and validate it here.
await manager.connect(websocket)
welcome_message = f"Client #{client_id} joined the chat."
await manager.broadcast(welcome_message)
try:
while True:
data = await websocket.receive_text()
full_message = f"Client #{client_id} says: {data}"
# Save message to database
new_message = models.Message(content=data, owner_id=1) # For now, hardcode owner_id=1 (e.g., testuser)
# In next chapter, we'll use actual authenticated user
db.add(new_message)
db.commit()
db.refresh(new_message)
await manager.broadcast(full_message)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat.")
Note on get_current_user_db and WebSocket Authentication:
The current websocket_chat endpoint does not actually authenticate the WebSocket connection using JWT. It takes a client_id directly. For a secure application, you would need to modify the WebSocket endpoint to:
- Expect a JWT in a query parameter (e.g.,
ws://localhost:8000/ws?token=YOUR_JWT). - Validate that JWT inside the WebSocket endpoint using a similar mechanism to
get_current_user_db. We will address securing WebSocket connections in a later chapter. For now, the focus is on message persistence.
6. Run and Test
Start the server:
pipenv shell uvicorn app.main:app --reloadRegister a user: Go to
http://127.0.0.1:8000/docs, use the/registerendpoint to create a user (e.g., username:chatuser, password:chatpassword).Login to get a token: Use the
/tokenendpoint with the new user’s credentials to get an access token.Authorize in Swagger UI: Use the obtained token to authorize in Swagger.
Test chat history: Access
http://127.0.0.1:8000/messages(ensure you are authorized). Initially, it will be empty.Use
client.htmlfor chat: Openclient.htmland connect. Send a few messages. Inapp/main.py, theowner_idfor new messages is currently hardcoded to1. In the database, the first registered user (chatuserif you followed step 2) will likely haveid=1.Check chat history again: Refresh
http://127.0.0.1:8000/messages(authorized). You should now see the messages you sent, stored in the database!If you restart the server, the chat history will still be there, demonstrating persistence.
Tips/Challenges/Errors
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: users: This means you didn’t run theBase.metadata.create_all(bind=engine)step or it failed. Ensurechat.dbexists and contains the tables.- Pydantic
Config.orm_mode/from_attributes: If you are using Pydantic v1.x, useorm_mode = True. For Pydantic v2.x, usefrom_attributes = True. FastAPI documentation usually defaults to v2.x. - Database Session Management: It’s crucial to use
Depends(get_db)for every endpoint that interacts with the database to ensure proper session handling (opening and closing connections).
Summary/Key Takeaways
You’ve successfully integrated a SQLite database using SQLAlchemy, allowing for persistent storage of user data and chat messages. You’ve learned how to define models, create tables, and perform basic database operations (add, commit, query). Our chat application can now remember messages! In the next chapter, we’ll enhance the chat functionality by introducing the concept of chat rooms.