While we introduced basic user registration and login in Chapter 4, this chapter focuses on refining these critical endpoints. We’ll ensure robust data validation, provide clear error messages, and integrate the user management more tightly with our database. This is about building a solid, production-ready authentication foundation.

Purpose of this Chapter

By the end of this chapter, you will:

  • Have dedicated endpoints for user registration and login.
  • Understand data validation for incoming user data.
  • Implement proper password hashing and verification.
  • Ensure that authenticated users are correctly identified and used throughout the application.

Concepts Explained: Data Validation & Error Handling

Data Validation: Ensuring that incoming data (like username and password during registration) meets expected criteria. FastAPI leverages Pydantic for this, which allows you to define strict schemas for your request bodies.

Error Handling: Providing clear and informative error responses when something goes wrong (e.g., duplicate username, incorrect password, invalid token). FastAPI’s HTTPException is designed for this, automatically returning appropriate HTTP status codes and JSON error details.

Step-by-Step Tasks

We’ve largely set up the core logic in app/auth.py and initial endpoints in app/main.py. This chapter primarily involves reviewing and ensuring these pieces are robust.

1. Review app/schemas.py for User Models

Ensure your UserCreate and UserResponse Pydantic models are in app/schemas.py:

# app/schemas.py (review)

from pydantic import BaseModel
from datetime import datetime

class UserBase(BaseModel):
    username: str

class UserCreate(UserBase):
    password: str

class UserResponse(UserBase):
    id: int

    class Config:
        from_attributes = True # Pydantic v2.x
        # orm_mode = True # Pydantic v1.x

# ... (other Message models) ...
  • UserBase: A common base for user attributes.
  • UserCreate: Adds the password field, which should only be present during creation/login and not in public responses.
  • UserResponse: Omits the password or hashed_password for security when sending user data back to clients.

2. Review app/models.py for User Database Model

Confirm your User model is correct in app/models.py:

# app/models.py (review)

from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship

from .database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True, nullable=False) # Ensure username is not null
    hashed_password = Column(String, nullable=False) # Ensure hashed_password is not null

    messages = relationship("Message", back_populates="owner")

# ... (Message model) ...
  • nullable=False: Added to username and hashed_password to enforce non-null values at the database level.

3. Review app/auth.py for Password Hashing and JWT Logic

Confirm that your Hasher class and JWT functions in app/auth.py are correctly implemented for security.

# app/auth.py (review)

# ... (imports) ...

SECRET_KEY = "your-super-secret-key" # MAKE THIS AN ENV VAR IN PRODUCTION
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

class Hasher:
    @staticmethod
    def verify_password(plain_password: str, hashed_password: str) -> bool:
        return pwd_context.verify(plain_password, hashed_password)

    @staticmethod
    def get_password_hash(password: str) -> str:
        return pwd_context.hash(password)

# ... (Token, TokenData models and create_access_token) ...

