"""
crate_anon/nlp_manager/tests/cloud_request_process_tests.py
===============================================================================
Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CRATE.
CRATE is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CRATE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CRATE. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
Unit tests.
Reminder: to enable logging, use e.g. pytest -k [testname] --log-cli-level=INFO
"""
import json
import logging
import os
from pathlib import Path
import sys
from unittest import mock, TestCase
from tempfile import TemporaryDirectory
from typing import Any, Dict
from cardinal_pythonlib.httpconst import HttpStatus
from sqlalchemy.engine import create_engine
from sqlalchemy.exc import OperationalError
from sqlalchemy.schema import Column
from sqlalchemy.sql.expression import text
from crate_anon.nlp_manager.all_processors import (
register_all_crate_python_processors_with_serverprocessor,
)
from crate_anon.nlp_manager.cloud_parser import Cloud
from crate_anon.nlp_manager.cloud_request import (
CloudRequest,
CloudRequestListProcessors,
CloudRequestProcess,
)
from crate_anon.nlp_manager.cloud_run_info import CloudRunInfo
from crate_anon.nlp_manager.constants import (
CloudNlpConfigKeys,
DatabaseConfigKeys,
InputFieldConfigKeys,
NLP_CONFIG_ENV_VAR,
NlpConfigPrefixes,
NlpDefConfigKeys,
NlpDefValues,
NlpOutputConfigKeys,
ProcessorConfigKeys,
)
from crate_anon.nlp_manager.nlp_definition import NlpDefinition
from crate_anon.nlp_manager.nlp_manager import drop_remake, process_cloud_now
from crate_anon.nlp_webserver.server_processor import ServerProcessor
import crate_anon.nlp_webserver.tasks
from crate_anon.nlp_webserver.views import NlpWebViews
from crate_anon.nlprp.constants import NlprpKeys, NlprpValues
log = logging.getLogger(__name__)
# =============================================================================
# CloudRequestProcessTests
# =============================================================================
[docs]class CloudRequestProcessTests(TestCase):
[docs] def setUp(self) -> None:
self.mock_execute_method = mock.Mock()
self.mock_session = mock.Mock(execute=self.mock_execute_method)
self.mock_db = mock.Mock(session=self.mock_session)
# can't set name attribute in constructor here as it has special
# meaning
mock_column = mock.Mock()
mock_column.name = "fruit" # so set it here
self.mock_values_method = mock.Mock()
mock_insert_object = mock.Mock(values=self.mock_values_method)
mock_insert_method = mock.Mock(return_value=mock_insert_object)
mock_sqla_table = mock.Mock(
columns=[mock_column], insert=mock_insert_method
)
mock_get_table_method = mock.Mock(return_value=mock_sqla_table)
self.mock_processor = mock.Mock(
get_table=mock_get_table_method, dest_session=self.mock_session
)
self.mock_notify_transaction_method = mock.Mock()
self.mock_nlpdef = mock.Mock(
notify_transaction=self.mock_notify_transaction_method
)
self.mock_nlpdef.name = "fruitdef"
self.process = CloudRequestProcess(nlpdef=self.mock_nlpdef)
def test_process_all_inserts_values(self) -> None:
nlp_values = [
("output", {"fruit": "apple"}, self.mock_processor),
("output", {"fruit": "banana"}, self.mock_processor),
("output", {"fruit": "fig"}, self.mock_processor),
]
mock_gen_nlp_values_method = mock.Mock(return_value=iter(nlp_values))
with mock.patch.multiple(
self.process, gen_nlp_values=mock_gen_nlp_values_method
):
self.process.process_all()
self.mock_values_method.assert_any_call({"fruit": "apple"})
self.mock_values_method.assert_any_call({"fruit": "banana"})
self.mock_values_method.assert_any_call({"fruit": "fig"})
self.assertEqual(self.mock_values_method.call_count, 3)
self.assertEqual(self.mock_execute_method.call_count, 3)
self.mock_notify_transaction_method.assert_any_call(
self.mock_session,
n_rows=1,
n_bytes=sys.getsizeof({"fruit": "apple"}),
force_commit=mock.ANY,
)
self.mock_notify_transaction_method.assert_any_call(
self.mock_session,
n_rows=1,
n_bytes=sys.getsizeof({"fruit": "banana"}),
force_commit=mock.ANY,
)
self.mock_notify_transaction_method.assert_any_call(
self.mock_session,
n_rows=1,
n_bytes=sys.getsizeof({"fruit": "fig"}),
force_commit=mock.ANY,
)
self.assertEqual(self.mock_notify_transaction_method.call_count, 3)
def test_process_all_handles_failed_insert(self) -> None:
nlp_values = [
("output", {"fruit": "apple"}, self.mock_processor),
]
self.mock_execute_method.side_effect = OperationalError(
"Insert failed", None, None, None
)
mock_gen_nlp_values_method = mock.Mock(return_value=iter(nlp_values))
with self.assertLogs(level=logging.ERROR) as logging_cm:
with mock.patch.multiple(
self.process, gen_nlp_values=mock_gen_nlp_values_method
):
self.process.process_all()
self.mock_notify_transaction_method.assert_any_call(
self.mock_session,
n_rows=1,
n_bytes=sys.getsizeof({"fruit": "apple"}),
force_commit=mock.ANY,
)
logger_name = "crate_anon.nlp_manager.cloud_request"
self.assertIn(f"ERROR:{logger_name}", logging_cm.output[0])
self.assertIn("Insert failed", logging_cm.output[0])
def test_not_ready_if_queue_id_is_none(self) -> None:
self.process.queue_id = None
with self.assertLogs(level=logging.WARNING) as logging_cm:
ready = self.process.check_if_ready()
self.assertFalse(ready)
self.assertIn(
"Tried to fetch from queue before sending request.",
logging_cm.output[0],
)
def test_not_ready_if_fetched(self) -> None:
self.process.queue_id = "queue_0001"
self.process._fetched = True
ready = self.process.check_if_ready()
self.assertFalse(ready)
def test_not_ready_if_no_response(self) -> None:
self.process.queue_id = "queue_0001"
with mock.patch.object(self.process, "_try_fetch", return_value=None):
ready = self.process.check_if_ready()
self.assertFalse(ready)
def test_ready_for_status_ok(self) -> None:
self.process.queue_id = "queue_0001"
response = {
NlprpKeys.STATUS: HttpStatus.OK,
NlprpKeys.VERSION: "0.3.0",
}
with mock.patch.object(
self.process, "_try_fetch", return_value=response
):
ready = self.process.check_if_ready()
self.assertTrue(ready)
def test_not_ready_when_old_server_status_processing(self) -> None:
self.process.queue_id = "queue_0001"
response = {
NlprpKeys.STATUS: HttpStatus.PROCESSING,
NlprpKeys.VERSION: "0.2.0",
}
with mock.patch.object(
self.process, "_try_fetch", return_value=response
):
ready = self.process.check_if_ready()
self.assertFalse(ready)
def test_not_ready_when_new_server_status_accepted(self) -> None:
self.process.queue_id = "queue_0001"
response = {
NlprpKeys.STATUS: HttpStatus.ACCEPTED,
NlprpKeys.VERSION: "0.3.0",
}
with mock.patch.object(
self.process, "_try_fetch", return_value=response
):
ready = self.process.check_if_ready()
self.assertFalse(ready)
def test_not_ready_when_server_status_not_found(self) -> None:
self.process.queue_id = "queue_0001"
response = {
NlprpKeys.STATUS: HttpStatus.NOT_FOUND,
NlprpKeys.VERSION: "0.3.0",
}
with mock.patch.object(
self.process, "_try_fetch", return_value=response
):
with self.assertLogs(level=logging.WARNING) as logging_cm:
ready = self.process.check_if_ready()
self.assertFalse(ready)
self.assertIn("Got HTTP status code 404", logging_cm.output[0])
def test_not_ready_when_server_status_anything_else(self) -> None:
self.process.queue_id = "queue_0001"
response = {
NlprpKeys.STATUS: HttpStatus.FORBIDDEN,
NlprpKeys.VERSION: "0.3.0",
}
with mock.patch.object(
self.process, "_try_fetch", return_value=response
):
with self.assertLogs(level=logging.WARNING) as logging_cm:
ready = self.process.check_if_ready()
self.assertFalse(ready)
self.assertIn("Got HTTP status code 403", logging_cm.output[0])
# =============================================================================
# CloudRequestListProcessorsTests
# =============================================================================
# A real one that wasn't working, 2024-12-16, with keys parameterized and
# boolean values Pythonized.
TEST_REMOTE_TABLE_SMOKING = "Smoking:Smoking"
TEST_SMOKING_PROC_NAME = "smoking"
TEST_SMOKING_PROC_VERSION = "0.1"
TEST_SMOKING_PROCINFO = {
NlprpKeys.DESCRIPTION: "A description",
NlprpKeys.IS_DEFAULT_VERSION: True,
NlprpKeys.NAME: TEST_SMOKING_PROC_NAME,
NlprpKeys.SCHEMA_TYPE: NlprpValues.TABULAR,
NlprpKeys.SQL_DIALECT: "mssql",
NlprpKeys.TABULAR_SCHEMA: {
TEST_REMOTE_TABLE_SMOKING: [
{
NlprpKeys.COLUMN_NAME: "start_",
NlprpKeys.COLUMN_TYPE: "BIGINT",
NlprpKeys.DATA_TYPE: "BIGINT",
NlprpKeys.IS_NULLABLE: False,
},
{
NlprpKeys.COLUMN_NAME: "end_",
NlprpKeys.COLUMN_TYPE: "BIGINT",
NlprpKeys.DATA_TYPE: "BIGINT",
NlprpKeys.IS_NULLABLE: False,
},
{
NlprpKeys.COLUMN_NAME: "who",
NlprpKeys.COLUMN_TYPE: "NVARCHAR(255)",
NlprpKeys.DATA_TYPE: "NVARCHAR",
NlprpKeys.IS_NULLABLE: True,
},
{
NlprpKeys.COLUMN_NAME: "rule",
NlprpKeys.COLUMN_TYPE: "VARCHAR(50)",
NlprpKeys.DATA_TYPE: "VARCHAR",
NlprpKeys.IS_NULLABLE: True,
},
{
NlprpKeys.COLUMN_NAME: "status",
NlprpKeys.COLUMN_TYPE: "VARCHAR(10)",
NlprpKeys.DATA_TYPE: "VARCHAR",
NlprpKeys.IS_NULLABLE: True,
},
]
},
NlprpKeys.TITLE: "Smoking Status Annotator",
NlprpKeys.VERSION: TEST_SMOKING_PROC_VERSION,
}
[docs]class CloudRequestListProcessorsTests(TestCase):
[docs] def setUp(self) -> None:
self.mock_nlpdef = mock.Mock(name="mock_nlpdef")
self.mock_nlpdef.name = "testlistprocdef"
self.process = CloudRequestListProcessors(nlpdef=self.mock_nlpdef)
self.test_version = "0.3.0"
def test_processors_key_missing(self) -> None:
response = {
NlprpKeys.STATUS: HttpStatus.ACCEPTED,
NlprpKeys.VERSION: self.test_version,
# Missing: NKeys.PROCESSORS
}
with mock.patch.object(
self.process, "_post_get_json", return_value=response
):
with self.assertRaises(KeyError):
self.process.get_remote_processors()
def test_processors_not_list(self) -> None:
response = {
NlprpKeys.STATUS: HttpStatus.ACCEPTED,
NlprpKeys.VERSION: self.test_version,
NlprpKeys.PROCESSORS: "XXX", # not a list
}
with mock.patch.object(
self.process, "_post_get_json", return_value=response
):
with self.assertRaises(ValueError):
self.process.get_remote_processors()
def test_procinfo_not_dict(self) -> None:
procinfo = "xxx" # not a dict
response = {
NlprpKeys.STATUS: HttpStatus.ACCEPTED,
NlprpKeys.VERSION: self.test_version,
NlprpKeys.PROCESSORS: [procinfo],
}
with mock.patch.object(
self.process, "_post_get_json", return_value=response
):
with self.assertRaises(ValueError):
self.process.get_remote_processors()
def test_procinfo_missing_keys(self) -> None:
mandatory_keys = (
NlprpKeys.NAME,
NlprpKeys.TITLE,
NlprpKeys.VERSION,
NlprpKeys.DESCRIPTION,
)
base_procinfo = {k: "x" for k in mandatory_keys}
for key in mandatory_keys:
procinfo = base_procinfo.copy()
del procinfo[key]
response = {
NlprpKeys.STATUS: HttpStatus.ACCEPTED,
NlprpKeys.VERSION: self.test_version,
NlprpKeys.PROCESSORS: [procinfo],
}
with mock.patch.object(
self.process, "_post_get_json", return_value=response
):
with self.assertRaises(KeyError):
self.process.get_remote_processors()
def test_procinfo_smoking(self) -> None:
response = {
NlprpKeys.STATUS: HttpStatus.ACCEPTED,
NlprpKeys.VERSION: self.test_version,
NlprpKeys.PROCESSORS: [TEST_SMOKING_PROCINFO],
}
with mock.patch.object(
self.process, "_post_get_json", return_value=response
):
self.process.get_remote_processors()
# Should be happy.
# Clean up:
ServerProcessor.debug_remove_processor(
name=TEST_SMOKING_PROC_NAME, version=TEST_SMOKING_PROC_VERSION
)
# =============================================================================
# CloudRequestDataTests
# =============================================================================
[docs]class CloudRequestDataTests(TestCase):
[docs] def setUp(self) -> None:
# On-disk database
self.tempdir = TemporaryDirectory() # will be deleted automatically
self.dbfilepath = Path(self.tempdir.name, "test.sqlite")
log.info(f"Using temporary database: {self.dbfilepath}")
self.dburl = f"sqlite:///{self.dbfilepath.absolute()}"
self.txttable = "notes"
self.pkcol = "pk"
self.txtcol = "note"
self.echo = False
self._mk_test_data()
# Dummy database
self.dummy_engine = create_engine("sqlite://", future=True)
# Config file
self.nlpdefname = "mynlp"
self.dbsectionname = "mydb"
self.cloudconfigname = "mycloud"
self.inputname = "myinput"
self.cloudclassname = "Cloud"
self.cloudproc_crp = "proc_crp"
self.cloudproc_alcohol = "proc_alcohol"
self.output_crp = "crp_output"
self.output_alcohol = "alcohol_output"
self.configfilepath = Path(self.tempdir.name, "crate_test_nlp.ini")
with open(self.configfilepath, "wt") as f:
configtext = self._mk_nlp_config()
log.debug(configtext)
f.write(configtext)
# Server side
register_all_crate_python_processors_with_serverprocessor()
self.mock_pyramid_request = mock.Mock(name="mock_pyramid_request")
self.server = NlpWebViews(request=self.mock_pyramid_request)
self.server._authenticate = mock.Mock()
self.server._set_body_json_from_request = mock.Mock(
name="mock_set_body_json_from_request"
)
# Rather than modify the instances, let's try to modify the class. This
# is because process_now() does its own instance creation.
# (CloudRequest is the base class of CloudRequestProcess and
# CloudRequestListProcessors.)
CloudRequest._post_get_json = self._get_server_response
# Client side #1
self.mock_nlpdef = mock.Mock()
self.mock_nlpdef.name = "testdef"
self.listprocclient = CloudRequestListProcessors(
nlpdef=self.mock_nlpdef
)
# Client side #2
os.environ[NLP_CONFIG_ENV_VAR] = str(self.configfilepath.absolute())
self.nlpdef = NlpDefinition(self.nlpdefname) # loads the config
self.crinfo = CloudRunInfo(nlpdef=self.nlpdef)
def _mk_nlp_config(self) -> str:
"""
Returns a test NLP config file.
"""
return f"""
# NLP definitions
[{NlpConfigPrefixes.NLPDEF}:{self.nlpdefname}]
{NlpDefConfigKeys.INPUTFIELDDEFS} =
{self.inputname}
{NlpDefConfigKeys.PROCESSORS} =
{self.cloudclassname} {self.cloudproc_crp}
{self.cloudclassname} {self.cloudproc_alcohol}
{NlpDefConfigKeys.PROGRESSDB} = {self.dbsectionname}
{NlpDefConfigKeys.HASHPHRASE} = blah
{NlpDefConfigKeys.CLOUD_CONFIG} = {self.cloudconfigname}
{NlpDefConfigKeys.CLOUD_REQUEST_DATA_DIR} = {self.tempdir.name}
# Inputs
[{NlpConfigPrefixes.INPUT}:{self.inputname}]
{InputFieldConfigKeys.SRCDB} = {self.dbsectionname}
{InputFieldConfigKeys.SRCTABLE} = {self.txttable}
{InputFieldConfigKeys.SRCPKFIELD} = {self.pkcol}
{InputFieldConfigKeys.SRCFIELD} = {self.txtcol}
# Processors
# - CRP
[{NlpConfigPrefixes.PROCESSOR}:{self.cloudproc_crp}]
{ProcessorConfigKeys.PROCESSOR_NAME} = crate_anon.nlp_manager.parse_biochemistry.Crp
{ProcessorConfigKeys.PROCESSOR_FORMAT} = {NlpDefValues.FORMAT_STANDARD}
{ProcessorConfigKeys.OUTPUTTYPEMAP} =
crp {self.output_crp}
{ProcessorConfigKeys.DESTDB} = {self.dbsectionname}
# - Alcohol units
[{NlpConfigPrefixes.PROCESSOR}:{self.cloudproc_alcohol}]
{ProcessorConfigKeys.PROCESSOR_NAME} = crate_anon.nlp_manager.parse_substance_misuse.AlcoholUnits
{ProcessorConfigKeys.PROCESSOR_FORMAT} = {NlpDefValues.FORMAT_STANDARD}
{ProcessorConfigKeys.OUTPUTTYPEMAP} =
alcoholunits {self.output_alcohol}
{ProcessorConfigKeys.DESTDB} = {self.dbsectionname}
# Output sections
# - CRP
[{NlpConfigPrefixes.OUTPUT}:{self.output_crp}]
{NlpOutputConfigKeys.DESTTABLE} = nlp_crp
# - Alcohol units
[{NlpConfigPrefixes.OUTPUT}:{self.output_alcohol}]
{NlpOutputConfigKeys.DESTTABLE} = nlp_alcohol
# Databases
[{NlpConfigPrefixes.DATABASE}:{self.dbsectionname}]
{DatabaseConfigKeys.URL} = {self.dburl}
{DatabaseConfigKeys.ECHO} = {self.echo}
# Cloud servers
[{NlpConfigPrefixes.CLOUD}:{self.cloudconfigname}]
{CloudNlpConfigKeys.CLOUD_URL} = https://dummy_url
""" # noqa: E501
def _mk_test_data(self) -> None:
"""
Inserts some test data into a table.
"""
texts = ["Current CRP 7. Teetotal. Non-smoker."]
engine = create_engine(self.dburl, future=True)
sql_create = text(
f"""
CREATE TABLE {self.txttable} (
{self.pkcol} INTEGER,
{self.txtcol} TEXT
)
"""
)
sql_insert = text(
f"""
INSERT INTO {self.txttable}
({self.pkcol}, {self.txtcol})
VALUES(:a, :b)
"""
)
with engine.begin() as con:
con.execute(sql_create)
for i, txt in enumerate(texts, start=1):
con.execute(sql_insert, dict(a=i, b=txt))
# noinspection PyUnusedLocal
def _get_server_response(
self, request_json_str: str, may_fail: bool = None
) -> Dict[str, Any]:
"""
Take a JSON request that has come from our mock client (in string
form), and return a JSON response from our mock server (in dictionary
form).
"""
request_json = json.loads(request_json_str)
log.debug(f"{request_json=}")
self.server.body = request_json
response_json = self.server.index()
log.debug(f"-> {response_json=}")
return response_json
[docs] def test_get_remote_processor_columns(self) -> None:
"""
Check that a client can request processor definitions from the server
(testing both ends, just simplifying some communication between them,
e.g. removing authentication). Check that the client can synthesise
SQLAlchemy Column objects from the results.
"""
# Fetch the data
processors = self.listprocclient.get_remote_processors()
# Check it
self.assertIsInstance(processors, list)
for sp in processors:
self.assertIsInstance(sp, ServerProcessor)
log.debug(f"+++ Trying {sp.name=}")
# We won't go so far as to set up a mock database in full. But
# check that Column object are created.
# (a) setup
c = Cloud(nlpdef=None, cfg_processor_name=None)
c.procname = sp.name
c._destdb = mock.Mock(name="mock_destdb")
c._destdb.engine = self.dummy_engine
c.set_procinfo_if_correct(sp)
log.debug(f"--- {c.schema=}")
for tablename in c.schema.keys():
ouc = mock.Mock(name=f"mock_ouc_{tablename}")
ouc.get_columns = lambda _engine: []
ouc.renames = {}
ouc.dest_tablename = tablename # usually a property
ouc.destfields = []
# ... simulating OutputUserConfig
c._outputtypemap[tablename] = ouc
c._type_to_tablename[tablename] = tablename
# (b) test
self.assertTrue(c.is_tabular())
table_columns = c.dest_tables_columns()
self.assertTrue(len(table_columns) > 0)
for tablename, columns in table_columns.items():
log.debug(f"--- {sp.name=}: {tablename=}; {columns=}")
self.assertTrue(len(columns) > 0)
for col in columns:
self.assertIsInstance(col, Column)
# To explore the database manually: import pdb; pdb.set_trace()