r"""
crate_anon/linkage/frequencies.py
===============================================================================
Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CRATE.
CRATE is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CRATE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CRATE. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
**Frequency classes for linkage tools.**
These record and calculate frequencies of real-world things (names, postcodes)
from publicly available data.
"""
# =============================================================================
# Imports
# =============================================================================
from collections import Counter, defaultdict
import csv
import json
import logging
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple
from cardinal_pythonlib.reprfunc import auto_repr
import jsonlines
from crate_anon.common.logfunc import warn_once
from crate_anon.linkage.constants import UK_POPULATION_2017
from crate_anon.linkage.helpers import (
get_first_two_char,
get_metaphone,
get_postcode_sector,
is_pseudopostcode,
mkdir_for_filename,
open_even_if_zipped,
standardize_name,
standardize_postcode,
)
log = logging.getLogger(__name__)
# =============================================================================
# BasicNameMetaphoneFreq
# =============================================================================
[docs]class BasicNameFreqInfo:
"""
Used for calculating P(share F2C but not name or metaphone).
Note that the metaphone can be "", e.g. if the name is "W". But we can
still calculate the frequency of those metaphones cumulatively across all
our names.
"""
KEY_NAME = "name"
KEY_P_NAME = "p_f"
KEY_GENDER = "gender"
KEY_METAPHONE = "metaphone"
KEY_P_METAPHONE = "p_p1"
KEY_P_METAPHONE_NOT_NAME = "p_p1nf"
KEY_F2C = "f2c"
KEY_P_F2C = "p_p2"
KEY_P_F2C_NOT_NAME_METAPHONE = "p_p2np1"
[docs] def __init__(
self,
name: str,
p_name: float,
gender: str = "",
metaphone: str = "",
p_metaphone: float = 0.0,
p_metaphone_not_name: float = 0.0,
f2c: str = "",
p_f2c: float = 0.0,
p_f2c_not_name_metaphone: float = 0.0,
synthetic: bool = False,
) -> None:
"""
The constructor allows initialization with just a name and its
frequency (with other probabilities being set later), or from a saved
representation with full details.
Args:
name:
Name.
p_name:
Population probability (frequency) of this name, within the
specified gender if there is one.
gender:
Specified gender, or a blank string for non-gender-associated
names.
metaphone:
"Sounds-like" representation as the first part of a double
metaphone.
p_metaphone:
Population frequency (probability) of the metaphone.
p_metaphone_not_name:
Probability that someone in the population shares this
metaphone, but not this name. Usually this is ``p_metaphone -
p_name``, but you may choose to impose a minimum frequency.
f2c:
First two characters (F2C) of the name.
p_f2c:
Population probability of the F2C.
p_f2c_not_name_metaphone:
Probability that someone in the population shares this F2C, but
not this name or metaphone.
synthetic:
Is this record made up (e.g. an unknown name, or a mean of two
other records)?
"""
name = standardize_name(name)
self.name = name
self.gender = gender
self.p_name = p_name
self.metaphone = metaphone or get_metaphone(name)
self.p_metaphone = p_metaphone
self.p_metaphone_not_name = p_metaphone_not_name
self.f2c = f2c or get_first_two_char(name)
self.p_f2c = p_f2c # not important! For info only.
self.p_f2c_not_name_metaphone = p_f2c_not_name_metaphone
self.synthetic = synthetic
def __repr__(self) -> str:
return auto_repr(self, sort_attrs=False)
@property
def p_no_match(self) -> float:
assert (
self.p_metaphone >= self.p_name
), "Set p_metaphone before using p_no_match"
return 1 - self.p_metaphone - self.p_f2c_not_name_metaphone
# p_metaphone includes p_name
[docs] def as_dict(self) -> Dict[str, Any]:
"""
Returns a JSON representation.
"""
return {
self.KEY_NAME: self.name,
self.KEY_GENDER: self.gender,
self.KEY_P_NAME: self.p_name,
self.KEY_METAPHONE: self.metaphone,
self.KEY_P_METAPHONE: self.p_metaphone,
self.KEY_P_METAPHONE_NOT_NAME: self.p_metaphone_not_name,
self.KEY_F2C: self.f2c,
self.KEY_P_F2C: self.p_f2c,
self.KEY_P_F2C_NOT_NAME_METAPHONE: self.p_f2c_not_name_metaphone,
}
[docs] @classmethod
def from_dict(cls, d: Dict[str, Any]) -> "BasicNameFreqInfo":
"""
Create from JSON representation.
"""
return BasicNameFreqInfo(
name=d[cls.KEY_NAME],
gender=d[cls.KEY_GENDER],
p_name=d[cls.KEY_P_NAME],
metaphone=d[cls.KEY_METAPHONE],
p_metaphone=d[cls.KEY_P_METAPHONE],
p_metaphone_not_name=d[cls.KEY_P_METAPHONE_NOT_NAME],
f2c=d[cls.KEY_F2C],
p_f2c=d[cls.KEY_P_F2C],
p_f2c_not_name_metaphone=d[cls.KEY_P_F2C_NOT_NAME_METAPHONE],
)
[docs] @staticmethod
def weighted_mean(
objects: Sequence["BasicNameFreqInfo"], weights: Sequence[float]
):
"""
Returns an object with the weighted probabilities across the objects
specified. Used for gender weighting.
"""
assert len(objects) == len(weights) > 0
first = objects[0]
result = BasicNameFreqInfo(name=first.name, p_name=0.0, synthetic=True)
for i, obj in enumerate(objects):
w = weights[i]
result.p_name += w * obj.p_name
result.p_metaphone += w * obj.p_name
result.p_metaphone_not_name += w * obj.p_metaphone_not_name
result.p_f2c += w * obj.p_f2c
result.p_f2c_not_name_metaphone += w * obj.p_f2c_not_name_metaphone
return result
# =============================================================================
# NameFrequencyInfo
# =============================================================================
[docs]class NameFrequencyInfo:
"""
Holds frequencies of a class of names (e.g. first names or surnames), and
also of their fuzzy (metaphone) versions.
We keep these frequency representations entirely here (source) and with
the probands (storage); the config doesn't get involved except to define
min_frequency at creation. We need to scan across all names for an estimate
of the empty ("") metaphone, which does arise in our standard data. There
is a process for obtaining default frequency information for any names not
encountered in our name definitions, of course, but that is then stored
with the (hashed) name representations and nothing needs to be recalculated
at comparison time. (Compare postcodes, where further geographical
adjustments may be required, depending on the comparison population.)
"""
[docs] def __init__(
self,
csv_filename: str,
cache_filename: str,
by_gender: bool = False,
min_frequency: float = 0,
) -> None:
"""
Initializes the object from a CSV file.
Uses standardize_name().
Args:
csv_filename:
CSV file, with no header, of "name, frequency" pairs.
cache_filename:
File in which to cache information, for faster loading.
by_gender:
Is the source data split by gender?
min_frequency:
Minimum frequency to allow; see command-line help.
"""
self._csv_filename = csv_filename
self._cache_filename = cache_filename
self._min_frequency = min_frequency
self.by_gender = by_gender
self.infolist = [] # type: List[BasicNameFreqInfo]
# We key the following by (name, gender), even if gender is "".
# This makes the code much simpler.
self.name_gender_idx = (
{}
) # type: Dict[Tuple[str, str], BasicNameFreqInfo]
self.metaphone_freq = {} # type: Dict[Tuple[str, str], float]
self.f2c_freq = {} # type: Dict[Tuple[str, str], float]
self.f2c_to_infolist = defaultdict(
list
) # type: Dict[Tuple[str, str], List[BasicNameFreqInfo]]
if not csv_filename or not cache_filename:
log.debug("Using dummy NameFrequencyInfo")
return
try:
self._load_from_cache(cache_filename)
except ValueError:
log.critical(f"Bad cache: please delete {cache_filename}")
raise
except FileNotFoundError:
self._load_from_csv(csv_filename)
self._save_to_cache(cache_filename)
def _load_from_cache(self, cache_filename: str) -> None:
"""
Loads from a JSONL cache.
"""
log.info(f"Reading from cache: {cache_filename}")
with jsonlines.open(cache_filename) as reader:
self.infolist = [BasicNameFreqInfo.from_dict(d) for d in reader]
log.debug(f"... finished reading from: {cache_filename}")
self._index(update_infolist=False)
def _save_to_cache(self, cache_filename: str) -> None:
"""
Saves to a JSONL cache.
"""
if not cache_filename:
return
log.info(f"Writing to cache: {cache_filename}")
mkdir_for_filename(cache_filename)
with jsonlines.open(cache_filename, mode="w") as writer:
for i in self.infolist:
writer.write(i.as_dict())
log.debug(f"... finished writing to cache: {cache_filename}")
def _load_from_csv(self, csv_filename: str) -> None:
"""
Read from the original data.
"""
log.info(f"Reading source data: {csv_filename}")
by_gender = self.by_gender
min_frequency = self._min_frequency
self.infolist = []
with open_even_if_zipped(csv_filename) as f:
for row in csv.reader(f):
if by_gender:
gender = row[1]
freq_str = row[2]
else:
gender = ""
freq_str = row[1]
self.infolist.append(
BasicNameFreqInfo(
name=row[0],
p_name=max(min_frequency, float(freq_str)),
gender=gender,
)
)
log.debug(f"... finished reading from: {csv_filename}")
self._index(update_infolist=True)
def _index(self, update_infolist: bool) -> None:
"""
Build our internal indexes, having loaded `self.infolist`.
Example for thinking (with fictional metaphones; these might be
wrong!):
.. code-block:: none
# name p metaphone f2c
1 SMITH 0.2 SMT SM
2 SMYTHE 0.05 SMT SM
3 SCHMITH 0.01 SMT SC
4 SMALL 0.04 SML SM
5 JONES 0.2 JNS JO
6 JOPLIN 0.1 JPL JO
7 WALKER 0.2 WLK WA
8 ZEBRA 0.2 ZBR ZE
With respect to a proband called SMITH:
- P(another person's name is SMITH) = 0.2 [1];
- P(another person's metaphone is SMT) = 0.26 [1, 2, 3];
- P(another person's metaphone is SMT but their name is not SMITH) =
0.06 [2, 3], being the preceding minus [1];
- P(another person's F2C is SM) = 0.29 [1, 2, 4];
- P(another person's F2C is SM but their metaphone is not SMT and their
name is not SMITH) = 0.04 [4].
With respect to a proband called SMALL:
- P(another person's name is SMALL) = 0.04 [4];
- P(... metaphone SML) = 0.04 [4];
- P(... metaphone SML, name not SMALL) = 0, being the preceding minus
[4];
- P(... F2C SM) = 0.29 [1, 2, 4];
- P(... F2C SM but metaphone not SML and name not SMALL) = 0.25 [1, 2].
This makes it apparent that:
- P(another person matches on name) = P(name in the population).
- Since names have a one-to-one or many-to-one relationship with
metaphones (one name can only have one metaphone but two names can
share a metaphone), P(metaphone match but not name match) is
P(metaphone match) minus P(name match).
- There is obviously a quantity P(F2C) that is constant for every F2C.
Also, the relationship between names and F2C is one-to-one or
many-to-one, as for metaphones. However, if F2C are second in the
hierarchy, such that we need to calculate P(F2C match but not name OR
METAPHONE match), it becomes relevant that the relationship between
metaphones and F2C is many-to-many [see examples 1-4 above].
THEREFORE, P(F2C match but name or metaphone match) is SPECIFIC TO
A NAME.
"""
log.debug("Indexing name frequency info...")
# Reset
self.name_gender_idx = {}
self.metaphone_freq = {}
self.f2c_freq = {}
self.f2c_to_infolist = defaultdict(list)
# For extra speed:
min_frequency = self._min_frequency
name_gender_idx = self.name_gender_idx
metaphone_freq = self.metaphone_freq
f2c_freq = self.f2c_freq
f2c_to_infolist = self.f2c_to_infolist
meta_to_infolist = defaultdict(
list
) # type: Dict[Tuple[str, str], List[BasicNameFreqInfo]]
for i in self.infolist:
name_key = i.name, i.gender
metaphone_key = i.metaphone, i.gender
f2c_key = i.f2c, i.gender
p_name = i.p_name
# Enable rapid lookup by name/gender
name_gender_idx[name_key] = i
# Calculate metaphone frequency (maybe for writing back to name
# info objects, but certainly for frequency information relating to
# unknown names with known metaphones).
metaphone_freq[metaphone_key] = (
metaphone_freq.get(metaphone_key, 0) + p_name
)
# Calculate F2C frequency (not very important!).
f2c_freq[f2c_key] = f2c_freq.get(f2c_key, 0) + p_name
# Enable lookup by F2C
f2c_to_infolist[f2c_key].append(i)
if update_infolist:
# Enable temporary lookup by metaphone
meta_to_infolist[metaphone_key].append(i)
if update_infolist:
log.info("... calculating additional frequency info (slow)...")
# Store metaphone frequency for each name.
for metaphone_key, metaphone_infolist in meta_to_infolist.items():
p_meta = metaphone_freq[metaphone_key]
for i in metaphone_infolist: # type: BasicNameFreqInfo
i.p_metaphone = max(min_frequency, p_meta)
i.p_metaphone_not_name = max(
min_frequency, p_meta - i.p_name
)
# This is not very important, but... store F2C frequency.
for f2c_key, f2c_infolist in f2c_to_infolist.items():
p_f2c = max(min_frequency, f2c_freq[f2c_key])
for i in f2c_infolist: # type: BasicNameFreqInfo
i.p_f2c = p_f2c
# Calculate P(F2C match but not name or metaphone match).
# This is name-specific; see above.
for i in self.infolist:
f2c_key = i.f2c, i.gender
i.p_f2c_not_name_metaphone = 0.0
for other in f2c_to_infolist[f2c_key]: # ... same F2C...
if other.name != i.name and other.metaphone != i.metaphone:
# ... but different name and metaphone...
i.p_f2c_not_name_metaphone += other.p_name
i.p_f2c_not_name_metaphone = max(
min_frequency, i.p_f2c_not_name_metaphone
)
log.debug("... finished indexing name frequency info")
[docs] def name_frequency_info(
self, name: str, gender: str = "", prestandardized: bool = True
) -> BasicNameFreqInfo:
"""
Look up frequency information for a name (with gender, optionally).
"""
if not prestandardized:
name = standardize_name(name)
key = name, gender
result = self.name_gender_idx.get(key, None)
if result is not None:
return result
return self._unknown_name_info(name, gender)
def _unknown_name_info(
self, name: str, gender: str = ""
) -> BasicNameFreqInfo:
"""
Return a default set of information for unknown names. We do not alter
our saved information.
It's possible that an unknown name has a known metaphone or F2C,
though, so we account for that.
"""
min_frequency = self._min_frequency
result = BasicNameFreqInfo(
name=name,
p_name=min_frequency,
gender=gender,
synthetic=True,
)
metaphone = result.metaphone
meta_key = metaphone, gender
result.p_metaphone = max(
min_frequency, self.metaphone_freq.get(meta_key, min_frequency)
)
result.p_metaphone_not_name = max(
min_frequency, result.p_metaphone - result.p_name
)
f2c_key = result.f2c, gender
result.p_f2c = max(
min_frequency, self.f2c_freq.get(f2c_key, min_frequency)
)
p_f2c_not_name_metaphone = 0.0
for i in self.f2c_to_infolist[f2c_key]: # same F2C
if i.metaphone != metaphone: # but not same metaphone
# and by definition not the same name, or we wouldn't be here
p_f2c_not_name_metaphone += i.p_name
result.p_f2c_not_name_metaphone = max(
min_frequency, p_f2c_not_name_metaphone
)
return result
[docs] def name_frequency(
self, name: str, gender: str = "", prestandardized: bool = True
) -> float:
"""
Returns the frequency of a name.
Args:
name: the name to check
gender: the gender, if created with ``by_gender=True``
prestandardized: was the name pre-standardized in format?
Returns:
the name's frequency in the population
"""
return self.name_frequency_info(
name, gender, prestandardized=prestandardized
).p_name
[docs] def first_two_char_frequency(self, f2c: str, gender: str = "") -> float:
"""
Returns the frequency of the first two characters of a name.
This one isn't very important; we want a more refined probability.
"""
key = f2c, gender
return self.f2c_freq.get(key, self._min_frequency)
# =============================================================================
# PostcodeFrequencyInfo
# =============================================================================
[docs]class PostcodeFrequencyInfo:
"""
Holds frequencies of UK postcodes, and also their hashed versions.
Handles pseudo-postcodes somewhat separately.
Frequencies are national estimates for known real postcodes. Any local
correction or correction for unknown postcodes is done separately.
We return explicit "don't know" values for unknown postcodes (including
pseudopostcodes) since those values may be handled differently, in a way
that is set at comparison time.
"""
KEY_POSTCODE_UNIT_FREQ = "postcode_unit_freq"
KEY_POSTCODE_SECTOR_FREQ = "postcode_sector_freq"
[docs] def __init__(
self,
csv_filename: str,
cache_filename: str,
report_every: int = 10000,
) -> None:
"""
Initializes the object from a CSV file.
Args:
csv_filename:
CSV file from the UK Office of National Statistics, e.g.
``ONSPD_MAY_2022_UK.csv``. Columns include "pdcs" (one of the
postcode formats) and "oa11" (Output Area from the 2011
Census). A ZIP file containing a single CSV file is also
permissible (distinguished by filename extension).
cache_filename:
Filename to hold pickle format cached data, because the CSV
read process is slow (it's a 1.4 Gb CSV).
report_every:
How often to report progress during loading.
"""
self._csv_filename = csv_filename
self._cache_filename = cache_filename
self._postcode_unit_freq = {} # type: Dict[str, float]
self._postcode_sector_freq = {} # type: Dict[str, float]
if not csv_filename or not cache_filename:
log.debug("Using dummy PostcodeFrequencyInfo")
return
try:
self._load_from_cache(cache_filename)
except (KeyError, ValueError):
log.critical(f"Bad cache: please delete {cache_filename}")
raise
except FileNotFoundError:
self._load_from_csv(
csv_filename,
report_every=report_every,
)
self._save_to_cache(cache_filename)
def _load_from_cache(self, cache_filename: str) -> None:
"""
Loads from a JSON cache.
May raise KeyError, ValueError.
"""
log.info(f"Reading from cache: {cache_filename}")
with open(cache_filename) as file:
d = json.load(file)
# May raise KeyError:
self._postcode_unit_freq = d[self.KEY_POSTCODE_UNIT_FREQ]
self._postcode_sector_freq = d[self.KEY_POSTCODE_SECTOR_FREQ]
if not isinstance(self._postcode_unit_freq, dict):
raise ValueError(
f"Bad cache: {self.KEY_POSTCODE_UNIT_FREQ} is of wrong type "
f"{type(self._postcode_unit_freq)}"
)
if not isinstance(self._postcode_sector_freq, dict):
raise ValueError(
f"Bad cache: {self.KEY_POSTCODE_SECTOR_FREQ} is of wrong type "
f"{type(self._postcode_sector_freq)}"
)
log.debug(f"... finished reading from: {cache_filename}")
def _save_to_cache(self, cache_filename: str) -> None:
"""
Saves to a JSON cache.
"""
if not cache_filename:
return
log.info(f"Writing to cache: {cache_filename}")
mkdir_for_filename(cache_filename)
d = {
self.KEY_POSTCODE_UNIT_FREQ: self._postcode_unit_freq,
self.KEY_POSTCODE_SECTOR_FREQ: self._postcode_sector_freq,
}
with open(cache_filename, mode="w") as file:
json.dump(d, file)
log.debug(f"... finished writing to cache: {cache_filename}")
def _load_from_csv(self, csv_filename: str, report_every: int) -> None:
"""
Read from the original data.
"""
log.info(f"Reading source data: {csv_filename}")
self._postcode_unit_freq = {}
self._postcode_sector_freq = {}
oa_unit_counter = Counter()
unit_to_oa = {} # type: Dict[str, str]
sector_to_oas = {} # type: Dict[str, Set[str]]
# Load data
with open_even_if_zipped(csv_filename) as f:
csvreader = csv.DictReader(f)
for rownum, row in enumerate(csvreader, start=1):
unit = standardize_postcode(row["pcds"])
sector = get_postcode_sector(unit)
oa = row["oa11"]
if rownum % report_every == 0:
log.debug(
f"Row# {rownum}: postcode unit {unit}, "
f"postcode sector {sector}, Output Area {oa}"
)
unit_to_oa[unit] = oa
oa_unit_counter[oa] += 1 # one more unit for this OA
if sector in sector_to_oas:
sector_to_oas[sector].add(oa)
else:
sector_to_oas[sector] = {oa}
# Calculate. The absolute value of the population size of an OA is
# irrelevant as it cancels out.
log.info("Calculating population frequencies for postcodes...")
unit_freq = self._postcode_unit_freq
sector_freq = self._postcode_sector_freq
total_n_oas = len(oa_unit_counter)
log.info(f"Number of Output Areas: {total_n_oas}")
for unit, oa in unit_to_oa.items():
n_units_in_this_oa = oa_unit_counter[oa]
unit_n_oas = 1 / n_units_in_this_oa
unit_freq[unit] = unit_n_oas / total_n_oas
for sector, oas in sector_to_oas.items():
sector_n_oas = len(oas)
sector_freq[sector] = sector_n_oas / total_n_oas
log.debug(f"... finished reading from: {csv_filename}")
[docs] def postcode_unit_sector_frequency(
self, postcode_unit: str, prestandardized: bool = False
) -> Tuple[Optional[float], Optional[float]]:
"""
Returns the frequency of a postcode unit and its associated sector.
Performs an important check that the sector frequency is as least as
big as the unit frequency.
Args:
postcode_unit: the postcode unit to check
prestandardized: was the postcode pre-standardized in format?
Returns:
tuple: unit_frequency, sector_frequency
"""
unit = (
postcode_unit
if prestandardized
else standardize_postcode(postcode_unit)
)
sector = get_postcode_sector(unit)
try:
unit_freq = self._postcode_unit_freq[unit]
sector_freq = self._postcode_sector_freq[sector]
assert unit_freq <= sector_freq, (
f"Postcodes: unit_freq = {unit_freq}, "
f"sector_freq = {sector_freq}, but should have "
f"unit_freq <= sector_freq, "
f"for unit = {unit}, sector = {sector}"
)
except KeyError:
if not is_pseudopostcode(unit, prestandardized=True):
warn_once(
f"Unknown postcode: {unit}", log, level=logging.DEBUG
)
unit_freq = None
sector_freq = None
return unit_freq, sector_freq
[docs] def debug_is_valid_postcode(
self, postcode_unit: str, prestandardized: bool = False
) -> bool:
"""
Is this a valid postcode?
"""
if not prestandardized:
postcode_unit = standardize_postcode(postcode_unit)
return postcode_unit in self._postcode_unit_freq or is_pseudopostcode(
postcode_unit, prestandardized=True
)
[docs] def debug_postcode_unit_population(
self,
postcode_unit: str,
prestandardized: bool = False,
total_population: int = UK_POPULATION_2017,
) -> Optional[float]:
"""
Returns the calculated population of a postcode unit.
Args:
postcode_unit: the postcode unit to check
prestandardized: was the postcode pre-standardized in format?
total_population: national population
"""
unit_freq, _ = self.postcode_unit_sector_frequency(
postcode_unit, prestandardized
)
if unit_freq is None:
return None
return unit_freq * total_population
[docs] def debug_postcode_sector_population(
self,
postcode_sector: str,
prestandardized: bool = False,
total_population: int = UK_POPULATION_2017,
) -> Optional[float]:
"""
Returns the calculated population of a postcode sector.
Args:
postcode_sector: the postcode sector to check
prestandardized: was the sector pre-standardized in format?
total_population: national population
"""
sector = (
postcode_sector
if prestandardized
else standardize_postcode(postcode_sector)
)
sector_freq = self._postcode_sector_freq.get(sector)
if sector_freq is None:
return None
return sector_freq * total_population