Skip to content

Full Example of FastAPI with Aync Database and Endpoints

You can find this in the examples folder of the repository.

Install dependencies

pip install dsg_lib[all] tqdm

Make App

Copy the fastapi code below after installing. (assumption is main.py)

# -*- coding: utf-8 -*-
"""
Author: Mike Ryan
Date: 2024/05/16
License: MIT
"""
import datetime
import secrets
import time
from contextlib import asynccontextmanager

from fastapi import Body, FastAPI, Query
from fastapi.responses import RedirectResponse
# from loguru import logger
# import logging as logger
from . import logger
from pydantic import BaseModel, EmailStr
from sqlalchemy import Column, ForeignKey, Select, String
from sqlalchemy.orm import relationship
from tqdm import tqdm

from dsg_lib.async_database_functions import (
    async_database,
    base_schema,
    database_config,
    database_operations,
)
from dsg_lib.common_functions import logging_config
from dsg_lib.fastapi_functions import system_health_endpoints  # , system_tools_endpoints

logging_config.config_log(
    logging_level="INFO", log_serializer=False, log_name="log.log"
)
# Create a DBConfig instance
config = {
    # "database_uri": "postgresql+asyncpg://postgres:postgres@postgresdb/postgres",
    "database_uri": "sqlite+aiosqlite:///:memory:?cache=shared",
    "echo": False,
    "future": True,
    # "pool_pre_ping": True,
    # "pool_size": 10,
    # "max_overflow": 10,
    "pool_recycle": 3600,
    # "pool_timeout": 30,
}

# create database configuration
db_config = database_config.DBConfig(config)
# Create an AsyncDatabase instance
async_db = async_database.AsyncDatabase(db_config)

# Create a DatabaseOperations instance
db_ops = database_operations.DatabaseOperations(async_db)


class User(base_schema.SchemaBaseSQLite, async_db.Base):
    """
    User table storing user details like first name, last name, and email
    """

    __tablename__ = "users"
    __table_args__ = {
        "comment": "User table storing user details like first name, last name, and email"
    }

    first_name = Column(String(50), unique=False, index=True)  # First name of the user
    last_name = Column(String(50), unique=False, index=True)  # Last name of the user
    email = Column(
        String(200), unique=True, index=True, nullable=True
    )  # Email of the user, must be unique
    addresses = relationship(
        "Address", order_by="Address.pkid", back_populates="user"
    )  # Relationship to the Address class


class Address(base_schema.SchemaBaseSQLite, async_db.Base):
    """
    Address table storing address details like street, city, and zip code
    """

    __tablename__ = "addresses"
    __table_args__ = {
        "comment": "Address table storing address details like street, city, and zip code"
    }

    street = Column(String(200), unique=False, index=True)  # Street of the address
    city = Column(String(200), unique=False, index=True)  # City of the address
    zip = Column(String(50), unique=False, index=True)  # Zip code of the address
    user_id = Column(
        String(36), ForeignKey("users.pkid")
    )  # Foreign key to the User table
    user = relationship(
        "User", back_populates="addresses"
    )  # Relationship to the User class

@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info("starting up")
    # Create the tables in the database
    await async_db.create_tables()

    create_users = True
    if create_users:
        await create_a_bunch_of_users(single_entry=2000, many_entries=20000)
    yield
    logger.info("shutting down")
    await async_db.disconnect()
    logger.info("database disconnected")
    print("That's all folks!")



# Create an instance of the FastAPI class
app = FastAPI(
    title="FastAPI Example",  # The title of the API
    description="This is an example of a FastAPI application using the DevSetGo Toolkit.",  # A brief description of the API
    version="0.1.0",  # The version of the API
    docs_url="/docs",  # The URL where the API documentation will be served
    redoc_url="/redoc",  # The URL where the ReDoc documentation will be served
    openapi_url="/openapi.json",  # The URL where the OpenAPI schema will be served
    debug=True,  # Enable debug mode
    middleware=[],  # A list of middleware to include in the application
    routes=[],  # A list of routes to include in the application
    lifespan=lifespan,  # this is the replacement for the startup and shutdown events
)


@app.get("/")
async def root():
    """
    Root endpoint of API
    Returns:
        Redrects to openapi document
    """
    # redirect to openapi docs
    logger.info("Redirecting to OpenAPI docs")
    response = RedirectResponse(url="/docs")
    return response


config_health = {
    "enable_status_endpoint": True,
    "enable_uptime_endpoint": True,
    "enable_heapdump_endpoint": True,
}
app.include_router(
    system_health_endpoints.create_health_router(config=config_health),
    prefix="/api/health",
    tags=["system-health"],
)




async def create_a_bunch_of_users(single_entry=0, many_entries=0):
    logger.info(f"single_entry: {single_entry}")
    await async_db.create_tables()
    # Create a list to hold the user data

    # Create a loop to generate user data

    for _ in tqdm(range(single_entry), desc="executing one"):
        value = secrets.token_hex(16)
        user = User(
            first_name=f"First{value}",
            last_name=f"Last{value}",
            email=f"user{value}@example.com",
        )
        logger.info(f"created_users: {user}")
        await db_ops.create_one(user)

    users = []
    # Create a loop to generate user data
    for i in tqdm(range(many_entries), desc="executing many"):
        value_one = secrets.token_hex(4)
        value_two = secrets.token_hex(8)
        user = User(
            first_name=f"First{value_one}{i}{value_two}",
            last_name=f"Last{value_one}{i}{value_two}",
            email=f"user{value_one}{i}{value_two}@example.com",
        )
        logger.info(f"created_users: {user.first_name}")
        users.append(user)

    # Use db_ops to add the users to the database
    await db_ops.create_many(users)


