log_example Example¶
Log Example Module¶
This module demonstrates advanced logging configurations and usage in Python. It integrates both the logging
module and loguru
for robust logging capabilities. The module also showcases multi-threading and multi-processing for concurrent execution, while logging messages and handling exceptions.
Features¶
- Logging Configuration: Configures logging with options for log rotation, retention, backtrace, and serialization.
- Exception Handling: Demonstrates exception handling with logging for
ZeroDivisionError
. - Concurrent Execution:
- Multi-threading: Executes tasks concurrently using threads.
- Multi-processing: Executes tasks concurrently using processes.
- Large Message Logging: Logs large messages repeatedly to test logging performance.
- Progress Tracking: Uses
tqdm
to display progress bars for threads and processes.
Functions¶
div_zero(x, y)
¶
Attempts to divide x
by y
and logs any ZeroDivisionError
encountered.
div_zero_two(x, y)
¶
Similar to div_zero
, attempts to divide x
by y
and logs any ZeroDivisionError
encountered.
log_big_string(lqty=100, size=256)
¶
Logs a large string multiple times, demonstrating both standard logging and loguru
logging.
worker(wqty=1000, lqty=100, size=256)
¶
Executes the log_big_string
function repeatedly, simulating a worker process or thread.
main(wqty, lqty, size, workers, thread_test, process_test)
¶
Main entry point for the module. Configures and starts either multi-threading or multi-processing based on the provided arguments.
Usage¶
Run the module directly to test its functionality. Example:
python log_example.py
You can customize the parameters for workers, logging quantity, and message size by modifying the main
function call in the __main__
block.
Dependencies¶
logging
loguru
multiprocessing
threading
secrets
tqdm
dsg_lib.common_functions
Notes¶
- Ensure the
dsg_lib
library is installed and accessible. - Adjust the logging configuration as needed for your application.
- Use the
process_test
orthread_test
flags to toggle between multi-processing and multi-threading.
License¶
This module is licensed under the MIT License.
# from loguru import logger
import logging
import logging as logger
import multiprocessing
import secrets
import threading
from tqdm import tqdm
from dsg_lib.common_functions import logging_config
# Configure logging as before
logging_config.config_log(
logging_directory="log",
log_name="log",
logging_level="DEBUG",
log_rotation="100 MB",
log_retention="10 days",
log_backtrace=True,
log_serializer=True,
log_diagnose=True,
# app_name='my_app',
# append_app_name=True,
intercept_standard_logging=True,
enqueue=True,
)
def div_zero(x: float, y: float) -> float | None:
"""
Safely divide x by y and log ZeroDivisionError if encountered.
Args:
x (float): Numerator.
y (float): Denominator.
Returns:
float | None: Quotient or None on error.
"""
try:
return x / y
except ZeroDivisionError as e:
logger.error(f"{e}") # log via loguru
logging.error(f"{e}") # log via standard logging
def div_zero_two(x: float, y: float) -> float | None:
"""
Mirror of div_zero demonstrating identical error handling.
"""
try:
return x / y
except ZeroDivisionError as e:
logger.error(f"{e}")
logging.error(f"{e}")
def log_big_string(lqty: int = 100, size: int = 256) -> None:
"""
Generate a large random string and log various messages repeatedly.
Args:
lqty (int): Number of log iterations.
size (int): Length of each random string.
"""
big_string = secrets.token_urlsafe(size) # create URL-safe token
for _ in range(lqty):
logging.debug(f"Lets make this a big message {big_string}") # standard debug
div_zero(1, 0) # trigger/log ZeroDivisionError
div_zero_two(1, 0)
# loguru messages
logger.debug("This is a loguru debug message")
logger.info("This is a loguru info message")
logger.warning("This is a loguru warning message")
logger.error("This is a loguru error message")
logger.critical("This is a loguru critical message")
# continued standard logging
logging.info("This is a standard logging info message")
logging.warning("This is a standard logging warning message")
logging.error("This is a standard logging error message")
logging.critical("This is a standard logging critical message")
def worker(wqty: int = 1000, lqty: int = 100, size: int = 256) -> None:
"""
Worker routine performing log_big_string in a progress loop.
Args:
wqty (int): Number of outer iterations.
lqty (int): Messages per iteration.
size (int): Random string length.
"""
for _ in tqdm(range(wqty), ascii=True, leave=True):
log_big_string(lqty=lqty, size=size)
def main(
wqty: int = 100,
lqty: int = 10,
size: int = 256,
workers: int = 16,
thread_test: bool = False,
process_test: bool = False,
) -> None:
"""
Configure and launch concurrent logging workers.
Args:
wqty (int): Iterations per worker.
lqty (int): Logs per iteration.
size (int): Random string size.
workers (int): Thread/process count.
thread_test (bool): Run threads if True.
process_test (bool): Run processes if True.
"""
if process_test:
processes = []
# Create worker processes
for _ in tqdm(range(workers), desc="Multi-Processing Start", leave=True):
p = multiprocessing.Process(
target=worker,
args=(
wqty,
lqty,
size,
),
)
processes.append(p)
p.start()
for p in tqdm((processes), desc="Multi-Processing Start", leave=False):
p.join(timeout=60) # Timeout after 60 seconds
if p.is_alive():
logger.error(f"Process {p.name} is hanging. Terminating.")
p.terminate()
p.join()
if thread_test:
threads = []
for _ in tqdm(
range(workers), desc="Threading Start", leave=True
): # Create worker threads
t = threading.Thread(
target=worker,
args=(
wqty,
lqty,
size,
),
)
threads.append(t)
t.start()
for t in tqdm(threads, desc="Threading Gather", leave=False):
t.join()
if __name__ == "__main__":
from time import time
start = time()
main(wqty=5, lqty=50, size=64, workers=8, thread_test=False, process_test=True)
print(f"Execution time: {time()-start:.2f} seconds")