r"""
crate_anon/nlp_manager/parse_medex.py
===============================================================================
Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CRATE.
CRATE is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CRATE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CRATE. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
**NLP handler for the external MedEx-UIMA tool, to find references to
drugs (medication.**
- MedEx-UIMA
- can't find Python version of MedEx (which preceded MedEx-UIMA)
- paper on Python version is
https://www.ncbi.nlm.nih.gov/pmc/articles/PMC2995636/; uses Python NLTK
- see notes in Documents/CRATE directory
- MedEx-UIMA is in Java, and resolutely uses a file-based processing system;
``Main.java`` calls ``MedTagger.java`` (``MedTagger.run_batch_medtag``),
and even in its core ``MedTagger.medtagging()`` function it's making files
in directories; that's deep in the core of its NLP thinking so we can't
change that behaviour without creating a fork. So the obvious way to turn
this into a proper "live" pipeline would be for the calling code to
- fire up a receiving process - Python launching custom Java
- create its own temporary directory - Python
- receive data - Python
- stash it on disk - Python
- call the MedEx function - Python -> stdout -> custom Java -> MedEx
- return the results - custom Java signals "done" -> Python reads stdin?
- and clean up - Python
Not terribly elegant, but might be fast enough (and almost certainly much
faster than reloading Java regularly!).
- output comes from its ``MedTagger.print_result()`` function
- would need a per-process-unique temporary directory, since it scans all
files in the input directory (and similarly one output directory); would do
that in Python
MedEx-UIMA is firmly (and internally) wedded to a file-based processing
system. So we need to:
- create a process-specific pair of temporary directories;
- fire up a receiving process
- pass data (1) to file and (2) signal that there's data available;
- await a "data ready" reply and read the data from disk;
- clean up (delete files) in readiness for next data chunk.
NOTE ALSO that MedEx's ``MedTagger`` class writes to ``stdout`` (though not
``stderr``). Option 1: move our logs to ``stdout`` and use ``stderr`` for
signalling. Option 2: keep things as they are and just use a ``stdout`` signal
that's not used by MedEx. Went with option 2; simpler and more consistent esp.
for logging.
How do we clean up the temporary directories?
- ``__del__`` is not the opposite of ``__init__``;
https://www.algorithm.co.il/blogs/programming/python-gotchas-1-__del__-is-not-the-opposite-of-__init__/
- https://eli.thegreenplace.net/2009/06/12/safely-using-destructors-in-python
PROBLEMS:
- NLP works fine, but UK-style abbreviations e.g. "qds" not recognized where
"q.i.d." is. US abbreviations: e.g.
https://www.d.umn.edu/medweb/Modules/Prescription/Abbreviations.html
- Places to look, and things to try adding:
.. code-block:: none
resources/TIMEX/norm_patterns/NormFREQword
qds=>R1P6H
resources/TIMEX/rules/frequency_rules
//QID ( 4 times a day
expression="[Qq]\.?[Ii]\.?[Dd]\.?[ ]*\((.*?)\)",val="R1P6H"
// RNC: qds
expression="[Qq]\.?[Dd]\.?[Ss]\.?[ ]*\((.*?)\)",val="R1P6H"
... looked like it was correct, but not working
... are this files compiled in, rather than being read live?
... do I have the user or the developer version?
... not there yet.
Probably need to recompile. See MedEx's Readme.txt
- reference to expression/val (as in frequency_rules):
.. code-block:: none
TIMEX.Rule._add_rule()
... from TIMEX.Rule.Rule via a directory walker
... from TIMEX.ProcessingEngine.ProcessingEngine()
... via semi-hardcoded file location relative to class's location
... via rule_dir, set to .../TIMEX/rules
- Detect a file being accessed:
.. code-block:: bash
sudo apt install inotify-tools
inotifywait -m FILE
... frequency_rules IS opened.
- OVERALL SEQUENCE:
.. code-block:: none
org.apache.medex.Main [OR: CrateNedexPipeline.java]
org.apache.medex.MedTagger.run_batch_medtag
... creeates an org.apache.NLPTools.Document
... not obviously doing frequency stuff, or drug recognition
... then runs org.apache.medex.MedTagger.medtagging(doc)
... this does most of the heavy lifting, I think
... uses ProcessingEngine freq_norm_engine
... org.apache.TIMEX.ProcessingEngine
... but it may be that this just does frequency NORMALIZATION, not frequency finding
... uses SemanticRuleEngine rule_engine
... which is org.apache.medex.SemanticRuleEngine
... see all the regexlist.put(..., "FREQ") calls
... note double-escaping \\ for Java's benefit
- Rebuilding MedEx:
.. code-block:; bash
export MEDEX_DIR=~/dev/MedEx_UIMA_1.3.6 # or similar
cd ${MEDEX_DIR}
# OPTIONAL # find . -name "*.class" -exec rm {} \; # remove old compiled files
javac \
-classpath "${MEDEX_DIR}/src:${MEDEX_DIR}/lib/*" \
src/org/apache/medex/Main.java \
-d bin
# ... will also compile dependencies
See build_medex_itself.py
- YES. If you add to ``org.apache.medex.SemanticRuleEngine``, with extra
entries in the ``regexlist.put(...)`` sequence, new frequencies appear in the
output.
To get them normalized as well, add them to frequency_rules.
Specifics:
(a) SemanticRuleEngine.java
.. code-block:: java
// EXTRA FOR UK FREQUENCIES (see https://www.evidence.nhs.uk/formulary/bnf/current/general-reference/latin-abbreviations)
// NB case-insensitive regexes in SemanticRuleEngine.java, so ignore case here
regexlist.put("^(q\\.?q\\.?h\\.?)( |$)", "FREQ"); // qqh, quarta quaque hora (RNC)
regexlist.put("^(q\\.?d\\.?s\\.?)( |$)", "FREQ"); // qds, quater die sumendum (RNC); must go before existing competing expression: regexlist.put("^q(\\.|)\\d+( |$)","FREQ");
regexlist.put("^(t\\.?d\\.?s\\.?)( |$)", "FREQ"); // tds, ter die sumendum (RNC)
regexlist.put("^(b\\.?d\\.?)( |$)", "FREQ"); // bd, bis die (RNC)
regexlist.put("^(o\\.?d\\.?)( |$)", "FREQ"); // od, omni die (RNC)
regexlist.put("^(mane)( |$)", "FREQ"); // mane (RNC)
regexlist.put("^(o\\.?m\\.?)( |$)", "FREQ"); // om, omni mane (RNC)
regexlist.put("^(nocte)( |$)", "FREQ"); // nocte (RNC)
regexlist.put("^(o\\.?n\\.?)( |$)", "FREQ"); // on, omni nocte (RNC)
regexlist.put("^(fortnightly)( |$)", "FREQ"); // fortnightly (RNC)
regexlist.put("^((?:2|two)\s+weekly)\b", "FREQ"); // fortnightly (RNC)
regexlist.put("argh", "FREQ"); // fortnightly (RNC)
// ALREADY IMPLEMENTED BY MedEx: tid (ter in die)
// NECESSITY, NOT FREQUENCY: prn (pro re nata)
// TIMING, NOT FREQUENCY: ac (ante cibum); pc (post cibum)
(b) frequency_rules
.. code-block:: none
// EXTRA FOR UK FREQUENCIES (see https://www.evidence.nhs.uk/formulary/bnf/current/general-reference/latin-abbreviations)
// NB case-sensitive regexes in Rule.java, so offer upper- and lower-case alternatives here
// qqh, quarta quaque hora (RNC)
expression="\b[Qq]\.?[Qq]\.?[Hh]\.?\b",val="R1P4H"
// qds, quater die sumendum (RNC); MUST BE BEFORE COMPETING "qd" (= per day) expression: expression="[Qq]\.?[ ]?[Dd]\.?",val="R1P24H"
expression="\b[Qq]\.?[Dd]\.?[Ss]\.?\b",val="R1P6H"
// tds, ter die sumendum (RNC)
expression="\b[Tt]\.?[Dd]\.?[Ss]\.?\b",val="R1P8H"
// bd, bis die (RNC)
expression="\b[Bb]\.?[Dd]\.?\b",val="R1P12H"
// od, omni die (RNC)
expression="\b[Oo]\.?[Dd]\.?\b",val="R1P24H"
// mane (RNC)
expression="\b[Mm][Aa][Nn][Ee]\b",val="R1P24H"
// om, omni mane (RNC)
expression="\b[Oo]\.?[Mm]\.?\b",val="R1P24H"
// nocte (RNC)
expression="\b[Nn][Oo][Cc][Tt][Ee]\b",val="R1P24H"
// on, omni nocte (RNC)
expression="\b[Oo]\.?[Nn]\.?\b",val="R1P24H"
// fortnightly and variants (RNC); unsure if TIMEX3 format is right
expression="\b[Ff][Oo][Rr][Tt][Nn][Ii][Gg][Hh][Tt][Ll][Yy]\b",val="R1P2WEEK"
expression="\b(?:2|[Tt][Ww][Oo])\s+[Ww][Ee][Ee][Kk][Ll][Yy]\b",val="R1P2WEEK"
// monthly (RNC)
expression="\b[Mm][Oo][Nn][Tt][Hh][Ll][Yy]\b",val="R1P1MONTH"
//
// ALREADY IMPLEMENTED BY MedEx: tid (ter in die)
// NECESSITY, NOT FREQUENCY: prn (pro re nata)
// TIMING, NOT FREQUENCY: ac (ante cibum); pc (post cibum)
(c) source:
- https://www.evidence.nhs.uk/formulary/bnf/current/general-reference/latin-abbreviations
- How about routes of administration?
.. code-block:: none
MedTagger.printResult()
route is in FStr_list[5]
... called from MedTagger.medtagging()
route is in FStr_list_final[5]
before that, is in FStr (separated by \n)
... from formatDruglist
...
... from logs, appears first next to "input for tagger" at
which point it's in
sent_token_array[j] (e.g. "po")
sent_tag_array[j] (e.g. "RUT" = route)
... from tag_dict
... from filter_tags
... from (Document) doc.filtered_drug_tag()
...
... ?from MedTagger.medtagging() calling doc.add_drug_tag()
... no, not really; is in this bit:
SuffixArray sa = new SuffixArray(...);
Vector<SuffixArrayResult> result = sa.search();
... and then each element of result has a "semantic_type"
member that can be "RUT"
... SuffixArray.search()
semantic_type=this.lex.sem_list().get(i);
... where lex comes from MedTagger:
this.lex = new Lexicon(this.lex_fname);
... Lexicon.sem_list() returns Lexicon.semantic_list
... Lexicon.Lexicon() constructs using MedTagger's this.lex_fname
... which is lexicon.cfg
... aha! There it is. If a line in lexicon.cfg has a RUT tag, it'll
appear as a route. So:
grep "RUT$" lexicon.cfg | sort # and replace tabs with spaces
bedside RUT
by mouth RUT
drip RUT
gt RUT
g tube RUT
g-tube RUT
gtube RUT
im injection RUT
im RUT
inhalation RUT
inhalatn RUT
inhaled RUT
intramuscular RUT
intravenously RUT
intravenous RUT
iv RUT
j tube RUT
j-tube RUT
jtube RUT
nare RUT
nares RUT
naris RUT
neb RUT
nostril RUT
orally RUT
oral RUT
ou RUT
patch DDF-DOSEUNIT-RUT
per gt RUT
per mouth RUT
per os RUT
per rectum RUT
per tube RUT
p. g RUT
pgt RUT
png RUT
pnj RUT
p.o RUT
po RUT
sc RUT
sl RUT
sq RUT
subc RUT
subcu RUT
subcutaneously RUT
subcutaneous RUT
subcut RUT
subling RUT
sublingual RUT
sub q RUT
subq RUT
swallow RUT
swish and spit RUT
sw&spit RUT
sw&swall RUT
topically RUT
topical RUT
topical tp RUT
trans RUT
with spacer RUT
Looks like these are not using synonyms. Note also format is ``route\tRUT``
Note also that the first element is always forced to lower case (in
Lexicon.Lexicon()), so presumably it's case-insensitive.
There's no specific comment format (though any line that doesn't resolve to
two items when split on a tab looks like it's ignored).
So we might want to add more; use
.. code-block:: bash
build_medex_itself.py --extraroutes >> lexicon.cfg
- Note that all frequencies and routes must be in the lexicon.
And all frequencies must be in ``SemanticRuleEngine.java`` (and, to be
normalized, frequency_rules).
- USEFUL BIT FOR CHECKING RESULTS:
.. code-block:: sql
SELECT
sentence_text,
drug, generic_name,
form, strength, dose_amount,
route, frequency, frequency_timex3,
duration, necessity
FROM anonymous_output.drugs;
- ENCODING
- Pipe encoding (to Java's ``stdin``, from Java's ``stdout``) encoding is the
less important as we're only likely to send/receive ASCII. It's hard-coded
to UTF-8.
- File encoding is vital and is hard-coded to UTF-8 here and in the
receiving Java.
- We have no direct influence over the MedTagger code for output (unless we
modify it). The output function is ``MedTagger.print_result()``, which
(line 2040 of ``MedTagger.java``) calls ``out.write(stuff)``.
The out variable is set by
.. code-block:: java
this.out = new BufferedWriter(new FileWriter(output_dir
+ File.separator + doc.fname()));
That form of the FileWriter constructor, ``FileWriter(String fileName)``,
uses the "default character encoding", as per
https://docs.oracle.com/javase/7/docs/api/java/io/FileWriter.html
That default is given by ``System.getProperty("file.encoding")``. However,
we don't have to do something daft like asking the Java to report its file
encoding to Python through a pipe; instead, we can set the Java default
encoding. It can't be done dynamically, but it can be done at JVM launch:
https://stackoverflow.com/questions/361975/setting-the-default-java-character-encoding.
Therefore, we should have a Java parameter specified in the config file as
``-Dfile.encoding=UTF-8``.
""" # noqa
import logging
import os
import shlex
import subprocess
import tempfile
from typing import Any, Dict, Generator, List, Optional, Tuple
from cardinal_pythonlib.cmdline import cmdline_quote
from cardinal_pythonlib.fileops import mkdir_p
from sqlalchemy import Column, Index, Integer, String, Text
from crate_anon.nlp_manager.base_nlp_parser import (
BaseNlpParser,
TextProcessingFailed,
)
from crate_anon.nlp_manager.constants import (
MEDEX_DATA_READY_SIGNAL,
MEDEX_RESULTS_READY_SIGNAL,
ProcessorConfigKeys,
)
from crate_anon.nlp_manager.nlp_definition import (
NlpDefinition,
)
log = logging.getLogger(__name__)
# =============================================================================
# Constants
# =============================================================================
DATA_FILENAME = "crate_medex.txt"
DATA_FILENAME_KEEP = "crate_medex_{}.txt"
USE_TEMP_DIRS = True
# ... True for production; False to see e.g. logs afterwards, by keeping
# everything in a subdirectory of the user's home directory (see hard-coded
# nastiness -- for debugging only)
SKIP_IF_NO_GENERIC = True
# ... Probably should be True. MedEx returns hits for drug "Thu" with no
# generic drug; this from its weekday lexicon, I think.
# -----------------------------------------------------------------------------
# Maximum field lengths
# -----------------------------------------------------------------------------
# https://phekb.org/sites/phenotype/files/MedEx_UIMA_eMERGE_short.pdf
#
# RxNorm: https://www.nlm.nih.gov/research/umls/rxnorm/overview.html
#
# UMLS: https://www.nlm.nih.gov/research/umls/new_users/glossary.html
# UMLS CUI max length: https://www.nlm.nih.gov/research/umls/knowledge_sources/metathesaurus/release/columns_data_elements.html # noqa
UMLS_CUI_MAX_LENGTH = 8 # definite
# TIMEX3:
# - http://www.timeml.org/tempeval2/tempeval2-trial/guidelines/timex3guidelines-072009.pdf # noqa
# - http://www.timeml.org/publications/timeMLdocs/timeml_1.2.1.html#timex3 # noqa
TIMEX3_MAX_LENGTH = 50 # guess
# Drug length:
# There are long ones, like
# "influenza virus vaccine, inactivated a-brisbane-59-2007, ivr-148 (h1n1) strain" (78) # noqa
# See e.g. resources/rxcui_generic.cfg, and:
# $ wc -L filename # shows length of longest line
# $ egrep -n "^.{$(wc -L < filename)}$" filename # shows longest line
# ... possibly this gets a bit confused by tabs, can also put a length in:
# $ egrep -n "^.{302}$" filename # shows lines of length 302
# And we find a drug of length 286:
# 1-oxa-7-azacyclopentadecan-15-one,13-((2,6-dideoy-3-c-methyl-3-o-methyl-alpha-l-ribo-hexopyranosyl)oxy)-2-ethyl-3,4,10-trihydroxy 3,5,8,10,12,14-hexamethyl-7-propyl-11-((3,4,6-trideoxy-3-(dimethylamino)-beta-d-xylo-hexopyranosyl)oxy)-, ((2r*, 3s*,4r*,5s*,8r*,10r*,11r*,12s*,13s*,14r*))- # noqa
# Then there are multivitamin things in brand_generic with length >600.
# So we should use an unlimited field; SQLAlchemy helpfully seems to translate
# Text to VARCHAR(MAX) under SQL Server, which is the more efficient:
# https://stackoverflow.com/questions/834788/using-varcharmax-vs-text-on-sql-server # noqa
MEDEX_MAX_FORM_LENGTH = 255 # guess; "Powder For Oral Suspension" (26) is one
MEDEX_MAX_STRENGTH_LENGTH = 50 # guess
MEDEX_MAX_DOSE_AMOUNT_LENGTH = 50 # guess
MEDEX_MAX_ROUTE_LENGTH = 50 # guess
MEDEX_MAX_FREQUENCY_LENGTH = 50 # guess
MEDEX_MAX_DURATION_LENGTH = 50 # guess
MEDEX_MAX_NECESSITY_LENGTH = 50 # guess
# =============================================================================
# Medex
# =============================================================================
[docs]class PseudoTempDir:
"""
This class exists so that a TemporaryDirectory and a manually specified
directory can be addressed via the same (very simple!) interface.
"""
[docs] def __init__(self, name: str) -> None:
self.name = name
[docs]class Medex(BaseNlpParser):
"""
EXTERNAL.
Class controlling a Medex-UIMA external process, via our custom Java
interface, ``CrateMedexPipeline.java``.
MedEx-UIMA is a medication-finding tool:
https://www.ncbi.nlm.nih.gov/pubmed/25954575.
"""
uses_external_tool = True
[docs] def __init__(
self,
nlpdef: NlpDefinition,
cfg_processor_name: str,
commit: bool = False,
) -> None:
"""
Args:
nlpdef:
a :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition`
cfg_processor_name:
the name of a CRATE NLP config file section (from which we may
choose to get extra config information)
commit:
force a COMMIT whenever we insert data? You should specify this
in multiprocess mode, or you may get database deadlocks.
"""
super().__init__(
nlpdef=nlpdef,
cfg_processor_name=cfg_processor_name,
commit=commit,
friendly_name="MedEx",
)
if nlpdef is None: # only None for debugging!
self._debug_mode = True
self._tablename = self.classname().lower()
self._max_external_prog_uses = 1
self._progenvsection = ""
self._env = {} # type: Dict[str, str]
progargs = ""
else:
self._debug_mode = False
self._tablename = self._cfgsection.opt_str(
ProcessorConfigKeys.DESTTABLE, required=True
)
self._max_external_prog_uses = self._cfgsection.opt_int_positive(
ProcessorConfigKeys.MAX_EXTERNAL_PROG_USES, default=0
)
self._progenvsection = self._cfgsection.opt_str(
ProcessorConfigKeys.PROGENVSECTION
)
if self._progenvsection:
# noinspection PyTypeChecker
self._env = nlpdef.get_env_dict(
self._progenvsection, os.environ
)
else:
self._env = os.environ.copy()
self._env["NLPLOGTAG"] = nlpdef.logtag or "."
# ... because passing a "-lt" switch with no parameter will make
# CrateGatePipeline.java complain and stop
progargs = self._cfgsection.opt_str(
ProcessorConfigKeys.PROGARGS, required=True
)
if USE_TEMP_DIRS:
self._inputdir = tempfile.TemporaryDirectory()
self._outputdir = tempfile.TemporaryDirectory()
self._workingdir = tempfile.TemporaryDirectory()
# ... these are autodeleted when the object goes out of scope; see
# https://docs.python.org/3/library/tempfile.html
# ... which manages it using weakref.finalize
else:
homedir = os.path.expanduser("~")
self._inputdir = PseudoTempDir(
os.path.join(homedir, "medextemp", "input")
)
mkdir_p(self._inputdir.name)
self._outputdir = PseudoTempDir(
os.path.join(homedir, "medextemp", "output")
)
mkdir_p(self._outputdir.name)
self._workingdir = PseudoTempDir(
os.path.join(homedir, "medextemp", "working")
)
mkdir_p(self._workingdir.name)
formatted_progargs = progargs.format(**self._env)
self._progargs = shlex.split(formatted_progargs)
self._progargs.extend(
[
"-data_ready_signal",
MEDEX_DATA_READY_SIGNAL,
"-results_ready_signal",
MEDEX_RESULTS_READY_SIGNAL,
"-i",
self._inputdir.name,
"-o",
self._outputdir.name,
]
)
self._n_uses = 0
self._pipe_encoding = "utf8"
self._file_encoding = "utf8"
self._p = None # the subprocess
self._started = False
# -------------------------------------------------------------------------
# External process control
# -------------------------------------------------------------------------
def _start(self) -> None:
"""
Launch the external process. We will save and retrieve data via files,
and send signals ("data ready", "results ready) via stdin/stout.
"""
if self._started or self._debug_mode:
return
args = self._progargs
# Nasty MedEx hacks
cwd = os.getcwd()
log.info(
f"For MedEx's benefit, changing to directory: "
f"{self._workingdir.name}"
)
os.chdir(self._workingdir.name)
sentsdir = os.path.join(self._workingdir.name, "sents")
log.info(f"Making temporary sentences directory: {sentsdir}")
mkdir_p(sentsdir)
logdir = os.path.join(self._workingdir.name, "log")
log.info(f"Making temporary log directory: {logdir}")
mkdir_p(logdir)
log.info(f"Launching command: {cmdline_quote(args)}")
self._p = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
# stderr=subprocess.PIPE,
shell=False,
bufsize=1,
)
# ... don't ask for stderr to be piped if you don't want it; firstly,
# there's a risk that if you don't consume it, something hangs, and
# secondly if you don't consume it, you see it on the console, which is
# helpful.
self._started = True
log.info(f"Returning to working directory {cwd}")
os.chdir(cwd)
def _encode_to_subproc_stdin(self, text: str) -> None:
"""
Send text to the external program (via its stdin), encoding it in
the process (typically to UTF-8).
"""
log.debug("SENDING: " + text)
bytes_ = text.encode(self._pipe_encoding)
self._p.stdin.write(bytes_)
def _flush_subproc_stdin(self) -> None:
"""
Flushes what we're sending to the external program via its stdin.
"""
self._p.stdin.flush()
def _decode_from_subproc_stdout(self) -> str:
"""
Decode what we've received from the external program's stdout,
from its specific encoding (usually UTF-8) to a Python string.
"""
bytes_ = self._p.stdout.readline()
text = bytes_.decode(self._pipe_encoding)
log.debug("RECEIVING: " + repr(text))
return text
def _finish(self) -> None:
"""
Close down the external process.
"""
if not self._started:
return
self._p.communicate() # close p.stdout, wait for the subprocess to exit # noqa: E501
self._started = False
def _signal_data_ready(self) -> bool:
"""
Signals to the child process that we have written data to files, and
it's now ready for reading by MedEx.
Returns: OK?
"""
if self._finished():
return False
self._encode_to_subproc_stdin(MEDEX_DATA_READY_SIGNAL + os.linesep)
self._flush_subproc_stdin()
return True
def _await_results_ready(self) -> bool:
"""
Waits until MedEx has signalled us that results are ready.
Returns: OK?
"""
while True:
if self._finished():
return False
line = self._decode_from_subproc_stdout()
if line == MEDEX_RESULTS_READY_SIGNAL + os.linesep:
return True
def _finished(self) -> bool:
"""
Has MedEx finished?
"""
if not self._started:
return True
self._p.poll()
finished = self._p.returncode is not None
if finished:
self._started = False
return finished
def _restart(self) -> None:
"""
Close down the external process and restart it.
"""
self._finish()
self._start()
# -------------------------------------------------------------------------
# Input processing
# -------------------------------------------------------------------------
[docs] def parse(
self, text: str
) -> Generator[Tuple[str, Dict[str, Any]], None, None]:
"""
- Send text to the external process, and receive the result.
- Note that associated data is not passed into this function, and is
kept in the Python environment, so we can't run into any problems
with the transfer to/from the Java program garbling important data.
All we send to the subprocess is the text (and an input_terminator).
Then, we may receive MULTIPLE sets of data back ("your text contains
the following 7 people/drug references/whatever"), followed
eventually by the output_terminator, at which point this set is
complete.
"""
self._n_uses += 1
self._start() # ensure started
if USE_TEMP_DIRS:
basefilename = DATA_FILENAME
else:
basefilename = DATA_FILENAME_KEEP.format(self._n_uses)
inputfilename = os.path.join(self._inputdir.name, basefilename)
outputfilename = os.path.join(self._outputdir.name, basefilename)
# ... MedEx gives output files the SAME NAME as input files.
try:
with open(
inputfilename, mode="w", encoding=self._file_encoding
) as infile:
# log.info(f"text: {text!r}")
infile.write(text)
if (
not self._signal_data_ready()
or not self._await_results_ready() # send
): # receive
log.critical("Subprocess terminated unexpectedly")
os.remove(inputfilename)
# We were using "log.critical()" and "return", but if the Medex
# processor is misconfigured, the failed processor can be run
# over thousands of records over many hours before the failure
# is obvious. Changed 2017-03-17.
raise ValueError(
"Java interface to Medex failed - miconfigured?"
)
with open(
outputfilename, mode="r", encoding=self._file_encoding
) as infile:
resultlines = infile.readlines()
for line in resultlines:
# log.critical(f"received: {line}")
# Output code, from MedTagger.print_result():
# out.write(
# index + 1 + "\t" + sent_text + "|" +
# drug + "|" + brand + "|" + dose_form + "|" +
# strength + "|" + dose_amt + "|" +
# route + "|" + frequency + "|" + duration + "|" +
# necessity + "|" +
# umls_code + "|" + rx_code + "|" + generic_code + "|" +
# generic_name + "\n");
# NOTE that the text can contain | characters. So work from the
# right.
line = line.rstrip() # remove any trailing newline
fields = line.split("|")
if len(fields) < 14:
log.warning(f"Bad result received: {line!r}")
continue
generic_name = self.str_or_none(fields[-1])
if not generic_name and SKIP_IF_NO_GENERIC:
continue
generic_code = self.int_or_none(fields[-2])
rx_code = self.int_or_none(fields[-3])
umls_code = self.str_or_none(fields[-4])
(
necessity,
necessity_startpos,
necessity_endpos,
) = self.get_text_start_end(fields[-5])
(
duration,
duration_startpos,
duration_endpos,
) = self.get_text_start_end(fields[-6])
(
_freq_text,
frequency_startpos,
frequency_endpos,
) = self.get_text_start_end(fields[-7])
frequency, frequency_timex = self.frequency_and_timex(
_freq_text
)
(
route,
route_startpos,
route_endpos,
) = self.get_text_start_end(fields[-8])
(
dose_amount,
dose_amount_startpos,
dose_amount_endpos,
) = self.get_text_start_end(fields[-9])
(
strength,
strength_startpos,
strength_endpos,
) = self.get_text_start_end(fields[-10])
(form, form_startpos, form_endpos) = self.get_text_start_end(
fields[-11]
)
(
brand,
brand_startpos,
brand_endpos,
) = self.get_text_start_end(fields[-12])
(drug, drug_startpos, drug_endpos) = self.get_text_start_end(
fields[-13]
)
_start_bit = "|".join(fields[0:-13])
_index_text, sent_text = _start_bit.split("\t", maxsplit=1)
index = self.int_or_none(_index_text)
yield self._tablename, {
"sentence_index": index,
"sentence_text": sent_text,
"drug": drug,
"drug_startpos": drug_startpos,
"drug_endpos": drug_endpos,
"brand": brand,
"brand_startpos": brand_startpos,
"brand_endpos": brand_endpos,
"form": form,
"form_startpos": form_startpos,
"form_endpos": form_endpos,
"strength": strength,
"strength_startpos": strength_startpos,
"strength_endpos": strength_endpos,
"dose_amount": dose_amount,
"dose_amount_startpos": dose_amount_startpos,
"dose_amount_endpos": dose_amount_endpos,
"route": route,
"route_startpos": route_startpos,
"route_endpos": route_endpos,
"frequency": frequency,
"frequency_startpos": frequency_startpos,
"frequency_endpos": frequency_endpos,
"frequency_timex3": frequency_timex,
"duration": duration,
"duration_startpos": duration_startpos,
"duration_endpos": duration_endpos,
"necessity": necessity,
"necessity_startpos": necessity_startpos,
"necessity_endpos": necessity_endpos,
"umls_code": umls_code,
"rx_code": rx_code,
"generic_code": generic_code,
"generic_name": generic_name,
}
# Since MedEx scans all files in the input directory, then if we're
# not using temporary directories (and are therefore using a new
# filename per item), we should remove the old one.
os.remove(inputfilename)
# Restart subprocess?
if (
self._max_external_prog_uses > 0
and self._n_uses % self._max_external_prog_uses == 0
):
log.info(
f"relaunching app after "
f"{self._max_external_prog_uses} uses"
)
self._restart()
except BrokenPipeError:
log.error("Broken pipe; relaunching app")
self._restart()
raise TextProcessingFailed()
[docs] @staticmethod
def get_text_start_end(
medex_str: Optional[str],
) -> Tuple[Optional[str], Optional[int], Optional[int]]:
"""
MedEx returns "drug", "strength", etc. as ``aspirin[7,14]``, where the
text is followed by the start position (zero-indexed) and the end
position (one beyond the last character) (zero-indexed). This function
converts a string like ``aspirin[7,14]`` to a tuple like ``"aspirin",
7, 14``.
Args:
medex_str: string from MedEx
Returns:
tuple: ``text, start_pos, end_pos``; values may be ``None``
"""
if not medex_str:
return None, None, None
lbracket = medex_str.rfind("[") # -1 for not found
comma = medex_str.rfind(",")
rbracket = medex_str.rfind("]")
try:
if lbracket == -1 or not (lbracket < comma < rbracket):
raise ValueError()
text = medex_str[:lbracket]
lpos = int(medex_str[lbracket + 1 : comma])
rpos = int(medex_str[comma + 1 : rbracket])
return text, lpos, rpos
except (TypeError, ValueError):
log.warning(f"Bad string[left, right] format: {medex_str!r}")
return None, None, None
[docs] @staticmethod
def int_or_none(text: Optional[str]) -> Optional[int]:
"""
Takes text and returns an integer version or ``None``.
"""
try:
return int(text)
except (TypeError, ValueError):
return None
[docs] @staticmethod
def str_or_none(text: Optional[str]) -> Optional[str]:
"""
If the string is non-empty, return the string; otherwise return
``None``.
"""
return None if not text else text
[docs] @staticmethod
def frequency_and_timex(text: str) -> Tuple[Optional[str], Optional[str]]:
"""
Splits a MedEx frequency/TIMEX strings to its frequency and TIMEX
parts; e.g. splits ``b.i.d.(R1P12H)`` to ``"b.i.d.", "R1P12H"``.
"""
if not text:
return None, None
lbracket = text.rfind("(")
rbracket = text.rfind(")")
if (
lbracket == -1
or not (lbracket < rbracket)
or rbracket != len(text) - 1
):
return None, None
return text[0:lbracket], text[lbracket + 1 : rbracket]
# -------------------------------------------------------------------------
# Test
# -------------------------------------------------------------------------
[docs] def test(self, verbose: bool = False) -> None:
"""
Test the send function.
"""
if self._debug_mode:
return
self.test_parser(
[
"Bob Hope visited Seattle and took venlafaxine M/R 375mg od.",
"James Joyce wrote Ulysses whilst taking aspirin 75mg mane.",
]
)
# -------------------------------------------------------------------------
# Database structure
# -------------------------------------------------------------------------
[docs] def dest_tables_columns(self) -> Dict[str, List[Column]]:
# docstring in superclass
startposdef = "Start position (zero-based) of "
endposdef = (
"End position (zero-based index of one beyond last character) of "
)
return {
self._tablename: [
Column(
"sentence_index",
Integer,
comment="One-based index of sentence in text",
),
Column(
"sentence_text",
Text,
comment="Text recognized as a sentence by MedEx",
),
Column("drug", Text, comment="Drug name, as in the text"),
Column("drug_startpos", Integer, comment=startposdef + "drug"),
Column("drug_endpos", Integer, comment=endposdef + "drug"),
Column(
"brand",
Text,
comment="Drug brand name (?lookup ?only if given)",
),
Column(
"brand_startpos", Integer, comment=startposdef + "brand"
),
Column("brand_endpos", Integer, comment=endposdef + "brand"),
Column(
"form",
String(MEDEX_MAX_FORM_LENGTH),
comment="Drug/dose form (e.g. 'tablet')",
),
Column("form_startpos", Integer, comment=startposdef + "form"),
Column("form_endpos", Integer, comment=endposdef + "form"),
Column(
"strength",
String(MEDEX_MAX_STRENGTH_LENGTH),
comment="Strength (e.g. '75mg')",
),
Column(
"strength_startpos",
Integer,
comment=startposdef + "strength",
),
Column(
"strength_endpos", Integer, comment=endposdef + "strength"
),
Column(
"dose_amount",
String(MEDEX_MAX_DOSE_AMOUNT_LENGTH),
comment="Dose amount (e.g. '2 tablets')",
),
Column(
"dose_amount_startpos",
Integer,
comment=startposdef + "dose_amount",
),
Column(
"dose_amount_endpos",
Integer,
comment=endposdef + "dose_amount",
),
Column(
"route",
String(MEDEX_MAX_ROUTE_LENGTH),
comment="Route (e.g. 'by mouth')",
),
Column(
"route_startpos", Integer, comment=startposdef + "route"
),
Column("route_endpos", Integer, comment=endposdef + "route"),
Column(
"frequency",
String(MEDEX_MAX_FREQUENCY_LENGTH),
comment="Frequency (e.g. 'b.i.d.')",
),
Column(
"frequency_startpos",
Integer,
comment=startposdef + "frequency",
),
Column(
"frequency_endpos",
Integer,
comment=endposdef + "frequency",
),
Column(
"frequency_timex3",
String(TIMEX3_MAX_LENGTH),
comment=(
"Normalized frequency in TIMEX3 format "
"(e.g. 'R1P12H')"
),
),
Column(
"duration",
String(MEDEX_MAX_DURATION_LENGTH),
comment="Duration (e.g. 'for 10 days')",
),
Column(
"duration_startpos",
Integer,
comment=startposdef + "duration",
),
Column(
"duration_endpos", Integer, comment=endposdef + "duration"
),
Column(
"necessity",
String(MEDEX_MAX_NECESSITY_LENGTH),
comment="Necessity (e.g. 'prn')",
),
Column(
"necessity_startpos",
Integer,
comment=startposdef + "necessity",
),
Column(
"necessity_endpos",
Integer,
comment=endposdef + "necessity",
),
Column(
"umls_code",
String(UMLS_CUI_MAX_LENGTH),
comment="UMLS CUI",
),
Column("rx_code", Integer, comment="RxNorm RxCUI for drug"),
Column(
"generic_code",
Integer,
comment="RxNorm RxCUI for generic name",
),
Column(
"generic_name",
Text,
comment="Generic drug name (associated with RxCUI code)",
),
]
}
[docs] def dest_tables_indexes(self) -> Dict[str, List[Index]]:
# docstring in superclass
return {}
# return {
# self._tablename: [
# Index('idx_generic_name', 'generic_name'),
# ]
# }