Source code for crate_anon.nlp_manager.parse_gate

"""
crate_anon/nlp_manager/parse_gate.py

===============================================================================

    Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CRATE.

    CRATE is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CRATE is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CRATE. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**NLP handler for external GATE NLP tools.**

The pipe encoding (Python -> Java stdin, Java stdout -> Python) is fixed to be
UTF-8, here and in the Java code.

"""

import logging
import os
import shlex
import subprocess
from typing import Any, Dict, Generator, List, Tuple

from cardinal_pythonlib.cmdline import cmdline_quote
from cardinal_pythonlib.dicts import (
    rename_keys_in_dict,
    set_null_values_in_dict,
)
from cardinal_pythonlib.lists import chunks
from cardinal_pythonlib.tsv import tsv_pairs_to_dict
from sqlalchemy import Column, Index

from crate_anon.nlp_manager.base_nlp_parser import (
    BaseNlpParser,
    TextProcessingFailed,
)
from crate_anon.nlp_manager.constants import (
    MAX_SQL_FIELD_LEN,
    ProcessorConfigKeys,
    GateFieldNames as GateFN,
)
from crate_anon.nlp_manager.nlp_definition import (
    NlpDefinition,
)
from crate_anon.nlprp.constants import NlprpKeys, NlprpValues
from crate_anon.nlp_manager.output_user_config import OutputUserConfig

log = logging.getLogger(__name__)


# =============================================================================
# Process handling
# =============================================================================
# Have Python host the client process, communicating with stdin/stdout?
#   http://eyalarubas.com/python-subproc-nonblock.html
#   https://stackoverflow.com/questions/2715847/python-read-streaming-input-from-subprocess-communicate  # noqa
# Java process could be a network server.
#   http://docs.oracle.com/javase/tutorial/networking/sockets/clientServer.html
#   http://www.tutorialspoint.com/java/java_networking.htm
# OK, first one works; that's easier.


