Skip to content

Examples

Here are a few examples of how to use the database functions

Asyncio Script Example

Example of how to use in a script

import asyncio
from sqlalchemy import select
from dsg_lib.async_database_functions import database_config, async_database, database_operations

# Configuration
config = {
    "database_uri": "sqlite+aiosqlite:///:memory:?cache=shared",
    "echo": False,
    "future": True,
    "pool_recycle": 3600,
}

# Create a DBConfig instance
db_config = database_config.DBConfig(config)

# Create an AsyncDatabase instance
async_db = async_database.AsyncDatabase(db_config)

# Create a DatabaseOperations instance
db_ops = database_operations.DatabaseOperations(async_db)

# User class
class User(async_db.Base):
    __tablename__ = "users"
    first_name = Column(String, unique=False, index=True)
    last_name = Column(String, unique=False, index=True)
    email = Column(String, unique=True, index=True, nullable=True)

# Async function to get all users
async def get_all_users():
    # Create a select query
    query = select(User)

    # Execute the query and fetch all results
    users = await db_ops.read_query(query)

    # Print the users
    for user in users:
        print(f"User: {user.first_name} {user.last_name}, Email: {user.email}")

# Run the async function
asyncio.run(get_all_users())

FastAPI Example

# -*- coding: utf-8 -*-

from contextlib import asynccontextmanager

from fastapi import FastAPI
from fastapi.responses import RedirectResponse
from loguru import logger
from tqdm import tqdm

from dsg_lib import logging_config

logging_config.config_log(
    logging_level="Debug", log_serializer=False, log_name="log.log"
)


@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info("starting up")
    # Create the tables in the database
    await async_db.create_tables()

    create_users = True
    if create_users:
        await create_a_bunch_of_users(single_entry=23, many_entries=100)
    yield
    logger.info("shutting down")


# Create an instance of the FastAPI class
app = FastAPI(
    title="FastAPI Example",  # The title of the API
    description="This is an example of a FastAPI application using the DevSetGo Toolkit.",  # A brief description of the API
    version="0.1.0",  # The version of the API
    docs_url="/docs",  # The URL where the API documentation will be served
    redoc_url="/redoc",  # The URL where the ReDoc documentation will be served
    openapi_url="/openapi.json",  # The URL where the OpenAPI schema will be served
    debug=True,  # Enable debug mode
    middleware=[],  # A list of middleware to include in the application
    routes=[],  # A list of routes to include in the application
    lifespan=lifespan,
)


@app.get("/")
async def root():
    """
    Root endpoint of API
    Returns:
        Redrects to openapi document
    """
    # redirect to openapi docs
    logger.info("Redirecting to OpenAPI docs")
    response = RedirectResponse(url="/docs")
    return response

from sqlalchemy import Column, Delete, Select, String, Update

from dsg_lib import (
    async_database,
    base_schema,
    database_config,
    database_operations,
)

# Create a DBConfig instance
config = {
    # "database_uri": "postgresql+asyncpg://postgres:postgres@postgresdb/postgres",
    "database_uri": "sqlite+aiosqlite:///:memory:?cache=shared",
    "echo": False,
    "future": True,
    # "pool_pre_ping": True,
    # "pool_size": 10,
    # "max_overflow": 10,
    "pool_recycle": 3600,
    # "pool_timeout": 30,
}

db_config = database_config.DBConfig(config)
# Create an AsyncDatabase instance
async_db = async_database.AsyncDatabase(db_config)

# Create a DatabaseOperations instance
db_ops = database_operations.DatabaseOperations(async_db)


# User class inherits from SchemaBase and async_db.Base
# This class represents the User table in the database
class User(base_schema.SchemaBase, async_db.Base):
    __tablename__ = "users"  # Name of the table in the database

    # Define the columns of the table
    first_name = Column(String, unique=False, index=True)  # First name of the user
    last_name = Column(String, unique=False, index=True)  # Last name of the user
    email = Column(
        String, unique=True, index=True, nullable=True
    )  # Email of the user, must be unique

async def create_a_bunch_of_users(single_entry=0, many_entries=0):
    logger.info(f"single_entry: {single_entry}")
    await async_db.create_tables()
    # Create a list to hold the user data

    # Create a loop to generate user data

    for i in tqdm(range(single_entry), desc="executing one"):
        value = secrets.token_hex(16)
        user = User(
            first_name=f"First{value}",
            last_name=f"Last{value}",
            email=f"user{value}@example.com",
        )
        logger.info(f"created_users: {user}")
        await db_ops.create_one(user)

    users = []
    # Create a loop to generate user data
    for i in tqdm(range(many_entries), desc="executing many"):
        value_one = secrets.token_hex(4)
        value_two = secrets.token_hex(8)
        user = User(
            first_name=f"First{value_one}{i}{value_two}",
            last_name=f"Last{value_one}{i}{value_two}",
            email=f"user{value_one}{i}{value_two}@example.com",
        )
        logger.info(f"created_users: {user.first_name}")
        users.append(user)

    # Use db_ops to add the users to the database
    await db_ops.create_many(users)


@app.get("/database/get-count")
async def get_count():
    count = await db_ops.count_query(Select(User))
    return {"count": count}


# endpoint to get list of user
@app.get("/database/get-all")
async def get_all(offset: int = 0, limit: int = 100):
    records = await db_ops.read_query(Select(User).offset(offset).limit(limit))
    return {"records": records}


@app.get("/database/get-primary-key")
async def table_primary_key():
    pk = await db_ops.get_primary_keys(User)
    return {"pk": pk}


@app.get("/database/get-column-details")
async def table_column_details():
    columns = await db_ops.get_columns_details(User)
    return {"columns": columns}


@app.get("/database/get-tables")
async def table_table_details():
    tables = await db_ops.get_table_names()
    return {"table_names": tables}


@app.get("/database/get-one-record")
async def get_one_record(record_id: str):
    record = await db_ops.get_one_record(Select(User).where(User.pkid == record_id))
    return {"record": record}

if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="127.0.0.1", port=5000)

Configuration Examples

# SQLite in memory database
config = {
    "database_uri": "sqlite+aiosqlite:///:memory:?cache=shared",
    "echo": False,
    "future": True,
    "pool_recycle": 3600,
}
# PostgreSQL database
config = {
    "database_uri": "postgresql+asyncpg://postgres:postgres@postgresdb/postgres",
    "echo": False,
    "future": True,
    "pool_recycle": 3600,
}
# MySQL database
config = {
    "database_uri": "mysql+aiomysql://root:root@localhost/test",
    "echo": False,
    "future": True,
    "pool_recycle": 3600,
}
# SQL Server database
config = {
    "database_uri": "mssql+aiomssql://sa:yourStrong(!)Password@localhost:1433/master",
    "echo": False,
    "future": True,
    "pool_recycle": 3600,
}
# Oracle database
config = {
    "database_uri": "oracle+oracledb_async://scott:tiger@localhost/?service_name=XEPDB1",
    "echo": False,
    "future": True,
    "pool_recycle": 3600,
}