Skip to content

Reference

dsg_lib.common_functions.file_mover

Module: file_mover Detailed file processing flow that continuously monitors and processes files from a source directory, optionally compresses them, and then moves them to a final destination. Ensures no files are lost during transfer.

Functions:

Name Description
) -> None

Continuously monitors the source directory for files matching the given pattern, moves them to a temporary directory, optionally compresses them, and then transfers them to the final directory.

_process_file

Path, temp_path: Path, final_path: Path, compress: bool) -> None: Handles the internal logic of moving and optionally compressing a single file.

Usage Example:

from dsg_lib.common_functions.file_mover import process_files_flow

process_files_flow(
    source_dir="/some/source",
    temp_dir="/some/temp",
    final_dir="/some/final",
    file_pattern="*.txt",
    compress=True
)

process_files_flow(source_dir, temp_dir, final_dir, file_pattern, compress=False, max_iterations=None)

Continuously monitors a source directory for files. Moves files matching file_pattern to a temporary directory, optionally compresses them, then moves them to a final destination directory.

Parameters:

Name Type Description Default
source_dir str

Path to the source directory to watch.

required
temp_dir str

Path to the temporary directory for processing.

required
final_dir str

Path to the final destination directory.

required
file_pattern str

Glob pattern for matching files (e.g. "*.txt").

required
compress bool

If True, compress files before moving. Defaults to False.

False
max_iterations Optional[int]

Limit iterations in watch loop. Defaults to None.

None

Returns:

Type Description
None

None

Raises:

Type Description
Exception

Propagated if file operations fail.

Example

process_files_flow("/source", "/temp", "/final", "*.pdf", compress=True)

Source code in dsg_lib/common_functions/file_mover.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def process_files_flow(
    source_dir: str,
    temp_dir: str,
    final_dir: str,
    file_pattern: str,
    compress: bool = False,
    max_iterations: Optional[int] = None,
) -> None:
    """
    Continuously monitors a source directory for files. Moves files matching
    file_pattern to a temporary directory, optionally compresses them, then
    moves them to a final destination directory.

    Args:
        source_dir (str): Path to the source directory to watch.
        temp_dir (str): Path to the temporary directory for processing.
        final_dir (str): Path to the final destination directory.
        file_pattern (str): Glob pattern for matching files (e.g. "*.txt").
        compress (bool, optional): If True, compress files before moving. Defaults to False.
        max_iterations (Optional[int], optional): Limit iterations in watch loop. Defaults to None.

    Returns:
        None

    Raises:
        Exception: Propagated if file operations fail.

    Example:
        process_files_flow("/source", "/temp", "/final", "*.pdf", compress=True)
    """
    temp_path: Path = Path(temp_dir)
    final_path: Path = Path(final_dir)
    source_path: Path = Path(source_dir)

    # Ensure temporary and final directories exist.
    for path in (temp_path, final_path):
        path.mkdir(parents=True, exist_ok=True)

    # Process existing files in the source directory at startup
    logger.info(f"Processing existing files in source directory: {source_dir}")
    for file in source_path.glob(file_pattern):
        if file.is_file():
            try:
                logger.info(f"Processing existing file: {file}")
                _process_file(file, temp_path, final_path, compress)
            except Exception as e:
                logger.error(f"Error processing existing file '{file}': {e}")
                raise

    # The clear_source deletion block has been removed so that files remain in the source directory
    # if they have not already been processed.

    logger.info(
        f"Starting file processing flow: monitoring '{source_dir}' for pattern '{file_pattern}'."
    )

    # Monitor the source directory for changes
    changes_generator: Generator[Set[Tuple[int, str]], None, None] = watch(source_dir)
    if max_iterations is not None:
        changes_generator = islice(changes_generator, max_iterations)

    for changes in changes_generator:
        logger.debug(f"Detected changes: {changes}")
        for _change_type, file_str in changes:
            file_path: Path = Path(file_str)
            # Only process files matching the pattern and that are files
            if file_path.is_file() and file_path.match(file_pattern):
                try:
                    logger.info(f"Detected file for processing: {file_path}")
                    _process_file(file_path, temp_path, final_path, compress)
                except Exception as e:
                    logger.error(f"Error processing file '{file_path}': {e}")
                    raise
        sleep(1)  # Small delay to minimize CPU usage