Skip to content

Reference

dsg_lib.async_database_functions.database_config

This module provides classes and functions for managing asynchronous database operations using SQLAlchemy and asyncio.

The main classes are DBConfig, which manages the database configuration and creates a SQLAlchemy engine and a MetaData instance, and AsyncDatabase, which uses an instance of DBConfig to perform asynchronous database operations.

The module also provides a function, import_sqlalchemy, which tries to import SQLAlchemy and its components, and raises an ImportError if SQLAlchemy is not installed or if the installed version is not compatible.

The module uses the logger from the dsg_lib for logging, and the time module for working with times. It also uses the contextlib module for creating context managers, and the typing module for type hinting.

The BASE variable is a base class for declarative database models. It is created using the declarative_base function from sqlalchemy.orm.

This module is part of the dsg_lib package, which provides utilities for working with databases in Python.

Example:

from dsg_lib.async_database_functions import database_config

# Define your database configuration config = {
    "database_uri": "postgresql+asyncpg://user:password@localhost/dbname",
    "echo": True, "future": True, "pool_pre_ping": True, "pool_size": 5,
    "max_overflow": 10, "pool_recycle": 3600, "pool_timeout": 30,
}

# Create a DBConfig instance db_config = database_config.DBConfig(config)

# Use the DBConfig instance to get a database session async with
db_config.get_db_session() as session:
    # Perform your database operations here pass

Author: Mike Ryan Date: 2024/05/16 License: MIT

DBConfig

A class used to manage the database configuration and create a SQLAlchemy engine.

Attributes:

Name Type Description
config dict

A dictionary containing the database configuration

