"""
crate_anon/anonymise/altermethod.py
===============================================================================
Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CRATE.
CRATE is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CRATE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CRATE. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
**The AlterMethod class.**
"""
import datetime
import html
import logging
import os
import traceback
from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING
from cardinal_pythonlib.datetimefunc import (
coerce_to_date,
truncate_date_to_first_of_month,
)
from cardinal_pythonlib.extract_text import (
document_to_text,
TextProcessingConfig,
)
import regex
# don't import config: circular dependency would have to be sorted out
from crate_anon.anonymise.constants import AlterMethodType
if TYPE_CHECKING:
from cardinal_pythonlib.hash import GenericHasher
from crate_anon.anonymise.config import Config
from crate_anon.anonymise.ddr import DataDictionaryRow
# import patient to avoid circular import when generating docs
from crate_anon.anonymise import patient
log = logging.getLogger(__name__)
# =============================================================================
# Constants
# =============================================================================
HTML_TAG_RE = regex.compile("<[^>]*>")
# =============================================================================
# AlterMethod
# =============================================================================
[docs]class AlterMethod:
"""
Implements a SINGLE transformation of source data on its way to the
destination database.
Knows how to represent itself as a text element in the relevant column of
a data dictionary row, and how to create itself from one of those text
elements.
A :class:`crate_anon.anonymise.ddr.DataDictionaryRow` may include multiple
instances of :class:`crate_anon.anonymise.altermethod.AlterMethod` in a
sequence.
"""
[docs] def __init__(
self,
config: "Config",
text_value: str = None,
scrub: bool = False,
truncate_date: bool = False,
extract_from_filename: bool = False,
extract_from_file_format: bool = False, # new in v0.18.18
file_format_str: str = "", # new in v0.18.18
extract_from_blob: bool = False,
skip_if_text_extract_fails: bool = False,
extract_ext_field: str = "",
hash_: bool = False,
hash_config_section: str = "",
# html_escape: bool = False,
html_unescape: bool = False,
html_untag: bool = False,
) -> None:
"""
Args:
config:
a :class:`crate_anon.anonymise.config.Config`
text_value:
string (from the data dictionary) to parse via
:func:`set_from_text`; may set many of the other attributes
scrub:
Boolean; "the source field contains sensitive text; scrub it"
truncate_date:
Boolean; "the source is a date; truncate it to the first of the
month"
extract_from_filename:
Boolean; "the source is a filename; extract the text from it"
extract_from_file_format:
Boolean; "the source is a partial filename; combine it with
``file_format_str`` to calculate the full filename, then
extract the text from it"
file_format_str:
format string for use with ``extract_from_file_format``
extract_from_blob:
Boolean; "the source is binary (the database contains a BLOB);
extract text from it". See also ``extract_ext_field``.
skip_if_text_extract_fails:
Boolean: "if text extraction fails, skip the record entirely"
extract_ext_field:
For when the database contains a BLOB: this parameter indicates
a database column (field) name, in the same row, that contains
the file's extension, to help identify the BLOB.
hash_:
Boolean. If true, transform the source by hashing it.
hash_config_section:
If ``hash_`` is true, this specifies the config section in
which the hash is defined.
html_unescape:
Boolean: "transform the source by HTML-unescaping it". For
example, this would convert ``≤`` to ``<``.
html_untag:
Boolean: "transform the source by removing HTML tags". For
example, this would convert ``hello <b>bold</b> world`` to
``hello bold world``.
"""
self.config = config
self.scrub = scrub
self.truncate_date = truncate_date
self.extract_from_blob = extract_from_blob
self.extract_from_filename = extract_from_filename
self.extract_from_file_format = extract_from_file_format
self.file_format_str = file_format_str
self.skip_if_text_extract_fails = skip_if_text_extract_fails
self.extract_ext_field = extract_ext_field
self.hash = hash_
self.hash_config_section = hash_config_section
self.hasher = None # type: Optional[GenericHasher]
# self.html_escape = html_escape
self.html_unescape = html_unescape
self.html_untag = html_untag
self.extract_text = (
extract_from_filename
or extract_from_file_format
or extract_from_blob
)
if text_value is not None:
self.set_from_text(text_value)
if hash_:
self.hasher = self.config.get_extra_hasher(
self.hash_config_section
)
self._assert_valid()
# -------------------------------------------------------------------------
# Text representations
# -------------------------------------------------------------------------
[docs] def set_from_text(self, value: str) -> None:
"""
Take the string from the ``alter_method`` field of the data dictionary,
and use it to set a bunch of internal attributes.
To get the configuration string back, see :func:`get_text`.
"""
self.scrub = False
self.truncate_date = False
self.extract_text = False
self.extract_from_blob = False
self.extract_from_file_format = False
self.file_format_str = ""
self.extract_from_filename = False
self.skip_if_text_extract_fails = False
self.extract_ext_field = ""
self.hash = False
self.hash_config_section = ""
def get_second_part(missing_description: str) -> str:
if "=" not in value:
raise ValueError(f"Bad format for alter method: {value}")
secondhalf = value[value.index("=") + 1 :]
if not secondhalf:
raise ValueError(
f"Missing {missing_description} in alter method: {value}"
)
return secondhalf
if value == AlterMethodType.TRUNCATEDATE.value:
self.truncate_date = True
elif value == AlterMethodType.SCRUBIN.value:
self.scrub = True
elif value.startswith(AlterMethodType.BINARY_TO_TEXT.value):
self.extract_text = True
self.extract_from_blob = True
self.extract_ext_field = get_second_part(
"filename/extension field"
)
elif value.startswith(AlterMethodType.FILENAME_FORMAT_TO_TEXT.value):
self.extract_text = True
self.extract_from_file_format = True
self.file_format_str = get_second_part("filename format field")
elif value == AlterMethodType.FILENAME_TO_TEXT.value:
self.extract_text = True
self.extract_from_filename = True
elif value == AlterMethodType.SKIP_IF_TEXT_EXTRACT_FAILS.value:
self.skip_if_text_extract_fails = True
elif value.startswith(AlterMethodType.HASH.value):
self.hash = True
self.hash_config_section = get_second_part("hash config section")
self.hasher = self.config.get_extra_hasher(
self.hash_config_section
)
# elif value == ALTERMETHOD.HTML_ESCAPE:
# self.html_escape = True
elif value == AlterMethodType.HTML_UNESCAPE.value:
self.html_unescape = True
elif value == AlterMethodType.HTML_UNTAG.value:
self.html_untag = True
else:
raise ValueError(f"Bad alter_method part: {value}")
@property
def as_text(self) -> str:
"""
Return the ``alter_method`` fragment from the working fields;
effectively the reverse of :func:`set_from_text`.
"""
def two_part(altermethod: str, parameter: str) -> str:
return altermethod + "=" + parameter
if self.truncate_date:
return AlterMethodType.TRUNCATEDATE.value
if self.scrub:
return AlterMethodType.SCRUBIN.value
if self.extract_text:
if self.extract_from_blob:
return two_part(
AlterMethodType.BINARY_TO_TEXT.value,
self.extract_ext_field,
)
elif self.extract_from_file_format:
return two_part(
AlterMethodType.FILENAME_FORMAT_TO_TEXT.value,
self.file_format_str,
)
else: # plain filename
return AlterMethodType.FILENAME_TO_TEXT.value
if self.skip_if_text_extract_fails:
return AlterMethodType.SKIP_IF_TEXT_EXTRACT_FAILS.value
if self.hash:
return two_part(
AlterMethodType.HASH.value, self.hash_config_section
)
# if self.html_escape:
# return ALTERMETHOD.HTML_ESCAPE.value
if self.html_unescape:
return AlterMethodType.HTML_UNESCAPE.value
if self.html_untag:
return AlterMethodType.HTML_UNTAG.value
return ""
# -------------------------------------------------------------------------
# Validation
# -------------------------------------------------------------------------
def _assert_valid(self) -> None:
"""
Raises :exc:`ValueError` if the method is invalid (e.g. representing
more than one transformation).
"""
methods_map = {
"scrub": self.scrub,
"truncate_date": self.truncate_date,
"extract_text": self.extract_text,
"hash": self.hash,
"html_unescape": self.html_unescape,
"html_untag": self.html_untag,
"skip_if_text_extract_fails": self.skip_if_text_extract_fails,
}
n_methods = sum(int(v) for v in methods_map.values())
if n_methods != 1:
raise ValueError(
f"AlterMethod: should be exactly one method, but "
f"there are {n_methods}: {methods_map}"
)
# -------------------------------------------------------------------------
# Perform the transformation: master method
# -------------------------------------------------------------------------
[docs] def alter(
self,
value: Any,
ddr: "DataDictionaryRow", # corresponding DataDictionaryRow
row: List[Any], # all values in row
ddrows: List["DataDictionaryRow"], # all of them
patient: "patient.Patient" = None,
) -> Tuple[Any, bool]:
"""
Performs the alteration.
Args:
value:
source value of interest
ddr:
corresponding
:class:`crate_anon.anonymise.ddr.DataDictionaryRow`
row:
all values in the same source row
ddrows:
all data dictionary rows
patient:
:class:`crate_anon.anonymise.patient.Patient` object
Returns:
tuple: ``newvalue, skiprow``
If multiple transformations are specified within one
:class:`AlterMethod`, only one is performed, and in the following
order:
#. scrub
#. truncate_date
#. extract_text
#. hash
#. html_unescape
#. html_untag
#. skip_if_text_extract_fails
However, multiple alteration methods can be specified for one field.
See :func:`crate_anon.anonymise.anonymise.process_table` and
:class:`crate_anon.anonymise.ddr.DataDictionaryRow`.
"""
if self.scrub:
return self._scrub_func(value, patient), False
if self.truncate_date:
return self._truncate_date_func(value), False
if self.extract_text:
value, extracted = self._extract_text_func(value, row, ddrows)
if not extracted and ddr.skip_row_if_extract_text_fails:
log.debug("Skipping row as text extraction failed")
return None, True
return value, False
if self.hash:
assert self.hasher is not None
return self.hasher.hash(value), False
# if alter_method.html_escape:
# return html.escape(value), False
if self.html_unescape:
return html.unescape(value), False
if self.html_untag:
return self._html_untag_func(value), False
if self.skip_if_text_extract_fails:
# Modifies other alter methods; doesn't do anything itself
return value, True
# -------------------------------------------------------------------------
# Transformation internals
# -------------------------------------------------------------------------
@staticmethod
def _scrub_func(value: Any, patient: "patient.Patient") -> Optional[str]:
"""
Takes a source value and scrubs it.
**Main point of anonymisation within CRATE.**
Args:
value: source data
patient: :class:`crate_anon.anonymise.patient.Patient` object
Returns:
scrubbed data
"""
if value is None:
return None
return patient.scrub(str(value))
@staticmethod
def _truncate_date_func(value: Any) -> Optional[datetime.date]:
"""
Truncates a date-like object to the first of the month.
"""
try:
value = coerce_to_date(value)
return truncate_date_to_first_of_month(value)
except (ValueError, OverflowError):
log.warning(
f"Invalid date received to "
f"{AlterMethodType.TRUNCATEDATE} method: {value}"
)
return None
@staticmethod
def _html_untag_func(text: str) -> str:
"""
Removes HTML tags.
"""
# Lots of ways...
# -- xml.etree, for well-formed XML
# https://stackoverflow.com/questions/9662346
# return ''.join(xml.etree.ElementTree.fromstring(text).itertext())
# -- html.parser
# https://stackoverflow.com/questions/753052
# -- lxml (but needs source build on Windows):
# http://www.neuraladvance.com/removing-html-from-python-strings.html
# http://lxml.de/
# -- regex/re
# https://stackoverflow.com/questions/3662142
# ... as below.
return HTML_TAG_RE.sub("", text)
def _extract_text_func(
self, value: Any, row: List[Any], ddrows: List["DataDictionaryRow"]
) -> Tuple[Optional[str], bool]:
"""
Take a field's value and return extracted text, for file-related
fields, where the DD row indicated that this field contains a filename
or a BLOB.
Args:
value: source field contents
row: all values in the same source row
ddrows: all data dictionary rows
Returns:
tuple: ``value, extracted``
"""
use_filename = False
filename = None
blob = None
# Work out either a full filename, or a BLOB.
# Set either use_filename + filename + extension, or blob + extension.
if self.extract_from_filename:
# The database contains a plain and full filename.
use_filename = True
filename = value
_, extension = os.path.splitext(filename)
log.info(f"extract_text: disk file, filename={filename!r}")
elif self.extract_from_file_format:
# The database contains a filename. However, it may not be a full
# path. For example, in RiO, we have fields like
# dbo.ClientDocument.Path, e.g. '1-1-20121023-1000001-LET.pdf'
# dbo.ClientDocument.ClientID, e.g. '1000001-LET.pdf'
# and the disk file might be
# C:\some_base_directory\1000001\Docs\1-1-20121023-1000001-LET.pdf
# We could specify this as a file spec:
# "C:\some_base_directory\{ClientID}\{Path}".
# In principle, this might need to be field-specific, so it could
# go in the data dictionary (rather than as a setting that's
# constant across an entire anonymisation run).
# Let's introduce ALTERMETHOD.FILENAME_FORMAT_TO_TEXT, in v0.18.18.
#
# Create a dictionary of column name -> value
ffdict = {} # type: Dict[str, Any]
for i, ddr in enumerate(ddrows):
ffdict[ddr.src_field] = row[i]
# Use that dictionary with the format string to make the filename
log.debug(
f"extract_text: file_format_str={self.file_format_str!r}, "
f"ffdict={ffdict!r}"
)
use_filename = True
filename = self.file_format_str.format(**ffdict)
_, extension = os.path.splitext(filename)
log.info(f"extract_text: disk file, filename={filename!r}")
else:
# The database contains the BLOB itself. However, we'd also like to
# know the file type, here from its extension. We look for another
# field that contains the extension, marked as such using
# alter_method.extract_ext_field in the data dictionary.
blob = value
extindex = next(
(
i
for i, ddr in enumerate(ddrows)
if ddr.src_field == self.extract_ext_field
),
None,
)
if extindex is None:
# Configuration error
raise ValueError(
f"Bug: missing extension field for "
f"alter_method={self.as_text}"
)
extension = row[extindex]
log.info(f"extract_text: database BLOB, extension={extension}")
# Is it a permissible file type?
if not self.config.extract_text_extension_permissible(extension):
log.info(f"Extension {extension!r} not permissible; skipping")
return None, False
if use_filename:
if not filename:
log.error("No filename; skipping")
return None, False
if not os.path.isfile(filename):
log.error(f"Filename {filename!r} is not a file; skipping")
return None, False
# Extract text from the file (given its filename), or from a BLOB.
try:
textconfig = TextProcessingConfig(
plain=self.config.extract_text_plain,
width=self.config.extract_text_width,
)
value = document_to_text(
filename=filename,
blob=blob,
extension=extension,
config=textconfig,
)
except Exception as e:
# Runtime error
traceback.print_exc() # full details, please
log.error(f"Caught exception from document_to_text: {e}")
return None, False
return value, True