Source code for crate_anon.preprocess.text_extractor

"""
crate_anon/preprocess/text_extractor.py

===============================================================================

    Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CRATE.

    CRATE is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CRATE is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CRATE. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**Extract text from a document store prior to anonymisation.**

"""

from dataclasses import dataclass
import logging
import os
from pathlib import Path
import re
import traceback
from typing import Generator, Optional, Tuple

from cardinal_pythonlib.extract_text import (
    document_to_text,
    ext_map,
    TextProcessingConfig,
)
from cardinal_pythonlib.sqlalchemy.schema import (
    make_bigint_autoincrement_column,
)
from pendulum import DateTime as Pendulum
from sqlalchemy import (
    BigInteger,
    Column,
    Connection,
    CursorResult,
    DateTime,
    insert,
    select,
    String,
    Table,
    UnicodeText,
    update,
)
from sqlalchemy.engine.base import Engine
from sqlalchemy.exc import IntegrityError, MultipleResultsFound, NoResultFound
from sqlalchemy.sql.schema import MetaData

from crate_anon.anonymise.constants import AnonymiseConfigDefaults
from crate_anon.common.sql import add_indexes, IndexCreationInfo

from crate_anon.preprocess.constants import (
    CRATE_COL_FILE_PATH,
    CRATE_COL_PK,
    CRATE_COL_TEXT,
    CRATE_COL_TEXT_LAST_EXTRACTED,
    CRATE_IDX_PREFIX,
    CRATE_TABLE_EXTRACTED_TEXT,
)
from crate_anon.preprocess.systmone_ddgen import (
    contextual_tablename,
    S1GenericCol,
    S1Table,
    SystmOneContext,
)

log = logging.getLogger(__name__)


class TextExtractor:
    def __init__(
        self,
        engine: Engine,
        metadata: MetaData,
        context: SystmOneContext,
        root_directory: str,
        drop_table: bool = False,
        plain: bool = AnonymiseConfigDefaults.EXTRACT_TEXT_PLAIN,
        width: int = AnonymiseConfigDefaults.EXTRACT_TEXT_WIDTH,
    ) -> None:
        self.engine = engine
        self.metadata = metadata
        self.context = context
        self.root_directory = root_directory
        self.drop_table = drop_table
        self.plain = plain
        self.width = width

        self.extensions = list(ext_map)
        self.extensions.remove(None)

    def extract_all(self) -> None:
        self.create_table()
        self.process_files()

    def create_table(self) -> None:
        self.extracted_text_table = self.metadata.tables.get(
            CRATE_TABLE_EXTRACTED_TEXT
        )

        drop_table = self.extracted_text_table is not None and self.drop_table
        create_table = self.extracted_text_table is None or self.drop_table

        if drop_table:
            self.extracted_text_table.drop(checkfirst=True)

        if create_table:
            self.extracted_text_table = self.get_table_definition()
            self.extracted_text_table.create(self.engine, checkfirst=True)
            self.index_table()

    def generate_filenames(self) -> Generator[Tuple[str, str], None, None]:
        log.info(f"Extracting text from {self.root_directory}...")
        for dirpath, dirnames, filenames in os.walk(self.root_directory):
            log.debug(f"Processing {dirpath}")
            for filename in filenames:
                yield dirpath, filename

    def index_table(self) -> None:
        for column in self.extracted_text_table.columns:
            colname = column.name
            if colname in self.indexed_column_names:
                idxname = f"{CRATE_IDX_PREFIX}_{colname}"
                add_indexes(
                    self.engine,
                    self.extracted_text_table,
                    [
                        IndexCreationInfo(
                            index_name=idxname, column=colname, unique=False
                        )
                    ],
                )

    def process_files(self) -> None:
        raise NotImplementedError(
            "Implement 'process_files()' in derived class!"
        )

    @property
    def indexed_column_names(self) -> list[str]:
        raise NotImplementedError(
            "Implement 'indexed_column_names' property in derived class!"
        )

    def get_table_definition(self) -> Table:
        raise NotImplementedError(
            "Implement 'get_table_definition()' in derived class!"
        )

    def extract_text_from_file(
        self, full_path: str, extension: str
    ) -> Tuple[Optional[str], Pendulum]:
        last_extracted = None
        text = None
        if extension in self.extensions:
            log.info("... extracting text...")
            try:
                config = TextProcessingConfig(
                    width=self.width, plain=self.plain
                )
                text = document_to_text(filename=full_path, config=config)
                log.info("... extracted.")
            except Exception as e:
                traceback.print_exc()
                log.error(f"... caught exception from document_to_text: {e}")
        else:
            log.info(f"... unsupported file extension '{extension}'.")

        if text is not None:
            last_extracted = Pendulum.now()

        return text, last_extracted


