Source code for crate_anon.preprocess.tests.text_extractor_tests

"""
crate_anon/preprocess/tests/text_extractor_tests.py

===============================================================================

    Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CRATE.

    CRATE is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CRATE is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CRATE. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**Test text extraction from documents.**

We test text extraction in cardinal_pythonlib, and in fact some of the tests
there just check that the correct tool is invoked with the expected arguments.

The tests here don't actually do any text extraction or touch the database. We
just mock the appropriate SQL Alchemny functions and ensure they are being
called as expected.

"""

import logging
import os
from pathlib import Path
import shutil
import tempfile
from unittest import mock

from faker.providers import BaseProvider
from faker_file.storages.filesystem import FileSystemStorage
from sqlalchemy.engine.base import Engine
from sqlalchemy.exc import IntegrityError, MultipleResultsFound, NoResultFound

from crate_anon.preprocess.constants import (
    CRATE_COL_PK,
    CRATE_COL_TEXT_LAST_EXTRACTED,
    CRATE_IDX_PREFIX,
    CRATE_TABLE_EXTRACTED_TEXT,
)
from crate_anon.preprocess.systmone_ddgen import S1GenericCol, SystmOneContext
from crate_anon.preprocess.text_extractor import SystmOneTextExtractor
from crate_anon.testing import mock_connection_exit
from crate_anon.testing.classes import CrateTestCase


class RowIdentifierProvider(BaseProvider):
    def row_identifier(self) -> int:
        return self.generator.pyint(1_000_000_000, 9_000_000_000)


class DocumentUidProvider(BaseProvider):
    def document_uid(self) -> int:
        return self.generator.pyint(
            0x1000_0000_0000_0000, 0xFFFF_FFFF_FFFF_FFFF
        )


class SubfolderProvider(BaseProvider):
    def subfolder(self) -> int:
        return self.generator.pyint(1, 4)


class IndexProvider(BaseProvider):
    def index(self) -> int:
        return self.generator.pyint(0, 9)


class PatientIdProvider(BaseProvider):
    def patient_id(self) -> int:
        return self.generator.pyint(1, 10_000_000)