[docs]class Gate(BaseNlpParser): """ EXTERNAL. Abstract NLP processor controlling an external process, typically our Java interface to GATE programs, ``CrateGatePipeline.java`` (but it could be any external program). We send text to it, it parses the text, and it sends us back results, which we return as dictionaries. The specific text sought depends on the configuration file and the specific GATE program used. For details of GATE, see https://www.gate.ac.uk/. """ _ = """ Notes: - PROBLEM when attempting to use KConnect (Bio-YODIE): its source code is full of direct calls to ``System.out.println()``. POTENTIAL SOLUTIONS: - named pipes: - ``os.mkfifo()`` - Unix only. - ``win32pipe`` - https://stackoverflow.com/questions/286614 - ZeroMQ with some sort of security - ``pip install zmq`` - some sort of Java binding (``jzmq``, ``jeromq``...) - redirect ``stdout`` in our Java handler - ``System.setOut()``... yes, that works. - Implemented and exposed as ``--suppress_gate_stdout``. """ uses_external_tool = True
[docs] def __init__( self, nlpdef: NlpDefinition, cfg_processor_name: str, commit: bool = False, ) -> None: """ Args: nlpdef: a :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition` cfg_processor_name: the name of a CRATE NLP config file section (from which we may choose to get extra config information) commit: force a COMMIT whenever we insert data? You should specify this in multiprocess mode, or you may get database deadlocks. """ super().__init__( nlpdef=nlpdef, cfg_processor_name=cfg_processor_name, commit=commit, friendly_name="GATE", ) if not nlpdef and not cfg_processor_name: # Debugging only self._debug_mode = True self._max_external_prog_uses = 0 self._input_terminator = "input_terminator" self._output_terminator = "output_terminator" typepairs = [] # type: List[str] self._progenvsection = "" progargs = "" logtag = "" else: self._debug_mode = False self._max_external_prog_uses = self._cfgsection.opt_int_positive( ProcessorConfigKeys.MAX_EXTERNAL_PROG_USES, default=0 ) self._input_terminator = self._cfgsection.opt_str( ProcessorConfigKeys.INPUT_TERMINATOR, required=True ) self._output_terminator = self._cfgsection.opt_str( ProcessorConfigKeys.OUTPUT_TERMINATOR, required=True ) typepairs = self._cfgsection.opt_strlist( ProcessorConfigKeys.OUTPUTTYPEMAP, required=True, lower=False ) self._progenvsection = self._cfgsection.opt_str( ProcessorConfigKeys.PROGENVSECTION ) progargs = self._cfgsection.opt_str( ProcessorConfigKeys.PROGARGS, required=True ) logtag = nlpdef.logtag or "." self._outputtypemap = {} # type: Dict[str, OutputUserConfig] self._type_to_tablename = {} # type: Dict[str, str] for annottype, outputsection in chunks(typepairs, 2): annottype = annottype.lower() c = OutputUserConfig(nlpdef.parser, outputsection) self._outputtypemap[annottype] = c self._type_to_tablename[annottype] = c.dest_tablename if self._progenvsection: # noinspection PyTypeChecker self._env = nlpdef.get_env_dict(self._progenvsection, os.environ) else: self._env = os.environ.copy() self._env["NLPLOGTAG"] = logtag # ... We have ensured that this is not empty for real use, because # passing a "-lt" switch with no parameter will make # CrateGatePipeline.java complain and stop. The environment variable # is read via the "progargs" config argument, as follows. formatted_progargs = progargs.format(**self._env) self._progargs = shlex.split(formatted_progargs) self._n_uses = 0 self._pipe_encoding = "utf8" self._p = None # the subprocess self._started = False # Sanity checks for ty, tn in self._type_to_tablename.items(): assert ( len(tn) <= MAX_SQL_FIELD_LEN ), f"Table name too long (max {MAX_SQL_FIELD_LEN} characters)"
# ------------------------------------------------------------------------- # External process control # ------------------------------------------------------------------------- def _start(self) -> None: """ Launch the external process, with stdin/stdout connections to it. """ if self._started: return args = self._progargs log.info(f"Launching command: {cmdline_quote(args)}") self._p = subprocess.Popen( args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, # stderr=subprocess.PIPE, shell=False, bufsize=1, ) # ... don't ask for stderr to be piped if you don't want it; firstly, # there's a risk that if you don't consume it, something hangs, and # secondly if you don't consume it, you see it on the console, which is # helpful. self._started = True def _encode_to_subproc_stdin(self, text: str) -> None: """ Send text to the external program (via its stdin), encoding it in the process (typically to UTF-8). """ log.debug("SENDING: " + text) bytes_ = text.encode(self._pipe_encoding) self._p.stdin.write(bytes_) def _flush_subproc_stdin(self) -> None: """ Flushes what we're sending to the external program via its stdin. """ self._p.stdin.flush() def _decode_from_subproc_stdout(self) -> str: """ Decode what we've received from the external program's stdout, from its specific encoding (usually UTF-8) to a Python string. """ bytes_ = self._p.stdout.readline() text = bytes_.decode(self._pipe_encoding) log.debug("RECEIVING: " + repr(text)) return text def _finish(self) -> None: """ Close down the external process. """ if not self._started: return # close p.stdout, wait for the subprocess to exit self._p.communicate() self._started = False self._n_uses = 0 def _restart(self) -> None: """ Close down the external process and restart it. """ self._finish() self._start() # ------------------------------------------------------------------------- # Input processing # -------------------------------------------------------------------------
[docs] def parse( self, text: str ) -> Generator[Tuple[str, Dict[str, Any]], None, None]: """ - Send text to the external process, and receive the result. - Note that associated data is not passed into this function, and is kept in the Python environment, so we can't run into any problems with the transfer to/from the Java program garbling important data. All we send to the subprocess is the text (and an input_terminator). Then, we may receive MULTIPLE sets of data back ("your text contains the following 7 people/drug references/whatever"), followed eventually by the output_terminator, at which point this set is complete. """ self._start() # ensure started try: # Send log.debug("writing: " + text) self._encode_to_subproc_stdin(text) self._encode_to_subproc_stdin(os.linesep) self._encode_to_subproc_stdin(self._input_terminator + os.linesep) self._flush_subproc_stdin() # required in the Python 3 system # Receive for line in iter( self._decode_from_subproc_stdout, self._output_terminator + os.linesep, ): # ... iterate until the sentinel output_terminator is received line = line.rstrip("\n") # ... remove trailing newline, but NOT TABS # ... if you strip tabs, you get superfluous # "Bad chunk, not of length 2" messages. log.debug("stdout received: " + line) d = tsv_pairs_to_dict(line) log.debug(f"dictionary received: {d}") try: annottype = d[GateFN.TYPE].lower() except KeyError: raise ValueError("_type information not in data received") if annottype not in self._type_to_tablename: log.warning( f"Unknown annotation type, skipping: {annottype}" ) continue c = self._outputtypemap[annottype] rename_keys_in_dict(d, c.renames) set_null_values_in_dict(d, c.null_literals) yield self._type_to_tablename[annottype], d self._n_uses += 1 # Restart subprocess? if 0 < self._max_external_prog_uses <= self._n_uses: log.info(f"relaunching app after {self._n_uses} uses") self._restart() except BrokenPipeError: log.error("Broken pipe; relaunching app") self._restart() raise TextProcessingFailed()
# ------------------------------------------------------------------------- # Test # -------------------------------------------------------------------------
[docs] def test(self, verbose: bool = False) -> None: """ Test the :func:`send` function. """ if self._debug_mode: return self.test_parser( ["Bob Hope visited Seattle.", "James Joyce wrote Ulysses."] )
# ------------------------------------------------------------------------- # Database structure # -------------------------------------------------------------------------
[docs] def dest_tables_columns(self) -> Dict[str, List[Column]]: # docstring in superclass tables = {} # type: Dict[str, List[Column]] for anottype, otconfig in self._outputtypemap.items(): tables[ otconfig.dest_tablename ] = self._standard_gate_columns() + otconfig.get_columns( self.dest_engine ) return tables
[docs] def dest_tables_indexes(self) -> Dict[str, List[Index]]: # docstring in superclass tables = {} # type: Dict[str, List[Index]] for anottype, otconfig in self._outputtypemap.items(): tables[otconfig.dest_tablename] = ( self._standard_gate_indexes() + otconfig.indexes ) return tables
[docs] def nlprp_name(self) -> str: # docstring in superclass if self._debug_mode: return super().nlprp_name() else: return self._nlpdef.name
[docs] def nlprp_schema_info(self, sql_dialect: str = None) -> Dict[str, Any]: # We do not absolutely need to override nlprp_schema_info(). Although # CRATE's GATE processor doesn't automatically know its schema, it is # told by the config (i.e. by the user) and can pass on that # information. However, for debug mode, it's helpful to override. if self._debug_mode: return {NlprpKeys.SCHEMA_TYPE: NlprpValues.UNKNOWN} else: return super().nlprp_schema_info(sql_dialect)