log_example Example¶
Log Example Module¶
This module demonstrates advanced logging configurations and usage in Python. It integrates both the logging module and loguru for robust logging capabilities. The module also showcases multi-threading and multi-processing for concurrent execution, while logging messages and handling exceptions.
Features¶
- Logging Configuration: Configures logging with options for log rotation, retention, backtrace, and serialization.
- Exception Handling: Demonstrates exception handling with logging for
ZeroDivisionError. - Concurrent Execution:
- Multi-threading: Executes tasks concurrently using threads.
- Multi-processing: Executes tasks concurrently using processes.
- Large Message Logging: Logs large messages repeatedly to test logging performance.
- Progress Tracking: Uses
tqdmto display progress bars for threads and processes.
Functions¶
div_zero(x, y)¶
Attempts to divide x by y and logs any ZeroDivisionError encountered.
div_zero_two(x, y)¶
Similar to div_zero, attempts to divide x by y and logs any ZeroDivisionError encountered.
log_big_string(lqty=100, size=256)¶
Logs a large string multiple times, demonstrating both standard logging and loguru logging.
worker(wqty=1000, lqty=100, size=256)¶
Executes the log_big_string function repeatedly, simulating a worker process or thread.
main(wqty, lqty, size, workers, thread_test, process_test)¶
Main entry point for the module. Configures and starts either multi-threading or multi-processing based on the provided arguments.
Usage¶
Run the module directly to test its functionality. Example:
python log_example.py
You can customize the parameters for workers, logging quantity, and message size by modifying the main function call in the __main__ block.
Dependencies¶
logginglogurumultiprocessingthreadingsecretstqdmdsg_lib.common_functions
Notes¶
- Ensure the
dsg_liblibrary is installed and accessible. - Adjust the logging configuration as needed for your application.
- Use the
process_testorthread_testflags to toggle between multi-processing and multi-threading.
License¶
This module is licensed under the MIT License.
# from loguru import logger
import logging
import logging as logger
import multiprocessing
import secrets
import threading
from tqdm import tqdm
from dsg_lib.common_functions import logging_config
# Configure logging as before
logging_config.config_log(
logging_directory="log",
log_name="log",
logging_level="DEBUG",
log_rotation="100 MB",
log_retention="10 days",
log_backtrace=True,
log_serializer=True,
log_diagnose=True,
# app_name='my_app',
# append_app_name=True,
intercept_standard_logging=True,
enqueue=True,
)
def div_zero(x: float, y: float) -> float | None:
"""
Safely divide x by y and log ZeroDivisionError if encountered.
Args:
x (float): Numerator.
y (float): Denominator.
Returns:
float | None: Quotient or None on error.
"""
try:
return x / y
except ZeroDivisionError as e:
logger.error(f"{e}") # log via loguru
logging.error(f"{e}") # log via standard logging
def div_zero_two(x: float, y: float) -> float | None:
"""
Mirror of div_zero demonstrating identical error handling.
"""
try:
return x / y
except ZeroDivisionError as e:
logger.error(f"{e}")
logging.error(f"{e}")
def log_big_string(lqty: int = 100, size: int = 256) -> None:
"""
Generate a large random string and log various messages repeatedly.
Args:
lqty (int): Number of log iterations.
size (int): Length of each random string.
"""
big_string = secrets.token_urlsafe(size) # create URL-safe token
for _ in range(lqty):
logging.debug(f"Lets make this a big message {big_string}") # standard debug
div_zero(1, 0) # trigger/log ZeroDivisionError
div_zero_two(1, 0)
# loguru messages
logger.debug("This is a loguru debug message")
logger.info("This is a loguru info message")
logger.warning("This is a loguru warning message")
logger.error("This is a loguru error message")
logger.critical("This is a loguru critical message")
# continued standard logging
logging.info("This is a standard logging info message")
logging.warning("This is a standard logging warning message")
logging.error("This is a standard logging error message")
logging.critical("This is a standard logging critical message")
def worker(wqty: int = 1000, lqty: int = 100, size: int = 256) -> None:
"""
Worker routine performing log_big_string in a progress loop.
Args:
wqty (int): Number of outer iterations.
lqty (int): Messages per iteration.
size (int): Random string length.
"""
for _ in tqdm(range(wqty), ascii=True, leave=True):
log_big_string(lqty=lqty, size=size)
def main(
wqty: int = 100,
lqty: int = 10,
size: int = 256,
workers: int = 16,
thread_test: bool = False,
process_test: bool = False,
) -> None:
"""
Configure and launch concurrent logging workers.
Args:
wqty (int): Iterations per worker.
lqty (int): Logs per iteration.
size (int): Random string size.
workers (int): Thread/process count.
thread_test (bool): Run threads if True.
process_test (bool): Run processes if True.
"""
if process_test:
processes = []
# Create worker processes
for _ in tqdm(range(workers), desc="Multi-Processing Start", leave=True):
p = multiprocessing.Process(
target=worker,
args=(
wqty,
lqty,
size,
),
)
processes.append(p)
p.start()
for p in tqdm((processes), desc="Multi-Processing Start", leave=False):
p.join(timeout=60) # Timeout after 60 seconds
if p.is_alive():
logger.error(f"Process {p.name} is hanging. Terminating.")
p.terminate()
p.join()
if thread_test:
threads = []
for _ in tqdm(
range(workers), desc="Threading Start", leave=True
): # Create worker threads
t = threading.Thread(
target=worker,
args=(
wqty,
lqty,
size,
),
)
threads.append(t)
t.start()
for t in tqdm(threads, desc="Threading Gather", leave=False):
t.join()
if __name__ == "__main__":
from time import time
start = time()
main(wqty=5, lqty=50, size=64, workers=8, thread_test=False, process_test=True)
print(f"Execution time: {time()-start:.2f} seconds")