Skip to content

Reference

dsg_lib.async_database_functions.async_database

async_database.py.

This module provides classes for managing asynchronous database operations using SQLAlchemy and asyncio.

Classes:

Name Description
- DBConfig

Manages the database configuration.

- AsyncDatabase

Manages the asynchronous database operations.

The DBConfig class initializes the database configuration and creates a SQLAlchemy engine and a MetaData instance.

The AsyncDatabase class uses an instance of DBConfig to perform asynchronous database operations. It provides methods to get a database session and to create tables in the database.

This module uses the logger from the dsg_lib.common_functions for logging.

Example:

from dsg_lib.async_database_functions import (
    async_database,
    base_schema,
    database_config,
    database_operations,
)

# Create a DBConfig instance
config = {
    "database_uri": "sqlite+aiosqlite:///:memory:?cache=shared",
    "echo": False,
    "future": True,
    "pool_recycle": 3600,
}

# create database configuration
db_config = database_config.DBConfig(config)

# Create an AsyncDatabase instance
async_db = async_database.AsyncDatabase(db_config)

# Create a DatabaseOperations instance
db_ops = database_operations.DatabaseOperations(async_db)

Author: Mike Ryan Date: 2024/05/16 License: MIT

AsyncDatabase

A class used to manage the asynchronous database operations.

Attributes

db_config : DBConfig an instance of DBConfig class containing the database configuration Base : Base the declarative base model for SQLAlchemy

Methods

get_db_session(): Returns a context manager that provides a new database session. create_tables(): Asynchronously creates all tables in the database.

Source code in dsg_lib/async_database_functions/async_database.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class AsyncDatabase:
    """
    A class used to manage the asynchronous database operations.

    Attributes
    ----------
    db_config : DBConfig
        an instance of DBConfig class containing the database configuration
    Base : Base
        the declarative base model for SQLAlchemy

    Methods
    -------
    get_db_session():
        Returns a context manager that provides a new database session.
    create_tables():
        Asynchronously creates all tables in the database.
    """

    def __init__(self, db_config: DBConfig):
        """Initialize the AsyncDatabase class with an instance of DBConfig.

        Parameters:
        db_config (DBConfig): An instance of DBConfig class containing the
        database configuration.

        Returns: None
        """
        self.db_config = db_config
        self.Base = BASE
        logger.debug("AsyncDatabase initialized")

    def get_db_session(self):
        """This method returns a context manager that provides a new database
        session.

        Parameters: None

        Returns: contextlib._GeneratorContextManager: A context manager that
        provides a new database session.
        """
        logger.debug("Getting database session")
        return self.db_config.get_db_session()

    async def create_tables(self):
        """This method asynchronously creates all tables in the database.

        Parameters: None

        Returns: None
        """
        logger.debug("Creating tables")
        try:
            # Bind the engine to the metadata of the base class
            self.Base.metadata.bind = self.db_config.engine

            # Begin a new transaction
            async with self.db_config.engine.begin() as conn:
                # Run a function in a synchronous manner
                await conn.run_sync(self.Base.metadata.create_all)
            logger.info("Tables created successfully")
        except Exception as ex:  # pragma: no cover
            # Log the error and raise it
            logger.error(f"Error creating tables: {ex}")  # pragma: no cover
            raise  # pragma: no cover

    async def disconnect(self):  # pragma: no cover
        """
        This method asynchronously disconnects the database engine.

        Parameters: None

        Returns: None
        """
        logger.debug("Disconnecting from database")
        try:
            await self.db_config.engine.dispose()
            logger.info("Disconnected from database")
        except Exception as ex:  # pragma: no cover
            logger.error(f"Error disconnecting from database: {ex}")  # pragma: no cover
            raise  # pragma: no cover

__init__(db_config)

Initialize the AsyncDatabase class with an instance of DBConfig.

Parameters: db_config (DBConfig): An instance of DBConfig class containing the database configuration.

Returns: None

Source code in dsg_lib/async_database_functions/async_database.py
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(self, db_config: DBConfig):
    """Initialize the AsyncDatabase class with an instance of DBConfig.

    Parameters:
    db_config (DBConfig): An instance of DBConfig class containing the
    database configuration.

    Returns: None
    """
    self.db_config = db_config
    self.Base = BASE
    logger.debug("AsyncDatabase initialized")

create_tables() async

This method asynchronously creates all tables in the database.

Parameters: None

Returns: None

Source code in dsg_lib/async_database_functions/async_database.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
async def create_tables(self):
    """This method asynchronously creates all tables in the database.

    Parameters: None

    Returns: None
    """
    logger.debug("Creating tables")
    try:
        # Bind the engine to the metadata of the base class
        self.Base.metadata.bind = self.db_config.engine

        # Begin a new transaction
        async with self.db_config.engine.begin() as conn:
            # Run a function in a synchronous manner
            await conn.run_sync(self.Base.metadata.create_all)
        logger.info("Tables created successfully")
    except Exception as ex:  # pragma: no cover
        # Log the error and raise it
        logger.error(f"Error creating tables: {ex}")  # pragma: no cover
        raise  # pragma: no cover

disconnect() async

This method asynchronously disconnects the database engine.

Parameters: None

Returns: None

Source code in dsg_lib/async_database_functions/async_database.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
async def disconnect(self):  # pragma: no cover
    """
    This method asynchronously disconnects the database engine.

    Parameters: None

    Returns: None
    """
    logger.debug("Disconnecting from database")
    try:
        await self.db_config.engine.dispose()
        logger.info("Disconnected from database")
    except Exception as ex:  # pragma: no cover
        logger.error(f"Error disconnecting from database: {ex}")  # pragma: no cover
        raise  # pragma: no cover

get_db_session()

This method returns a context manager that provides a new database session.

Parameters: None

Returns: contextlib._GeneratorContextManager: A context manager that provides a new database session.

Source code in dsg_lib/async_database_functions/async_database.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def get_db_session(self):
    """This method returns a context manager that provides a new database
    session.

    Parameters: None

    Returns: contextlib._GeneratorContextManager: A context manager that
    provides a new database session.
    """
    logger.debug("Getting database session")
    return self.db_config.get_db_session()