Source code for crate_anon.anonymise.launch_multiprocess_anonymiser

#!/usr/bin/env python

"""
crate_anon/anonymise/launch_multiprocess_anonymiser.py

===============================================================================

    Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CRATE.

    CRATE is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CRATE is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CRATE. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**Launch the CRATE anonymiser in multiprocess mode.**

Previous bash version: see

- https://stackoverflow.com/questions/356100
- https://stackoverflow.com/questions/1644856
- https://stackoverflow.com/questions/8903239
- https://stackoverflow.com/questions/1951506
- Note: ``$!`` is the process ID of last process launched in background
- https://stackoverflow.com/questions/59895

Python version: see

- https://stackoverflow.com/questions/23611396/python-execute-cat-subprocess-in-parallel
- https://stackoverflow.com/questions/320232/ensuring-subprocesses-are-dead-on-exiting-python-program
- https://stackoverflow.com/questions/641420/how-should-i-log-while-using-multiprocessing-in-python
"""  # noqa

import argparse
import logging
import multiprocessing
import os
import sys
import time
from typing import List

from cardinal_pythonlib.logs import configure_logger_for_colour
from cardinal_pythonlib.subproc import (
    check_call_process,
    run_multiple_processes,
)
from rich_argparse import ArgumentDefaultsRichHelpFormatter

from crate_anon.common.constants import EnvVar
from crate_anon.version import CRATE_VERSION, CRATE_VERSION_DATE

log = logging.getLogger(__name__)

ANONYMISER = "crate_anon.anonymise.anonymise_cli"

if EnvVar.GENERATING_CRATE_DOCS in os.environ:
    CPUCOUNT = 8
else:
    CPUCOUNT = multiprocessing.cpu_count()


# =============================================================================
# Main
# =============================================================================


[docs]def main() -> None: """ Command-line processor. See command-line help. """ version = f"Version {CRATE_VERSION} ({CRATE_VERSION_DATE})" description = ( f"Runs the CRATE anonymiser in parallel. {version}. " f"Note that all arguments not specified here are passed to the " f"underlying script (see crate_anonymise --help)." ) parser = argparse.ArgumentParser( description=description, formatter_class=ArgumentDefaultsRichHelpFormatter, ) parser.add_argument( "--nproc", "-n", nargs="?", type=int, default=CPUCOUNT, help="Number of processes " "(default is the number of CPUs on this machine)", ) parser.add_argument( "--verbose", "-v", action="store_true", help="Be verbose" ) args, unknownargs = parser.parse_known_args() loglevel = logging.DEBUG if args.verbose else logging.INFO rootlogger = logging.getLogger() configure_logger_for_colour(rootlogger, level=loglevel) common_launcher = [sys.executable, "-m", ANONYMISER] common_options = ["-v"] * (1 if args.verbose else 0) + unknownargs log.debug(f"common_options: {common_options}") nprocesses_patient = args.nproc nprocesses_nonpatient = args.nproc nprocesses_index = args.nproc # ------------------------------------------------------------------------- # Setup # ------------------------------------------------------------------------- # Start. time_start = time.time() # ------------------------------------------------------------------------- # Clean/build the tables. Only run one copy of this! # ------------------------------------------------------------------------- # CALL USING "python -m my.module"; DO NOT run the script as an executable. # If you run a Python script as an executable, it gets added to the # PYTHONPATH. Then, when your script says "import regex" (meaning the # system module), it might import "regex.py" from the same directory (which # it wouldn't normally do, because Python 3 uses absolute not relative # imports). procargs = ( common_launcher + ["--dropremake", "--processcluster=STRUCTURE"] + common_options ) check_call_process(procargs) # ------------------------------------------------------------------------- # Build opt-out lists. Only run one copy of this! # ------------------------------------------------------------------------- procargs = ( common_launcher + ["--optout", "--processcluster=OPTOUT", "--skip_dd_check"] + common_options ) check_call_process(procargs) # ------------------------------------------------------------------------- # Now run lots of things simultaneously: # ------------------------------------------------------------------------- # It'd be less confusing if we have a single numbering system across all, # rather than numbering separately for patient and non-patient processes. # However, each group divides its work by its process number, so that # won't fly (for n processes it wants to see processes numbered from 0 to # n - 1 inclusive). # (a) patient tables args_list = [] # type: List[List[str]] for procnum in range(nprocesses_patient): procargs = ( common_launcher + [ "--patienttables", "--processcluster=PATIENT", f"--nprocesses={nprocesses_patient}", f"--process={procnum}", "--skip_dd_check", ] + common_options ) args_list.append(procargs) # (b) non-patient tables for procnum in range(nprocesses_nonpatient): procargs = ( common_launcher + [ "--nonpatienttables", "--processcluster=NONPATIENT", f"--nprocesses={nprocesses_nonpatient}", f"--process={procnum}", "--skip_dd_check", ] + common_options ) args_list.append(procargs) run_multiple_processes(args_list) # Wait for them all to finish # todo: fix cardinal_pythonlib to allow requesting e.g. n processes in # total but only n/2 at once time_middle = time.time() # ------------------------------------------------------------------------- # Now do the indexing, if nothing else failed. # (Always fastest to index last.) # ------------------------------------------------------------------------- args_list = [ common_launcher + [ "--index", "--processcluster=INDEX", f"--nprocesses={nprocesses_index}", f"--process={procnum}", "--skip_dd_check", ] + common_options for procnum in range(nprocesses_index) ] run_multiple_processes(args_list) # ------------------------------------------------------------------------- # Finished. # ------------------------------------------------------------------------- time_end = time.time() main_dur = time_middle - time_start index_dur = time_end - time_middle total_dur = time_end - time_start print( f"Time taken: main {main_dur} s, indexing {index_dur} s, " f"total {total_dur} s" )
if __name__ == "__main__": main()