[docs]@dataclass class SystmOneDocumentInfo: full_path: str row_identifier: int document_uid: str extension: str def __post_init__(self) -> None: self.relative_path = str(Path(*Path(self.full_path).parts[-2:]))
class SystmOneTextExtractor(TextExtractor): indexed_column_names = [ CRATE_COL_PK, S1GenericCol.ROW_ID, S1GenericCol.PATIENT_ID, ] def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.documents_table = self.metadata.tables[ contextual_tablename(S1Table.DOCUMENTS, self.context) ] def get_table_definition(self) -> Table: return Table( CRATE_TABLE_EXTRACTED_TEXT, self.metadata, make_bigint_autoincrement_column(CRATE_COL_PK), Column( S1GenericCol.ROW_ID, BigInteger, comment="FK to S1_Documents", nullable=False, ), Column( S1GenericCol.PATIENT_ID, BigInteger, comment="Patient ID from S1_Documents", nullable=False, ), Column( "DocumentUID", String(16), comment="Unique ID of document", nullable=False, ), Column( S1GenericCol.EVENT_OCCURRED_WHEN, DateTime, comment="DateEvent from S1_Documents", nullable=False, ), Column( S1GenericCol.EVENT_RECORDED_WHEN, DateTime, comment="DateEventRecorded from S1_Documents", nullable=False, ), Column( CRATE_COL_FILE_PATH, String(255), comment="Path relative to docstore", unique=True, ), Column( CRATE_COL_TEXT, UnicodeText, comment="Extracted text from file", ), Column( CRATE_COL_TEXT_LAST_EXTRACTED, DateTime, comment="Date/time text was last extracted", ), ) def process_files(self) -> None: with self.engine.connect() as connection: for doc_info in self.generate_matches(): self.process_file(connection, doc_info) connection.commit() def process_file( self, connection: Connection, doc_info: SystmOneDocumentInfo ) -> None: log.info(f"Processing {doc_info.full_path}...") if self.already_extracted(connection, doc_info): log.info("... already extracted.") return row = self.get_documents_table_row(connection, doc_info) if row is not None: patient_id = row._mapping[S1GenericCol.PATIENT_ID] date_event = row._mapping[S1GenericCol.EVENT_OCCURRED_WHEN] date_event_recorded = row._mapping[ S1GenericCol.EVENT_RECORDED_WHEN ] self.extract_text_into_database( connection, doc_info, patient_id, date_event, date_event_recorded, ) def already_extracted( self, connection: Connection, doc_info: SystmOneDocumentInfo ) -> bool: row = self.get_extracted_text_table_row(connection, doc_info) if row is None: return False last_extracted = row._mapping[CRATE_COL_TEXT_LAST_EXTRACTED] return last_extracted is not None def get_extracted_text_table_row( self, connection: Connection, doc_info: SystmOneDocumentInfo ) -> CursorResult: relative_path = doc_info.relative_path statement = select(self.extracted_text_table).where( self.extracted_text_table.c.crate_file_path == relative_path ) return connection.execute(statement).one_or_none() def extract_text_into_database( self, connection: Connection, doc_info: SystmOneDocumentInfo, patient_id: int, date_event: DateTime, date_event_recorded: DateTime, ) -> None: text, last_extracted = self.extract_text_from_file( doc_info.full_path, doc_info.extension ) values = dict( RowIdentifier=doc_info.row_identifier, DocumentUID=doc_info.document_uid, IDPatient=patient_id, DateEvent=date_event, DateEventRecorded=date_event_recorded, crate_file_path=doc_info.relative_path, crate_text=text, crate_text_last_extracted=last_extracted, ) statement = insert(self.extracted_text_table).values(**values) try: connection.execute(statement) except IntegrityError: statement = ( update(self.extracted_text_table) .values(**values) .where( self.extracted_text_table.c.crate_file_path == doc_info.relative_path ) ) def get_documents_table_row( self, connection: Connection, doc_info: SystmOneDocumentInfo ) -> CursorResult: row = None row_identifier = doc_info.row_identifier statement = select(self.documents_table).where( self.documents_table.c.RowIdentifier == row_identifier ) try: row = connection.execute(statement).one() except NoResultFound: log.error(f"... no row found for RowIdentifier: {row_identifier}") except MultipleResultsFound: log.error( "... multiple rows found with RowIdentifier: " f"{row_identifier}" ) return row def generate_matches( self, ) -> Generator[Tuple[SystmOneDocumentInfo], None, None]: # Groups: # 1: RowIdentifier # 2: DocumentUID (sometimes incorrectly set to IDOrganisation) # 3: Subfolder 1-4 # 4: Index where document split across files # 5: Extension, mixed case regex = r"(\d+)_([0-9a-f]+)_(\d+)_(\d+)(\.\S+)" for dirpath, filename in self.generate_filenames(): file_path = os.path.join(dirpath, filename) if m := re.match(regex, filename): yield SystmOneDocumentInfo( full_path=file_path, row_identifier=int(m.group(1)), document_uid=m.group(2), extension=m.group(5).lower(), ) else: log.info(f"Completely ignoring {file_path}")