Source code for crate_anon.anonymise.ddr

"""
crate_anon/anonymise/ddr.py

===============================================================================

    Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CRATE.

    CRATE is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CRATE is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CRATE. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**Data dictionary rows.**

"""

# =============================================================================
# Imports
# =============================================================================

import ast
import logging
from typing import Any, List, Dict, Iterable, Optional, TYPE_CHECKING, Union

from cardinal_pythonlib.convert import convert_to_int
from cardinal_pythonlib.lists import count_bool
from cardinal_pythonlib.sql.validation import (
    ensure_valid_field_name,
    ensure_valid_table_name,
    is_sqltype_valid,
)
from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName
from cardinal_pythonlib.sqlalchemy.schema import (
    convert_sqla_type_for_dialect,
    does_sqlatype_merit_fulltext_index,
    does_sqlatype_require_index_len,
    giant_text_sqltype,
    get_sqla_coltype_from_dialect_str,
    is_sqlatype_binary,
    is_sqlatype_date,
    is_sqlatype_numeric,
    is_sqlatype_text_of_length_at_least,
    is_sqlatype_text_over_one_char,
)
from sqlalchemy import Column
from sqlalchemy.engine.interfaces import Dialect
from sqlalchemy.sql.sqltypes import TypeEngine

from crate_anon.anonymise.altermethod import AlterMethod
from crate_anon.anonymise.constants import (
    AlterMethodType,
    AnonymiseConfigKeys,
    Decision,
    DEFAULT_INDEX_LEN,
    IndexType,
    MYSQL_MAX_IDENTIFIER_LENGTH,
    ODD_CHARS_TRANSLATE,
    ScrubMethod,
    ScrubSrc,
    SQLSERVER_MAX_IDENTIFIER_LENGTH,
    SrcFlag,
)
from crate_anon.common.sql import (
    coltype_length_if_text,
    is_sql_column_type_textual,
    matches_fielddef,
    matches_tabledef,
    SQLTYPE_DATE,
)

if TYPE_CHECKING:
    from crate_anon.anonymise.config import Config, DatabaseSafeConfig

log = logging.getLogger(__name__)


# =============================================================================
# Constants for reporting
# =============================================================================


class DDRLabels:
    BEING_SCRUBBED = "Scrubbed free text."
    DEFINES_PRIMARY_PID = (
        "PRINCIPAL PATIENT-DEFINING COLUMN IN THE WHOLE DATABASE."
    )
    MRID = "MRID."
    NOTHING = "–"  # en dash
    RID = "RID."
    SOURCE_HASH = "Hash of source row to detect changes."
    THIRD_PARTY_RID = "Third-party RID (e.g. relative)."
    TRID = "TRID."
    UNKNOWN = "?"


# =============================================================================
# Helper functions
# =============================================================================


