Source code for crate_anon.anonymise.tests.anonymise_tests

"""
crate_anon/anonymise/tests/anonymise_tests.py

===============================================================================

    Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CRATE.

    CRATE is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CRATE is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CRATE. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

"""

# =============================================================================
# Imports
# =============================================================================

import logging
from typing import Any, Dict, Generator, List, Tuple, TYPE_CHECKING
from unittest import mock

from cardinal_pythonlib.hash import HmacMD5Hasher
from cardinal_pythonlib.sqlalchemy.schema import (
    execute_ddl,
    mssql_table_has_ft_index,
)
import factory
import pytest
from sortedcontainers import SortedSet
from sqlalchemy import (
    BigInteger,
    Boolean,
    Column,
    create_engine,
    DateTime,
    inspect,
    Integer,
    String,
    Text,
)

from crate_anon.anonymise.anonymise import (
    create_indexes,
    gen_opt_out_pids_from_database,
    process_patient_tables,
    process_table,
    validate_optouts,
)
from crate_anon.anonymise.altermethod import AlterMethod
from crate_anon.anonymise.constants import IndexType, ScrubMethod
from crate_anon.anonymise.models import PatientInfo
from crate_anon.anonymise.dd import ScrubSourceFieldInfo
from crate_anon.anonymise.ddr import DataDictionaryRow
from crate_anon.anonymise.tests.factories import PatientInfoFactory
from crate_anon.testing import AnonTestBase, SourceTestBase
from crate_anon.testing.classes import (
    DatabaseTestCase,
    SlowSecretDatabaseTestCase,
)
from crate_anon.testing.factories import (
    AnonTestBaseFactory,
    Fake,
    SourceTestBaseFactory,
)

if TYPE_CHECKING:
    from factory.builder import Resolver


# =============================================================================
# SQLAlchemy test tables
# =============================================================================


