r"""
crate_anon/linkage/helpers.py
===============================================================================
Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CRATE.
CRATE is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CRATE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CRATE. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
**Helper functions for linkage tools.**
Avoid using pickle for caching; it is insecure (arbitrary code execution).
"""
# =============================================================================
# Imports
# =============================================================================
from argparse import ArgumentTypeError
from contextlib import contextmanager, ExitStack
from io import StringIO, TextIOWrapper
import logging
from math import log as math_ln
import os
import random
import re
import string
from typing import (
Any,
Dict,
Generator,
List,
Optional,
Set,
Tuple,
Type,
TYPE_CHECKING,
Union,
)
import unicodedata
from zipfile import ZipFile
import regex
from cardinal_pythonlib.datetimefunc import coerce_to_pendulum_date
from cardinal_pythonlib.fileops import mkdir_p
from fuzzy import DMetaphone
from numba import jit
from pendulum import Date
from pendulum.parsing.exceptions import ParserError
from crate_anon.anonymise.anonregex import get_uk_postcode_regex_string
from crate_anon.common.logfunc import warn_once
from crate_anon.common.regex_helpers import anchor
from crate_anon.linkage.constants import (
FuzzyDefaults,
MANGLE_PRETRANSLATE,
MINUS_INFINITY,
NONE_TYPE,
SAFE_UPPER_PRETRANSLATE,
SIMPLIFY_PUNCTUATION_WHITESPACE_TRANS,
)
if TYPE_CHECKING:
from crate_anon.linkage.matchconfig import MatchConfig
log = logging.getLogger(__name__)
# =============================================================================
# Notes
# =============================================================================
_ = """
Geography
---------
[COVERED IN THE PAPER. FURTHER DETAIL HERE.]
UK postcodes have this format (A letter, 9 digit, plus Wikipedia concrete
examples;
https://en.wikipedia.org/wiki/Postcodes_in_the_United_Kingdom#Formatting):
+------------+----------------+------------+----------+
| Postcode |
+------------+----------------+------------+----------+
| Outward code | Inward code |
+------------+----------------+------------+----------+
| Area (1-2) | District (1-2) | Sector (1) | Unit (2) |
| (A, AA) | (9, 99, 9A) | (9) | (AA) |
+------------+----------------+------------+----------+
| AA | 9A | 9 | AA |
| SW | 1W | 0 | NY |
| EC | 9A | 9 | AA |
+------------+----------------+------------+----------+
| A | 9A | 9 | AA |
| W | 1A | 0 | AX |
+------------+----------------+------------+----------+
| A | 9 | 9 | AA |
| M | 1 | 1 | AE |
+------------+----------------+------------+----------+
| A | 99 | 9 | AA |
| B | 33 | 8 | TH |
+------------+----------------+------------+----------+
| AA | 9 | 9 | AA |
| CR | 2 | 6 | XH |
+------------+----------------+------------+----------+
| AA | 99 | 9 | AA |
| DN | 55 | 1 | PT |
+------------+----------------+------------+----------+
Then there are "special cases" that don't fit, but they are mostly
extraterritorial. See
https://en.wikipedia.org/wiki/Postcodes_in_the_United_Kingdom#Special_cases.
UK census geography is described at
https://www.ons.gov.uk/methodology/geography/ukgeographies/censusgeography.
The most important unit for our purposes is the Output Area (OA), the smallest
unit, which is made up of an integer number of postcode units.
So an OA is bigger than a postcode unit. But is it bigger or smaller than a
postcode sector? Smaller, I think.
- https://data.gov.uk/dataset/7f4e1818-4305-4962-adc4-e4e3effd7784/output-area-to-postcode-sector-december-2011-lookup-in-england-and-wales
- this allows you to look up *from* output area *to* postcode sector, implying
that postcode sectors must be larger.
""" # noqa: E501
# =============================================================================
# Metaphones
# =============================================================================
dmeta = DMetaphone()
_ = """
For a sense of metaphones:
>>> dmeta("Rudolf")
[b'RTLF', None]
>>> dmeta("Cardinal")
[b'KRTN', None]
>>> dmeta("Supercalifragilistic")
[b'SPRK', None]
>>> dmeta("Christopher")
[b'KRST', None]
>>> dmeta("Chris")
[b'KRS', None]
>>> dmeta("C")
[b'K', None]
>>> dmeta("Philip")
[b'FLP', None]
>>> dmeta("Phil")
[b'FL', None]
>>> dmeta("Phi")
[b'F', None]
>>> dmeta("Knuth") # https://stackabuse.com/phonetic-similarity-of-words-a-vectorized-approach-in-python/
[b'N0', b'NT']
>>> dmeta("Clérambault") # raises UnicodeEncodeError
""" # noqa: E501
# =============================================================================
# For caching
# =============================================================================
[docs]def mkdir_for_filename(filename: str) -> None:
"""
Ensures that a directory exists for the filename.
"""
assert filename
mkdir_p(os.path.dirname(filename))
# =============================================================================
# Reading from file or zipped file
# =============================================================================
[docs]@contextmanager
def open_even_if_zipped(filename: str) -> Generator[StringIO, None, None]:
"""
Yields (as a context manager) a text file, opened directly or through a
ZIP file (distinguished by its extension) containing that file.
"""
is_zip = os.path.splitext(filename)[1].lower() == ".zip"
with ExitStack() as stack:
if is_zip:
log.info(f"Reading ZIP file: {filename}")
z = stack.enter_context(ZipFile(filename)) # type: ZipFile
contents = z.infolist()
if not contents:
raise ValueError("ZIP file is empty")
first_file = contents[0]
log.info(f"Within ZIP, reading: {first_file.filename}")
# noinspection PyTypeChecker
binary_file = stack.enter_context(z.open(first_file))
f = TextIOWrapper(binary_file)
else:
log.info(f"Reading file: {filename}")
# noinspection PyTypeChecker
f = stack.enter_context(open(filename, "rt"))
yield f
log.debug(f"... finished reading: {filename}")
# =============================================================================
# Name manipulation
# =============================================================================
REMOVE_PUNCTUATION_SPACE_TABLE = str.maketrans("", "", string.punctuation)
# ... the three-argument version of str.maketrans removes anything in the third
# category. The object returned is a dictionary mapping integer ASCII values
# to replacement character values (or None).
REMOVE_PUNCTUATION_SPACE_TABLE[ord(" ")] = None # also remove spaces
NONWORD_REGEX = regex.compile(r"\W")
ONE_OR_MORE_SPACE_REGEX = regex.compile(r"\s+")
[docs]def mangle_unicode_to_ascii(s: Any) -> str:
"""
Mangle unicode to ASCII, losing accents etc. in the process.
This is a slightly different version to that in cardinal_pythonlib, because
the Eszett gets a rough ride:
.. code-block:: python
"Straße Clérambault".encode("ascii", "ignore") # b'Strae Clerambault'
So we add the ``MANGLE_PRETRANSLATE`` step.
"""
if s is None:
return ""
if not isinstance(s, str):
s = str(s)
return (
unicodedata.normalize("NFKD", s)
.translate(MANGLE_PRETRANSLATE)
.encode("ascii", "ignore") # gets rid of accents
.decode("ascii") # back to a string
)
[docs]def safe_upper(name: str) -> str:
"""
Convert to upper case, but don't mess up a few specific accents. Note that:
- 'ß'.upper() == 'SS' but 'ẞ'.upper() == 'ẞ'
... here, we will use an upper-case Eszett, and the "SS" will be dealt with
through transliteration.
"""
return name.translate(SAFE_UPPER_PRETRANSLATE).upper()
[docs]def remove_redundant_whitespace(x: str) -> str:
"""
Strip at edges; remove double-spaces; remove any other whitespace by a
single space.
"""
return ONE_OR_MORE_SPACE_REGEX.sub(" ", x).strip()
[docs]def simplify_punctuation_whitespace(x: str) -> str:
"""
Simplify punctuation and whitespace, e.g. curly to straight quotes, tab to
space, en dash to hyphen, etc.
"""
return x.translate(SIMPLIFY_PUNCTUATION_WHITESPACE_TRANS)
[docs]def standardize_name(name: str) -> str:
"""
Converts a name to a standard form: upper case (will also e.g. translate
Eszett to SS), no spaces, no punctuation.
This is the format used by the US surname database, e.g. ACOSTAPEREZ for
(probably) Acosta Perez, and just PEREZ without e.g. PÉREZ.
We use this for our name frequency databases. For other purposes, we use
a more sophisticated approach; see e.g. surname_alternative_fragments().
Examples: see unit tests.
"""
return mangle_unicode_to_ascii(
name.upper().translate(REMOVE_PUNCTUATION_SPACE_TABLE)
)
def _gen_name_versions(
x: str,
accent_transliterations: Dict[
int, Union[str, int, None]
] = FuzzyDefaults.ACCENT_TRANSLITERATIONS_TRANS,
) -> Generator[str, None, None]:
"""
Generate the string itself and accent-mangled and accent-transliterated
versions thereof. We assume that either nothing happens, mangling
happens, or transliteration happens, but not some nasty combination.
"""
x = x.strip()
if not x:
return
# The string:
yield x
# Mangled, e.g. Ü to U:
yield mangle_unicode_to_ascii(x)
# Transliterated, e.g. Ü to UE.
yield x.translate(accent_transliterations)
[docs]def surname_alternative_fragments(
surname: str,
accent_transliterations: Dict[
int, Union[str, int, None]
] = FuzzyDefaults.ACCENT_TRANSLITERATIONS_TRANS,
nonspecific_name_components: Set[
str
] = FuzzyDefaults.NONSPECIFIC_NAME_COMPONENTS,
) -> List[str]:
"""
Return a list of fragments that may occur as substitutes for the name
(including the name itself). Those fragments include:
- Parts of double-barrelled surnames.
- ASCII-mangled versions of accents (e.g. Ü to U).
- Transliterated versions of accents (e.g. Ü to UE).
Upper case will be used throughout.
Args:
surname:
The name to process. This should contain all original accents,
spacing, and punctuation (i.e. should NOT have been standardized as
above). Case is unimportant (we will use upper case internally).
accent_transliterations:
A mapping from accents to potential transliterated versions, in the
form of a Python string translation table.
nonspecific_name_components:
Name fragments that should not be produced in their own right, e.g.
nobiliary particles such as "van" in "van Beethoven".
Returns:
A list of fragments: full name first, then other fragments in
alphabetical order.
"""
if not surname:
# No name, nothing to do.
return []
# Very basic standardization first: upper case, sort out punctuation.
surname = safe_upper(simplify_punctuation_whitespace(surname))
# Split into word chunks:
chunks = list(filter(None, NONWORD_REGEX.split(surname)))
# Filtering is required, e.g. "hello ' world" -> ['hello', '', '', 'world']
# Now make a standardized version of the name where punctuation/spaces have
# been removed (as per the US name databases also):
surname = "".join(chunks)
# Create a set of fragments. The set will de-duplicate.
fragments = set() # type: Set[str]
# The name itself, and any accent-modified versions:
fragments.update(_gen_name_versions(surname, accent_transliterations))
# Components, and any accent-modified versions:
for chunk in chunks:
# All should be pre-stripped and none should be blank
if chunk in nonspecific_name_components:
continue
fragments.update(_gen_name_versions(chunk, accent_transliterations))
# This process may well have worked through duplicates, but the set will
# take care of those.
# Return the (standardized) name first.
return [surname] + sorted(fragments - {surname})
[docs]def get_first_two_char(x: str) -> str:
"""
Returns the first two characters of a string. Having this as a function is
slight overkill.
"""
return x[:2]
# =============================================================================
# Postcode manipulation
# =============================================================================
POSTCODE_REGEX = re.compile(
anchor(get_uk_postcode_regex_string(at_word_boundaries_only=False))
# Need at_word_boundaries_only=True.
# We don't want at_word_boundaries_only=True, since that matches e.g.
# "VALID_POSTCODE JUNK". We want anchor() instead.
)
[docs]def standardize_postcode(postcode_unit_or_sector: str) -> str:
"""
Standardizes postcodes to "no space" format.
"""
return postcode_unit_or_sector.upper().translate(
REMOVE_PUNCTUATION_SPACE_TABLE
)
[docs]def get_postcode_sector(
postcode_unit: str, prestandardized: bool = False
) -> str:
"""
Returns the postcode (area + district +) sector from a full postcode. For
example, converts "AB12 3CD" to "AB12 3".
While the format and length of the first part (area + district) varies (2-4
characters), the format of the second (sector + unit) is fixed, of the
format "9AA" (3 characters);
https://en.wikipedia.org/wiki/Postcodes_in_the_United_Kingdom#Formatting.
So to get the sector, we chop off the last two characters.
"""
if not prestandardized:
postcode_unit = standardize_postcode(postcode_unit)
return postcode_unit[:-2]
# noinspection HttpUrlsUsage
_ = """
PSEUDO_POSTCODES = set(standardize_postcode(p) for p in (
"ZZ99 3VZ", # No fixed abode [1, 2]
"ZZ99 3WZ", # Address not known [2]
"ZZ99 3CZ", # England/U.K, not otherwise specified [1, 3] (*)
# ... or "Z99 3CZ"? [2] (*).
"ZZ99 3GZ", # Wales, not otherwise specified [1, 2]
"ZZ99 1WZ", # Scotland, not otherwise specified [1, 2]
"ZZ99 2WZ", # Northern Ireland, not otherwise specified [1, 2]
# Also: ZZ99 <nnn>, where <nnn> is a country code -- so that's a large
# range.
# [1] http://www.datadictionary.wales.nhs.uk/index.html#!WordDocuments/postcode.htm
# [2] https://www.england.nhs.uk/wp-content/uploads/2021/03/commissioner-assignment-method-2122-guidance-v1.1.pdf
# [3] https://afyonluoglu.org/PublicWebFiles/Reports-TR/Veri%20Sozlugu/international/2017-HES%20Admitted%20Patient%20Care%20Data%20Dictionary.pdf
# (*) [2] uses "Z99 3CZ" (page 6); [1, 3] use "ZZ99 3CZ".
))
PSEUDO_POSTCODE_SECTORS = set(get_postcode_sector(p) for p in PSEUDO_POSTCODES)
""" # noqa: E501
PSEUDO_POSTCODE_START = "ZZ99"
PSEUDOPOSTCODE_NFA = "ZZ993VZ" # no fixed abode
[docs]def is_pseudopostcode(
postcode_unit: str, prestandardized: bool = False
) -> bool:
"""
Is this a pseudopostcode?
"""
if not prestandardized:
postcode_unit = standardize_postcode(postcode_unit)
return postcode_unit.startswith(PSEUDO_POSTCODE_START)
[docs]def is_nfa_postcode(postcode_unit: str, prestandardized: bool = False) -> bool:
"""
Is this the pseudopostcode meaning "no fixed abode"?
"""
if not prestandardized:
postcode_unit = standardize_postcode(postcode_unit)
return postcode_unit == PSEUDOPOSTCODE_NFA
# =============================================================================
# Functions to introduce errors (for testing)
# =============================================================================
[docs]def mutate_name(name: str) -> str:
"""
Introduces typos into a (standardized, capitalized,
no-space-no-punctuation) name.
"""
n = len(name)
a = ord("A")
z = ord("Z")
which = random.randrange(n)
start_ord = ord(name[which])
while True:
replacement_ord = random.randint(a, z)
if replacement_ord != start_ord:
break
return name[:which] + chr(replacement_ord) + name[which + 1 :]
[docs]def mutate_postcode(postcode: str, cfg: "MatchConfig") -> str:
"""
Introduces typos into a UK postcode, keeping the letter/digit format.
Args:
postcode: the postcode to alter
cfg: the main :class:`MatchConfig` object
"""
n = len(postcode)
a = ord("A")
z = ord("Z")
zero = ord("0")
nine = ord("9")
while True:
while True:
which = random.randrange(n)
if postcode[which] != " ":
break
# noinspection PyUnboundLocalVariable
start_ord = ord(postcode[which])
replacement_ord = start_ord
if postcode[which].isdigit():
while replacement_ord == start_ord:
replacement_ord = random.randint(zero, nine)
else:
while replacement_ord == start_ord:
replacement_ord = random.randint(a, z)
mutated = (
postcode[:which] + chr(replacement_ord) + postcode[which + 1 :]
)
if cfg.is_valid_postcode(mutated):
return mutated
# =============================================================================
# Faster maths
# =============================================================================
[docs]@jit(nopython=True)
def ln(x: float) -> float:
"""
Version of :func:`math.log` that treats log(0) as ``-inf``, rather than
crashing with ``ValueError: math domain error``.
Args:
x: parameter
Returns:
float: ln(x), the natural logarithm of x
"""
# noinspection PyBroadException
try:
return math_ln(x)
except Exception: # numba.jit can only cope with Exception
if x < 0:
raise ValueError("Can't take log of a negative number")
# Either x > 0 but causing problems anyway (unlikely), or x == 0.
return MINUS_INFINITY
[docs]@jit(nopython=True)
def log_posterior_odds_from_pdh_pdnh(
log_prior_odds: float, p_d_given_h: float, p_d_given_not_h: float
) -> float:
r"""
Calculates posterior odds.
Fast implementation.
Args:
log_prior_odds:
log prior odds of H, :math:`ln(\frac{ P(H) }{ P(\neg H) })`
p_d_given_h:
:math:`P(D | H)`
p_d_given_not_h:
:math:`P(D | \neg H)`
Returns:
float:
log posterior odds of H,
:math:`ln(\frac{ P(H | D) }{ P(\neg H | D) })`
"""
return log_prior_odds + ln(p_d_given_h) - ln(p_d_given_not_h)
[docs]@jit(nopython=True)
def log_likelihood_ratio_from_p(
p_d_given_h: float, p_d_given_not_h: float
) -> float:
r"""
Calculates the log of the odds ratio.
Fast implementation.
Args:
p_d_given_h:
:math:`P(D | H)`
p_d_given_not_h:
:math:`P(D | \neg H)`
Returns:
float:
log likelihood ratio,
:math:`ln(\frac{ P(D | H) }{ P(D | \neg H) })`
"""
return ln(p_d_given_h) - ln(p_d_given_not_h)
# =============================================================================
# Read and check the type of dictionary values
# =============================================================================
[docs]def getdictval(
d: Dict[str, Any],
key: str,
type_: Type,
mandatory: bool = False,
default: Any = None,
) -> Any:
"""
Returns a value from a dictionary, or raises ValueError.
- If ``mandatory`` is True, the key must be present, and the value must not
be ``None`` or a blank string.
- If ``mandatory`` is False and the key is absent, ``default`` is returned.
- The value must be of type `type_` (or ``None`` if permitted).
"""
try:
v = d[key]
except KeyError:
if mandatory:
raise ValueError(f"Missing key: {key}")
else:
return default
if mandatory and (v is None or v == ""):
raise ValueError(f"Missing or blank value: {key}")
if not isinstance(v, (type_, NONE_TYPE)):
raise ValueError(
f"Value for {key!r} should be of type {type_} "
f"but was of type {type(v)}; was {v!r}"
)
return v
[docs]def validate_prob(p: float, description: str) -> None:
"""
Checks a probability is in the range [0, 1] or raises :exc:`ValueError`.
"""
if not 0 <= p <= 1:
raise ValueError(
f"Bad probability for {description}: {p} "
f"-- must be in range [0, 1]"
)
[docs]def validate_uncertain_prob(p: float, description: str) -> None:
"""
Checks a probability is in the range (0, 1) or raises :exc:`ValueError`.
"""
if not 0 < p < 1:
raise ValueError(
f"Bad probability for {description}: {p} "
f"-- must be in range (0, 1)"
)
[docs]def getdictprob(
d: Dict[str, Any],
key: str,
mandatory: bool = False,
default: Optional[float] = None,
) -> Optional[float]:
"""
As for :func:`getdictval` but returns a probability and checks that it is
in range. The default is non-mandatory, returning None.
"""
v = getdictval(d, key, float, mandatory=mandatory, default=default)
if v is None:
return None
validate_prob(v, key)
return v
# =============================================================================
# Dates
# =============================================================================
ISO_DATE_REGEX = re.compile(
# yyyy-MM-dd, from the year 0000 onwards.
r"^\d{4}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[12][0-9]|3[01])$"
# ^^^^^ ^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^
# year month day
)
# Also: https://stackoverflow.com/questions/3143070
[docs]def age_years(dob: Optional[Date], when: Optional[Date]) -> Optional[int]:
"""
A person's age in years when something happened, or ``None`` if either
DOB or the index date is unknown.
"""
if dob and when:
return (when - dob).in_years()
return None
[docs]def mk_blurry_dates(d: Union[Date, str]) -> Tuple[str, str, str]:
"""
Returns MONTH_DAY, YEAR_DAY, and YEAR_MONTH versions in a standard form.
"""
# ISO format is %Y-%m-%d; see
# https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes # noqa: E501
# Here we want the shortest full representation; these are not intended to
# be human-legible.
d = coerce_to_pendulum_date(d)
dob_md = d.strftime("%m%d")
dob_yd = d.strftime("%Y%d")
dob_ym = d.strftime("%Y%m")
return dob_md, dob_yd, dob_ym
# =============================================================================
# argparse helpers
# =============================================================================
[docs]def optional_int(value: str) -> Optional[int]:
"""
``argparse`` argument type that checks that its value is an integer or the
value ``None``.
"""
if value.lower() == "none":
return None
try:
return int(value)
except (AssertionError, TypeError, ValueError):
raise ArgumentTypeError(f"{value!r} is an invalid optional int")
# =============================================================================
# Identity function
# =============================================================================
[docs]def identity(x: Any) -> Any:
"""
Returns its input.
"""
return x
# =============================================================================
# Perfect identifiers
# =============================================================================
[docs]def dict_from_str(x: str) -> Dict[str, str]:
"""
Reads a dictionary like {'a': 'x', 'b': 'y'} from a string like "{a:x,
b:y}".
"""
if not x:
return {}
w = x.strip() # working
if w[0] != "{" or w[-1] != "}":
raise ValueError(f"Bad dict string: {x!r}")
w = w[1:-1].strip()
d = {}
for pair_str in w.split(","):
if pair_str.count(":") != 1:
raise ValueError(f"Bad dict string: {x!r}")
k, v = pair_str.split(":")
d[k.strip()] = v.strip()
return d
[docs]def standardize_perfect_id_key(k: str) -> str:
"""
Keys are compared case-insensitive, in lower case.
"""
return k.strip().lower()
[docs]def standardize_perfect_id_value(k: Any) -> str:
"""
Values are forced to strings and compared case-insensitive, in upper case.
"""
return str(k).strip().upper()