"""
crate_anon/nlp_manager/cloud_request_sender.py
===============================================================================
Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CRATE.
CRATE is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CRATE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CRATE. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
**CloudRequestSender class.**
"""
# =============================================================================
# Imports
# =============================================================================
from enum import auto, Enum
import logging
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
Generator,
TYPE_CHECKING,
)
from crate_anon.nlp_manager.constants import (
DEFAULT_REPORT_EVERY_NLP,
)
from crate_anon.nlp_manager.input_field_config import (
InputFieldConfig,
FN_SRCDB,
FN_SRCTABLE,
FN_SRCPKFIELD,
FN_SRCPKVAL,
FN_SRCPKSTR,
FN_SRCFIELD,
)
from crate_anon.nlp_manager.models import FN_SRCHASH
from crate_anon.nlp_manager.cloud_request import (
CloudRequestProcess,
RecordNotPrintable,
RecordsPerRequestExceeded,
RequestTooLong,
)
from crate_anon.nlp_manager.cloud_run_info import CloudRunInfo
if TYPE_CHECKING:
from http.cookiejar import CookieJar
log = logging.getLogger(__name__)
# =============================================================================
# CloudRequestSender
# =============================================================================
[docs]class CloudRequestSender:
"""
Class to encapsulate a NLP request outbound to a cloud NLP server.
"""
[docs] class State(Enum):
"""
Request state.
"""
BUILDING_REQUEST = auto()
SENDING_REQUEST = auto()
FINISHED = auto()
[docs] def __init__(
self,
text_generator: Generator[Tuple[str, Dict[str, Any]], None, None],
crinfo: CloudRunInfo,
ifconfig: InputFieldConfig,
report_every: int = DEFAULT_REPORT_EVERY_NLP,
incremental: bool = False,
queue: bool = True,
) -> None:
"""
Initialise class
Args:
text_generator:
Generator that generates text strings from the source
database. See
:meth:`crate_anon.nlp_manager.input_field_config.InputFieldConfig.gen_text`.
crinfo:
A :class:`crate_anon.nlp_manager.cloud_run_info.CloudRunInfo`
object.
ifconfig:
An
:class:`crate_anon.nlp_manager.input_field_config.InputFieldConfig`
object.
report_every:
Report to the log every *n* requests.
incremental:
Process in incremental mode (ignoring source records that have
not changed since last time)?
queue:
Queue the requests for back-end processing (rather than waiting
for an immediate reply)?
"""
self._text_generator = text_generator
self._crinfo = crinfo
self._ifconfig = ifconfig
self._report_every = report_every
self._incremental = incremental
self._queue = queue
self._global_recnum = -1
self._requests = [] # type: List[CloudRequestProcess]
self._cookies = None # type: Optional[CookieJar]
self._request_count = 0 # number of requests sent
self._text = None # type: Optional[str]
self._other_values = None # type: Optional[Dict[str, Any]]
self._request_is_empty = True
self._need_new_record = True
self._need_new_request = True
self._num_recs_processed = 0
self._state = self.State.BUILDING_REQUEST
self._request = None # type: Optional[CloudRequestProcess]
[docs] def send_requests(
self, global_recnum: int
) -> Tuple[List[CloudRequestProcess], bool, int]:
"""
Sends off a series of cloud requests and returns them as a list.
``self._queue`` determines whether these are queued requests or not.
Also returns whether the generator for the text is empty.
Return tuple is: ``requests, some_records_processed, global_recnum``.
"""
self._global_recnum = global_recnum
self._requests = []
self._cookies = None
self._request_count = 1
self._text = None
self._other_values = None
self._request_is_empty = True
self._need_new_record = True
self._need_new_request = True
# Check processors are available
available_procs = self._crinfo.get_remote_processors()
if not available_procs:
return [], False, self._global_recnum
self._num_recs_processed = 0
self._state = self.State.BUILDING_REQUEST
# If we've reached the limit of records before commit, return to
# outer function in order to process and commit (or write to file if
# it's a queued request)
while self._state != self.State.FINISHED:
if self._state == self.State.BUILDING_REQUEST:
self._build_request()
if self._state == self.State.SENDING_REQUEST:
self._send_request()
return (
self._requests,
self._num_recs_processed > 0,
self._global_recnum,
)
def _build_request(self) -> None:
"""
Adds another record to the outbound request, until the request is
fully built. Updates our state to reflect what needs to happen next.
"""
if self._need_new_record:
try:
self._get_next_record()
except StopIteration:
self._update_state_for_no_more_records()
return
hasher = self._crinfo.nlpdef.hash
srchash = hasher(self._text)
if self._incremental and self._record_already_processed(srchash):
return
self._num_recs_processed += 1
self._other_values[FN_SRCHASH] = srchash
if self._need_new_request:
self._request = self._get_new_cloud_request()
self._request_is_empty = True
self._need_new_request = False
self._need_new_record = True
# Add the text to the cloud request with the appropriate metadata
try:
self._request.add_text(self._text, self._other_values)
# added OK, request now has some text
self._request_is_empty = False
except RecordNotPrintable:
# Text contained no printable characters. Skip it.
pass
except (RecordsPerRequestExceeded, RequestTooLong) as e:
if isinstance(e, RequestTooLong) and self._request_is_empty:
# Get some new text next time
log.warning("Skipping text that's too long to send")
else:
# Try same text again with a fresh request
self._need_new_record = False
self._state = self.State.SENDING_REQUEST
if self._record_limit_reached():
self._state = self.State.SENDING_REQUEST
def _get_new_cloud_request(self) -> CloudRequestProcess:
"""
Creates and returns a new
:class:`crate_anon.nlp_manager.cloud_request.CloudRequestProcess`
object.
"""
return CloudRequestProcess(self._crinfo)
def _update_state_for_no_more_records(self) -> None:
"""
No more input records are available. This means either (a) we've sent
all our requests and have finished, or (b) we're building our last
request and we need to send it. Set the state accordingly.
"""
if self._request_is_empty or self._need_new_request:
# Nothing more to send
self._state = self.State.FINISHED
return
# Send last request
self._state = self.State.SENDING_REQUEST
def _record_already_processed(self, srchash: str) -> bool:
"""
Has this source record (identified by its PK and its hash) already been
processed? (If so, then in incremental mode, we can skip it.)
"""
pkval = self._other_values[FN_SRCPKVAL]
pkstr = self._other_values[FN_SRCPKSTR]
progrec = self._ifconfig.get_progress_record(pkval, pkstr)
if progrec is not None:
if progrec.srchash == srchash:
log.debug("Record previously processed; skipping")
return True
log.debug("Record has changed")
else:
log.debug("Record is new")
return False
def _record_limit_reached(self) -> bool:
"""
Have we processed as many records as we're allowed before we should
COMMIT to the database?
"""
limit_before_commit = self._crinfo.cloudcfg.limit_before_commit
return self._num_recs_processed >= limit_before_commit
def _get_next_record(self) -> None:
"""
Reads the next text record and metadata into ``self._text`` and
``self._other_values``.
Raises:
:exc:`StopIteration` if there are no more records
"""
self._text, self._other_values = next(self._text_generator)
self._global_recnum += 1
pkval = self._other_values[FN_SRCPKVAL]
pkstr = self._other_values[FN_SRCPKSTR]
# 'ifconfig.get_progress_record' expects pkstr to be None if it's
# empty
if not pkstr:
pkstr = None
if (
self._report_every
and self._global_recnum % self._report_every == 0
):
# total number of records in table
totalcount = self._ifconfig.get_count()
log.info(
"Processing {db}.{t}.{c}, PK: {pkf}={pkv} "
"(record {g_recnum}/{totalcount})".format(
db=self._other_values[FN_SRCDB],
t=self._other_values[FN_SRCTABLE],
c=self._other_values[FN_SRCFIELD],
pkf=self._other_values[FN_SRCPKFIELD],
pkv=pkstr if pkstr else pkval,
g_recnum=self._global_recnum,
totalcount=totalcount,
)
)
def _send_request(self) -> None:
"""
Send a pending request to the remote NLP server.
Update the state afterwards.
"""
self._request.send_process_request(
queue=self._queue,
cookies=self._cookies,
include_text_in_reply=self._crinfo.cloudcfg.has_gate_processors,
)
# If there's a connection error, we only get this far if we
# didn't choose to stop at failure
if self._request.request_failed:
log.warning("Continuing after failed request.")
else:
if self._request.cookies:
self._cookies = self._request.cookies
log.info(
f"Sent request to be processed: #{self._request_count} "
f"of this block"
)
self._request_count += 1
self._requests.append(self._request)
if self._record_limit_reached():
self._state = self.State.FINISHED
return
self._state = self.State.BUILDING_REQUEST
self._need_new_request = True