@app.get("/database/get-primary-key", tags=["Database Examples"])
async def table_primary_key():
    logger.info("Getting primary key of User table")
    pk = await db_ops.get_primary_keys(User)
    logger.info(f"Primary key of User table: {pk}")
    return {"pk": pk}


@app.get("/database/get-column-details", tags=["Database Examples"])
async def table_column_details():
    logger.info("Getting column details of User table")
    columns = await db_ops.get_columns_details(User)
    logger.info(f"Column details of User table: {columns}")
    return {"columns": columns}


@app.get("/database/get-tables", tags=["Database Examples"])
async def table_table_details():
    logger.info("Getting table names")
    tables = await db_ops.get_table_names()
    logger.info(f"Table names: {tables}")
    return {"table_names": tables}


@app.get("/database/get-count", tags=["Database Examples"])
async def get_count():
    logger.info("Getting count of users")
    count = await db_ops.count_query(Select(User))
    logger.info(f"Count of users: {count}")
    return {"count": count}


@app.get("/database/get-all", tags=["Database Examples"])
async def get_all(offset: int = 0, limit: int = Query(100, le=100000, ge=1)):
    logger.info(f"Getting all users with offset {offset} and limit {limit}")
    records = await db_ops.read_query(Select(User).offset(offset).limit(limit))
    logger.info(f"Retrieved {len(records)} users")
    return {"records": records}


@app.get("/database/get-one-record", tags=["Database Examples"])
async def read_one_record(record_id: str):
    logger.info(f"Reading one record with id {record_id}")
    record = await db_ops.read_one_record(Select(User).where(User.pkid == record_id))
    logger.info(f"Record with id {record_id}: {record}")
    return record


class UserBase(BaseModel):
    first_name: str
    last_name: str
    email: EmailStr


class UserCreate(UserBase):
    pass


@app.post("/database/create-one-record", status_code=201, tags=["Database Examples"])
async def create_one_record(new_user: UserCreate):
    logger.info(f"Creating one record: {new_user}")
    user = User(**new_user.dict())
    record = await db_ops.create_one(user)
    logger.info(f"Created record: {record}")
    return record


@app.post("/database/create-many-records", status_code=201, tags=["Database Examples"])
async def create_many_records(number_of_users: int = Query(100, le=1000, ge=1)):
    logger.info(f"Creating {number_of_users} records")
    t0 = time.time()
    users = []
    # Create a loop to generate user data
    for i in tqdm(range(number_of_users), desc="executing many"):
        value_one = secrets.token_hex(4)
        value_two = secrets.token_hex(8)
        user = User(
            first_name=f"First{value_one}{i}{value_two}",
            last_name=f"Last{value_one}{i}{value_two}",
            email=f"user{value_one}{i}{value_two}@example.com",
        )
        logger.info(f"Created user: {user.first_name}")
        users.append(user)

    # Use db_ops to add the users to the database
    await db_ops.create_many(users)
    t1 = time.time()
    process_time = format(t1 - t0, ".4f")
    logger.info(f"Created {number_of_users} records in {process_time} seconds")
    return {"number_of_users": number_of_users, "process_time": process_time}


@app.put("/database/update-one-record", status_code=200, tags=["Database Examples"])
async def update_one_record(
    id: str = Body(
        ...,
        description="UUID to update",
        examples=["6087cce8-0bdd-48c2-ba96-7d557dae843e"],
    ),
    first_name: str = Body(..., examples=["Agent"]),
    last_name: str = Body(..., examples=["Smith"]),
    email: str = Body(..., examples=["jim@something.com"]),
):
    logger.info(f"Updating one record with id {id}")
    # adding date_updated to new_values as it is not supported in sqlite \
    # and other database may not either.
    new_values = {
        "first_name": first_name,
        "last_name": last_name,
        "email": email,
        "date_updated": datetime.datetime.now(datetime.timezone.utc),
    }
    record = await db_ops.update_one(table=User, record_id=id, new_values=new_values)
    logger.info(f"Updated record with id {id}")
    return record


@app.delete("/database/delete-one-record", status_code=200, tags=["Database Examples"])
async def delete_one_record(record_id: str = Body(...)):
    logger.info(f"Deleting one record with id {record_id}")
    record = await db_ops.delete_one(table=User, record_id=record_id)
    logger.info(f"Deleted record with id {record_id}")
    return record


@app.delete(
    "/database/delete-many-records-aka-this-is-a-bad-idea",
    status_code=201,
    tags=["Database Examples"],
)
async def delete_many_records(
    id_values: list = Body(...), id_column_name: str = "pkid"
):
    logger.info(f"Deleting many records with ids {id_values}")
    record = await db_ops.delete_many(
        table=User, id_column_name="pkid", id_values=id_values
    )
    logger.info(f"Deleted records with ids {id_values}")
    return record


@app.get(
    "/database/get-list-of-records-to-paste-into-delete-many-records",
    tags=["Database Examples"],
)
async def read_list_of_records(
    offset: int = Query(0, le=1000, ge=0), limit: int = Query(100, le=10000, ge=1)
):
    logger.info(f"Reading list of records with offset {offset} and limit {limit}")
    records = await db_ops.read_query(Select(User), offset=offset, limit=limit)
    records_list = []
    for record in records:
        records_list.append(record.pkid)
    logger.info(f"Read list of records: {records_list}")
    return records_list


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="127.0.0.1", port=5000)

Run Code

In the console (linux) run the code below. Open browser to http://127.0.0.1:5000 to see app.

python3 main.py