Source code for crate_anon.nlp_manager.input_field_config

#!/usr/bin/env python



    Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (

    This file is part of CRATE.

    CRATE is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CRATE is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CRATE. If not, see <>.


**Class to define input fields for NLP.**


import logging

# import sys
from typing import Any, Dict, Generator, List, Optional, Tuple

from cardinal_pythonlib.datetimefunc import get_now_utc_notz_datetime
from cardinal_pythonlib.hash import hash64
from cardinal_pythonlib.sql.validation import (
from cardinal_pythonlib.sqlalchemy.core_query import count_star
from cardinal_pythonlib.sqlalchemy.schema import (
from cardinal_pythonlib.timing import MultiTimerContext, timer
from sqlalchemy import BigInteger, Column, DateTime, Index, String, Table
from sqlalchemy.engine.base import Engine
from sqlalchemy.orm.session import Session
from sqlalchemy.sql import and_, column, exists, null, or_, select, table
from sqlalchemy.sql.schema import MetaData

from crate_anon.common.parallel import is_my_job_by_hash_prehashed
from crate_anon.common.stringfunc import relevant_for_nlp
from crate_anon.nlp_manager.constants import (
from crate_anon.nlp_manager.constants import (
from crate_anon.nlp_manager.models import NlpRecord
from crate_anon.nlp_manager.nlp_definition import (
from crate_anon.version import CRATE_VERSION

log = logging.getLogger(__name__)

TIMING_GEN_TEXT_SQL_SELECT = "gen_text_sql_select"
TIMING_PROCESS_GEN_TEXT = "process_generated_text"
TIMING_PROGRESS_DB_SELECT = "progress_db_select"
TIMING_PROGRESS_DB_DELETE = "progress_db_delete"

# =============================================================================
# Input field definition
# =============================================================================

[docs]class InputFieldConfig: """ Class defining an input field for NLP (containing text). See the documentation for the :ref:`NLP config file <nlp_config>`. """
[docs] def __init__(self, nlpdef: NlpDefinition, cfg_input_name: str) -> None: """ Read config from a configparser section, and also associate with a specific NLP definition. Args: nlpdef: :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition`, the master NLP definition, referring to the master config file etc. cfg_input_name: config section name for the input field definition """ = cfg_input_name cfg = nlpdef.get_config_section( full_sectionname(NlpConfigPrefixes.INPUT, cfg_input_name) ) self._nlpdef = nlpdef self._srcdb = cfg.opt_str(InputFieldConfigKeys.SRCDB) self._srctable = cfg.opt_str(InputFieldConfigKeys.SRCTABLE) self._srcpkfield = cfg.opt_str(InputFieldConfigKeys.SRCPKFIELD) self._srcfield = cfg.opt_str(InputFieldConfigKeys.SRCFIELD) self._srcdatetimefield = cfg.opt_str( InputFieldConfigKeys.SRCDATETIMEFIELD, required=False ) # ... new in v0.18.52 # Make these case-sensitive to avoid our failure in renaming SQLA # Column objects to be lower-case: self._copyfields = cfg.opt_multiline( InputFieldConfigKeys.COPYFIELDS ) # fieldnames self._indexed_copyfields = cfg.opt_multiline( InputFieldConfigKeys.INDEXED_COPYFIELDS ) self._debug_row_limit = cfg.opt_int( InputFieldConfigKeys.DEBUG_ROW_LIMIT, default=0 ) # self._fetch_sorted = opt_bool('fetch_sorted', default=True) ensure_valid_table_name(self._srctable) ensure_valid_field_name(self._srcpkfield) ensure_valid_field_name(self._srcfield) if self._srcdatetimefield: ensure_valid_field_name(self._srcdatetimefield) if len(set(self._indexed_copyfields)) != len(self._indexed_copyfields): raise ValueError( f"Redundant indexed_copyfields: {self._indexed_copyfields}" ) if len(set(self._copyfields)) != len(self._copyfields): raise ValueError(f"Redundant copyfields: {self._copyfields}") indexed_not_copied = set(self._indexed_copyfields) - set( self._copyfields ) if indexed_not_copied: raise ValueError( f"Fields in index_copyfields but not in " f"copyfields: {indexed_not_copied}" ) # allfields = [self._srcpkfield, self._srcfield] + self._copyfields # if len(allfields) != len(set(allfields)): # raise ValueError( # f"Field overlap in InputFieldConfig: {section}") # RE-THOUGHT: OK to copy source text fields etc. if desired. # It's fine in SQL to say SELECT a, a FROM mytable; self._db = nlpdef.get_database(self._srcdb)
@property def srcdb(self) -> str: """ Returns the name of the source database. """ return self._srcdb @property def srctable(self) -> str: """ Returns the name of the source table. """ return self._srctable @property def srcpkfield(self) -> str: """ Returns the name of the primary key (PK) field (column) in the source table. """ return self._srcpkfield @property def srcfield(self) -> str: """ Returns the name of the text field (column) in the source table. """ return self._srcfield @property def srcdatetimefield(self) -> str: # new in v0.18.52 """ Returns the name of the field (column) in the source table that defines the date/time of the source text. """ return self._srcdatetimefield @property def source_session(self) -> Session: """ Returns the SQLAlchemy ORM :class:`Session` for the source database. """ return self._db.session @property def _source_metadata(self) -> MetaData: """ Returns the SQLAlchemy :class:`MetaData` for the source database, used for reflection (inspection) of the source database structure. """ return self._db.metadata @property def _source_engine(self) -> Engine: """ Returns the SQLAlchemy Core :class:`Engine` for the source database. """ return self._db.engine @property def _progress_session(self) -> Session: """ Returns the SQLAlchemy ORM :class:`Session` for the progress database. """ return self._nlpdef.progressdb_session
[docs] @staticmethod def get_core_columns_for_dest() -> List[Column]: """ Returns the columns used NLP destination tables, primarily describing the source. See :ref:`Standard NLP output columns <standard_nlp_output_columns>`. Returns: a list of SQLAlchemy :class:`Column` objects """ return [ Column( FN_PK, BigInteger, primary_key=True, autoincrement=True, comment="Arbitrary primary key (PK) of output record", ), Column( FN_NLPDEF, SqlTypeDbIdentifier, comment="Name of the NLP definition producing this row", ), Column( FN_SRCDB, SqlTypeDbIdentifier, comment="Source database name (from CRATE NLP config)", ), Column( FN_SRCTABLE, SqlTypeDbIdentifier, comment="Source table name" ), Column( FN_SRCPKFIELD, SqlTypeDbIdentifier, comment="PK field (column) name in source table", ), Column( FN_SRCPKVAL, BigInteger, comment="PK of source record (or integer hash of PK if the " "PK is a string)", ), Column( FN_SRCPKSTR, String(MAX_STRING_PK_LENGTH), comment=f"NULL if the table has an integer PK, but the PK " f"if the PK was a string, to deal with hash " f"collisions. Max length: {MAX_STRING_PK_LENGTH}", ), Column( FN_SRCFIELD, SqlTypeDbIdentifier, comment="Field (column) name of source text", ), Column( FN_SRCDATETIMEFIELD, SqlTypeDbIdentifier, comment="Date/time field (column) name in source table", ), Column( FN_SRCDATETIMEVAL, DateTime, nullable=True, comment="Date/time of source field", ), Column( FN_CRATE_VERSION_FIELD, String(MAX_SEMANTIC_VERSION_STRING_LENGTH), nullable=True, comment="Version of CRATE that generated this NLP record.", ), Column( FN_WHEN_FETCHED, DateTime, nullable=True, comment="Date/time that the NLP processor fetched the " "record from the source database (in UTC).", ), ]
[docs] @staticmethod def get_core_indexes_for_dest() -> List[Index]: """ Returns the core indexes to be applied to the destination tables. Primarily, these are for columns that refer to the source. Returns: a list of SQLAlchemy :class:`Index` objects See - """ # noqa return [ Index( "_idx_srcref", # Remember, order matters; more to less specific # See also BaseNlpParser.delete_dest_record FN_SRCPKVAL, FN_NLPDEF, FN_SRCFIELD, FN_SRCTABLE, FN_SRCDB, FN_SRCPKSTR, ), Index("_idx_srcdate", FN_SRCDATETIMEVAL), Index( "_idx_deletion", # We sometimes delete just using the following; see # BaseNlpParser.delete_where_srcpk_not FN_NLPDEF, FN_SRCFIELD, FN_SRCTABLE, FN_SRCDB, ), ]
def _require_table_or_view_exists(self) -> None: """ Ensure that the source table exists, or raise :exc:`RuntimeError`. """ if not table_or_view_exists(self._source_engine, self._srctable): raise RuntimeError( f"Missing source table: {self._srcdb}.{self._srctable}" )
[docs] def get_copy_columns(self) -> List[Column]: """ Returns the columns that the user has requested to be copied from the source table to the NLP destination table. Returns: a list of SQLAlchemy :class:`Column` objects """ # We read the column type from the source database. self._require_table_or_view_exists() meta = self._source_metadata t = Table(self._srctable, meta, autoload=True) copy_columns = [] # type: List[Column] processed_copy_column_names = [] # type: List[str] for c in t.columns: # if in self._copyfields: if in self._copyfields: copied = c.copy() # Force lower case: # = # = quoted_name(, None) # ... this is not working properly. Keep getting an # "Unconsumed column names" error with e.g. a source field of # "Text". # Try making copyfields case-sensitive instead. copy_columns.append(copied) processed_copy_column_names.append( # Check all requested fields are present: missing = set(self._copyfields) - set(processed_copy_column_names) if missing: raise RuntimeError( f"The following fields were requested to be copied but are " f"absent from the source (NB case-sensitive): {missing}" ) # log.critical(copy_columns) return copy_columns
[docs] def get_copy_indexes(self) -> List[Index]: """ Returns indexes that should be made in the destination table for columns that the user has requested to be copied from the source. Returns: a list of SQLAlchemy :class:`Index` objects """ self._require_table_or_view_exists() meta = self._source_metadata t = Table(self._srctable, meta, autoload=True) copy_indexes = [] # type: List[Index] processed_copy_index_col_names = [] # type: List[str] for c in t.columns: # if in self._indexed_copyfields: if in self._indexed_copyfields: copied = c.copy() # See above re case. idx_name = f"idx_{}" copy_indexes.append(Index(idx_name, copied)) processed_copy_index_col_names.append( missing = set(self._indexed_copyfields) - set( processed_copy_index_col_names ) if missing: raise ValueError( f"The following fields were requested to be copied/indexed but" f" are absent from the source (NB case-sensitive): {missing}" ) return copy_indexes
[docs] def is_pk_integer(self) -> bool: """ Is the primary key (PK) of the source table an integer? """ pkcoltype = get_column_type( self._source_engine, self._srctable, self._srcpkfield ) if not pkcoltype: raise ValueError( f"Unable to get column type for column " f"{self._srctable}.{self._srcpkfield}" ) pk_is_integer = is_sqlatype_integer(pkcoltype) # log.debug(f"pk_is_integer: {pkcoltype!r} -> {pk_is_integer}") return pk_is_integer
[docs] def gen_text( self, tasknum: int = 0, ntasks: int = 1 ) -> Generator[Tuple[str, Dict[str, Any]], None, None]: """ Generate text strings from the source database, for NLP. Text fields that are NULL, empty, or contain only whitespace, are skipped. Yields: tuple: ``text, dict``, where ``text`` is the source text and ``dict`` is a column-to-value mapping for all other fields (source reference fields, copy fields). """ if 1 < ntasks <= tasknum: raise RuntimeError(f"Invalid tasknum {tasknum}; must be <{ntasks}") # --------------------------------------------------------------------- # Values that are constant to all items we will generate # (i.e. database/field *names*, plus CRATE version info) # --------------------------------------------------------------------- base_dict = { FN_SRCDB: self._srcdb, FN_SRCTABLE: self._srctable, FN_SRCPKFIELD: self._srcpkfield, FN_SRCFIELD: self._srcfield, FN_SRCDATETIMEFIELD: self._srcdatetimefield, FN_CRATE_VERSION_FIELD: CRATE_VERSION, } # --------------------------------------------------------------------- # Build a query # --------------------------------------------------------------------- session = self.source_session pkcol = column(self._srcpkfield) # ... don't use is_sqlatype_integer with this; it's a column clause, # not a full column definition. pk_is_integer = self.is_pk_integer() # Core columns colindex_pk = 0 colindex_text = 1 colindex_datetime = 2 colindex_remainder_start = 3 selectcols = [ pkcol, column(self._srcfield), column(self._srcdatetimefield) if self._srcdatetimefield else null(), # noqa ] # User-specified extra columns for extracol in self._copyfields: selectcols.append(column(extracol)) query = select(selectcols).select_from(table(self._srctable)) # not ordered... # if self._fetch_sorted: # query = query.order_by(pkcol) # --------------------------------------------------------------------- # Plan our parallel-processing approach # --------------------------------------------------------------------- distribute_by_hash = False if ntasks > 1: if pk_is_integer: # Integer PK, so we can be efficient and bake the parallel # processing work division into the SQL: query = query.where(pkcol % ntasks == tasknum) else: distribute_by_hash = True # --------------------------------------------------------------------- # Execute the query # --------------------------------------------------------------------- nrows_returned = 0 with MultiTimerContext(timer, TIMING_GEN_TEXT_SQL_SELECT): when_fetched = get_now_utc_notz_datetime() result = session.execute(query) for row in result: # ... "result" is a generator with MultiTimerContext(timer, TIMING_PROCESS_GEN_TEXT): # Get PK value pkval = row[colindex_pk] # Deal with non-integer PKs if pk_is_integer: hashed_pk = None # remove warning about reference before assignment # noqa else: hashed_pk = hash64(pkval) if ( distribute_by_hash and not is_my_job_by_hash_prehashed( hashed_pk, tasknum, ntasks ) ): continue # Optional debug limit on the number of rows if 0 < self._debug_row_limit <= nrows_returned: log.warning( f"Table {self._srcdb}.{self._srctable}: not " f"fetching more than {self._debug_row_limit} rows " f"(in total for this process) due to debugging " f"limits" ) result.close() # # noqa return # Get text text = row[colindex_text] # Skip text that is absent/empty/contains only whitespace: if not relevant_for_nlp(text): continue # We don't modify (e.g. strip) the text, because our NLP # processor may return relevant character positions, so we # want those to be correct with respect to the source. # Get everything else other_values = dict( zip(self._copyfields, row[colindex_remainder_start:]) ) if pk_is_integer: other_values[FN_SRCPKVAL] = pkval # an integer other_values[FN_SRCPKSTR] = None else: # hashed_pk will have been set above other_values[FN_SRCPKVAL] = hashed_pk # an integer other_values[FN_SRCPKSTR] = pkval # a string etc. other_values[FN_SRCDATETIMEVAL] = row[colindex_datetime] other_values[FN_WHEN_FETCHED] = when_fetched other_values.update(base_dict) if ( self._nlpdef.truncate_text_at and len(text) > self._nlpdef.truncate_text_at ): text = text[: self._nlpdef.truncate_text_at] other_values[TRUNCATED_FLAG] = True else: other_values[TRUNCATED_FLAG] = False # Yield the result yield text, other_values nrows_returned += 1
[docs] def get_count(self) -> int: """ Counts records in the source table. Used for progress monitoring. """ return count_star( session=self.source_session, tablename=self._srctable )
[docs] def get_progress_record( self, srcpkval: int, srcpkstr: str = None ) -> Optional[NlpRecord]: """ Fetch a progress record for the given source record, if one exists. Returns: :class:`crate_anon.nlp_manager.models.NlpRecord`, or ``None`` """ session = self._progress_session query = ( session.query(NlpRecord) .filter(NlpRecord.srcdb == self._srcdb) .filter(NlpRecord.srctable == self._srctable) .filter(NlpRecord.srcpkval == srcpkval) .filter(NlpRecord.srcfield == self._srcfield) .filter(NlpRecord.nlpdef == # Order not important (though the order of the index certainly # is; see NlpRecord.__table_args__). # # noqa ) if srcpkstr is not None: query = query.filter(NlpRecord.srcpkstr == srcpkstr) # log.critical(query) with MultiTimerContext(timer, TIMING_PROGRESS_DB_SELECT): # This was surprisingly slow under SQL Server testing. return query.one_or_none()
[docs] def gen_src_pks(self) -> Generator[Tuple[int, Optional[str]], None, None]: """ Generate integer PKs from the source table. For tables with an integer PK, yields tuples: ``pk_value, None``. For tables with a string PK, yields tuples: ``pk_hash, pk_value``. - Timing is subsumed under the timer named ``TIMING_DELETE_WHERE_NO_SOURCE``. """ session = self.source_session query = select([column(self._srcpkfield)]).select_from( table(self._srctable) ) result = session.execute(query) if self.is_pk_integer(): for row in result: yield row[0], None else: for row in result: pkval = row[0] yield hash64(pkval), pkval
[docs] def delete_progress_records_where_srcpk_not( self, temptable: Optional[Table] ) -> None: """ If ``temptable`` is None, deletes all progress records for this input field/NLP definition. If ``temptable`` is a table, deletes records from the progress database (from this input field/NLP definition) whose source PK is not in the temporary table. (Used for deleting NLP records when the source has subsequently been deleted.) """ progsession = self._progress_session log.debug( f"delete_progress_records_where_srcpk_not... " f"{self._srcdb}.{self._srctable} -> progressdb" ) prog_deletion_query = ( progsession.query(NlpRecord) .filter(NlpRecord.srcdb == self._srcdb) .filter(NlpRecord.srctable == self._srctable) . # unnecessary # filter(NlpRecord.srcpkfield == self._srcpkfield). filter(NlpRecord.nlpdef == ) if temptable is not None: log.debug("... deleting selectively") temptable_pkvalcol = temptable.columns[FN_SRCPKVAL] temptable_pkstrcol = temptable.columns[FN_SRCPKSTR] prog_deletion_query = prog_deletion_query.filter( ~exists().where( and_( NlpRecord.srcpkval == temptable_pkvalcol, or_( NlpRecord.srcpkstr == temptable_pkstrcol, and_( NlpRecord.srcpkstr.is_(None), temptable_pkstrcol.is_(None), ), ), ) ) ) else: log.debug("... deleting all") with MultiTimerContext(timer, TIMING_PROGRESS_DB_DELETE): prog_deletion_query.delete(synchronize_session=False) # # noqa self._nlpdef.commit(progsession)
[docs] def delete_all_progress_records(self) -> None: """ Deletes **all** records from the progress database for this NLP definition (across all source tables/columns). """ progsession = self._progress_session prog_deletion_query = progsession.query(NlpRecord).filter( NlpRecord.nlpdef == ) log.debug( f"delete_all_progress_records for NLP definition: " f"{}" ) with MultiTimerContext(timer, TIMING_PROGRESS_DB_DELETE): prog_deletion_query.delete(synchronize_session=False) self._nlpdef.commit(progsession)