Source code for crate_anon.linkage.person

#!/usr/bin/env python



    Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (

    This file is part of CRATE.

    CRATE is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CRATE is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CRATE. If not, see <>.


**Person representations for fuzzy matching.**


# =============================================================================
# Imports
# =============================================================================

import json
import logging
import random
from typing import (

from cardinal_pythonlib.reprfunc import auto_repr

from crate_anon.linkage.comparison import bayes_compare, Comparison
from crate_anon.linkage.helpers import (
from crate_anon.linkage.identifiers import (
from crate_anon.linkage.matchconfig import MatchConfig

log = logging.getLogger(__name__)

# =============================================================================
# Person
# =============================================================================

[docs]class Person: """ A proper representation of a person that can do hashing and comparisons. The information may be incomplete or slightly wrong. Includes frequency information and requires a config. """ # ------------------------------------------------------------------------- # Class attributes # ------------------------------------------------------------------------- class PersonKey: LOCAL_ID = "local_id" # person ID within relevant DB (proband/sample) FORENAMES = "forenames" SURNAMES = "surnames" DOB = "dob" GENDER = "gender" POSTCODES = "postcodes" PERFECT_ID = "perfect_id" OTHER_INFO = "other_info" # anything the user may want to attach # [getattr(PersonKey, x) for x in vars(PersonKey)...] does not work here as # PersonKey is not in scope within a list comprehension here; see # and # But this works: ALL_PERSON_KEYS = [] # type: List[str] for tmp in vars(PersonKey): if not tmp.startswith("_"): ALL_PERSON_KEYS.append(getattr(PersonKey, tmp)) del tmp # For reading CSV: SEMICOLON_DELIMIT = [ PersonKey.FORENAMES, PersonKey.SURNAMES, PersonKey.POSTCODES, ] TEMPORAL_IDENTIFIERS = SEMICOLON_DELIMIT PLAINTEXT_CSV_FORMAT_HELP = ( f"(1) CSV format with header row. Columns: {ALL_PERSON_KEYS}. " f"(2) Semicolon-separated values are allowed within " f"{SEMICOLON_DELIMIT}. " f"(3) The fields {TEMPORAL_IDENTIFIERS} are in TemporalIdentifier " f"format. {Identifier.TEMPORAL_ID_FORMAT_HELP} " f"(4) {PersonKey.PERFECT_ID}, if specified, contains one or more " f"perfect person identifiers as key:value pairs, e.g. " f"'nhs:12345;ni:AB6789XY'. The keys will be forced to lower case; " f"values will be forced to upper case. " f"(5) {PersonKey.OTHER_INFO!r} is an arbitrary string for you to use " f"(e.g. for validation)." ) HASHED_JSONLINES_FORMAT_HELP = ( "File created by CRATE in JSON Lines (.jsonl) format. (You could use " "the 'jq' tool to inspect these.)" ) # ------------------------------------------------------------------------- # Creation # -------------------------------------------------------------------------
[docs] def __init__( self, cfg: MatchConfig, local_id: str = "", other_info: str = "", forenames: List[Union[None, str, TemporalIDHolder, Forename]] = None, surnames: List[Union[None, str, TemporalIDHolder, Surname]] = None, dob: Union[None, str, DateOfBirth] = "", gender: Union[None, str, Gender] = "", postcodes: List[Union[None, str, TemporalIDHolder, Postcode]] = None, perfect_id: Union[None, Dict[str, Any], PerfectID] = None, ) -> None: """ Args: cfg: The config object. local_id: Identifier within this person's local database (e.g. proband ID or sample ID). Typically a research pseudonym, not itself identifying. other_info: String containing any other attributes the user may wish to remember (e.g. in JSON). Only used for validation research (e.g. ensuring linkage is not biased by ethnicity). forenames: The person's forenames (given names, first/middle names), as strings or Forename objects. surnames: The person's surname(s), as strings or Surname or TemporalIDHolder objects. dob: The date of birth, in ISO-8061 "YYYY-MM-DD" string format, or as a DateOfBirth object, or None, or ''. gender: The gender: 'M', 'F', 'X', or '', or None, or a Gender object. postcodes: Any UK postcodes for this person, with optional associated dates. perfect_id: Any named person-unique identifiers (e.g. UK NHS numbers, UK National Insurance numbers), for non-fuzzy matching. Dictionary keys will be forced to lower case, and dictionary values to upper case. """ self._is_plaintext = None # type: Optional[bool] def chk_plaintext(new_identifier: Identifier) -> None: """ Ensure we don't mix plaintext and hashed data. """ new_plaintext = new_identifier.is_plaintext if self._is_plaintext is None: self._is_plaintext = new_plaintext elif new_plaintext != self._is_plaintext: new = self.plain_or_hashed_txt(new_plaintext) old = self.plain_or_hashed_txt(self._is_plaintext) raise ValueError( f"Trying to add {new} information to a Person containing " f"only {old} information; new data was " f"{new_identifier!r}; current is {self!r}" ) assert isinstance(cfg, MatchConfig) self.cfg = cfg self.baseline_log_odds_same_person = ( self.cfg.baseline_log_odds_same_person ) # for speed # local_id self.local_id = str(local_id) if local_id is not None else None if not self.local_id: raise ValueError(f"Bad local_id: {local_id!r}") # other_info self.other_info = other_info or "" if not isinstance(self.other_info, str): raise ValueError(f"Bad other_info: {self.other_info!r}") # gender gender = "" if gender is None else gender # DO NOT DO: gender = gender or "" # ... because bool(Gender(cfg, gender="")) == False. if isinstance(gender, Gender): self.gender = gender else: self.gender = Gender(cfg=cfg, gender=gender) chk_plaintext(self.gender) # forenames forenames = forenames or [] if not isinstance(forenames, list): raise ValueError(f"Bad forenames: {forenames!r}") self.forenames = [] # type: List[Forename] for f in forenames: if not f: # None or "" continue elif isinstance(f, str): f = Forename(cfg=cfg, name=f, gender=self.gender.gender_str) elif isinstance(f, TemporalIDHolder): f = Forename( cfg=cfg, name=f.identifier, start_date=f.start_date, end_date=f.end_date, ) elif not isinstance(f, Forename): raise ValueError(f"Bad forename: {f!r}") if not bool(f): continue # skip blank names not detected above chk_plaintext(f) self.forenames.append(f) # surnames surnames = surnames or [] if not isinstance(surnames, list): raise ValueError(f"Bad surnames: {surnames!r}") self.surnames = [] # type: List[Surname] for s in surnames: if not s: continue elif isinstance(s, str): s = Surname(cfg=cfg, name=s, gender=self.gender.gender_str) elif isinstance(s, TemporalIDHolder): s = Surname( cfg=cfg, name=s.identifier, start_date=s.start_date, end_date=s.end_date, ) elif not isinstance(s, Surname): raise ValueError(f"Bad surname: {s!r}") if not bool(s): continue # skip blank names not detected above chk_plaintext(s) self.surnames.append(s) # dob (NB highly desirable for real work, but not mandatory, and we # also want to be able to create Person objects without a DOB for # testing) dob = "" if dob is None else dob if isinstance(dob, DateOfBirth): self.dob = dob else: self.dob = DateOfBirth(cfg=cfg, dob=dob or "") chk_plaintext(self.dob) # postcodes postcodes = postcodes or [] if not isinstance(postcodes, list): raise ValueError(f"Bad postcodes: {postcodes!r}") self.postcodes = [] # type: List[Postcode] for p in postcodes: if not p: # None or "" continue elif isinstance(p, str): p = Postcode(cfg=cfg, postcode=p) elif isinstance(p, TemporalIDHolder): p = Postcode( cfg=cfg, postcode=p.identifier, start_date=p.start_date, end_date=p.end_date, ) elif not isinstance(p, Postcode): raise ValueError(f"Bad data structure for postcode: {p!r}") if not bool(p): continue # skip blanks not detected above chk_plaintext(p) self.postcodes.append(p) # perfect_id if isinstance(perfect_id, PerfectID): self.perfect_id = perfect_id else: self.perfect_id = PerfectID(cfg=cfg, identifiers=perfect_id) chk_plaintext(self.perfect_id)
[docs] @staticmethod def plain_or_hashed_txt(plaintext: bool) -> str: """ Used for error messages. """ return "plaintext" if plaintext else "hashed"
[docs] @classmethod def from_plaintext_csv( cls, cfg: MatchConfig, rowdict: Dict[str, str] ) -> "Person": """ Returns a :class:`Person` object from a CSV row. Args: cfg: a configuration object rowdict: a CSV row, read via :class:`csv.DictReader`. """ kwargs = {} # type: Dict[str, Any] for attr in cls.ALL_PERSON_KEYS: vstr = rowdict[attr] if attr in cls.SEMICOLON_DELIMIT: v = [x.strip() for x in vstr.split(";") if x] if attr == cls.PersonKey.PERFECT_ID: v = PerfectID.from_plaintext_str(cfg, vstr) elif attr in cls.TEMPORAL_IDENTIFIERS: v = [ TemporalIDHolder.from_plaintext_str(cfg, x) for x in v ] else: # All TEMPORAL_IDENTIFIERS are in SEMICOLON_DELIMIT assert attr not in cls.TEMPORAL_IDENTIFIERS v = vstr kwargs[attr] = v return Person(cfg=cfg, **kwargs)
[docs] @classmethod def from_json_dict( cls, cfg: MatchConfig, d: Dict[str, Any], hashed: bool = True ) -> "Person": """ Restore a hashed or plaintext version from a dictionary (which has been read from JSONL). """ def check_is_dict(d_: Any, name_: str) -> None: if not isinstance(d_, dict): raise ValueError( f"{name_} contains something that is not a dict: {d_!r}" ) pk = cls.PersonKey forenames = [] # type: List[Forename] for mnd in getdictval(d, pk.FORENAMES, list): check_is_dict(mnd, pk.FORENAMES) forenames.append(Forename.from_dict(cfg, mnd, hashed)) surnames = [] # type: List[Surname] for sur in getdictval(d, pk.SURNAMES, list): check_is_dict(sur, pk.SURNAMES) surnames.append(Surname.from_dict(cfg, sur, hashed)) postcodes = [] # type: List[Postcode] for pd in getdictval(d, pk.POSTCODES, list): check_is_dict(pd, pk.POSTCODES) postcodes.append(Postcode.from_dict(cfg, pd, hashed)) return Person( cfg=cfg, local_id=getdictval(d, pk.LOCAL_ID, str), other_info=getdictval(d, pk.OTHER_INFO, str, mandatory=False), forenames=forenames, surnames=surnames, dob=DateOfBirth.from_dict( cfg, getdictval(d, pk.DOB, dict), hashed ), gender=Gender.from_dict( cfg, getdictval(d, pk.GENDER, dict), hashed ), postcodes=postcodes, perfect_id=PerfectID.from_dict( cfg, getdictval(d, pk.PERFECT_ID, dict), hashed ), )
[docs] @classmethod def from_json_str(cls, cfg: MatchConfig, s: str) -> "Person": """ Restore a hashed version from a string representing JSON. """ d = json.loads(s) return cls.from_json_dict(cfg, d)
# ------------------------------------------------------------------------- # Equality, hashing -- local_id should be unique # ------------------------------------------------------------------------- # Be careful: # - # noqa # - [re "hashable"] # Here, we define equality based on local_id, which will not change. In # practice, nothing else will either. def __eq__(self, other: "Person") -> bool: return self.local_id == other.local_id def __hash__(self) -> int: return hash(self.local_id) # ------------------------------------------------------------------------- # Representation: string # ------------------------------------------------------------------------- def __repr__(self): return auto_repr(self) def __str__(self) -> str: if self.is_hashed(): return f"Person<HASHED, local_id={self.local_id!r}>" names = " ".join( [str(f) for f in self.forenames] + [str(s) for s in self.surnames] ) postcodes = " - ".join(str(x) for x in self.postcodes) k = self.PersonKey details = ", ".join( [ f"{k.LOCAL_ID}={self.local_id}", f"{k.PERFECT_ID}={self.perfect_id}", f"name={names}", f"{k.GENDER}={self.gender}", f"{k.DOB}={self.dob}", f"{k.POSTCODES}={postcodes}", f"{k.OTHER_INFO}={self.other_info!r}", ] ) return f"Person<{details}>" # ------------------------------------------------------------------------- # Representation: CSV # -------------------------------------------------------------------------
[docs] @classmethod def plaintext_csv_columns(cls) -> List[str]: """ CSV column names -- including user-specified "other" information. """ return cls.ALL_PERSON_KEYS
[docs] def plaintext_csv_dict(self) -> Dict[str, str]: """ Returns a dictionary suitable for :class:`csv.DictWriter`. This is for writing identifiable content. """ d = {} # type: Dict[str, str] for k in self.ALL_PERSON_KEYS: a = getattr(self, k) if k in self.SEMICOLON_DELIMIT and k != self.PersonKey.PERFECT_ID: v = ";".join(str(x) for x in a) else: v = str(a) d[k] = v return d
# ------------------------------------------------------------------------- # Representation: JSON # -------------------------------------------------------------------------
[docs] def as_dict( self, hashed: bool = True, include_frequencies: bool = True, include_other_info: bool = False, ) -> Dict[str, Any]: """ For JSON. Args: hashed: Create a hashed/encrypted version? include_frequencies: Include frequency information. If you don't, this makes the resulting file suitable for use as a sample, but not as a proband file. include_other_info: include the (potentially identifying) ``other_info`` data? Usually ``False``; may be ``True`` for validation. """ pk = self.PersonKey # This could be terser, but to be clear: if hashed: if self._is_plaintext: encrypt = True local_id = self.cfg.local_id_hash_fn(self.local_id) else: encrypt = False # already encrypted; don't do it twice local_id = self.local_id else: if self._is_plaintext: encrypt = False local_id = self.local_id else: raise AssertionError( "Can't create plaintext from hashed Person" ) d = { pk.LOCAL_ID: local_id, pk.FORENAMES: [ f.as_dict(encrypt, include_frequencies) for f in self.forenames ], pk.SURNAMES: [ s.as_dict(encrypt, include_frequencies) for s in self.surnames ], pk.DOB: self.dob.as_dict(encrypt, include_frequencies), pk.GENDER: self.gender.as_dict(encrypt, include_frequencies), pk.POSTCODES: [ p.as_dict(encrypt, include_frequencies) for p in self.postcodes ], pk.PERFECT_ID: self.perfect_id.as_dict(encrypt), } if include_other_info: d[pk.OTHER_INFO] = self.other_info return d
# ------------------------------------------------------------------------- # Copying # -------------------------------------------------------------------------
[docs] def copy(self) -> "Person": """ Returns a copy of this object. - :func:`copy.deepcopy` is incredibly slow, yet :func:`copy.copy` isn't enough when we want to mutate this object. - We did do it quasi-manually, copying attributes but using ``[copy.copy(x) for x in value]`` if the value was a list. - However, since we have functions to convert to/from a dict representation, we may as well use them. """ hashed = self.is_hashed() return self.from_json_dict( self.cfg, self.as_dict( hashed=hashed, include_frequencies=True, include_other_info=True, ), hashed=hashed, )
# ------------------------------------------------------------------------- # Created hashed version # -------------------------------------------------------------------------
[docs] def hashed( self, include_frequencies: bool = True, include_other_info: bool = False, ) -> "Person": """ Returns a :class:`Person` object but with all the elements hashed (if they are not blank). Note that you do NOT need to do this just to write a hashed version to disk. This function is primarily for comparing an entire sample of hashed people to plaintext people, or vice versa; we hash the plaintext version first. Args: include_frequencies: Include frequency information. If you don't, this makes the resulting file suitable for use as a sample, but not as a proband file. include_other_info: include the (potentially identifying) ``other_info`` data? Usually ``False``; may be ``True`` for validation. """ d = self.as_dict( hashed=True, include_frequencies=include_frequencies, include_other_info=include_other_info, ) return self.from_json_dict(self.cfg, d)
# ------------------------------------------------------------------------- # Main comparison function # -------------------------------------------------------------------------
[docs] def log_odds_same(self, candidate: "Person") -> float: """ Returns the log odds that ``self`` (the proband) and ``candidate`` are the same person. Args: candidate: another :class:`Person` object Returns: float: the log odds they're the same person """ # High speed function. return bayes_compare( log_odds=self.baseline_log_odds_same_person, comparisons=self._gen_comparisons(candidate), )
# ------------------------------------------------------------------------- # Comparison helper functions # ------------------------------------------------------------------------- def _gen_comparisons( self, candidate: "Person" ) -> Generator[Optional[Comparison], None, None]: """ Generates all relevant comparisons. Args: candidate: another :class:`Person` object. **Note** In general, frequency information is associated with the proband, not the candidate, so use ``self.thing.comparison(candidate.thing)``. """ # A perfect match would already have been tested for. The shortlisting # process may already have ensured a DOB partial match, or maybe not. # Regardless, there are no identifiers that will cause a complete # disqualification if they mismatch, so order here becomes unimportant # for speed. # Surnames yield from gen_best_comparisons( proband_identifiers=self.surnames, candidate_identifiers=candidate.surnames, ordered=False, ) # Forenames yield from gen_best_comparisons( proband_identifiers=self.forenames, candidate_identifiers=candidate.forenames, ordered=True, p_u=self.cfg.p_u_forename, ) # DOB (see above) # There is no special treatment of 29 Feb (since this DOB is # approximately 4 times less common than other birthdays, in principle # it does merit special treatment, but we ignore that). yield self.dob.comparison(candidate.dob) # Gender yield self.gender.comparison(candidate.gender) # Postcodes yield from gen_best_comparisons( proband_identifiers=self.postcodes, candidate_identifiers=candidate.postcodes, ordered=False, ) # ------------------------------------------------------------------------- # Info functions # -------------------------------------------------------------------------
[docs] def is_plaintext(self) -> bool: """ Is this a plaintext (identifiable) Person? """ return self._is_plaintext
[docs] def is_hashed(self) -> bool: """ Is this a hashed (de-identified) Person? """ return not self.is_plaintext()
[docs] def n_forenames(self) -> int: """ Number of forenames """ return len(self.forenames)
[docs] def has_dob(self) -> bool: """ Do we have a DOB? """ return bool(self.dob)
[docs] def n_postcodes(self) -> int: """ How many postcodes does this person have? """ return len(self.postcodes)
# ------------------------------------------------------------------------- # Validation # -------------------------------------------------------------------------
[docs] def ensure_valid_as_proband(self) -> None: """ Ensures this person has sufficient information to act as a proband, or raises :exc:`ValueError`. We previously required a DOB unless debugging, but no longer. """ for f in self.forenames: f.ensure_has_freq_info_if_id_present() for s in self.surnames: s.ensure_has_freq_info_if_id_present() self.dob.ensure_has_freq_info_if_id_present() self.gender.ensure_has_freq_info_if_id_present() for p in self.postcodes: p.ensure_has_freq_info_if_id_present()
[docs] def ensure_valid_as_candidate(self) -> None: """ Ensures this person has sufficient information to act as a candidate, or raises :exc:`AssertionError`. We previously required a DOB unless debugging, but no longer. """ pass
# ------------------------------------------------------------------------- # Debugging functions to check this object # -------------------------------------------------------------------------
[docs] def debug_gen_identifiers(self) -> Generator[Identifier, None, None]: """ Yield all identifiers. """ yield from self.forenames yield from self.surnames if self.dob: yield self.dob if self.gender: yield self.gender yield from self.postcodes
[docs] def debug_comparison_report( self, candidate: "Person", verbose: bool = True ) -> str: """ Compare a person with another, log every step of the way, and return the result as a string. """ lines = [] # type: List[str] def report(msg_: str) -> None: lines.append(f"{msg_} -> log_odds = {log_odds}") if verbose: spacer = " - " self_id = ( "\n".join( spacer + repr(i) for i in self.debug_gen_identifiers() ) + "\n" ) candidate_id = ( "\n".join( spacer + repr(i) for i in candidate.debug_gen_identifiers() ) + "\n" ) else: self_id = "" candidate_id = "" lines.append("VERBOSE COMPARISON:") lines.append(f"- self (proband) = {self}") lines.append(self_id) lines.append(f"- candidate = {candidate}") lines.append(candidate_id) lines.append(f"- self dict = {self.as_dict(hashed=False)}") lines.append(self_id) lines.append(f"- candidate dict = {candidate.as_dict(hashed=False)}") lines.append(candidate_id) log_odds = self.cfg.baseline_log_odds_same_person report("Baseline") for comp in self._gen_comparisons(candidate=candidate): if not comp: continue log_odds = comp.posterior_log_odds(log_odds) report(str(comp)) return "\n".join(filter(None, lines))
[docs] def debug_compare(self, candidate: "Person", verbose: bool = True) -> None: """ Compare a person with another, and log every step of the way. """, verbose=verbose))
# ------------------------------------------------------------------------- # Debugging functions to mutate this object # -------------------------------------------------------------------------
[docs] def debug_delete_something(self) -> None: """ Randomly delete one of: a forename, or a postcode. """ n_forenames = self.n_forenames() n_postcodes = self.n_postcodes() n_possibilities = n_forenames + n_postcodes if n_possibilities == 0: log.warning(f"Unable to delete info from {self}") return which = random.randint(0, n_possibilities - 1) if which < n_forenames: del self.forenames[which] return which -= n_forenames del self.postcodes[which]
[docs] def debug_mutate_something(self) -> None: """ Randomly mutate one of: a forename, or a postcode. """ n_forenames = self.n_forenames() n_postcodes = self.n_postcodes() n_possibilities = n_forenames + n_postcodes if n_possibilities == 0: log.warning(f"Unable to mutate info from {self}") return which = random.randrange(n_possibilities) cfg = self.cfg if which < n_forenames: oldname = self.forenames[which] assert oldname.is_plaintext self.forenames[which] = Forename( cfg, name=mutate_name(, gender=oldname.gender ) return which -= n_forenames oldpostcode = self.postcodes[which] assert oldpostcode.is_plaintext self.postcodes[which] = Postcode( cfg, postcode=mutate_postcode(oldpostcode.postcode_unit, cfg) )