Source code for crate_anon.preprocess.preprocess_systmone

#!/usr/bin/env python

r"""
crate_anon/preprocess/preprocess_systmone.py

===============================================================================

    Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CRATE.

    CRATE is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CRATE is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CRATE. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**Preprocess a copy of a SystmOne database -- primarily to index it.**

"""

import argparse
import logging
from typing import List, TYPE_CHECKING

from cardinal_pythonlib.enumlike import keys_descriptions_from_enum
from cardinal_pythonlib.logs import main_only_quicksetup_rootlogger
from cardinal_pythonlib.sqlalchemy.schema import (
    hack_in_mssql_xml_type,
    make_bigint_autoincrement_column,
)
from rich_argparse import RawDescriptionRichHelpFormatter
from sqlalchemy import create_engine, MetaData
from sqlalchemy.engine.base import Engine

from crate_anon.anonymise.constants import CHARSET
from crate_anon.common.sql import (
    add_columns,
    add_indexes,
    create_view,
    drop_columns,
    drop_indexes,
    drop_view,
    ensure_columns_present,
    IndexCreationInfo,
    set_print_not_execute,
)
from crate_anon.preprocess.constants import (
    CRATE_COL_PK,
    CRATE_IDX_PREFIX,
    DEFAULT_GEOG_COLS,
    ONSPD_TABLE_POSTCODE,
)
from crate_anon.preprocess.postcodes import COL_POSTCODE_VARIABLE_LENGTH_SPACE
from crate_anon.preprocess.systmone_ddgen import (
    contextual_tablename,
    core_tablename,
    CrateS1ViewCol,
    CrateView,
    DEFAULT_SYSTMONE_CONTEXT,
    is_in_re,
    is_mpid,
    is_pid,
    is_pk,
    S1AddressCol,
    S1GenericCol,
    S1PatientCol,
    S1Table,
    SystmOneContext,
    TABLES_REQUIRING_CRATE_PK_REGEX,
)

if TYPE_CHECKING:
    from sqlalchemy.schema import Column, Table

log = logging.getLogger(__name__)


# =============================================================================
# Preprocessing
# =============================================================================


