r"""
crate_anon/nlp_webserver/tasks.py
===============================================================================
Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CRATE.
CRATE is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CRATE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CRATE. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
Tasks to process text for an NLPRP server (and be scheduled via Celery).
"""
import datetime
import json
import logging
from typing import TYPE_CHECKING
from cardinal_pythonlib.httpconst import HttpStatus
from cardinal_pythonlib.json.typing_helpers import (
JsonArrayType,
JsonObjectType,
)
from celery import Celery
# from celery.app.task import Task # see "def delay", "def apply_async"
from cryptography.fernet import Fernet
import requests
from sqlalchemy import engine_from_config
from sqlalchemy.orm import scoped_session
from sqlalchemy.exc import SQLAlchemyError
import transaction
from crate_anon.common.constants import JSON_SEPARATORS_COMPACT
from crate_anon.nlp_manager.constants import GateApiKeys, GateResultKeys
from crate_anon.nlp_webserver.models import Session, DocProcRequest
from crate_anon.nlp_webserver.server_processor import ServerProcessor
from crate_anon.nlp_webserver.constants import (
NlpServerConfigKeys,
PROCTYPE_GATE,
SQLALCHEMY_COMMON_OPTIONS,
)
from crate_anon.nlp_webserver.security import decrypt_password
from crate_anon.nlp_webserver.settings import SETTINGS
from crate_anon.nlprp.api import NlprpKeys
if TYPE_CHECKING:
from typing import Optional
log = logging.getLogger(__name__)
# =============================================================================
# SQLAlchemy and Celery setup
# =============================================================================
TaskSession = scoped_session(Session)
engine = engine_from_config(
SETTINGS,
NlpServerConfigKeys.SQLALCHEMY_PREFIX,
**SQLALCHEMY_COMMON_OPTIONS,
)
TaskSession.configure(bind=engine)
try:
broker_url = SETTINGS[NlpServerConfigKeys.BROKER_URL]
except KeyError:
log.error(
f"{NlpServerConfigKeys.BROKER_URL} value " f"missing from config file."
)
raise
backend_url = SETTINGS.get(NlpServerConfigKeys.BACKEND_URL) or None
key = SETTINGS[NlpServerConfigKeys.ENCRYPTION_KEY]
# Turn key into bytes object
key = key.encode()
CIPHER_SUITE = Fernet(key)
# Set expiry time to 90 days in seconds
expiry_time = 60 * 60 * 24 * 90
celery_app = Celery(
"tasks", broker=broker_url, backend=backend_url, result_expires=expiry_time
)
celery_app.conf.database_engine_options = SQLALCHEMY_COMMON_OPTIONS
# =============================================================================
# Helper functions
# =============================================================================
[docs]def nlprp_processor_dict(
success: bool,
processor: ServerProcessor = None,
results: JsonArrayType = None,
errcode: int = None,
errmsg: str = None,
) -> JsonObjectType:
"""
Returns a dictionary suitable for use as one of the elements of the
``response["results"]["processors"]`` array; see :ref:`NLPRP <nlprp>`.
Args:
success:
did the request succeed?
processor:
a :class:`crate_anon.nlp_webserver.procs.Processor`, or ``None``
results:
a JSON array of results
errcode:
(if not ``success``) an integer error code
errmsg:
(if not ``success``) an error message
Returns:
a JSON object in NLPRP format
"""
proc_dict = {NlprpKeys.RESULTS: results or [], NlprpKeys.SUCCESS: success}
if processor:
proc_dict[NlprpKeys.NAME] = processor.name
proc_dict[NlprpKeys.TITLE] = processor.title
proc_dict[NlprpKeys.VERSION] = processor.version
if not success:
proc_dict[NlprpKeys.ERRORS] = [
{
NlprpKeys.CODE: errcode,
NlprpKeys.MESSAGE: errmsg,
NlprpKeys.DESCRIPTION: errmsg,
}
]
return proc_dict
[docs]def internal_error(
msg: str, processor: ServerProcessor = None
) -> JsonObjectType:
"""
Log an error message, and raise a corresponding :exc:`NlprpError` for
an internal server error.
Args:
msg: the error message
processor: the :class:`Processor` object to be used
"""
log.error(msg)
return nlprp_processor_dict(
success=False,
processor=processor,
errcode=HttpStatus.INTERNAL_SERVER_ERROR,
errmsg=f"Internal Server Error: {msg}",
)
[docs]def gate_api_error(
msg: str, processor: ServerProcessor = None
) -> JsonObjectType:
"""
Return a "GATE failed" error.
Args:
msg: description of the error
processor: the :class:`Processor` object to be used
"""
log.error(f"GATE API error: {msg}")
return nlprp_processor_dict(
success=False,
processor=processor,
errcode=HttpStatus.BAD_GATEWAY,
errmsg=f"Bad Gateway: {msg}",
)
# =============================================================================
# Convert GATE JSON results (from GATE's own API) to our internal format
# =============================================================================
[docs]def get_gate_results(results_dict: JsonObjectType) -> JsonArrayType:
"""
Convert results in GATE JSON format to results in our internal format.
Args:
results_dict: see :class:`crate_anon.nlp_manager.constants.GateApiKeys`
or https://cloud.gate.ac.uk/info/help/online-api.html
Returns:
list of dictionaries; see
:class:`crate_anon.nlp_manager.constants.GateApiKeys`
"""
results = [] # type: JsonArrayType
entities = results_dict[GateApiKeys.ENTITIES]
for annottype, values in entities.items():
for features in values:
start, end = features[GateApiKeys.INDICES]
del features[GateApiKeys.INDICES]
results.append(
{
GateResultKeys.TYPE: annottype,
GateResultKeys.START: start,
GateResultKeys.END: end,
GateResultKeys.SET: None, # CHECK WHAT THIS SHOULD BE!!
GateResultKeys.FEATURES: features,
}
)
return results
# =============================================================================
# Task session management
# =============================================================================
[docs]def start_task_session() -> None:
"""
Starts a session for the tasks. To be called at the start of a web request.
"""
TaskSession()
# =============================================================================
# NLP server processing functions
# =============================================================================
# noinspection PyUnusedLocal
@celery_app.task(bind=True, name="tasks.process_nlp_text")
def process_nlp_text(
self, docprocrequest_id: str, username: str = "", crypt_pass: str = ""
) -> None:
"""
Task to process a single ``DocProcRequest`` by sending text to the relevant
processor.
Args:
self:
the :class:`celery.Task`
docprocrequest_id:
the :class:`crate_anon.nlp_webserver.models.DocProcRequest` ID
username:
username in use
crypt_pass:
encrypted password
"""
# noinspection PyUnresolvedReferences
dpr = TaskSession.query(DocProcRequest).get(
docprocrequest_id
) # type: Optional[DocProcRequest]
if not dpr:
log.error(f"DocProcRequest {docprocrequest_id} does not exist")
if dpr.done:
log.error(f"DocProcRequest {docprocrequest_id} already processed")
return
# Turn the password back into bytes and decrypt
password = decrypt_password(crypt_pass.encode(), CIPHER_SUITE)
text = dpr.doctext
# Get the processor
processor_id = dpr.processor_id
try:
# Fetch the processor
try:
processor = ServerProcessor.processors[processor_id] # may raise
# Run the NLP
results = process_nlp_text_immediate(
text, processor, username, password
)
except KeyError:
results = internal_error(f"No such processor: {processor_id!r}")
dpr.done = True
dpr.when_done_utc = datetime.datetime.utcnow()
dpr.results = json.dumps(results, separators=JSON_SEPARATORS_COMPACT)
transaction.commit()
except SQLAlchemyError:
# noinspection PyUnresolvedReferences
TaskSession.rollback()
[docs]def process_nlp_text_immediate(
text: str,
processor: ServerProcessor,
username: str = "",
password: str = "",
) -> JsonObjectType:
"""
Function to send text immediately to the relevant processor.
Args:
text:
text to run the NLP over
processor:
NLP processor; a class:`crate_anon.nlp_webserver.procs.Processor`
username:
username in use
password:
plaintext password
Returns:
a :class:`NlpServerResult`
"""
if processor.proctype == PROCTYPE_GATE:
return process_nlp_gate(text, processor, username, password)
else:
if not processor.parser:
processor.set_parser()
return process_nlp_internal(text=text, processor=processor)
[docs]def process_nlp_gate(
text: str, processor: ServerProcessor, username: str, password: str
) -> JsonObjectType:
"""
Send text to a chosen GATE processor (via an HTTP connection, using the
GATE JSON API; see https://cloud.gate.ac.uk/info/help/online-api.html).
Args:
text:
text to run the NLP over
processor:
NLP processor; a class:`crate_anon.nlp_webserver.procs.Processor`
username:
username in use
password:
plaintext password
Returns:
a :class:`NlpServerResult`
API failure is handled by returning a failure code/message to our client.
"""
headers = {
"Content-Type": "text/plain",
"Accept": "application/gate+json",
# Content-Encoding: gzip?,
"Expect": "100-continue",
# ... see https://cloud.gate.ac.uk/info/help/online-api.html
"charset": "utf8",
}
try:
response = requests.post(
processor.base_url + "/" + processor.name,
data=text.encode("utf-8"),
headers=headers,
auth=(username, password),
) # basic auth
except requests.exceptions.RequestException as e:
return gate_api_error(
f"The GATE processor returned the error: {e.response.reason} "
f"(with status code {e.response.status_code})",
processor=processor,
)
if response.status_code != HttpStatus.OK:
return gate_api_error(
f"The GATE processor returned the error: {response.reason} "
f"(with status code {response.status_code})",
processor=processor,
)
try:
json_response = response.json()
except json.decoder.JSONDecodeError:
return gate_api_error(
"Bad Gateway: The GATE processor did not return JSON",
processor=processor,
)
results = get_gate_results(json_response)
return nlprp_processor_dict(
success=True, processor=processor, results=results
)
[docs]def process_nlp_internal(
text: str, processor: ServerProcessor
) -> JsonObjectType:
"""
Send text to a chosen CRATE Python NLP processor and return a
:class:`NlpServerResult`.
"""
parser = processor.parser
try:
tablename_valuedict_generator = parser.parse(text)
except AttributeError:
return internal_error(
f"parser is not a CRATE Python NLP parser; is {parser!r}"
)
# Get second element of each element in parsed text as first is tablename
# which will have no meaning here
return nlprp_processor_dict(
True, processor, results=[x[1] for x in tablename_valuedict_generator]
)