[docs]def warn_if_identifier_long( table: str, column: str, dest_dialect: Optional[str] ) -> None: """ Warns about identifiers that are too long for specific database engines. """ prettyname_dialectname_maxlen = ( ("MySQL", SqlaDialectName.MYSQL, MYSQL_MAX_IDENTIFIER_LENGTH), ("SQL Server", SqlaDialectName.MSSQL, SQLSERVER_MAX_IDENTIFIER_LENGTH), ) description_value = ( ("Table", table), ("Column", column), ) for prettyname, dialect_name, maxlen in prettyname_dialectname_maxlen: if dest_dialect is not None and dest_dialect != dialect_name: # We know our destination dialect and it's not the one we're # considering. continue for description, value in description_value: if len(value) > maxlen: log.warning( f"{description} name in {table!r}.{column!r} " f"is too long for {prettyname} " f"({len(value)} characters > {maxlen} maximum)" )
# ============================================================================= # DataDictionaryRow # =============================================================================
[docs]class DataDictionaryRow: """ Class representing a single row of a data dictionary (a DDR). """ # For attribute/config references: SRC_DB = "src_db" SRC_TABLE = "src_table" SRC_FIELD = "src_field" SRC_DATAFILE = "src_datatype" SRC_FLAGS = "src_flags" SCRUB_SRC = "scrub_src" SCRUB_METHOD = "scrub_method" DECISION = "decision" INCLUSION_VALUES = "inclusion_values" EXCLUSION_VALUES = "exclusion_values" ALTER_METHOD = "alter_method" DEST_TABLE = "dest_table" DEST_FIELD = "dest_field" DEST_DATATYPE = "dest_datatype" INDEX = "index" INDEXLEN = "indexlen" COMMENT = "comment" ROWNAMES = [ SRC_DB, SRC_TABLE, SRC_FIELD, SRC_DATAFILE, SRC_FLAGS, SCRUB_SRC, SCRUB_METHOD, DECISION, INCLUSION_VALUES, EXCLUSION_VALUES, ALTER_METHOD, DEST_TABLE, DEST_FIELD, DEST_DATATYPE, INDEX, INDEXLEN, COMMENT, ] ENUM_ROWNAMES = (INDEX, SCRUB_SRC, SCRUB_METHOD)
[docs] def __init__(self, config: "Config") -> None: """ Set up basic defaults. Args: config: :class:`crate_anon.anonymise.config.Config` """ self.config = config # In the order of ROWNAMES: self.src_db = None # type: Optional[str] self.src_table = None # type: Optional[str] self.src_field = None # type: Optional[str] self.src_datatype = None # type: Optional[str] # in SQL string format # src_flags: a property; see below self.scrub_src = None # type: Optional[str] self.scrub_method = None # type: Optional[str] # decision: a property; see below # inclusion_values: a property; see below # exclusion_values: a property; see below # alter_method: a property; see below self.dest_table = None # type: Optional[str] self.dest_field = None # type: Optional[str] self.dest_datatype = None # type: Optional[str] self.index = IndexType.NONE # type: IndexType self.indexlen = None # type: Optional[int] self.comment = "" # For src_flags: self._pk = False self._not_null = False self._add_src_hash = False self._primary_pid = False self._defines_primary_pids = False self._master_pid = False self._constant = False self._addition_only = False self._opt_out_info = False self._required_scrubber = False # Other: self.omit = False # in the DD file, this corresponds to 'decision' self._from_file = False self._src_override_dialect = None # type: Optional[Dialect] self._src_sqla_coltype = None # type: Optional[str] self._inclusion_values = [] # type: List[Any] self._exclusion_values = [] # type: List[Any] self._alter_methods = [] # type: List[AlterMethod]
# ------------------------------------------------------------------------- # Properties: Relating to whole databases # ------------------------------------------------------------------------- @property def src_db_lowercase(self) -> str: """ Returns the source database name, in lower case. """ return self.src_db.lower() @property def src_dialect(self) -> Dialect: """ Returns the SQLAlchemy :class:`Dialect` (e.g. MySQL, SQL Server...) for the source database. """ return self._src_override_dialect or self.config.get_src_dialect( self.src_db ) @property def dest_dialect(self) -> Dialect: """ Returns the SQLAlchemy :class:`Dialect` (e.g. MySQL, SQL Server...) for the destination database. """ return self.config.dest_dialect @property def dest_dialect_name(self) -> str: """ Returns the SQLAlchemy dialect name for the destination database. """ return self.config.dest_dialect_name # ------------------------------------------------------------------------- # Properties: Relating to database columns # ------------------------------------------------------------------------- @property def src_table_lowercase(self) -> str: """ Returns the source table name, in lower case. """ return self.src_table.lower() @property def src_field_lowercase(self) -> str: """ Returns the source field (column) name, in lower case. """ return self.src_field.lower() @property def pk(self) -> bool: """ Is the source field (and the destination field, for that matter) a primary key (PK)? """ return self._pk @property def not_null(self) -> bool: """ Defaults to False. But if the DD row was created by database reflection, and the source field was set NOT NULL, will return True. """ return self._not_null @property def src_is_textual(self) -> bool: """ Is the source column textual? """ return is_sql_column_type_textual(self.src_datatype) @property def src_textlength(self) -> Optional[int]: """ If the source column is textual, returns its length (or ``None``) for unlimited. Also returns ``None`` if the source is not textual. """ if not self.src_is_textual: return None dialect = self.src_dialect # Get length of field if text field (otherwise this remains 'None') # noinspection PyUnresolvedReferences return coltype_length_if_text(self.src_datatype, dialect.name) # ------------------------------------------------------------------------- # Properties: CRATE # ------------------------------------------------------------------------- @property def add_src_hash(self) -> bool: """ Should we add a column to the destination that contains a hash of the contents of the whole source row (all fields)? """ return self._add_src_hash @property def primary_pid(self) -> bool: """ Does the source field contain the primary patient ID (PID)? (A typical example of a PID: "hospital number".) """ return self._primary_pid @property def defines_primary_pids(self) -> bool: """ Is this the field -- usually one in the entire source database -- that *defines* primary PIDs? Usually this is true for the "ID" column of the master patient table. """ return self._defines_primary_pids @property def master_pid(self) -> bool: """ Does this field contain the master patient ID (MPID)? (A typical example of an MPID: "NHS number".) """ return self._master_pid @property def contains_patient_scrub_src_info(self) -> bool: """ Does this field contain scrub-source information about the patient? """ return self.scrub_src == ScrubSrc.PATIENT @property def contains_third_party_info_directly(self) -> bool: """ Does this field contain (identifiable) information about a third party, directly? """ return self.scrub_src == ScrubSrc.THIRDPARTY @property def third_party_pid(self) -> bool: """ Does this field contain the PID of a different (e.g. related) patient? """ return self.scrub_src == ScrubSrc.THIRDPARTY_XREF_PID @property def contains_third_party_info(self) -> bool: """ Does this field contain (identifiable) information about a third party, either directly or via a third-party PID? """ return self.third_party_pid or self.contains_third_party_info_directly @property def has_special_alter_method(self) -> bool: """ Fields for which the alter method is fixed. """ return self.primary_pid or self.master_pid or self.third_party_pid @property def constant(self) -> bool: """ Is the source field guaranteed not to change (for a given PK)? """ return self._constant @property def addition_only(self) -> bool: """ May we assume that records can only be added to this table, not deleted? This is a flag that may be applied to a PK row only. """ return self._addition_only @property def opt_out_info(self) -> bool: """ Does the field contain information about whether the patient wishes to opt out entirely from the anonymised database? (Whether the contents of the field means "opt out" or "don't opt out" depends on ``optout_col_values`` in the :class:`crate_anon.anonymise.config.Config`.) """ return self._opt_out_info @property def src_flags(self) -> str: """ Returns a string representation of the source flags. """ return "".join( str(x) for x in ( SrcFlag.PK if self._pk else "", SrcFlag.NOT_NULL if self._not_null else "", SrcFlag.ADD_SRC_HASH if self._add_src_hash else "", SrcFlag.PRIMARY_PID if self._primary_pid else "", ( SrcFlag.DEFINES_PRIMARY_PIDS if self._defines_primary_pids else "" ), SrcFlag.MASTER_PID if self._master_pid else "", SrcFlag.CONSTANT if self._constant else "", SrcFlag.ADDITION_ONLY if self._addition_only else "", SrcFlag.OPT_OUT if self._opt_out_info else "", SrcFlag.REQUIRED_SCRUBBER if self._required_scrubber else "", ) ) @src_flags.setter def src_flags(self, flags: str) -> None: """ Takes a string representation of the source flags, and sets our internal flags accordingly. """ self._pk = SrcFlag.PK.value in flags self._not_null = SrcFlag.NOT_NULL.value in flags self._add_src_hash = SrcFlag.ADD_SRC_HASH.value in flags self._primary_pid = SrcFlag.PRIMARY_PID.value in flags self._defines_primary_pids = ( SrcFlag.DEFINES_PRIMARY_PIDS.value in flags ) self._master_pid = SrcFlag.MASTER_PID.value in flags self._constant = SrcFlag.CONSTANT.value in flags self._addition_only = SrcFlag.ADDITION_ONLY.value in flags self._opt_out_info = SrcFlag.OPT_OUT.value in flags self._required_scrubber = SrcFlag.REQUIRED_SCRUBBER.value in flags @property def inclusion_values(self) -> List[Any]: """ Returns a list of inclusion values (or an empty string if there are no such values). This slightly curious output format is used to create a TSV row (see :func:`get_tsv`) or to check in a "truthy" way whether we have inclusion values (see :func:`crate_anon.anonymise.anonymise.process_table`). """ return self._inclusion_values or "" # for TSV output @inclusion_values.setter def inclusion_values(self, value: str) -> None: """ Set the inclusion values. Args: value: either something that is "falsy" (to set the inclusion values to an empty list) or something that evaluates to a Python iterable (e.g. list or tuple) via :func:`ast.literal_eval`. For example: ``[None, 0]``, or ``[True, 1, 'yes', 'true', 'Yes', 'True']``. """ if value: self._inclusion_values = ( ast.literal_eval(value) or [] ) # type: List[Any] else: self._inclusion_values = [] # type: List[Any] @property def exclusion_values(self) -> Union[List[Any], str]: """ Returns a list of exclusion values (or an empty string if there are no such values). This slightly curious output format is used to create a TSV row (see :func:`get_tsv`) or to check in a "truthy" way whether we have exclusion values (see :func:`crate_anon.anonymise.anonymise.process_table`). """ return self._exclusion_values or "" # for TSV output @exclusion_values.setter def exclusion_values(self, value: str) -> None: """ Set the exclusion values. Args: value: either something that is "falsy" (to set the inclusion values to an empty list) or something that evaluates to a Python iterable (e.g. list or tuple) via :func:`ast.literal_eval`. For example: ``[None, 0]``, or ``[True, 1, 'yes', 'true', 'Yes', 'True']``. """ if value: self._exclusion_values = ( ast.literal_eval(value) or [] ) # type: List[Any] else: self._exclusion_values = [] # type: List[Any] @property def required_scrubber(self) -> bool: """ Is this a "required scrubber" field? A "required scrubber" is a field that must provide at least one non-NULL value for each patient, or the patient won't get processed. (For example, you might want to omit a patient if you can't be certain about their surname for anonymisation.) """ return self._required_scrubber @property def alter_method(self) -> str: """ Return the ``alter_method`` string from the working fields. """ return ",".join(filter(None, (x.as_text for x in self._alter_methods))) # This removes any AlterMethod objects that are doing nothing, because # they return blank strings. @alter_method.setter def alter_method(self, value: str) -> None: """ Convert the ``alter_method`` string (from the data dictionary) to a bunch of Boolean/simple fields internally. """ # Get the list of elements in the user's order. self._alter_methods = [] # type: List[AlterMethod] elements = [x.strip() for x in value.split(",") if x] methods = [] # type: List[AlterMethod] for e in elements: methods.append(AlterMethod(config=self.config, text_value=e)) # Now establish order. Text extraction first; everything else in order. text_extraction_indices = [] # type: List[int] for i, am in enumerate(methods): if am.extract_text: text_extraction_indices.append(i) for index in sorted(text_extraction_indices, reverse=True): # Go in reverse order of index. self._alter_methods.append(methods[index]) del methods[index] self._alter_methods.extend(methods) # Now, checks: have_text_extraction = False have_truncate_date = False for am in self._alter_methods: if not am.truncate_date and have_truncate_date: raise ValueError( f"Date truncation must stand alone in " f"alter_method: {value}" ) if am.extract_text and have_text_extraction: raise ValueError( f"Can only have one text extraction method " f"in {value}" ) if am.truncate_date: have_truncate_date = True if am.extract_text: have_text_extraction = True
[docs] def set_alter_methods_directly(self, methods: List[AlterMethod]) -> None: """ For internal use: setting from a list directly. """ # Calls the alter_method setter. self.alter_method = ",".join(m.as_text for m in methods)
@property def from_file(self) -> bool: """ Was this DDR loaded from a file (rather than, say, autogenerated from a database)? """ return self._from_file @property def decision(self) -> str: """ Should we include the field in the destination? Returns: ``"OMIT"`` or ``"include``. """ return Decision.OMIT.value if self.omit else Decision.INCLUDE.value @decision.setter def decision(self, value: Union[str, Decision]) -> None: """ Sets the internal ``omit`` flag from the input (usually taken from the data dictionary file). Args: value: ``"OMIT"`` or ``"include``. """ try: if isinstance(value, Decision): e = value else: e = Decision.lookup(value) self.omit = e is Decision.OMIT except ValueError: raise ValueError( "decision was {}; must be one of {}".format( value, [Decision.OMIT.value, Decision.INCLUDE.value] ) ) @property def include(self) -> bool: """ Is this row being included (not omitted)? """ return not self.omit @property def is_table_comment(self) -> bool: """ Is this a table comment, free of column information? """ return bool(self.comment and not self.src_field) # ------------------------------------------------------------------------- # Comparisons # ------------------------------------------------------------------------- def __lt__(self, other: "DataDictionaryRow") -> bool: """ Defines an order of DDRs based on their source field's signature. """ return self.src_signature < other.src_signature
[docs] def matches_tabledef(self, tabledef: Union[str, List[str]]) -> bool: """ Does our source table match the wildcard-based table definition? Args: tabledef: ``fnmatch``-style pattern (e.g. ``"patient_address_table_*"``), or list of them """ return matches_tabledef(self.src_table, tabledef)
[docs] def matches_fielddef(self, fielddef: Union[str, List[str]]) -> bool: """ Does our source table/field match the wildcard-based field definition? Args: fielddef: ``fnmatch``-style pattern (e.g. ``"system_table.*"`` or ``"*.nhs_number"``), or list of them """ return matches_fielddef(self.src_table, self.src_field, fielddef)
# ------------------------------------------------------------------------- # Representations # ------------------------------------------------------------------------- def __str__(self) -> str: """ Returns a string representation of the DDR. """ return ", ".join( [f"{a}: {getattr(self, a)!r}" for a in DataDictionaryRow.ROWNAMES] ) @property def src_signature(self) -> str: """ Returns a signature based on the source database/table/field, in the format ``db.table.column``. """ if self.is_table_comment: return f"{self.src_db}.{self.src_table}" return f"{self.src_db}.{self.src_table}.{self.src_field}" @property def dest_signature(self) -> str: """ Returns a signature based on the destination table/field, in the format ``table.column``. """ if self.is_table_comment: return f"{self.dest_table}" return f"{self.dest_table}.{self.dest_field}" @property def offender_description(self) -> str: """ Get a string used to describe this DDR (in terms of its source/destination fields) if it does something wrong. """ offenderdest = "" if not self.omit else f" -> {self.dest_signature}" return f"{self.src_signature}{offenderdest}"
[docs] @classmethod def header_row(cls) -> List[str]: """ Returns a header row (a list of headings) for use in spreadsheet formats. """ return list(cls.ROWNAMES)
[docs] def as_row(self) -> List[Any]: """ Returns a data row (a list of values whose order matches :meth:`header_row`) for use in spreadsheet formats. """ row = [] # type: List[Any] for k in self.ROWNAMES: v = getattr(self, k) if v is None: # some spreadsheet handlers (e.g. pyexcel_ods) choke on None v = "" elif k in self.ENUM_ROWNAMES: # convert enum to str v = str(v) row.append(v) return row
[docs] def report_dest_annotation(self) -> str: """ Returns information useful for a researcher looking at the destination database, in simple string form. - Therefore: does not include fields like "constant", "addition_only", which are primarily for database managers; we're trying to keep this terse. - Relates to DESTINATION fields, e.g. a source PID becomes a destination RID. """ items = [] # type: List[str] if self.primary_pid: items.append(DDRLabels.RID) if self.defines_primary_pids: items.append(DDRLabels.DEFINES_PRIMARY_PID) if self.master_pid: items.append(DDRLabels.MRID) if self.third_party_pid: items.append(DDRLabels.THIRD_PARTY_RID) if self.being_scrubbed: items.append(DDRLabels.BEING_SCRUBBED) if not items: return DDRLabels.NOTHING return " ".join(items)
# ------------------------------------------------------------------------- # Setting # -------------------------------------------------------------------------
[docs] def set_from_dict( self, valuedict: Dict[str, Any], override_dialect: Dialect = None ) -> None: """ Set internal fields from a dict of elements representing a row from the TSV data dictionary file. Also sets the "loaded from file" indicator, since that is the context in which we use this function. Args: valuedict: Dictionary mapping row headings (or attribute names) to values. override_dialect: SQLAlchemy SQL dialect to enforce (e.g. for interpreting textual column types in the source database). By default, the source database's own dialect is used. """ self.src_db = valuedict["src_db"] self.src_table = valuedict["src_table"] self.src_field = valuedict["src_field"] self.src_datatype = valuedict["src_datatype"].upper() self._src_override_dialect = override_dialect # noinspection PyAttributeOutsideInit self.src_flags = valuedict["src_flags"] # a property self.scrub_src = ScrubSrc.lookup( valuedict["scrub_src"], allow_none=True ) self.scrub_method = ScrubMethod.lookup( valuedict["scrub_method"], allow_none=True ) # noinspection PyAttributeOutsideInit self.decision = valuedict["decision"] # a property; sets self.omit # noinspection PyAttributeOutsideInit self.inclusion_values = valuedict["inclusion_values"] # a property # noinspection PyAttributeOutsideInit self.exclusion_values = valuedict["exclusion_values"] # a property # noinspection PyAttributeOutsideInit self.alter_method = valuedict["alter_method"] # a property self.dest_table = valuedict["dest_table"] self.dest_field = valuedict["dest_field"] self.dest_datatype = valuedict["dest_datatype"].upper() self.index = IndexType.lookup(valuedict["index"], allow_none=True) self.indexlen = convert_to_int(valuedict["indexlen"]) self.comment = valuedict["comment"] self._from_file = True
# ------------------------------------------------------------------------- # Anonymisation decisions # ------------------------------------------------------------------------- @property def being_scrubbed(self) -> bool: """ Is the field being scrubbed as it passes from source to destination? (Only true if the field is being included, not omitted.) """ return not self.omit and any(am.scrub for am in self._alter_methods) @property def contains_patient_info(self) -> bool: """ Does the field contain patient information? That means any of: - primary PID - MPID - scrub-source (sensitive) information """ return self.primary_pid or self.master_pid or bool(self.scrub_src) @property def contains_scrub_src(self) -> bool: """ Does the field contain scrub-source information (sensitive information used for de-identification)? """ return bool(self.scrub_src) @property def contains_vital_patient_info(self) -> bool: """ Does the field contain vital patient information? That means: - scrub-source (sensitive) information """ return self.contains_scrub_src @property def required(self) -> bool: """ Is the field required? That means any of: - chosen by the user to be translated into the destination - contains vital patient information (scrub-source information) """ # return not self.omit or self.contains_patient_info return not self.omit or self.contains_vital_patient_info
[docs] def skip_row_by_value(self, value: Any) -> bool: """ Should we skip this row, because the value is one of the row's exclusion values, or the row has inclusion values and the value isn't one of them? Args: value: value to test """ if self._inclusion_values and value not in self._inclusion_values: # log.debug("skipping row based on inclusion_values") return True if value in self._exclusion_values: # log.debug("skipping row based on exclusion_values") return True return False
@property def alter_methods(self) -> List[AlterMethod]: """ Return all alteration methods to be applied. Returns: list of :class:`crate_anon.anonymise.altermethod.AlterMethod` objects """ return self._alter_methods @property def skip_row_if_extract_text_fails(self) -> bool: """ Should we skip the row if processing the row involves extracting text and that process fails? """ return any(x.skip_if_text_extract_fails for x in self._alter_methods) @property def extracting_text_altermethods(self) -> List[AlterMethod]: """ Return all alteration methods that involve text extraction. Returns: list of :class:`crate_anon.anonymise.altermethod.AlterMethod` objects """ return [am for am in self._alter_methods if am.extract_text]
[docs] def remove_scrub_from_alter_methods(self) -> None: """ Prevent this row from being scrubbed, by removing any "scrub" method from among its alteration methods. """ # log.debug( # f"remove_scrub_from_alter_methods " # f"[used for non-patient tables]: {self.src_signature}") for sm in self._alter_methods: sm.scrub = False
# ------------------------------------------------------------------------- # Other decisions # ------------------------------------------------------------------------- @property def using_fulltext_index(self) -> bool: """ Should the destination field have a full-text index? """ return self.index == IndexType.FULLTEXT # ------------------------------------------------------------------------- # SQLAlchemy types # ------------------------------------------------------------------------- @property def src_sqla_coltype(self) -> TypeEngine: """ Returns the SQLAlchemy column type of the source column. """ return self._src_sqla_coltype or get_sqla_coltype_from_dialect_str( self.src_datatype, self.config.get_src_dialect(self.src_db) )
[docs] def set_src_sqla_coltype(self, sqla_coltype: TypeEngine) -> None: """ Sets the SQLAlchemy column type of the source column. """ self._src_sqla_coltype = sqla_coltype
@property def dest_should_be_encrypted_pid_type(self) -> bool: """ Should the destination column (if included) be of the encrypted PID/MPID type? """ return self.primary_pid or self.third_party_pid or self.master_pid @property def dest_sqla_coltype(self) -> TypeEngine: """ Returns the SQLAlchemy column type of the destination column. Note that this doesn't include nullable status. An SQLAlchemy column looks like Column(String(50), nullable=False) -- the type that we're fetching here is, for example, the String(50) part. For the full column, see ``dest_sqla_column`` below. """ assert not self.is_table_comment if self.dest_datatype: # User (or our autogeneration process) wants to override # the type. return get_sqla_coltype_from_dialect_str( self.dest_datatype, self.dest_dialect ) else: # Destination data type is not explicitly specified. # Is it a special type of field? if self.dest_should_be_encrypted_pid_type: return self.config.sqltype_encrypted_pid else: # Otherwise: return the SQLAlchemy column type class determined # from the source database by reflection. Will be autoconverted # to the destination dialect, with some exceptions, addressed # as below: return convert_sqla_type_for_dialect( coltype=self.src_sqla_coltype, dialect=self.dest_dialect, expand_for_scrubbing=self.being_scrubbed, ) @property def dest_sqla_column(self) -> Column: """ Returns an SQLAlchemy :class:`sqlalchemy.sql.schema.Column` for the destination column. """ assert not self.is_table_comment name = self.dest_field coltype = self.dest_sqla_coltype comment = self.comment or "" kwargs = { "doc": comment, # Python side "comment": comment, # SQL side; supported from SQLAlchemy 1.2: # https://docs.sqlalchemy.org/en/14/core/metadata.html#sqlalchemy.schema.Column.params.comment # noqa } if self.pk: kwargs["primary_key"] = True kwargs["autoincrement"] = False if self.not_null or self.primary_pid: kwargs["nullable"] = False return Column(name, coltype, **kwargs)
[docs] def make_dest_datatype_explicit(self) -> None: """ By default, when autocreating a data dictionary, the ``dest_datatype`` field is not populated explicit, just implicitly. This option makes them explicit by instantiating those values. Primarily for debugging. """ if self.is_table_comment: return if not self.dest_datatype: self.dest_datatype = str(self.dest_sqla_coltype)
# ------------------------------------------------------------------------- # Validation # -------------------------------------------------------------------------
[docs] def check_valid(self) -> None: """ Check internal validity and complain if invalid, showing the source of the problem. Raises: :exc:`AssertionError`, :exc:`ValueError` """ try: self._check_valid() except (AssertionError, ValueError): log.exception( f"Offending DD row [{self.offender_description}]: " f"{str(self)}" ) raise
[docs] def check_prohibited_fieldnames( self, prohibited_fieldnames: Iterable[str] ) -> None: """ Check that the destination field isn't a prohibited one. Args: prohibited_fieldnames: list of prohibited fieldnames Raises: :exc:`ValueError` if there's a problem. """ if self.dest_field in prohibited_fieldnames: log.exception( f"Offending DD row [{self.offender_description}]: " f"{str(self)}" ) raise ValueError("Prohibited dest_field name")
def _check_valid(self) -> None: """ Check internal validity and complain if invalid. Raises: :exc:`AssertionError`, :exc:`ValueError` """ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Check source database/table is OK # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ assert self.src_db, "Need src_db" if self.src_db not in self.config.source_db_names: raise ValueError( "Data dictionary row references non-existent source " "database" ) srccfg = self.config.sources[self.src_db].srccfg assert self.src_table, "Need src_table" ensure_valid_table_name(self.src_table) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Check destination table is OK # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if not self.omit: assert self.dest_table, "Need dest_table" ensure_valid_table_name(self.dest_table) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Check for conflicting or missing flags # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if self.defines_primary_pids and not self.primary_pid: raise ValueError( f"All fields with " f"{self.SRC_FLAGS}={SrcFlag.DEFINES_PRIMARY_PIDS} " f"set must have {self.SRC_FLAGS}={SrcFlag.PRIMARY_PID} set" ) if self.opt_out_info and not self.config.optout_col_values: raise ValueError( f"Fields with {self.SRC_FLAGS}={SrcFlag.OPT_OUT} " f"exist, but config's {AnonymiseConfigKeys.OPTOUT_COL_VALUES} " f"setting is empty" ) if ( count_bool( [ self.primary_pid, self.master_pid, self.third_party_pid, bool(self.alter_method), ] ) > 1 ): raise ValueError( f"Field can be any ONE of: " f"{self.SRC_FLAGS}={SrcFlag.PRIMARY_PID}, " f"{self.SRC_FLAGS}={SrcFlag.MASTER_PID}, " f"{self.SCRUB_SRC}={ScrubSrc.THIRDPARTY_XREF_PID}, or " f"{self.ALTER_METHOD} " f"(because those flags all imply a certain " f"{self.ALTER_METHOD})" ) if self.required_scrubber and not self.scrub_src: raise ValueError( f"If you specify " f"{self.SRC_FLAGS}={SrcFlag.REQUIRED_SCRUBBER}, " f"you must specify {self.SCRUB_SRC}" ) if self.add_src_hash: if not self.pk: raise ValueError( f"{self.SRC_FLAGS}={SrcFlag.ADD_SRC_HASH} " f"can only be set on " f"{self.SRC_FLAGS}={SrcFlag.PK} fields" ) # Don't exclude the self.omit and SrcFlag.ADD_SRC_HASH here. That # is done as a table-level check in dd.py instead, in # DataDictionary.check_valid(), because if the whole table is being # omitted, this is OK. if self.index != IndexType.UNIQUE: raise ValueError( f"{self.SRC_FLAGS}={SrcFlag.ADD_SRC_HASH} fields require " f"{self.INDEX}=={IndexType.UNIQUE}" ) if self.constant: raise ValueError( f"cannot mix {SrcFlag.ADD_SRC_HASH} flag with " f"{SrcFlag.CONSTANT} flag" ) if self.constant: if not self.pk: raise ValueError( f"{self.SRC_FLAGS}={SrcFlag.CONSTANT} can only be set on " f"{self.SRC_FLAGS}={SrcFlag.PK} fields" ) if self.index != IndexType.UNIQUE: raise ValueError( f"{self.SRC_FLAGS}={SrcFlag.CONSTANT} fields require " f"{self.INDEX}=={IndexType.UNIQUE}" ) if self.addition_only: if not self.pk: raise ValueError( f"{self.SRC_FLAGS}={SrcFlag.ADDITION_ONLY} " f"can only be set on " f"{self.SRC_FLAGS}={SrcFlag.PK} fields" ) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # No more checks required if field will be omitted. # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if self.omit: return # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Table comment? If so, and it's OK, then we'll end here. # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if not self.src_field: assert self.comment, ( "src_field can only be missing for table comments, " "but there is no comment" ) assert ( not self.src_datatype and not self.dest_datatype ), "Table comments cannot have datatypes" assert not self.src_flags, "Table comments cannot have any flags" assert not self.scrub_src, "Table comments cannot be scrub sources" assert not self.scrub_method, "Table comments cannot be scrubbed" assert ( not self.inclusion_values and not self.exclusion_values ), "Table comments cannot have inclusion/exclusion values" assert not self.dest_field, "Table comments cannot have dest_field" assert ( self.index == IndexType.NONE ), "Table comments cannot be indexed" return # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Check other source field information # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ensure_valid_field_name(self.src_field) assert self.src_datatype, "Need src_datatype" # REMOVED 2016-06-04; fails with complex SQL Server types, which can # look like 'NVARCHAR(10) COLLATE "Latin1_General_CI_AS"'. # # if not is_sqltype_valid(self.src_datatype): # raise ValueError( # "Field has invalid source data type: {}".format( # self.src_datatype)) src_sqla_coltype = self.src_sqla_coltype # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Check destination table/field/datatype # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Table assert self.dest_field, "Need dest_field" ensure_valid_table_name(self.dest_table) if self.dest_table == self.config.temporary_tablename: raise ValueError( f"Destination tables can't be named " f"{self.config.temporary_tablename}, as that's the name set " f"in the config's {AnonymiseConfigKeys.TEMPORARY_TABLENAME} " f"variable" ) # Field ensure_valid_field_name(self.dest_field) if self.dest_field == self.config.source_hash_fieldname: raise ValueError( f"Destination fields can't be named " f"{self.config.source_hash_fieldname}, as that's the name set " f"in the config's {AnonymiseConfigKeys.SOURCE_HASH_FIELDNAME} " f"variable" ) elif self.dest_field == self.config.trid_fieldname: raise ValueError( f"Destination fields can't be named " f"{self.config.trid_fieldname}, as that's the name set " f"in the config's {AnonymiseConfigKeys.TRID_FIELDNAME} " f"variable" ) # Ensure the destination table/column names are OK for the dialect. warn_if_identifier_long( self.dest_table, self.dest_field, self.dest_dialect_name ) # Datatype if self.dest_datatype and not is_sqltype_valid(self.dest_datatype): raise ValueError( f"Field has invalid {self.DEST_DATATYPE}: " f"{self.dest_datatype}" ) dest_sqla_coltype = self.dest_sqla_coltype # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Check destination flags/special fields # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # PID/RID if self.matches_fielddef(srccfg.ddgen_per_table_pid_field): if not self.primary_pid: raise ValueError( f"All fields with {self.SRC_FIELD}={self.src_field!r} " f"used in output should have " f"{self.SRC_FLAGS}={SrcFlag.PRIMARY_PID} set" ) if self.dest_field != self.config.research_id_fieldname: raise ValueError( f"Primary PID field should have {self.DEST_FIELD} = " f"{self.config.research_id_fieldname}" ) # MPID/MRID if ( self.matches_fielddef(srccfg.ddgen_master_pid_fieldname) and not self.master_pid ): raise ValueError( f"All fields with {self.SRC_FIELD} = " f"{srccfg.ddgen_master_pid_fieldname} used in output should " f"have {self.SRC_FLAGS}={SrcFlag.MASTER_PID} set" ) # Anything that is hashed (but not self._add_src_hash -- added # separately): if ( self.dest_should_be_encrypted_pid_type and self.dest_datatype and self.dest_datatype != self.config.sqltype_encrypted_pid_as_sql ): raise ValueError( f"All {self.SRC_FLAGS}={SrcFlag.PRIMARY_PID}/" f"{self.SRC_FLAGS}={SrcFlag.MASTER_PID}/" f"{self.SRC_FLAGS}={SrcFlag.ADD_SRC_HASH} " f"fields used in output must have {self.DEST_DATATYPE} = " f"{self.config.sqltype_encrypted_pid_as_sql} " f"(determined by the config parameter " f"{AnonymiseConfigKeys.HASH_METHOD!r})" ) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Check alteration methods # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if self.has_special_alter_method and self._alter_methods: raise ValueError( f"Don't specify {self.ALTER_METHOD} " f"for PID/MPID/third-party PID fields; " f"these are handled specially" ) for am in self._alter_methods: if am.truncate_date: if not ( is_sqlatype_date(src_sqla_coltype) or is_sqlatype_text_over_one_char(src_sqla_coltype) ): raise ValueError( f"Can't set {AlterMethodType.TRUNCATEDATE.value} " f"for non-date/non-text field" ) if am.extract_from_filename: if not is_sqlatype_text_over_one_char(src_sqla_coltype): raise ValueError( f"For {self.ALTER_METHOD} = " f"{AlterMethodType.FILENAME_TO_TEXT}, " f"source field must contain a filename and therefore " f"must be text type of >1 character" ) if am.extract_from_blob: if not is_sqlatype_binary(src_sqla_coltype): raise ValueError( f"For {self.ALTER_METHOD} = " f"{AlterMethodType.BINARY_TO_TEXT}, " f"source field must be of binary type" ) # This error/warning too hard to be sure of with SQL Server odd # string types: # if [RENAMED: self._scrub] and not self._extract_text: # if not is_sqltype_text_over_one_char(self.src_datatype): # raise ValueError("Can't scrub in non-text field or " # "single-character text field") # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Check indexing # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if ( self.index in (IndexType.NORMAL, IndexType.UNIQUE) and self.indexlen is None and does_sqlatype_require_index_len(dest_sqla_coltype) ): raise ValueError( f"Must specify {self.INDEXLEN} " f"to index a TEXT or BLOB field" ) # ------------------------------------------------------------------------- # Other stuff requiring config or database info # -------------------------------------------------------------------------
[docs] def set_from_src_db_info( self, src_db: str, src_table: str, src_field: str, src_datatype_sqltext: str, src_sqla_coltype: TypeEngine, dbconf: "DatabaseSafeConfig", comment: str = None, nullable: bool = True, primary_key: bool = False, table_has_explicit_pk: bool = False, table_has_candidate_pk: bool = False, ) -> None: """ Set up this DDR from a field in the source database, using options set in the config file. Used to draft a data dictionary. This is the first-draft classification of a given column, which the administrator should review and may then wish to edit. Args: src_db: Source database name. src_table: Source table name. src_field: Source field (column) name. src_datatype_sqltext: Source string SQL type, e.g. ``"VARCHAR(100)"``. src_sqla_coltype: Source SQLAlchemy column type, e.g. ``Integer()``. dbconf: A :class:`crate_anon.anonymise.config.DatabaseSafeConfig`. comment: Textual comment. nullable: Whether the source is can be NULL (True) or is NOT NULL (False). primary_key: Whether the source is marked as a primary key. table_has_explicit_pk: Whether the source database knows of a formal PK field for this table, whether or not this is it. table_has_candidate_pk: Whether the source table contains a field that matches CRATE's name detection criteria, whether or not this is it. """ self.src_db = src_db self.src_table = src_table self.src_field = src_field self.src_datatype = src_datatype_sqltext self._src_sqla_coltype = src_sqla_coltype self._not_null = not nullable self._pk = False self._add_src_hash = False self._primary_pid = False self._defines_primary_pids = False self._master_pid = False self._constant = False self._addition_only = False self.comment = comment self._from_file = False # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ddgen: Is the field special, such as a PK? # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ resembles_pk = self.matches_fielddef(dbconf.ddgen_pk_fields) if table_has_explicit_pk: if dbconf.ddgen_prefer_original_pk: # Use the database's PK. treat_as_pk = primary_key else: # The table has an explicit PK. However, we prefer to use our # name-based rules, if something matches that. if table_has_candidate_pk: # There's a name-based match in the table. Use it. treat_as_pk = resembles_pk else: # Nothing matches. So it's the original PK or nothing. treat_as_pk = primary_key else: # The table doesn't have an explicit PK, so we just follow our # name-based rules. treat_as_pk = resembles_pk if treat_as_pk: # Table primary key (e.g. arbitrary integer). self._pk = True self._constant = ( dbconf.ddgen_constant_content or self.matches_tabledef(dbconf.ddgen_constant_content_tables) ) and not self.matches_tabledef( dbconf.ddgen_nonconstant_content_tables ) self._add_src_hash = not self._constant self._addition_only = ( dbconf.ddgen_addition_only or self.matches_tabledef(dbconf.ddgen_addition_only_tables) ) and not self.matches_tabledef( dbconf.ddgen_deletion_possible_tables ) if self.matches_fielddef(dbconf.ddgen_per_table_pid_field): # PID, e.g. local hospital number. self._primary_pid = True if self.matches_tabledef(dbconf.ddgen_table_defines_pids): self._defines_primary_pids = True if self.matches_fielddef(dbconf.ddgen_master_pid_fieldname): # MPID, e.g. NHS number. self._master_pid = True if self.matches_fielddef(dbconf.ddgen_pid_defining_fieldnames): # The PID in the "chief" patient table. self._defines_primary_pids = True # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ddgen: Does it indicate the patient wishes to opt out entirely? # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if self.matches_fielddef(dbconf.ddgen_patient_opt_out_fields): self._opt_out_info = True # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ddgen: Does the field contain sensitive data? # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if ( self.master_pid or self.defines_primary_pids or ( self.primary_pid and dbconf.ddgen_add_per_table_pids_to_scrubber ) or self.matches_fielddef(dbconf.ddgen_scrubsrc_patient_fields) ): self.scrub_src = ScrubSrc.PATIENT elif self.matches_fielddef(dbconf.ddgen_scrubsrc_thirdparty_fields): self.scrub_src = ScrubSrc.THIRDPARTY elif self.matches_fielddef( dbconf.ddgen_scrubsrc_thirdparty_xref_pid_fields ): self.scrub_src = ScrubSrc.THIRDPARTY_XREF_PID else: self.scrub_src = None # type: Optional[str] # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ddgen: Is it a mandatory scrubbing field? # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if self.matches_fielddef(dbconf.ddgen_required_scrubsrc_fields): self._required_scrubber = True # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ddgen: What kind of sensitive data? Date, text, number, code? # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if not self.scrub_src: self.scrub_method = "" elif ( self.scrub_src is ScrubSrc.THIRDPARTY_XREF_PID or is_sqlatype_numeric(src_sqla_coltype) or self.matches_fielddef(dbconf.ddgen_per_table_pid_field) or self.matches_fielddef(dbconf.ddgen_master_pid_fieldname) or self.matches_fielddef(dbconf.ddgen_scrubmethod_number_fields) ): self.scrub_method = ScrubMethod.NUMERIC elif is_sqlatype_date(src_sqla_coltype) or self.matches_fielddef( dbconf.ddgen_scrubmethod_date_fields ): self.scrub_method = ScrubMethod.DATE elif self.matches_fielddef(dbconf.ddgen_scrubmethod_code_fields): self.scrub_method = ScrubMethod.CODE elif self.matches_fielddef(dbconf.ddgen_scrubmethod_phrase_fields): self.scrub_method = ScrubMethod.PHRASE else: self.scrub_method = ScrubMethod.WORDS # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ddgen: Do we want to change the destination fieldname? # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if self.primary_pid: self.dest_field = self.config.research_id_fieldname elif self.master_pid: self.dest_field = self.config.master_research_id_fieldname else: self.dest_field = src_field if dbconf.ddgen_force_lower_case: self.dest_field = self.dest_field.lower() if dbconf.ddgen_convert_odd_chars_to_underscore: self.dest_field = str(self.dest_field) # if this fails, # there's a Unicode problem self.dest_field = self.dest_field.translate(ODD_CHARS_TRANSLATE) # ... this will choke on a Unicode string # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ddgen: Do we want to change the destination field SQL type? # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if self.dest_should_be_encrypted_pid_type: self.dest_datatype = self.config.sqltype_encrypted_pid_as_sql else: self.dest_datatype = "" # ... and see also potential changes made below # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ddgen: How should we manipulate the destination? # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ extracting_text = False if self.matches_fielddef(dbconf.ddgen_truncate_date_fields): # Date truncation self._alter_methods.append( AlterMethod(config=self.config, truncate_date=True) ) # If we're truncating a date, we should also encourage a DATE # destination (as opposed to e.g. a DATETIME). self.dest_datatype = SQLTYPE_DATE elif self.matches_fielddef(dbconf.ddgen_filename_to_text_fields): # Read filename from database, read file, convert to text self._alter_methods.append( AlterMethod(config=self.config, extract_from_filename=True) ) self.dest_datatype = giant_text_sqltype(self.dest_dialect) extracting_text = True elif self.matches_fielddef(dbconf.bin2text_dict.keys()): # Read binary data from database, convert to text for binfielddef, extfield in dbconf.bin2text_dict.items(): if self.matches_fielddef(binfielddef): self._alter_methods.append( AlterMethod( config=self.config, extract_from_blob=True, extract_ext_field=extfield, ) ) self.dest_datatype = giant_text_sqltype(self.dest_dialect) extracting_text = True elif ( not self.primary_pid and not self.master_pid and not self.matches_fielddef( dbconf.ddgen_safe_fields_exempt_from_scrubbing ) and dbconf.ddgen_min_length_for_scrubbing >= 1 and is_sqlatype_text_of_length_at_least( src_sqla_coltype, dbconf.ddgen_min_length_for_scrubbing ) ): # Text field meeting the criteria to scrub self._alter_methods.append( AlterMethod(config=self.config, scrub=True) ) if extracting_text: # Scrub all extract-text fields, unless asked not to if not self.matches_fielddef( dbconf.ddgen_safe_fields_exempt_from_scrubbing ): self._alter_methods.append( AlterMethod(config=self.config, scrub=True) ) # Set skip_if_text_extract_fails flag? if self.matches_fielddef( dbconf.ddgen_skip_row_if_extract_text_fails_fields ): self._alter_methods.append( AlterMethod( config=self.config, skip_if_text_extract_fails=True ) ) for fieldspec, cfg_section in dbconf.ddgen_extra_hash_fields.items(): # Hash something using an "extra" hasher. if self.matches_fielddef(fieldspec): self._alter_methods.append( AlterMethod( config=self.config, hash_=True, hash_config_section=cfg_section, ) ) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ddgen: Manipulate the destination table name? # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # https://stackoverflow.com/questions/10017147 self.dest_table = src_table if dbconf.ddgen_force_lower_case: self.dest_table = self.dest_table.lower() if dbconf.ddgen_convert_odd_chars_to_underscore: self.dest_table = str(self.dest_table) # ... if this fails, there's a Unicode problem self.dest_table = self.dest_table.translate(ODD_CHARS_TRANSLATE) for suffix in dbconf.ddgen_rename_tables_remove_suffixes: if self.dest_table.endswith(suffix): self.dest_table = self.dest_table[: -len(suffix)] # remove it break # only remove one suffix! # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ddgen: Should we index the destination? # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ dest_sqla_type = self.dest_sqla_coltype if self._pk: self.index = IndexType.UNIQUE elif ( self.primary_pid or self.master_pid or self.defines_primary_pids or self.dest_field == self.config.research_id_fieldname ): self.index = IndexType.NORMAL elif ( dbconf.ddgen_allow_fulltext_indexing and does_sqlatype_merit_fulltext_index( src_sqla_coltype, min_length=dbconf.ddgen_freetext_index_min_length, ) ): self.index = IndexType.FULLTEXT elif self.matches_fielddef(dbconf.ddgen_index_fields): self.index = IndexType.NORMAL else: self.index = IndexType.NONE self.indexlen = ( DEFAULT_INDEX_LEN if ( self.index != IndexType.NONE and self.index != IndexType.FULLTEXT and does_sqlatype_require_index_len(dest_sqla_type) ) else None ) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ddgen: Should we omit it (at least until a human has checked the DD)? # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # In descending order of priority: if self.matches_fielddef(dbconf.ddgen_omit_fields): # explicit # Explicit omission trumps everything else # (There are rare occasions with "additional" databases where we # may want to omit a PK/PID/MPID field.) self.omit = True elif self._pk or self.primary_pid or self.master_pid: # We always want PKs, and the translated PID/MPID (RID+TRID or # MRID respectively). self.omit = False elif bool(self.scrub_src): # Scrub-source fields are generally sensitive and therefore worthy # of omission, EXCEPT that if a date is marked for truncation, the # user probably wants it (truncated) to come through! if any(am.truncate_date for am in self._alter_methods): self.omit = False else: self.omit = True elif self.matches_fielddef(dbconf.ddgen_include_fields): # explicit # Explicit inclusion next. self.omit = False else: self.omit = dbconf.ddgen_omit_by_default
[docs] def set_as_table_comment( self, src_db: str, src_table: str, comment: str ) -> None: """ Set up this DDR as a special table-comment row. (Used in data dictionary drafting.) Args: src_db: Source database name. src_table: Source table name. comment: Textual comment. """ self.src_db = src_db self.src_table = src_table self.src_field = "" self.dest_table = src_table self.comment = comment self.omit = False self._from_file = False