[docs]def add_postcode_geography_view( engine: Engine, address_table: str, postcode_db: str, geog_cols: List[str], view_name: str, ) -> None: """ Creates a source view to add geography columns to an address table including postcodes, linking in e.g. LSOA/IMD information from an ONS postcode table (e.g. imported by CRATE; see postcodes.py). Args: engine: An SQLAlchemy Engine. address_table: The name of the address table in the source SystmOne database. postcode_db: The name of the database (and, for SQL Server, the schema) in which ONS postcode information is stored. geog_cols: Columns to merge in from the postcode database. view_name: The name of the view to create in the source SystmOne database. Re SystmOne postcode encoding: - CPFT creates PostCode_NoSpaces. - However .. code-block:: sql SELECT COUNT(*) FROM S1_PatientAddress WHERE CHARINDEX(' ', FullPostcode) > 0; -- ... gives lots of hits (e.g. 593k); so they mostly have spaces in. SELECT COUNT(*) FROM S1_PatientAddress WHERE CHARINDEX(' ', FullPostcode) = 0; -- ... a few (e.g. 20); all are just the first halves, and none are a -- full postcode with space missing. SELECT DISTINCT LEN(FullPostcode) FROM S1_PatientAddress ORDER BY LEN(FullPostcode); -- NULL, 4, 5, 6, 7, 8 So, with a bit more testing, we conclude that SystmOne uses STANDARD VARIABLE-LENGTH FORMAT WITH SPACES. """ a = "A" # alias for address table p = "P" # alias for postcode table geog_col_specs = [ f"{p}.{col}" for col in sorted(geog_cols, key=lambda x: x.lower()) ] s1_postcode_col = S1AddressCol.POSTCODE ons_postcode_col = COL_POSTCODE_VARIABLE_LENGTH_SPACE ensure_columns_present( engine, tablename=address_table, column_names=[s1_postcode_col] ) colsep = ",\n " select_sql = f""" SELECT -- Original columns that are not identifying: -- ... PK and patient ID: {a}.{S1GenericCol.ROW_ID}, {a}.{S1GenericCol.PATIENT_ID}, -- ... Admin fields: {a}.{S1GenericCol.EVENT_OCCURRED_WHEN}, {a}.{S1GenericCol.EVENT_RECORDED_WHEN}, -- [not in CPFT version] {a}.IDDoneBy, -- [not in CPFT version] {a}.IDEvent, -- [not in CPFT version] {a}.IDOrganisation, -- [not in CPFT version] {a}.IDOrganisationDoneAt, -- [not in CPFT version] {a}.IDOrganisationRegisteredAt, -- [not in CPFT version] {a}.IDOrganisationVisibleTo, -- [not in CPFT version] {a}.IDProfileEnteredBy, -- [not in CPFT version] {a}.TextualEventDoneBy, -- ... Non-identifying information about the address: {a}.{S1AddressCol.ADDRESS_TYPE}, {a}.{S1AddressCol.CCG_OF_RESIDENCE}, {a}.{S1AddressCol.DATE_TO}, -- Geography columns (with nothing too specific): {colsep.join(geog_col_specs)} FROM {address_table} AS {a} INNER JOIN {postcode_db}.{ONSPD_TABLE_POSTCODE} AS {p} ON {p}.{ons_postcode_col} = {a}.{s1_postcode_col} """ create_view(engine, view_name, select_sql)
[docs]def add_testpatient_view( engine: Engine, patient_table: str, view_name: str ) -> None: """ Creates a source view to find extra test patients, where they have not been correctly identified by the "official" method or caught by additional local filters. Args: engine: An SQLAlchemy Engine. patient_table: The name of the patient table in the source SystmOne database. view_name: The name of the view to create in the source SystmOne database. Test with: .. code-block:: sql SELECT t.IDPatient, p.FirstName, p.Surname FROM vw_crate_FindExtraTestPatients t INNER JOIN S1_Patient p ON p.RowIdentifier = t.RowIdentifier """ select_sql = f""" SELECT {S1GenericCol.ROW_ID}, {S1GenericCol.PATIENT_ID}, 1 AS {CrateS1ViewCol.IS_TEST_PATIENT} FROM {patient_table} WHERE {S1PatientCol.FORENAME} LIKE '%test%' AND {S1PatientCol.SURNAME} LIKE '%test%' """ create_view(engine, view_name, select_sql)
[docs]def preprocess_systmone( engine: Engine, context: SystmOneContext, allow_unprefixed_tables: bool = False, drop_danger_drop: bool = False, postcode_db_name: str = None, geog_cols: List[str] = None, ) -> None: """ Add indexes to a SystmOne source database. Without this, anonymisation is very slow. Also adds pseudo-PK columns to selected tables. """ geog_cols = geog_cols or [] # type: List[str] log.info("Reflecting (inspecting) database...") metadata = MetaData() metadata.bind = engine metadata.reflect(engine) log.info("... inspection complete") # Tables for table in sorted( metadata.tables.values(), key=lambda t: t.name.lower() ): # type: Table ct = core_tablename( table.name, from_context=context, allow_unprefixed=allow_unprefixed_tables, ) if not ct: log.debug(f"Skipping table: {table.name}") continue table_needs_pk = is_in_re(ct, TABLES_REQUIRING_CRATE_PK_REGEX) # If creating, (1) create pseudo-PK if necessary, (2) create indexes. # If dropping, (1) drop indexes, (2) drop pseudo-PK if necessary. # Create step #1 if not drop_danger_drop and table_needs_pk: crate_pk_col = make_bigint_autoincrement_column( CRATE_COL_PK, engine.dialect ) # SQL Server requires Table-bound columns in order to generate DDL: table.append_column(crate_pk_col) add_columns(engine, table, [crate_pk_col]) # Create step #2 or drop step #1 # noinspection PyTypeChecker for column in table.columns: # type: Column colname = column.name idxname = f"{CRATE_IDX_PREFIX}_{colname}" if ( column.primary_key or is_pk(ct, colname, context) or is_pid(colname, context) or is_mpid(colname, context) ): # It's too much faff to work out reliably if the source table # should have a UNIQUE index, particularly when local (CPFT) # tables use the "RowIdentifier" column name in a non-unique # way. It's not critical, as we're only reading, so a # non-unique index is fine (even if we move to a unique index # on the destination). if drop_danger_drop: drop_indexes(engine, table, [idxname]) else: add_indexes( engine, table, [ IndexCreationInfo( index_name=idxname, column=colname, unique=False, ) ], ) # Drop step #2 if drop_danger_drop and table_needs_pk: drop_columns(engine, table, [CRATE_COL_PK]) # Views if drop_danger_drop: drop_view(engine, CrateView.TESTPATIENT_VIEW) if postcode_db_name: drop_view(engine, CrateView.GEOGRAPHY_VIEW) else: add_testpatient_view( engine=engine, patient_table=contextual_tablename(S1Table.PATIENT, context), view_name=CrateView.TESTPATIENT_VIEW, ) if postcode_db_name: add_postcode_geography_view( engine=engine, postcode_db=postcode_db_name, address_table=contextual_tablename( S1Table.ADDRESS_HISTORY, context ), view_name=CrateView.GEOGRAPHY_VIEW, geog_cols=geog_cols, )
# ============================================================================= # Main # =============================================================================
[docs]def main() -> None: """ Command-line parser. See command-line help. """ parser = argparse.ArgumentParser( formatter_class=RawDescriptionRichHelpFormatter, description=r"Indexes a SystmOne database to be suitable for CRATE.", ) parser.add_argument("--url", required=True, help="SQLAlchemy database URL") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose") parser.add_argument( "--print", action="store_true", help="Print SQL but do not execute it. (You can redirect the printed " "output to create an SQL script.)", ) parser.add_argument("--echo", action="store_true", help="Echo SQL") context_k, context_d = keys_descriptions_from_enum( SystmOneContext, keys_to_lower=True ) parser.add_argument( "--systmone_context", type=str, choices=context_k, default=DEFAULT_SYSTMONE_CONTEXT.name.lower(), help="Context of the SystmOne database that you are reading. " f"[{context_d}]", ) parser.add_argument( "--systmone_allow_unprefixed_tables", action="store_true", help="Permit tables that don't start with the expected prefix " "(which is e.g. 'SR' for the TPP SRE context, 'S1_' for the CPFT " "Data Warehouse context). May add helpful content, but you may " "get odd tables and views.", ) parser.add_argument( "--postcodedb", help="Specify database (schema) name for ONS Postcode Database (as " "imported by CRATE) to link to addresses as a view. With SQL " "Server, you will have to specify the schema as well as the " 'database; e.g. "--postcodedb ONS_PD.dbo"', ) parser.add_argument( "--geogcols", nargs="*", default=DEFAULT_GEOG_COLS, help=f"List of geographical information columns to link in from ONS " f"Postcode Database. BEWARE that you do not specify anything too " f"identifying. Default: {' '.join(DEFAULT_GEOG_COLS)}", ) parser.add_argument( "--drop_danger_drop", action="store_true", help="REMOVES new columns and indexes, rather than creating them. " "(There's not very much danger; no real information is lost, but " "it might take a while to recalculate it.)", ) args = parser.parse_args() main_only_quicksetup_rootlogger( level=logging.DEBUG if args.verbose else logging.INFO ) set_print_not_execute(args.print) hack_in_mssql_xml_type() engine = create_engine(args.url, echo=args.echo, encoding=CHARSET) log.info(f"Database: {engine.url!r}") # ... repr (!r) hides p/w log.debug(f"Dialect: {engine.dialect.name}") preprocess_systmone( engine, context=SystmOneContext[args.systmone_context], allow_unprefixed_tables=args.systmone_allow_unprefixed_tables, drop_danger_drop=args.drop_danger_drop, postcode_db_name=args.postcodedb, geog_cols=args.geogcols, )
if __name__ == "__main__": main()