Source code for crate_anon.nlp_manager.cloud_parser

"""
crate_anon/nlp_manager/cloud_parser.py

===============================================================================

    Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CRATE.

    CRATE is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CRATE is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CRATE. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

Send text to a cloud-based NLPRP server for processing.

.. todo:: cloud_parser: handle new ``tabular_schema`` info from server

"""

import logging
from typing import Any, Dict, List, Optional, Type

from cardinal_pythonlib.lists import chunks
from sqlalchemy.schema import Column, Index
from sqlalchemy import types as sqlatypes

from crate_anon.nlp_manager.nlp_definition import NlpDefinition
from crate_anon.nlp_manager.constants import ProcessorConfigKeys, NlpDefValues
from crate_anon.nlp_manager.output_user_config import OutputUserConfig
from crate_anon.nlprp.constants import NlprpKeys as NKeys, NlprpValues
from crate_anon.nlp_manager.base_nlp_parser import TableMaker
from crate_anon.nlp_webserver.server_processor import ServerProcessor

log = logging.getLogger(__name__)


# =============================================================================
# Cloud class for cloud-based processsors
# =============================================================================


[docs]class Cloud(TableMaker): """ EXTERNAL. Abstract NLP processor that passes information to a remote (cloud-based) NLP system via the NLPRP protocol. The processor at the other end might be of any kind. """ _is_cloud_processor = True
[docs] def __init__( self, nlpdef: Optional[NlpDefinition], cfg_processor_name: Optional[str], commit: bool = False, ) -> None: """ Args: nlpdef: :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition` cfg_processor_name: the config section for the processor commit: force a COMMIT whenever we insert data? You should specify this in multiprocess mode, or you may get database deadlocks. """ super().__init__( nlpdef, cfg_processor_name, commit, friendly_name="Cloud" ) self.remote_processor_info = None # type: Optional[ServerProcessor] self.schema_type = None self.sql_dialect = None self.schema = None # type: Optional[Dict[str, Any]] self.available_remotely = False # update later if available # Output section - bit of repetition from the 'Gate' parser self._outputtypemap = {} # type: Dict[str, OutputUserConfig] self._type_to_tablename = {} # type: Dict[str, str] self.tablename = None if not nlpdef and not cfg_processor_name: # Debugging only self.procname = "" self.procversion = "" self.format = "" else: self.procname = self._cfgsection.opt_str( ProcessorConfigKeys.PROCESSOR_NAME, required=True ) self.procversion = self._cfgsection.opt_str( ProcessorConfigKeys.PROCESSOR_VERSION, default=None ) # Made format required so people are less likely to make mistakes self.format = self._cfgsection.opt_str( ProcessorConfigKeys.PROCESSOR_FORMAT, required=True ) # Output section - bit of repetition from the 'Gate' parser typepairs = self._cfgsection.opt_strlist( ProcessorConfigKeys.OUTPUTTYPEMAP, required=True, lower=False ) # If typepairs is empty the following block won't execute for output_type, outputsection in chunks(typepairs, 2): output_type = output_type.lower() c = OutputUserConfig( nlpdef.parser, outputsection, schema_required=False ) self._outputtypemap[output_type] = c self._type_to_tablename[output_type] = c.dest_tablename if output_type == '""': self.tablename = c.dest_tablename
[docs] @staticmethod def get_coltype_parts(coltype_str: str) -> List[str]: """ Get root column type and parameter, i.e. for VARCHAR(50) root column type is VARCHAR and parameter is 50. """ parts = [x.strip() for x in coltype_str.replace(")", "").split("(")] if len(parts) == 1: col_str = parts[0] parameter = "" else: try: col_str, parameter = parts except ValueError: log.error(f"Invalid column type in response: {coltype_str}") raise try: # Turn the parameter into an integer if it's supposed to be one parameter = int(parameter) except ValueError: pass return [col_str, parameter]
[docs] @staticmethod def str_to_coltype_general(coltype_str: str) -> Type[sqlatypes.TypeEngine]: """ Get the sqlalchemy column type class which fits with the column type. """ coltype = getattr(sqlatypes, coltype_str) # Check if 'coltype' is really an sqlalchemy column type if issubclass(coltype, sqlatypes.TypeEngine): return coltype
[docs] def is_tabular(self) -> bool: """ Is the format of the schema information given by the remote processor tabular? """ return self.schema_type == NlprpValues.TABULAR
def get_tablename_from_type(self, output_type: str) -> str: return self._type_to_tablename[output_type] def get_otconf_from_type(self, output_type: str) -> OutputUserConfig: return self._outputtypemap[output_type] def _standard_columns_if_gate(self) -> List[Column]: """ Returns standard columns for GATE output if ``self.format`` is GATE. """ if self.format == NlpDefValues.FORMAT_GATE: return self._standard_gate_columns() else: return [] def _standard_indexes_if_gate(self) -> List[Index]: """ Returns standard indexes for GATE output if ``self.format`` is GATE. """ if self.format == NlpDefValues.FORMAT_GATE: return self._standard_gate_indexes() else: return [] def _confirm_available(self, available: bool = True) -> None: """ Set the attribute 'available_remotely', which indicates whether a requested processor is actually available from the specified server. """ self.available_remotely = available
[docs] def set_procinfo_if_correct( self, remote_processor: ServerProcessor ) -> None: """ Checks if a processor dictionary, with all the nlprp specified info a processor should have, belongs to this processor. If it does, then we add the information from the procesor dictionary. """ if self.procname != remote_processor.name: return # if ((self.procversion is None and # processor_dict[NKeys.IS_DEFAULT_VERSION]) or if ( self.procversion is None and remote_processor.is_default_version ) or (self.procversion == remote_processor.version): self._set_processor_info(remote_processor)
def _set_processor_info(self, remote_processor: ServerProcessor) -> None: """ Add the information from a processor dictionary. If it contains table information, this allows us to create the correct tables when the time comes. """ # This won't be called unless the remote processor is available self._confirm_available() self.remote_processor_info = remote_processor # self.name = processor_dict[NKeys.NAME] self.schema_type = remote_processor.schema_type if self.is_tabular(): self.schema = remote_processor.tabular_schema self.sql_dialect = remote_processor.sql_dialect # Check that, by this stage, we either have a tabular schema from # the processor, or we have user-specified destfields assert self.is_tabular or all( x.destfields for x in self._outputtypemap.values() ), ( "You haven't specified a table structure and the processor hasn't " "provided one." ) def _str_to_coltype(self, data_type_str: str) -> sqlatypes.TypeEngine: """ This is supposed to get column types depending on the sql dialect used by the server, but it's not implemented yet. """ raise NotImplementedError # if self.sql_dialect == SqlDialects.MSSQL: # return self._str_to_coltype_mssql(data_type_str) # elif self.sql_dialect == SqlDialects.MYSQL: # return self._str_to_coltype_mysql(data_type_str) # elif self.sql_dialect == SqlDialects.ORACLE: # return self._str_to_coltype_oracle(data_type_str) # elif self.sql_dialect == SqlDialects.POSTGRES: # return self._str_to_coltype_postgres(data_type_str) # elif self.sql_dialect == SqlDialects.SQLITE: # return self._str_to_coltype_sqlite(data_type_str) # else: # pass def _dest_tables_columns_user(self) -> Dict[str, List[Column]]: tables = {} # type: Dict[str, List[Column]] for output_type, otconfig in self._outputtypemap.items(): tables[ otconfig.dest_tablename ] = self._standard_columns_if_gate() + otconfig.get_columns( self.dest_engine ) return tables def _dest_tables_indexes_user(self) -> Dict[str, List[Index]]: tables = {} # type: Dict[str, List[Index]] for output_type, otconfig in self._outputtypemap.items(): tables[otconfig.dest_tablename] = ( self._standard_indexes_if_gate() + otconfig.indexes ) return tables def _dest_tables_columns_auto(self) -> Dict[str, List[Column]]: """ Gets the destination tables and their columns using the remote processor information. """ tables = {} for table, columns in self.schema.items(): column_objects = ( self._standard_columns_if_gate() ) # type: List[Column] if self.tablename: tablename = self.tablename else: tablename = self.get_tablename_from_type(table) # ... might be empty list for column in columns: col_str, parameter = self.get_coltype_parts( column[NKeys.COLUMN_TYPE] ) data_type_str = column[NKeys.DATA_TYPE] coltype = self.str_to_coltype_general(data_type_str) column_objects.append( Column( column[NKeys.COLUMN_NAME], coltype if not parameter else coltype(parameter), comment=column[NKeys.COLUMN_COMMENT], nullable=column[NKeys.IS_NULLABLE], ) ) tables[tablename] = column_objects return tables def _dest_tables_indexes_auto(self) -> Dict[str, List[Index]]: if self.format != NlpDefValues.FORMAT_GATE: return {} # indexes can't be returned by the server tables = {} # type: Dict[str, List[Index]] for table in self.schema: tables[table] = self._standard_gate_indexes() return tables
[docs] def dest_tables_indexes(self) -> Dict[str, List[Index]]: # Docstring in superclass if self._outputtypemap and all( x.destfields for x in self._outputtypemap.values() ): return self._dest_tables_indexes_user() elif self.is_tabular(): return self._dest_tables_indexes_auto() else: raise ValueError( "You haven't specified a table structure and " "the processor hasn't provided one." )
[docs] def dest_tables_columns(self) -> Dict[str, List[Column]]: # Docstring in superclass if self._outputtypemap and all( x.destfields for x in self._outputtypemap.values() ): return self._dest_tables_columns_user() elif self.is_tabular(): # Must have processor-defined schema because we already checked # for it return self._dest_tables_columns_auto() else: raise ValueError( "You haven't specified a table structure and " "the processor hasn't provided one." )