parameters. engine (Engine

The SQLAlchemy engine created with the

database URI from the config. metadata (MetaData

The SQLAlchemy

Create Engine Support Functions by Database Type Confirmed by testing [SQLITE, PostrgeSQL] To Be Tested [MySQL, Oracle, MSSQL] and should be considered experimental ------- Option SQLite PostgreSQL MySQL Oracle MSSQL echo Yes Yes Yes Yes Yes future Yes Yes Yes Yes Yes pool_pre_ping Yes Yes Yes Yes Yes pool_size No Yes Yes Yes Yes max_overflow No Yes Yes Yes Yes pool_recycle Yes Yes Yes Yes Yes pool_timeout No Yes Yes Yes Yes

Example:

from dsg_lib.async_database_functions import database_config

# Define your database configuration config = {
    "database_uri": "postgresql+asyncpg://user:password@localhost/dbname",
    "echo": True, "future": True, "pool_pre_ping": True, "pool_size": 5,
    "max_overflow": 10, "pool_recycle": 3600, "pool_timeout": 30,
}

# Create a DBConfig instance db_config = database_config.DBConfig(config)

# Use the DBConfig instance to get a database session async with
db_config.get_db_session() as session:
    # Perform your database operations here pass

Source code in dsg_lib/async_database_functions/database_config.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
class DBConfig:
    """
    A class used to manage the database configuration and create a SQLAlchemy
    engine.

    Attributes:
        config (dict): A dictionary containing the database configuration
        parameters. engine (Engine): The SQLAlchemy engine created with the
        database URI from the config. metadata (MetaData): The SQLAlchemy
        MetaData instance.


    Create Engine Support Functions by Database Type Confirmed by testing
    [SQLITE, PostrgeSQL] To Be Tested [MySQL, Oracle, MSSQL] and should be
    considered experimental ------- Option          SQLite  PostgreSQL  MySQL
    Oracle  MSSQL echo                Yes         Yes         Yes     Yes
    Yes future              Yes         Yes         Yes     Yes     Yes
    pool_pre_ping       Yes         Yes         Yes     Yes     Yes pool_size
    No          Yes         Yes     Yes     Yes max_overflow        No
    Yes         Yes     Yes     Yes pool_recycle        Yes         Yes
    Yes     Yes     Yes pool_timeout        No          Yes         Yes     Yes
    Yes

    Example:
    ```python

    from dsg_lib.async_database_functions import database_config

    # Define your database configuration config = {
        "database_uri": "postgresql+asyncpg://user:password@localhost/dbname",
        "echo": True, "future": True, "pool_pre_ping": True, "pool_size": 5,
        "max_overflow": 10, "pool_recycle": 3600, "pool_timeout": 30,
    }

    # Create a DBConfig instance db_config = database_config.DBConfig(config)

    # Use the DBConfig instance to get a database session async with
    db_config.get_db_session() as session:
        # Perform your database operations here pass
    ```

    """

    SUPPORTED_PARAMETERS = {
        "sqlite": {"echo", "future", "pool_recycle"},
        "postgresql": {
            "echo",
            "future",
            "pool_pre_ping",
            "pool_size",
            "max_overflow",
            "pool_recycle",
            "pool_timeout",
        },
        # Add other engines here...
    }

    def __init__(self, config: Dict):
        """
        Initializes the DBConfig instance with the given database configuration.

        The configuration should be a dictionary with the following keys: -
        "database_uri": The URI for the database. - "echo": If True, the engine
        will log all statements as well as a `repr()` of their parameter lists
        to the engines logger, which defaults to sys.stdout. - "future": If
        True, use the future version of SQLAlchemy, which supports asyncio. -
        "pool_pre_ping": If True, the pool will test the connection for liveness
        upon each checkout. - "pool_size": The size of the connection pool to be
        maintained. - "max_overflow": The number of connections that can be
        opened above the `pool_size` setting, when all other connections are in
        use. - "pool_recycle": The number of seconds after which a connection is
        automatically recycled. This is required for MySQL, which removes
        connections after 8 hours idle by default. - "pool_timeout": The number
        of seconds to wait before giving up on getting a connection from the
        pool.

        Args:
            config (Dict): A dictionary containing the database configuration
            parameters.

        Raises:
            Exception: If there are unsupported parameters for the database
            engine type.

        Example:
        ```python

        from dsg_lib.async_database_functions import database_config

        # Define your database configuration config = {
            "database_uri":
            "postgresql+asyncpg://user:password@localhost/dbname", "echo": True,
            "future": True, "pool_pre_ping": True, "pool_size": 5,
            "max_overflow": 10, "pool_recycle": 3600, "pool_timeout": 30,
        }

        # Create a DBConfig instance db_config =
        database_config.DBConfig(config)
        ```
        """
        self.config = config
        engine_type = self.config["database_uri"].split("+")[0]
        supported_parameters = self.SUPPORTED_PARAMETERS.get(engine_type, set())
        unsupported_parameters = (
            set(config.keys()) - supported_parameters - {"database_uri"}
        )
        if unsupported_parameters:
            error_message = (
                f"Unsupported parameters for {engine_type}: {unsupported_parameters}"
            )
            logger.error(error_message)
            raise Exception(error_message)

        engine_parameters = {
            param: self.config.get(param)
            for param in supported_parameters
            if self.config.get(param) is not None
        }
        self.engine = create_async_engine(
            self.config["database_uri"], **engine_parameters
        )
        self.metadata = MetaData()

    @asynccontextmanager
    async def get_db_session(self):
        """
        This method returns a context manager that provides a new database
        session.

        The session is created using the SQLAlchemy engine from the DBConfig
        instance, and it does not expire on commit. The session is of type
        AsyncSession.

        This method should be used with the `async with` statement.

        Yields:
            AsyncSession: A new SQLAlchemy asynchronous session.

        Raises:
            SQLAlchemyError: If a database error occurs.

        Example:
        ```python

        from dsg_lib.async_database_functions import database_config

        # Define your database configuration config = {
            "database_uri":
            "postgresql+asyncpg://user:password@localhost/dbname", "echo": True,
            "future": True, "pool_pre_ping": True, "pool_size": 5,
            "max_overflow": 10, "pool_recycle": 3600, "pool_timeout": 30,
        }

        # Create a DBConfig instance db_config =
        database_config.DBConfig(config)

        # Use the DBConfig instance to get a database session async with
        db_config.get_db_session() as session:
            # Perform your database operations here pass
        ```
        """
        logger.debug("Creating new database session")
        try:
            # Create a new database session
            async with sessionmaker(
                self.engine, expire_on_commit=False, class_=AsyncSession
            )() as session:
                # Yield the session to the context manager
                yield session
        except SQLAlchemyError as e:  # pragma: no cover
            # Log the error and raise it
            logger.error(f"Database error occurred: {str(e)}")  # pragma: no cover
            raise  # pragma: no cover
        finally:  # pragma: no cover
            # Log the end of the database session
            logger.debug("Database session ended")  # pragma: no cover

__init__(config)

Initializes the DBConfig instance with the given database configuration.

The configuration should be a dictionary with the following keys: - "database_uri": The URI for the database. - "echo": If True, the engine will log all statements as well as a repr() of their parameter lists to the engines logger, which defaults to sys.stdout. - "future": If True, use the future version of SQLAlchemy, which supports asyncio. - "pool_pre_ping": If True, the pool will test the connection for liveness upon each checkout. - "pool_size": The size of the connection pool to be maintained. - "max_overflow": The number of connections that can be opened above the pool_size setting, when all other connections are in use. - "pool_recycle": The number of seconds after which a connection is automatically recycled. This is required for MySQL, which removes connections after 8 hours idle by default. - "pool_timeout": The number of seconds to wait before giving up on getting a connection from the pool.

Parameters:

Name Type Description Default
config Dict

A dictionary containing the database configuration

required

Raises:

Type Description
Exception

If there are unsupported parameters for the database

Example:

from dsg_lib.async_database_functions import database_config

# Define your database configuration config = {
    "database_uri":
    "postgresql+asyncpg://user:password@localhost/dbname", "echo": True,
    "future": True, "pool_pre_ping": True, "pool_size": 5,
    "max_overflow": 10, "pool_recycle": 3600, "pool_timeout": 30,
}