[docs]class SystmOneTextExtractorTests(CrateTestCase):
[docs] def setUp(self) -> None: super().setUp() self.register_providers() self.patient_id = self.fake.patient_id() self.date_event = self.fake.past_datetime() self.date_event_recorded = self.fake.past_datetime() self.valid_document_row = mock.Mock( _mapping={ S1GenericCol.PATIENT_ID: self.patient_id, S1GenericCol.EVENT_OCCURRED_WHEN: self.date_event, S1GenericCol.EVENT_RECORDED_WHEN: self.date_event_recorded, } ) self.mock_document_row = mock.Mock() self.mock_extracted_text_row = mock.Mock() self.mock_result = mock.Mock( one=self.mock_document_row, one_or_none=self.mock_extracted_text_row, ) self.mock_execute = mock.Mock(return_value=self.mock_result) self.mock_connection = mock.Mock(execute=self.mock_execute) self.mock_connect_cm = mock.Mock() self.mock_connect_cm.__enter__ = mock.Mock( return_value=self.mock_connection ) self.mock_connect_cm.__exit__ = mock_connection_exit self.mock_connect = mock.Mock(return_value=self.mock_connect_cm) self.mock_engine = mock.Mock(connect=self.mock_connect, spec=Engine) self.mock_s1_documents_table = mock.Mock() self.mock_drop = mock.Mock() self.mock_extracted_text_table = mock.Mock(drop=self.mock_drop) self.mock_metadata = mock.Mock( tables={ "S1_Documents": self.mock_s1_documents_table, CRATE_TABLE_EXTRACTED_TEXT: self.mock_extracted_text_table, } ) self.context = SystmOneContext["cpft_dw"] self.root_directory = tempfile.mkdtemp() self.storage = FileSystemStorage( root_path=self.root_directory, rel_path="tmp", ) self.mock_table_class = mock.Mock() self.extractor = SystmOneTextExtractor( self.mock_engine, self.mock_metadata, self.context, self.root_directory, ) self.mock_select_object = mock.Mock() self.mock_select_fn = mock.Mock(return_value=self.mock_select_object) self.mock_document_to_text = mock.Mock() self.mock_last_extracted = self.fake.past_datetime() self.mock_insert_values = mock.Mock() self.mock_insert_result = mock.Mock(values=self.mock_insert_values) self.mock_insert = mock.Mock(return_value=self.mock_insert_result) self.mock_update_values = mock.Mock() self.mock_update_result = mock.Mock(values=self.mock_update_values) self.mock_update = mock.Mock(return_value=self.mock_update_result)
def register_providers(self) -> None: self.fake.add_provider(RowIdentifierProvider) self.fake.add_provider(DocumentUidProvider) self.fake.add_provider(SubfolderProvider) self.fake.add_provider(IndexProvider) self.fake.add_provider(PatientIdProvider)
[docs] def tearDown(self) -> None: shutil.rmtree(self.root_directory)
def generate_filename( self, extension: str, row_identifier: int = None, document_uid: int = None, subfolder: int = None, index: int = None, ) -> str: if row_identifier is None: row_identifier = self.fake.row_identifier() if document_uid is None: document_uid = self.fake.document_uid() if subfolder is None: subfolder = self.fake.subfolder() if index is None: index = self.fake.index() return f"{row_identifier}_{document_uid:x}_{subfolder}_{index}.{extension}" # noqa: E501 def test_invalid_filename_skipped(self) -> None: filename = os.path.join(self.root_directory, "test.txt") content = self.fake.paragraph(nb_sentences=10) self.storage.write_text(filename, content) with self.assertLogs(level=logging.INFO) as logging_cm: self.extractor.extract_all() self.assert_logged( "crate_anon.preprocess.text_extractor", logging.INFO, f"Completely ignoring {filename}", logging_cm, ) def test_unknown_row_identifier_skipped(self) -> None: content = self.fake.paragraph(nb_sentences=10) row_identifier = self.fake.row_identifier() filename = os.path.join( self.root_directory, self.generate_filename("txt", row_identifier=row_identifier), ) self.storage.write_text(filename, content) self.mock_extracted_text_row.return_value = None with mock.patch.multiple( "crate_anon.preprocess.text_extractor", select=self.mock_select_fn, ): self.mock_document_row.side_effect = NoResultFound() with self.assertLogs(level=logging.ERROR) as logging_cm: self.extractor.extract_all() self.assert_logged( "crate_anon.preprocess.text_extractor", logging.ERROR, f"... no row found for RowIdentifier: {row_identifier}", logging_cm, ) def test_multiple_results_skipped(self) -> None: # Not seen in the real world but theoretically possible. content = self.fake.paragraph(nb_sentences=10) row_identifier = self.fake.row_identifier() filename = os.path.join( self.root_directory, self.generate_filename("txt", row_identifier=row_identifier), ) self.storage.write_text(filename, content) self.mock_extracted_text_row.return_value = None with mock.patch.multiple( "crate_anon.preprocess.text_extractor", select=self.mock_select_fn, ): self.mock_document_row.side_effect = MultipleResultsFound() with self.assertLogs(level=logging.ERROR) as logging_cm: self.extractor.extract_all() self.assert_logged( "crate_anon.preprocess.text_extractor", logging.ERROR, ( "... multiple rows found with RowIdentifier: " f"{row_identifier}" ), logging_cm, ) def test_row_inserted_into_table(self) -> None: content = self.fake.paragraph(nb_sentences=10) row_identifier = self.fake.row_identifier() document_uid = self.fake.document_uid() filename = os.path.join( self.root_directory, self.generate_filename( "txt", row_identifier=row_identifier, document_uid=document_uid ), ) self.storage.write_text(filename, content) self.mock_extracted_text_row.return_value = None self.mock_document_row.return_value = self.valid_document_row self.mock_document_to_text.return_value = content with mock.patch.multiple( "crate_anon.preprocess.text_extractor", select=self.mock_select_fn, document_to_text=self.mock_document_to_text, Pendulum=mock.Mock( now=mock.Mock(return_value=self.mock_last_extracted) ), insert=self.mock_insert, ): self.extractor.extract_all() values = dict( RowIdentifier=row_identifier, DocumentUID=f"{document_uid:x}", IDPatient=self.patient_id, DateEvent=self.date_event, DateEventRecorded=self.date_event_recorded, crate_file_path=str(Path(*Path(filename).parts[-2:])), crate_text=content, crate_text_last_extracted=self.mock_last_extracted, ) self.mock_insert_values.assert_called_once_with(**values) def test_row_not_inserted_when_already_extracted(self) -> None: content = self.fake.paragraph(nb_sentences=10) filename = os.path.join( self.root_directory, self.generate_filename("txt"), ) self.storage.write_text(filename, content) self.mock_extracted_text_row.return_value = mock.Mock( _mapping={ CRATE_COL_TEXT_LAST_EXTRACTED: self.fake.past_datetime(), } ) with mock.patch.multiple( "crate_anon.preprocess.text_extractor", select=self.mock_select_fn, insert=self.mock_insert, ): with self.assertLogs(level=logging.INFO) as logging_cm: self.extractor.extract_all() self.mock_insert_values.assert_not_called() self.assert_logged( "crate_anon.preprocess.text_extractor", logging.INFO, "... already extracted.", logging_cm, ) def test_null_text_inserted_when_extension_not_supported(self) -> None: content = self.fake.paragraph(nb_sentences=10) filename = os.path.join( self.root_directory, self.generate_filename("tex"), ) self.storage.write_text(filename, content) self.mock_extracted_text_row.return_value = None self.mock_document_row.return_value = self.valid_document_row self.mock_document_to_text.return_value = content with mock.patch.multiple( "crate_anon.preprocess.text_extractor", select=self.mock_select_fn, document_to_text=self.mock_document_to_text, insert=self.mock_insert, ): with self.assertLogs(level=logging.INFO) as logging_cm: self.extractor.extract_all() self.assert_logged( "crate_anon.preprocess.text_extractor", logging.INFO, "... unsupported file extension '.tex'.", logging_cm, ) args, kwargs = self.mock_insert_values.call_args self.assertIsNone(kwargs["crate_text"]) self.assertIsNone(kwargs["crate_text_last_extracted"]) def test_row_updated_in_table(self) -> None: content = self.fake.paragraph(nb_sentences=10) row_identifier = self.fake.row_identifier() document_uid = self.fake.document_uid() filename = os.path.join( self.root_directory, self.generate_filename( "txt", row_identifier=row_identifier, document_uid=document_uid ), ) self.storage.write_text(filename, content) self.mock_extracted_text_row.return_value = None self.mock_document_row.return_value = self.valid_document_row self.mock_document_to_text.return_value = content self.mock_execute.side_effect = [ self.mock_result, # check existing text self.mock_result, # check documents table IntegrityError(None, None, None), # insert ] with mock.patch.multiple( "crate_anon.preprocess.text_extractor", select=self.mock_select_fn, document_to_text=self.mock_document_to_text, Pendulum=mock.Mock( now=mock.Mock(return_value=self.mock_last_extracted) ), insert=self.mock_insert, update=self.mock_update, ): self.extractor.extract_all() values = dict( RowIdentifier=row_identifier, DocumentUID=f"{document_uid:x}", IDPatient=self.patient_id, DateEvent=self.date_event, DateEventRecorded=self.date_event_recorded, crate_file_path=str(Path(*Path(filename).parts[-2:])), crate_text=content, crate_text_last_extracted=self.mock_last_extracted, ) self.mock_update_values.assert_called_once_with(**values) def test_exception_from_text_conversion_handled(self) -> None: content = self.fake.paragraph(nb_sentences=10) row_identifier = self.fake.row_identifier() document_uid = self.fake.document_uid() filename = os.path.join( self.root_directory, self.generate_filename( "txt", row_identifier=row_identifier, document_uid=document_uid ), ) self.storage.write_text(filename, content) self.mock_extracted_text_row.return_value = None self.mock_document_row.return_value = self.valid_document_row self.mock_document_to_text.side_effect = Exception( "Something bad happened" ) with mock.patch.multiple( "crate_anon.preprocess.text_extractor", select=self.mock_select_fn, document_to_text=self.mock_document_to_text, insert=self.mock_insert, ): with self.assertLogs(level=logging.ERROR) as logging_cm: self.extractor.extract_all() self.assert_logged( "crate_anon.preprocess.text_extractor", logging.ERROR, ( "... caught exception from document_to_text: " "Something bad happened" ), logging_cm, ) args, kwargs = self.mock_insert_values.call_args self.assertIsNone(kwargs["crate_text"]) self.assertIsNone(kwargs["crate_text_last_extracted"]) def test_table_dropped(self) -> None: self.extractor.drop_table = True self.mock_table_class.return_value = self.mock_extracted_text_table self.mock_extracted_text_table.columns = [] with mock.patch.multiple( "crate_anon.preprocess.text_extractor", Table=self.mock_table_class, ): self.extractor.extract_all() self.mock_drop.assert_called_once_with(checkfirst=True) def test_columns_indexed(self) -> None: self.extractor.drop_table = True self.mock_table_class.return_value = self.mock_extracted_text_table mock_pk_column = mock.Mock() mock_pk_column.name = CRATE_COL_PK mock_row_id_column = mock.Mock() mock_row_id_column.name = S1GenericCol.ROW_ID mock_patient_id_column = mock.Mock() mock_patient_id_column.name = S1GenericCol.PATIENT_ID self.mock_extracted_text_table.columns = [ mock_pk_column, mock_row_id_column, mock_patient_id_column, ] mock_add_indexes = mock.Mock() mock_pk_info = mock.Mock() mock_row_id_info = mock.Mock() mock_patient_id_info = mock.Mock() mock_add_indexes = mock.Mock() mock_index_creation_info = mock.Mock( side_effect=[ mock_pk_info, mock_row_id_info, mock_patient_id_info, ] ) with mock.patch.multiple( "crate_anon.preprocess.text_extractor", Table=self.mock_table_class, add_indexes=mock_add_indexes, IndexCreationInfo=mock_index_creation_info, ): self.extractor.extract_all() mock_add_indexes.assert_any_call( self.mock_engine, self.mock_extracted_text_table, [mock_pk_info], ) mock_add_indexes.assert_any_call( self.mock_engine, self.mock_extracted_text_table, [mock_row_id_info], ) mock_add_indexes.assert_any_call( self.mock_engine, self.mock_extracted_text_table, [mock_patient_id_info], ) mock_index_creation_info.assert_any_call( index_name=f"{CRATE_IDX_PREFIX}_{CRATE_COL_PK}", column=CRATE_COL_PK, unique=False, ) mock_index_creation_info.assert_any_call( index_name=f"{CRATE_IDX_PREFIX}_{S1GenericCol.ROW_ID}", column=S1GenericCol.ROW_ID, unique=False, ) mock_index_creation_info.assert_any_call( index_name=f"{CRATE_IDX_PREFIX}_{S1GenericCol.PATIENT_ID}", column=S1GenericCol.PATIENT_ID, unique=False, )