# OAuth2PasswordBearer instance
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# Dependency for general HTTP endpoints (returns User model from DB)
async def get_current_user_db(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    user = db.query(models.User).filter(models.User.username == username).first()
    if user is None:
        raise credentials_exception
    return user # Return the full User model

# Dependency for WebSocket endpoints (returns User model from DB)
# (Copied from main.py for better organization, ensure to remove from main.py if moved here)
# For now, keeping it in main.py for simpler flow, but for large apps, move here.
# async def get_current_active_user_ws(websocket: WebSocket, db: Session = Depends(get_db)):
#     # ... (WebSocket specific token extraction from query params or headers) ...
#     return user
  • SECRET_KEY: Reiterate that this needs to be an environment variable.
  • get_current_user_db: This dependency is now returning the actual models.User object retrieved from the database, which is more useful than just the username string. This allows downstream dependencies to easily access user ID, etc.

4. Review app/main.py Endpoints

Ensure your /register and /token (login) endpoints in app/main.py correctly use the database and hashing logic.

# app/main.py (review and ensure consistency)

# ... (imports) ...
from .auth import Hasher, create_access_token, get_current_user_db, ACCESS_TOKEN_EXPIRE_MINUTES, Token, oauth2_scheme, SECRET_KEY, ALGORITHM # Added SECRET_KEY, ALGORITHM
# ... (rest of imports) ...

app = FastAPI()

manager = ConnectionManager()

AVAILABLE_ROOMS = ["general", "python", "frontend", "random"]

# Dependency to authenticate user for WebSocket connection
# This ensures SECRET_KEY is accessible. For a cleaner approach,
# get_current_active_user_ws could be in auth.py and SECRET_KEY passed or globally accessible.
async def get_current_active_user_ws(websocket: WebSocket, db: Session = Depends(get_db)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials for WebSocket",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # Expect token in query parameter, e.g., ws://localhost:8000/ws/general?token=YOUR_JWT
        token_from_ws = websocket.query_params.get("token")
        if not token_from_ws:
            raise credentials_exception

        payload = jwt.decode(token_from_ws, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        user = db.query(models.User).filter(models.User.username == username).first()
        if user is None:
            raise credentials_exception
        return user # Return the full User model
    except JWTError:
        raise credentials_exception
    except HTTPException:
        raise
    except Exception as e:
        print(f"WebSocket authentication error: {e}")
        # Raising a generic 500 might leak less info than 401 on unexpected errors
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal authentication error")


@app.post("/register", response_model=schemas.UserResponse, status_code=status.HTTP_201_CREATED)
async def register_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = db.query(models.User).filter(models.User.username == user.username).first()
    if db_user:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered")

    hashed_password = Hasher.get_password_hash(user.password)
    db_user = models.User(username=user.username, hashed_password=hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    user = db.query(models.User).filter(models.User.username == form_data.username).first()
    if not user or not Hasher.verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me", response_model=schemas.UserResponse)
async def read_users_me(current_user: models.User = Depends(get_current_user_db)):
    return current_user

@app.get("/messages/{room_name}", response_model=List[schemas.MessageResponse])
async def get_chat_history_for_room(
    room_name: str,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user_db), # Protected endpoint
    skip: int = 0,
    limit: int = 100
):
    if room_name not in AVAILABLE_ROOMS:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Room not found")

    messages = db.query(models.Message)\
                 .filter(models.Message.room_name == room_name)\
                 .order_by(models.Message.timestamp.desc())\
                 .offset(skip)\
                 .limit(limit)\
                 .all()
    return messages

@app.get("/rooms")
async def get_rooms():
    return {"rooms": AVAILABLE_ROOMS}

@app.websocket("/ws/{room_name}")
async def websocket_room_endpoint(
    websocket: WebSocket,
    room_name: str,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_active_user_ws) # Authenticated User via WS
):
    if room_name not in AVAILABLE_ROOMS:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Invalid chat room")
        return

    await manager.connect(websocket, room_name)
    welcome_message = f"User '{current_user.username}' joined room '{room_name}'."
    await manager.broadcast(welcome_message, room_name)

    try:
        while True:
            data = await websocket.receive_text()
            full_message = f"[{room_name}] {current_user.username}: {data}"

            new_message = models.Message(
                content=data,
                owner_id=current_user.id,
                timestamp=datetime.utcnow(),
                room_name=room_name
            )
            db.add(new_message)
            db.commit()
            db.refresh(new_message)

            await manager.broadcast(full_message, room_name)
    except WebSocketDisconnect:
        manager.disconnect(websocket, room_name)
        await manager.broadcast(f"User '{current_user.username}' left room '{room_name}'.", room_name)
    except HTTPException as e:
        print(f"WebSocket authentication failed for user {current_user.username if current_user else 'unknown'}: {e.detail}")
    except Exception as e:
        print(f"Unexpected error in WebSocket for user '{current_user.username}' in room '{room_name}': {e}")
        manager.disconnect(websocket, room_name)
        await manager.broadcast(f"User '{current_user.username}' left room '{room_name}' due to an error.", room_name)

5. Testing the Flow

  1. Ensure database is created/migrated (if you updated models.py).
  2. Start the server: pipenv shell then uvicorn app.main:app --reload
  3. Open client.html in your browser.
  4. Test Registration: Try to register a new user (testuser2 / password1234). If successful, you’ll see a message. Try registering the same username again to see the 400 Bad Request error.
  5. Test Login: Log in with one of your registered users. You should get an access token.
  6. Test Protected Endpoint: Try to fetch /users/me or /messages/general (after authorizing with the token). It should work.
  7. Test WebSocket Connection: Connect to a room (e.g., general) with your authenticated user. The client.html now passes the token automatically if logged in. You should see a successful connection and be able to chat.
  8. Test WebSocket Authentication Failure: Try to connect to a WebSocket room without logging in first or by manually tampering with the token in the URL. The connection should fail with an authentication error. Check your server logs for details (WebSocket authentication error).

Pitfalls and Best Practices

  • Error Messages: Ensure error messages are informative to the client but don’t leak sensitive server-side details.
  • Rate Limiting: For production, implement rate limiting on registration and login endpoints to prevent brute-force attacks.
  • Password Policies: Enforce strong password policies (min length, complexity) during registration.
  • HTTPS/WSS: Always use HTTPS for HTTP endpoints and WSS for WebSocket connections in production to encrypt traffic and prevent man-in-the-middle attacks.

Summary/Key Takeaways

You’ve reinforced the user registration and login flow, making it more robust with proper data validation and error handling. You’ve also fully integrated the authenticated user into both HTTP and WebSocket endpoints, using the actual User database model instead of just a string username. This ensures that actions, like saving messages, are correctly attributed to the responsible user.

The next critical aspect for production-readiness is robust error handling and comprehensive logging, which we will cover in the next chapter.