# Create a DBConfig instance db_config =
database_config.DBConfig(config)

Source code in dsg_lib/async_database_functions/database_config.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def __init__(self, config: Dict):
    """
    Initializes the DBConfig instance with the given database configuration.

    The configuration should be a dictionary with the following keys: -
    "database_uri": The URI for the database. - "echo": If True, the engine
    will log all statements as well as a `repr()` of their parameter lists
    to the engines logger, which defaults to sys.stdout. - "future": If
    True, use the future version of SQLAlchemy, which supports asyncio. -
    "pool_pre_ping": If True, the pool will test the connection for liveness
    upon each checkout. - "pool_size": The size of the connection pool to be
    maintained. - "max_overflow": The number of connections that can be
    opened above the `pool_size` setting, when all other connections are in
    use. - "pool_recycle": The number of seconds after which a connection is
    automatically recycled. This is required for MySQL, which removes
    connections after 8 hours idle by default. - "pool_timeout": The number
    of seconds to wait before giving up on getting a connection from the
    pool.

    Args:
        config (Dict): A dictionary containing the database configuration
        parameters.

    Raises:
        Exception: If there are unsupported parameters for the database
        engine type.

    Example:
    ```python

    from dsg_lib.async_database_functions import database_config

    # Define your database configuration config = {
        "database_uri":
        "postgresql+asyncpg://user:password@localhost/dbname", "echo": True,
        "future": True, "pool_pre_ping": True, "pool_size": 5,
        "max_overflow": 10, "pool_recycle": 3600, "pool_timeout": 30,
    }

    # Create a DBConfig instance db_config =
    database_config.DBConfig(config)
    ```
    """
    self.config = config
    engine_type = self.config["database_uri"].split("+")[0]
    supported_parameters = self.SUPPORTED_PARAMETERS.get(engine_type, set())
    unsupported_parameters = (
        set(config.keys()) - supported_parameters - {"database_uri"}
    )
    if unsupported_parameters:
        error_message = (
            f"Unsupported parameters for {engine_type}: {unsupported_parameters}"
        )
        logger.error(error_message)
        raise Exception(error_message)

    engine_parameters = {
        param: self.config.get(param)
        for param in supported_parameters
        if self.config.get(param) is not None
    }
    self.engine = create_async_engine(
        self.config["database_uri"], **engine_parameters
    )
    self.metadata = MetaData()

get_db_session() async

This method returns a context manager that provides a new database session.

The session is created using the SQLAlchemy engine from the DBConfig instance, and it does not expire on commit. The session is of type AsyncSession.

This method should be used with the async with statement.

Yields:

Name Type Description
AsyncSession

A new SQLAlchemy asynchronous session.

Raises:

Type Description
SQLAlchemyError

If a database error occurs.

Example:

from dsg_lib.async_database_functions import database_config

# Define your database configuration config = {
    "database_uri":
    "postgresql+asyncpg://user:password@localhost/dbname", "echo": True,
    "future": True, "pool_pre_ping": True, "pool_size": 5,
    "max_overflow": 10, "pool_recycle": 3600, "pool_timeout": 30,
}

# Create a DBConfig instance db_config =
database_config.DBConfig(config)

# Use the DBConfig instance to get a database session async with
db_config.get_db_session() as session:
    # Perform your database operations here pass

Source code in dsg_lib/async_database_functions/database_config.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
@asynccontextmanager
async def get_db_session(self):
    """
    This method returns a context manager that provides a new database
    session.

    The session is created using the SQLAlchemy engine from the DBConfig
    instance, and it does not expire on commit. The session is of type
    AsyncSession.

    This method should be used with the `async with` statement.

    Yields:
        AsyncSession: A new SQLAlchemy asynchronous session.

    Raises:
        SQLAlchemyError: If a database error occurs.

    Example:
    ```python

    from dsg_lib.async_database_functions import database_config

    # Define your database configuration config = {
        "database_uri":
        "postgresql+asyncpg://user:password@localhost/dbname", "echo": True,
        "future": True, "pool_pre_ping": True, "pool_size": 5,
        "max_overflow": 10, "pool_recycle": 3600, "pool_timeout": 30,
    }

    # Create a DBConfig instance db_config =
    database_config.DBConfig(config)

    # Use the DBConfig instance to get a database session async with
    db_config.get_db_session() as session:
        # Perform your database operations here pass
    ```
    """
    logger.debug("Creating new database session")
    try:
        # Create a new database session
        async with sessionmaker(
            self.engine, expire_on_commit=False, class_=AsyncSession
        )() as session:
            # Yield the session to the context manager
            yield session
    except SQLAlchemyError as e:  # pragma: no cover
        # Log the error and raise it
        logger.error(f"Database error occurred: {str(e)}")  # pragma: no cover
        raise  # pragma: no cover
    finally:  # pragma: no cover
        # Log the end of the database session
        logger.debug("Database session ended")  # pragma: no cover