[docs]class TestBoolOptOut(SourceTestBase): __tablename__ = "test_opt_out_bool" pid = Column(Integer, primary_key=True, comment="Patient ID") mpid = Column(Integer, comment="Master patient ID") opt_out = Column(Boolean, comment="Opt out?")
[docs]class TestBoolOptOutFactory(SourceTestBaseFactory): class Meta: model = TestBoolOptOut pid = factory.Sequence(lambda n: n + 1) mpid = factory.Sequence(lambda n: n + 1)
[docs]class TestStringOptOut(SourceTestBase): __tablename__ = "test_opt_out_string" pid = Column(Integer, primary_key=True, comment="Patient ID") mpid = Column(Integer, comment="Master patient ID") opt_out = Column(String(4), comment="Opt out?")
[docs]class TestStringOptOutFactory(SourceTestBaseFactory): class Meta: model = TestStringOptOut pid = factory.Sequence(lambda n: n + 1) mpid = factory.Sequence(lambda n: n + 1)
[docs]class TestAnonNote(AnonTestBase): __tablename__ = "test_anon_note" note_id = Column(Integer, primary_key=True, comment="Note ID") note1 = Column(Text, comment="Text of note 1") note2 = Column(Text, comment="Text of note 2")
[docs]class TestPatient(SourceTestBase): __tablename__ = "test_patient" pid = Column(Integer, primary_key=True, comment="Patient ID") forename = Column(String(50), comment="Forename") surname = Column(String(50), comment="Surname") @property def name(self) -> str: return f"{self.forename} {self.surname}"
[docs]class TestPatientFactory(SourceTestBaseFactory): class Meta: model = TestPatient forename = factory.LazyFunction(Fake.en_gb.first_name) surname = factory.LazyFunction(Fake.en_gb.last_name)
[docs]class TestPatientWithStringMPID(SourceTestBase): __tablename__ = "test_patient_with_string_mpid" pid = Column(Integer, primary_key=True, comment="Patient ID") nhsnum = Column(String(10), comment="NHS Number")
[docs]class TestPatientWithStringMPIDFactory(SourceTestBaseFactory): class Meta: model = TestPatientWithStringMPID pid = factory.Sequence(lambda n: n + 1) @factory.lazy_attribute def nhsnum(obj: "Resolver") -> str: return str(Fake.en_gb.nhs_number())
[docs]class TestRecord(SourceTestBase): __tablename__ = "test_record" pk = Column(Integer, primary_key=True, comment="PK") pid = Column(Integer, comment="Patient ID") row_identifier = Column(Integer, comment="Row ID") third_party_pid = Column(Integer, comment="Third party PID") nhsnum = Column(BigInteger, comment="NHS Number") other = Column(String(50), comment="Other column")
[docs]class TestRecordFactory(SourceTestBaseFactory): class Meta: model = TestRecord pk = factory.Sequence(lambda n: n + 1) row_identifier = factory.Sequence(lambda n: n + 10000) nhsnum = factory.LazyFunction(Fake.en_gb.nhs_number) third_party_pid = factory.Sequence(lambda n: n + 1000)
[docs]class TestAnonRecord(AnonTestBase): __tablename__ = "test_anon_record" row_identifier = Column(Integer, primary_key=True, comment="Row ID") nhshash = Column(String(32)) third_party_pid_hash = Column(String(32)) other = Column(String(50), comment="Other column") _src_hash = Column(String(32)) _when_processed_utc = Column(DateTime())
[docs]class TestAnonRecordFactory(AnonTestBaseFactory): class Meta: model = TestAnonRecord
[docs]class TestPidAsPkRecord(SourceTestBase): __tablename__ = "test_pid_as_pk_record" pid = Column(Integer, primary_key=True, comment="Patient ID") other = Column(String(50), comment="Other column")
[docs]class TestPidAsPkRecordFactory(SourceTestBaseFactory): class Meta: model = TestPidAsPkRecord
[docs]class TestAnonPidAsPkRecord(AnonTestBase): __tablename__ = "test_anon_pid_as_pk_record" rid = Column(String(32), primary_key=True, comment="Research ID") trid = Column(Integer) mrid = Column(String(32)) _src_hash = Column(String(32)) _when_processed_utc = Column(DateTime())
[docs]class TestAnonPidAsPkRecordFactory(AnonTestBaseFactory): class Meta: model = TestAnonPidAsPkRecord
# ============================================================================= # Unit tests # ============================================================================= class AnonymiseTestMixin: def mock_dd_row( self, omit: bool = False, skip_row_by_value: mock.Mock = None, primary_pid: bool = False, master_pid: bool = False, third_party_pid: bool = False, alter_methods: list[AlterMethod] = None, add_src_hash: bool = False, **kwargs, ) -> mock.Mock: if skip_row_by_value is None: skip_row_by_value = mock.Mock(return_value=False) if alter_methods is None: alter_methods = [] return mock.Mock( omit=omit, skip_row_by_value=skip_row_by_value, primary_pid=primary_pid, master_pid=master_pid, third_party_pid=third_party_pid, alter_methods=alter_methods, add_src_hash=add_src_hash, **kwargs, )
[docs]class GenOptOutPidsFromDatabaseTests(DatabaseTestCase): def test_string_in_optout_col_values_ignored_for_boolean_column( self, ) -> None: optout_defining_fields = mock.Mock( return_value=[ ( "db", "test_opt_out_bool", "opt_out", "pid", "mpid", ) ] ) mock_dd = mock.Mock(get_optout_defining_fields=optout_defining_fields) mock_sources = { "db": mock.Mock( session=self.source_dbsession, engine=self.source_engine, metadata=SourceTestBase.metadata, ), } opt_out_1 = TestBoolOptOutFactory(opt_out=True) opt_out_2 = TestBoolOptOutFactory(opt_out=True) opt_out_3 = TestBoolOptOutFactory(opt_out=True) opt_out_4 = TestBoolOptOutFactory(opt_out=False) self.source_dbsession.flush() with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, sources=mock_sources, optout_col_values=[True, 1, "1"], ): pids = list(gen_opt_out_pids_from_database()) self.assertIn(opt_out_1.pid, pids) self.assertIn(opt_out_2.pid, pids) self.assertIn(opt_out_3.pid, pids) self.assertNotIn(opt_out_4.pid, pids) def test_invalid_boolean_optout_col_value_logged( self, ) -> None: optout_defining_fields = mock.Mock( return_value=[ ( "db", "test_opt_out_bool", "opt_out", "pid", "mpid", ) ] ) mock_dd = mock.Mock(get_optout_defining_fields=optout_defining_fields) mock_sources = { "db": mock.Mock( session=self.source_dbsession, engine=self.source_engine, metadata=SourceTestBase.metadata, ), } TestBoolOptOutFactory(opt_out=True) self.source_dbsession.flush() with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, sources=mock_sources, optout_col_values=["1"], ): with self.assertLogs(level=logging.INFO) as logging_cm: list(gen_opt_out_pids_from_database()) self.assert_logged( "crate_anon.anonymise.anonymise", logging.INFO, ( "... ignoring non-boolean value (1), type 'str' " "for boolean column 'opt_out'" ), logging_cm, ) def test_string_in_optout_col_values_valid_for_string_column( self, ) -> None: optout_defining_fields = mock.Mock( return_value=[ ( "db", "test_opt_out_string", "opt_out", "pid", "mpid", ) ] ) mock_dd = mock.Mock(get_optout_defining_fields=optout_defining_fields) mock_sources = { "db": mock.Mock( session=self.source_dbsession, engine=self.source_engine, metadata=SourceTestBase.metadata, ), } opt_out_1 = TestStringOptOutFactory(opt_out="yes") opt_out_2 = TestStringOptOutFactory(opt_out="1") opt_out_3 = TestStringOptOutFactory(opt_out="no") opt_out_4 = TestStringOptOutFactory(opt_out="0") self.source_dbsession.flush() with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, sources=mock_sources, optout_col_values=["yes", "1"], ): pids = list(gen_opt_out_pids_from_database()) self.assertIn(opt_out_1.pid, pids) self.assertIn(opt_out_2.pid, pids) self.assertNotIn(opt_out_3.pid, pids) self.assertNotIn(opt_out_4.pid, pids)
[docs]class ValidateOptoutsTests(DatabaseTestCase): def test_error_reported_if_no_valid_optout_fields(self) -> None: optout_defining_fields = mock.Mock( return_value=[ ( "db", "test_opt_out_bool", "opt_out", "pid", "mpid", ) ] ) mock_dd = mock.Mock( get_optout_defining_fields=optout_defining_fields, ) mock_sources = { "db": mock.Mock( session=self.source_dbsession, engine=self.source_engine, metadata=SourceTestBase.metadata, ), } TestBoolOptOutFactory(opt_out=True) TestBoolOptOutFactory(opt_out=False) self.source_dbsession.flush() with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, sources=mock_sources, optout_col_values=[3.14159, "1"], ): with self.assertRaises(ValueError) as cm: validate_optouts() self.assertEqual( str(cm.exception), "No valid opt-out values for column 'opt_out'", )
[docs]class CreateIndexesTests(DatabaseTestCase):
[docs] def setUp(self) -> None: super().setUp() self._engine_outside_transaction = None
def test_full_text_index_created_with_mysql(self) -> None: if self.anon_engine.dialect.name != "mysql": pytest.skip("Skipping MySQL-only test") if self._get_mysql_anon_note_table_full_text_indexes(): self._drop_mysql_full_text_indexes() indexes = self._get_mysql_anon_note_table_full_text_indexes() self.assertEqual(len(indexes), 0) self._make_full_text_index() indexes = self._get_mysql_anon_note_table_full_text_indexes() self.assertEqual(len(indexes), 2) self.assertEqual(indexes["note1"]["type"], "FULLTEXT") self.assertEqual(indexes["note2"]["type"], "FULLTEXT") def _drop_mysql_full_text_indexes(self) -> None: execute_ddl( self.anon_engine, sql="DROP INDEX _idxft_note1 ON test_anon_note" ) execute_ddl( self.anon_engine, sql="DROP INDEX _idxft_note2 ON test_anon_note" ) def _get_mysql_anon_note_table_full_text_indexes( self, ) -> Dict[str, List[Dict[str, Any]]]: return { i["column_names"][0]: i for i in inspect(self.anon_engine).get_indexes("test_anon_note") } def test_full_text_index_created_with_mssql(self) -> None: if self.anon_engine.dialect.name != "mssql": pytest.skip("Skipping mssql-only test") self._drop_mssql_full_text_indexes() self.assertFalse(self._mssql_anon_note_table_has_full_text_index()) self._make_full_text_index() self.assertTrue(self._mssql_anon_note_table_has_full_text_index()) def _mssql_anon_note_table_has_full_text_index(self) -> bool: return mssql_table_has_ft_index( self.engine_outside_transaction, "test_anon_note", "dbo" ) def _drop_mssql_full_text_indexes(self) -> None: # SQL Server only. Need to be outside a transaction to drop indexes sql = """ IF EXISTS ( SELECT fti.object_id FROM sys.fulltext_indexes fti WHERE fti.object_id = OBJECT_ID(N'[dbo].[test_anon_note]') ) DROP FULLTEXT INDEX ON [dbo].[test_anon_note] """ execute_ddl(self.engine_outside_transaction, sql) @property def engine_outside_transaction(self) -> None: if self._engine_outside_transaction is None: self._engine_outside_transaction = create_engine( self.anon_engine.url, connect_args={"autocommit": True}, # for pyodbc future=True, ) return self._engine_outside_transaction def _make_full_text_index(self) -> None: mock_config = None # noinspection PyUnusedLocal def index_row_sets( tasknum: int = 0, ntasks: int = 1 ) -> Generator[Tuple[str, List[DataDictionaryRow]], None, None]: note1_row = DataDictionaryRow(mock_config) note1_row.dest_field = "note1" note1_row.index = IndexType.FULLTEXT note2_row = DataDictionaryRow(mock_config) note2_row.dest_field = "note2" note2_row.index = IndexType.FULLTEXT for set_ in [ ("TestAnonNote", [note1_row, note2_row]), ]: yield set_ mock_dd = mock.Mock( get_dest_sqla_table=mock.Mock(return_value=TestAnonNote.__table__) ) with mock.patch.multiple( "crate_anon.anonymise.anonymise", gen_index_row_sets_by_table=index_row_sets, ): with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, _destination_database_url=self.anon_engine.url, ) as mock_config: create_indexes()
[docs]class ProcessPatientTablesMPidTests( SlowSecretDatabaseTestCase, AnonymiseTestMixin ):
[docs] def setUp(self) -> None: super().setUp() self.mock_admindb = mock.Mock(session=self.secret_dbsession) mock_srccfg = mock.Mock(debug_limited_tables=[]) self.mock_sourcedb = mock.Mock( session=self.source_dbsession, srccfg=mock_srccfg, engine=self.source_engine, metadata=SourceTestBase.metadata, ) self.mock_get_scrub_from_rows_as_fieldinfo = mock.Mock( return_value=[ ScrubSourceFieldInfo( is_mpid=True, is_patient=False, recurse=False, required_scrubber=False, scrub_method=ScrubMethod.NUMERIC, signature=None, value_fieldname="nhsnum", ), ] ) self.mock_get_scrub_from_db_table_pairs = mock.Mock( return_value=[ ("source1", "test_patient_with_string_mpid"), ] ) self.mock_get_pid_name = mock.Mock(return_value="pid") self.mock_estimate_count_patients = mock.Mock(return_value=1) self.mock_opting_out_pid = mock.Mock(return_value=False) mock_row = self.mock_dd_row( src_field="row_identifier", dest_field="row_identifier", ) mock_rows_for_src_table = mock.Mock(return_value=[mock_row]) self.mock_dd = mock.Mock( get_scrub_from_db_table_pairs=( self.mock_get_scrub_from_db_table_pairs ), get_scrub_from_rows_as_fieldinfo=( self.mock_get_scrub_from_rows_as_fieldinfo ), get_pid_name=self.mock_get_pid_name, get_mandatory_scrubber_sigs=mock.Mock(return_value=set()), get_source_databases=mock.Mock( return_value=SortedSet(["source1"]) ), get_patient_src_tables_with_active_dest=mock.Mock( return_value=SortedSet(["test_record"]) ), get_rows_for_src_table=mock_rows_for_src_table, )
def test_patient_saved_in_secret_database(self) -> None: patient = TestPatientWithStringMPIDFactory() self.source_dbsession.commit() pids = [patient.pid] with mock.patch.multiple( "crate_anon.anonymise.anonymise", estimate_count_patients=self.mock_estimate_count_patients, opting_out_pid=self.mock_opting_out_pid, ): with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=self.mock_dd, _destination_database_url=self.anon_engine.url, admindb=self.mock_admindb, sources={"source1": self.mock_sourcedb}, ): process_patient_tables(specified_pids=pids) patient_info = self.secret_dbsession.query(PatientInfo).one() self.assertEqual(patient_info.pid, patient.pid) self.assertEqual(str(patient_info.mpid), patient.nhsnum) def test_patient_mpid_updated_in_secret_database(self) -> None: patient = TestPatientWithStringMPIDFactory() self.source_dbsession.commit() patient_info = self.secret_dbsession.query(PatientInfo).one_or_none() self.assertIsNone(patient_info) patient_info = PatientInfoFactory(pid=patient.pid, mpid=None) self.secret_dbsession.commit() pids = [patient.pid] with mock.patch.multiple( "crate_anon.anonymise.anonymise", estimate_count_patients=self.mock_estimate_count_patients, opting_out_pid=self.mock_opting_out_pid, ): with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=self.mock_dd, _destination_database_url=self.anon_engine.url, admindb=self.mock_admindb, sources={"source1": self.mock_sourcedb}, ): process_patient_tables(specified_pids=pids) patient_info = self.secret_dbsession.query(PatientInfo).one() self.assertEqual(patient_info.pid, patient.pid) self.assertEqual(str(patient_info.mpid), patient.nhsnum) def test_patient_with_invalid_mpid_skipped(self) -> None: if self.source_engine.dialect.name == "sqlite": pytest.skip( "Skipping test because SQLite would allow non-integer values " "in an integer field" ) patient_info = self.secret_dbsession.query(PatientInfo).one_or_none() self.assertIsNone(patient_info) patient = TestPatientWithStringMPIDFactory(nhsnum="ABC123") self.source_dbsession.commit() pid = patient.pid pids = [pid] with mock.patch.multiple( "crate_anon.anonymise.anonymise", estimate_count_patients=self.mock_estimate_count_patients, opting_out_pid=self.mock_opting_out_pid, ): with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=self.mock_dd, _destination_database_url=self.anon_engine.url, admindb=self.mock_admindb, sources={"source1": self.mock_sourcedb}, ): with self.assertLogs(level=logging.WARNING) as logging_cm: process_patient_tables(specified_pids=pids) self.assertIsNone( self.secret_dbsession.query(PatientInfo).one_or_none() ) self.assert_logged( "crate_anon.anonymise.anonymise", logging.WARNING, ( f"Skipping patient with PID={pid} because the record could " "not be saved to the secret_map table" ), logging_cm, ) def test_valid_patients_added_when_invalid_mpid_skipped(self) -> None: if self.source_engine.dialect.name == "sqlite": pytest.skip( "Skipping test because SQLite would allow non-integer values " "in an integer field" ) patient_info = self.secret_dbsession.query(PatientInfo).one_or_none() self.assertIsNone(patient_info) invalid_patient = TestPatientWithStringMPIDFactory(nhsnum="ABC123") self.source_dbsession.commit() valid_patient1 = TestPatientWithStringMPIDFactory() self.source_dbsession.commit() valid_patient2 = TestPatientWithStringMPIDFactory() self.source_dbsession.commit() invalid_pid = invalid_patient.pid valid_pid1 = valid_patient1.pid valid_pid2 = valid_patient2.pid pids = [valid_pid1, invalid_pid, valid_pid2] with mock.patch.multiple( "crate_anon.anonymise.anonymise", estimate_count_patients=self.mock_estimate_count_patients, opting_out_pid=self.mock_opting_out_pid, ): with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=self.mock_dd, _destination_database_url=self.anon_engine.url, admindb=self.mock_admindb, sources={"source1": self.mock_sourcedb}, ): process_patient_tables(specified_pids=pids) pids = [p.pid for p in self.secret_dbsession.query(PatientInfo)] self.assertIn(valid_patient1.pid, pids) self.assertIn(valid_patient2.pid, pids) # For some reason these end up being a mixture of strings and ints nhsnums = [ int(p.mpid) for p in self.secret_dbsession.query(PatientInfo) ] self.assertIn(int(valid_patient1.nhsnum), nhsnums) self.assertIn(int(valid_patient2.nhsnum), nhsnums)
[docs]class ProcessPatientTablesPKTests(DatabaseTestCase, AnonymiseTestMixin):
[docs] def setUp(self) -> None: super().setUp() self.mock_admindb = mock.Mock(session=self.secret_dbsession) self.mock_destdb = mock.Mock( session=self.anon_dbsession, engine=self.anon_engine, metadata=AnonTestBase.metadata, ) mock_srccfg = mock.Mock(debug_limited_tables=[]) self.mock_sourcedb = mock.Mock( session=self.source_dbsession, srccfg=mock_srccfg, engine=self.source_engine, metadata=SourceTestBase.metadata, ) self.mock_get_scrub_from_rows_as_fieldinfo = mock.Mock( return_value=[ ScrubSourceFieldInfo( is_mpid=True, is_patient=False, recurse=False, required_scrubber=False, scrub_method=ScrubMethod.NUMERIC, signature=None, value_fieldname="nhsnum", ), ] ) self.mock_get_scrub_from_db_table_pairs = mock.Mock( return_value=[ ("source1", "test_patient_with_string_mpid"), ] ) self.mock_get_pid_name = mock.Mock(return_value="pid") self.mock_estimate_count_patients = mock.Mock(return_value=1) self.mock_opting_out_pid = mock.Mock(return_value=False) mock_row = self.mock_dd_row( src_field="row_identifier", dest_field="row_identifier", ) mock_rows_for_src_table = mock.Mock(return_value=[mock_row]) self.mock_dd = mock.Mock( get_scrub_from_db_table_pairs=( self.mock_get_scrub_from_db_table_pairs ), get_scrub_from_rows_as_fieldinfo=( self.mock_get_scrub_from_rows_as_fieldinfo ), get_pid_name=self.mock_get_pid_name, get_mandatory_scrubber_sigs=mock.Mock(return_value=set()), get_source_databases=mock.Mock( return_value=SortedSet(["source1"]) ), get_patient_src_tables_with_active_dest=mock.Mock( return_value=SortedSet(["test_record"]) ), get_rows_for_src_table=mock_rows_for_src_table, get_dest_sqla_table=mock.Mock( return_value=TestAnonRecord.__table__ ), )
def test_duplicate_primary_key_skipped(self) -> None: # row_identifier is the primary key in the destination # database but not in the source # MySQL supports ON DUPLICATE KEY UPDATE if self.anon_engine.dialect.name == "mysql": pytest.skip("Skipping different behaviour for MySQL") patient = TestPatientWithStringMPIDFactory() record = TestRecordFactory(pid=patient.pid) TestRecordFactory( pid=patient.pid, row_identifier=record.row_identifier ) self.source_dbsession.commit() pids = [patient.pid] with mock.patch.multiple( "crate_anon.anonymise.anonymise", estimate_count_patients=self.mock_estimate_count_patients, opting_out_pid=self.mock_opting_out_pid, ): with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=self.mock_dd, _destination_database_url=self.anon_engine.url, admindb=self.mock_admindb, destdb=self.mock_destdb, sources={"source1": self.mock_sourcedb}, rows_inserted_per_table={("source1", "test_record"): 0}, timefield=None, ): with self.assertLogs(level=logging.WARNING) as logging_cm: process_patient_tables(specified_pids=pids) self.assert_logged( "crate_anon.anonymise.anonymise", logging.WARNING, "Skipping record due to IntegrityError", logging_cm, ) self.assertEqual(self.anon_dbsession.query(TestAnonRecord).count(), 1)
[docs]class ProcessTableTests(DatabaseTestCase, AnonymiseTestMixin):
[docs] def setUp(self) -> None: super().setUp() self.patient = TestPatientFactory() self.source_dbsession.commit() # Passphrases match those in get_demo_config() self.pid_hasher = HmacMD5Hasher("SOME_PASSPHRASE_REPLACE_ME") self.mpid_hasher = HmacMD5Hasher("SOME_OTHER_PASSPHRASE_REPLACE_ME") self.change_hasher = HmacMD5Hasher("YETANOTHER") mock_srccfg = mock.Mock(debug_limited_tables=[]) self.mock_sourcedb = mock.Mock( session=self.source_dbsession, srccfg=mock_srccfg, engine=self.source_engine, metadata=SourceTestBase.metadata, ) self.mock_destdb = mock.Mock( session=self.anon_dbsession, engine=self.anon_engine, metadata=AnonTestBase.metadata, )
def test_record_anonymised(self) -> None: TestRecordFactory(pid=self.patient.pid, other="Personal information") self.source_dbsession.commit() mock_alter_method = mock.Mock( alter=mock.Mock(return_value=("ANONYMISED", False)) ) mock_rows = [ self.mock_dd_row( omit=True, src_field="pk", dest_table="test_anon_record", dest_field="pk", ), self.mock_dd_row( omit=True, src_field="pid", dest_table="test_anon_record", dest_field="pid", ), self.mock_dd_row( src_field="row_identifier", dest_table="test_anon_record", dest_field="row_identifier", ), self.mock_dd_row( src_field="other", dest_table="test_anon_record", dest_field="other", alter_methods=[mock_alter_method], ), ] mock_rows_for_src_table = mock.Mock(return_value=mock_rows) mock_dd = mock.Mock( get_rows_for_src_table=mock_rows_for_src_table, get_dest_sqla_table=mock.Mock( return_value=TestAnonRecord.__table__ ), ) with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, sources={"source": self.mock_sourcedb}, _destination_database_url=self.anon_engine.url, destdb=self.mock_destdb, rows_inserted_per_table={("source", "test_record"): 0}, ): process_table("source", "test_record") anon_record = self.anon_dbsession.query(TestAnonRecord).one() self.assertEqual(anon_record.other, "ANONYMISED") def test_primary_pid_altered_to_patient_rid(self) -> None: TestPidAsPkRecordFactory(pid=self.patient.pid, other="Other") self.source_dbsession.commit() mock_row = self.mock_dd_row( src_field="pid", primary_pid=True, dest_table="test_anon_pid_as_pk_record", dest_field="rid", add_src_hash=True, ) mock_rows_for_src_table = mock.Mock(return_value=[mock_row]) mock_dd = mock.Mock( get_rows_for_src_table=mock_rows_for_src_table, get_dest_sqla_table=mock.Mock( return_value=TestAnonPidAsPkRecord.__table__ ), get_pid_name=mock.Mock(return_value="pid"), ) mock_patient = mock.Mock( pid=self.patient.pid, rid="not-a-real-rid", trid=123456, mrid="not-a-real-mrid", ) with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, sources={"source": self.mock_sourcedb}, _destination_database_url=self.anon_engine.url, destdb=self.mock_destdb, rows_inserted_per_table={("source", "test_pid_as_pk_record"): 0}, add_mrid_wherever_rid_added=True, master_research_id_fieldname="mrid", ): process_table( "source", "test_pid_as_pk_record", patient=mock_patient, ) anon_record = self.anon_dbsession.query(TestAnonPidAsPkRecord).one() self.assertEqual(anon_record.rid, mock_patient.rid) self.assertEqual(anon_record.mrid, mock_patient.mrid) def test_master_pid_encrypted(self) -> None: test_record = TestRecordFactory(pid=self.patient.pid) self.source_dbsession.commit() mock_row = self.mock_dd_row( src_field="nhsnum", dest_table="test_anon_record", dest_field="nhshash", master_pid=True, ) mock_rows_for_src_table = mock.Mock(return_value=[mock_row]) mock_dd = mock.Mock( get_rows_for_src_table=mock_rows_for_src_table, get_dest_sqla_table=mock.Mock( return_value=TestAnonRecord.__table__ ), ) with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, sources={"source": self.mock_sourcedb}, _destination_database_url=self.anon_engine.url, destdb=self.mock_destdb, rows_inserted_per_table={("source", "test_record"): 0}, ): process_table( "source", "test_record", ) anon_record = self.anon_dbsession.query(TestAnonRecord).one() expected_hash = self.mpid_hasher.hash(test_record.nhsnum) self.assertEqual(anon_record.nhshash, expected_hash) def test_third_party_pid_encrypted(self) -> None: test_record = TestRecordFactory(pid=self.patient.pid) self.source_dbsession.commit() mock_row = self.mock_dd_row( src_field="third_party_pid", dest_table="test_anon_record", dest_field="third_party_pid_hash", third_party_pid=True, ) mock_rows_for_src_table = mock.Mock(return_value=[mock_row]) mock_dd = mock.Mock( get_rows_for_src_table=mock_rows_for_src_table, get_dest_sqla_table=mock.Mock( return_value=TestAnonRecord.__table__ ), ) with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, sources={"source": self.mock_sourcedb}, _destination_database_url=self.anon_engine.url, destdb=self.mock_destdb, rows_inserted_per_table={("source", "test_record"): 0}, ): process_table( "source", "test_record", ) anon_record = self.anon_dbsession.query(TestAnonRecord).one() expected_hash = self.pid_hasher.hash(test_record.third_party_pid) self.assertEqual(anon_record.third_party_pid_hash, expected_hash) def test_row_skipped_by_alter_method(self) -> None: TestRecordFactory(pid=self.patient.pid, other="Personal information") self.source_dbsession.commit() mock_alter_method = mock.Mock( alter=mock.Mock(return_value=(None, True)) ) mock_rows = [ self.mock_dd_row( omit=True, src_field="pk", dest_table="test_anon_record", dest_field="pk", ), self.mock_dd_row( omit=True, src_field="pid", dest_table="test_anon_record", dest_field="pid", ), self.mock_dd_row( src_field="row_identifier", dest_table="test_anon_record", dest_field="row_identifier", ), self.mock_dd_row( src_field="other", dest_table="test_anon_record", dest_field="other", alter_methods=[mock_alter_method], ), ] mock_rows_for_src_table = mock.Mock(return_value=mock_rows) mock_dd = mock.Mock( get_rows_for_src_table=mock_rows_for_src_table, get_dest_sqla_table=mock.Mock( return_value=TestAnonRecord.__table__ ), ) with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, sources={"source": self.mock_sourcedb}, _destination_database_url=self.anon_engine.url, destdb=self.mock_destdb, rows_inserted_per_table={("source", "test_record"): 0}, ): process_table("source", "test_record") self.assertIsNone( self.anon_dbsession.query(TestAnonRecord).one_or_none() ) def test_skipped_by_free_text_limit(self) -> None: TestRecordFactory(pid=self.patient.pid) self.source_dbsession.commit() mock_rows = [ self.mock_dd_row( src_field="other", dest_table="test_anon_record", dest_field="other", src_textlength=100, ), ] mock_rows_for_src_table = mock.Mock(return_value=mock_rows) mock_dd = mock.Mock( get_rows_for_src_table=mock_rows_for_src_table, get_dest_sqla_table=mock.Mock( return_value=TestAnonRecord.__table__ ), ) with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, sources={"source": self.mock_sourcedb}, _destination_database_url=self.anon_engine.url, destdb=self.mock_destdb, rows_inserted_per_table={("source", "test_record"): 0}, ): process_table("source", "test_record", free_text_limit=50) self.assertIsNone( self.anon_dbsession.query(TestAnonRecord).one_or_none() ) def test_skipped_when_scrubbed_excluded(self) -> None: TestRecordFactory(pid=self.patient.pid) self.source_dbsession.commit() mock_rows = [ self.mock_dd_row( src_field="other", dest_table="test_anon_record", dest_field="other", src_is_textual=True, being_scrubbed=True, ), ] mock_rows_for_src_table = mock.Mock(return_value=mock_rows) mock_dd = mock.Mock( get_rows_for_src_table=mock_rows_for_src_table, get_dest_sqla_table=mock.Mock( return_value=TestAnonRecord.__table__ ), ) with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, sources={"source": self.mock_sourcedb}, _destination_database_url=self.anon_engine.url, destdb=self.mock_destdb, rows_inserted_per_table={("source", "test_record"): 0}, ): process_table( "source", "test_record", exclude_scrubbed_fields=True ) self.assertIsNone( self.anon_dbsession.query(TestAnonRecord).one_or_none() ) def test_unchanged_record_matching_hash_with_plain_rid_skipped( self, ) -> None: test_record = TestRecordFactory(pid=self.patient.pid) self.source_dbsession.commit() TestAnonRecordFactory( row_identifier=test_record.row_identifier, _src_hash=self.change_hasher.hash( repr([test_record.row_identifier]) ), ) self.anon_dbsession.commit() mock_row = self.mock_dd_row( src_field="row_identifier", dest_table="test_anon_record", dest_field="row_identifier", add_src_hash=True, ) mock_rows_for_src_table = mock.Mock(return_value=[mock_row]) mock_dd = mock.Mock( get_rows_for_src_table=mock_rows_for_src_table, get_dest_sqla_table=mock.Mock( return_value=TestAnonRecord.__table__ ), ) with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, sources={"source": self.mock_sourcedb}, _destination_database_url=self.anon_engine.url, destdb=self.mock_destdb, rows_inserted_per_table={("source", "test_record"): 0}, ): with self.assertLogs(level=logging.DEBUG) as logging_cm: process_table("source", "test_record", incremental=True) self.assert_logged( "crate_anon.anonymise.anonymise", logging.DEBUG, "... ... skipping unchanged record (identical by hash): ", logging_cm, ) def test_unchanged_record_matching_hash_with_hashed_rid_skipped( self, ) -> None: test_record = TestPidAsPkRecordFactory( pid=self.patient.pid, other="Other" ) self.source_dbsession.commit() TestAnonPidAsPkRecordFactory( rid=self.pid_hasher.hash(self.patient.pid), _src_hash=self.change_hasher.hash(repr([test_record.pid])), ) self.anon_dbsession.commit() mock_row = self.mock_dd_row( src_field="pid", primary_pid=True, dest_table="test_anon_pid_as_pk_record", dest_field="rid", add_src_hash=True, ) mock_rows_for_src_table = mock.Mock(return_value=[mock_row]) mock_dd = mock.Mock( get_rows_for_src_table=mock_rows_for_src_table, get_dest_sqla_table=mock.Mock( return_value=TestAnonPidAsPkRecord.__table__ ), get_pid_name=mock.Mock(return_value="pid"), ) mock_patient = mock.Mock(pid=self.patient.pid) with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, sources={"source": self.mock_sourcedb}, _destination_database_url=self.anon_engine.url, destdb=self.mock_destdb, rows_inserted_per_table={("source", "test_pid_as_pk_record"): 0}, ): with self.assertLogs(level=logging.DEBUG) as logging_cm: process_table( "source", "test_pid_as_pk_record", patient=mock_patient, incremental=True, ) self.assert_logged( "crate_anon.anonymise.anonymise", logging.DEBUG, "... ... skipping unchanged record (identical by hash): ", logging_cm, ) def test_constant_record_matching_pk_skipped( self, ) -> None: test_record = TestRecordFactory(pid=self.patient.pid) self.source_dbsession.commit() TestAnonRecordFactory( row_identifier=test_record.row_identifier, ) self.anon_dbsession.commit() mock_row = self.mock_dd_row( src_field="row_identifier", dest_table="test_anon_record", dest_field="row_identifier", constant=True, ) mock_rows_for_src_table = mock.Mock(return_value=[mock_row]) mock_dd = mock.Mock( get_rows_for_src_table=mock_rows_for_src_table, get_dest_sqla_table=mock.Mock( return_value=TestAnonRecord.__table__ ), ) with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, sources={"source": self.mock_sourcedb}, _destination_database_url=self.anon_engine.url, destdb=self.mock_destdb, rows_inserted_per_table={("source", "test_record"): 0}, ): with self.assertLogs(level=logging.DEBUG) as logging_cm: process_table("source", "test_record", incremental=True) self.assert_logged( "crate_anon.anonymise.anonymise", logging.DEBUG, ( "... ... skipping unchanged record (identical by PK and " "marked as constant): " ), logging_cm, ) def test_does_nothing_if_all_ddrows_omitted(self) -> None: TestRecordFactory(pid=self.patient.pid) self.source_dbsession.commit() mock_rows = [ self.mock_dd_row( omit=True, src_field="pk", dest_table="test_anon_record", dest_field="pk", add_src_hash=True, ), self.mock_dd_row( omit=True, src_field="pid", dest_table="test_anon_record", dest_field="pid", add_src_hash=True, ), self.mock_dd_row( omit=True, src_field="row_identifier", dest_table="test_anon_record", dest_field="row_identifier", add_src_hash=True, ), ] mock_rows_for_src_table = mock.Mock(return_value=mock_rows) mock_dd = mock.Mock( get_rows_for_src_table=mock_rows_for_src_table, get_dest_sqla_table=mock.Mock( return_value=TestAnonRecord.__table__ ), ) with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, sources={"source": self.mock_sourcedb}, _destination_database_url=self.anon_engine.url, destdb=self.mock_destdb, rows_inserted_per_table={("source", "test_record"): 0}, ): with self.assertLogs(level=logging.DEBUG) as logging_cm: process_table("source", "test_record") self.assert_logged( "crate_anon.anonymise.anonymise", logging.DEBUG, "... ... all columns omitted", logging_cm, ) self.assertIsNone( self.anon_dbsession.query(TestAnonRecord).one_or_none() ) def test_row_skipped_by_value(self) -> None: TestRecordFactory(pid=self.patient.pid) self.source_dbsession.commit() mock_rows = [ self.mock_dd_row( src_field="row_identifier", dest_table="test_anon_record", dest_field="row_identifier", skip_row_by_value=mock.Mock(return_value=True), ), ] mock_rows_for_src_table = mock.Mock(return_value=mock_rows) mock_dd = mock.Mock( get_rows_for_src_table=mock_rows_for_src_table, get_dest_sqla_table=mock.Mock( return_value=TestAnonRecord.__table__ ), ) with mock.patch.multiple( "crate_anon.anonymise.anonymise.config", dd=mock_dd, sources={"source": self.mock_sourcedb}, _destination_database_url=self.anon_engine.url, destdb=self.mock_destdb, rows_inserted_per_table={("source", "test_record"): 0}, ): with self.assertLogs(level=logging.DEBUG) as logging_cm: process_table("source", "test_record") self.assert_logged( "crate_anon.anonymise.anonymise", logging.DEBUG, "... ... skipping row based on inclusion/exclusion values", logging_cm, ) self.assertIsNone( self.anon_dbsession.query(TestAnonRecord).one_or_none() )