14.5.19. crate_anon.nlp_manager.nlp_manager

crate_anon/nlp_manager/nlp_manager.py


Copyright (C) 2015, University of Cambridge, Department of Psychiatry. Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

This file is part of CRATE.

CRATE is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

CRATE is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with CRATE. If not, see <https://www.gnu.org/licenses/>.


Manage natural-language processing (NLP) via internal and external tools.

Speed testing:

  • 8 processes, extracting person, location from a mostly text database

  • commit off during full (non-incremental) processing (much faster)

  • needs lots of RAM; e.g. Java subprocess uses 1.4 Gb per process as an average (rises from ~250Mb to ~1.4Gb and falls; steady rise means memory leak!); tested on a 16 Gb machine. See also the max_external_prog_uses parameter.

from __future__ import division
test_size_mb = 1887
n_person_tags_found =
n_locations_tags_found =
time_s = 10333  # 10333 s for main bit; 10465 including indexing; is 2.9 hours
speed_mb_per_s = test_size_mb / time_s

… gives 0.18 Mb/s, and note that’s 1.9 Gb of text, not of attachments.

  • With incremental option, and nothing to do: same run took 18 s.

  • During the main run, snapshot CPU usage:

    java about 81% across all processes, everything else close to 0
        (using about 12 Gb RAM total)
    ... or 75-85% * 8 [from top]
    mysqld about 18% [from top]
    nlp_manager.py about 4-5% * 8 [from top]
    
class crate_anon.nlp_manager.nlp_manager.DbInfo(session: Session | None = None, engine: Engine | None = None, metadata: MetaData | None = None, db: DatabaseHolder | None = None, temptable: Table | None = None)[source]

Simple object carrying information about a database. Used by delete_where_no_source().

__init__(session: Session | None = None, engine: Engine | None = None, metadata: MetaData | None = None, db: DatabaseHolder | None = None, temptable: Table | None = None) None[source]
crate_anon.nlp_manager.nlp_manager.cancel_request(nlpdef: NlpDefinition, cancel_all: bool = False) None[source]

Delete pending requests from the server’s queue.

crate_anon.nlp_manager.nlp_manager.delete_where_no_source(nlpdef: NlpDefinition, ifconfig: InputFieldConfig, report_every: int = 100000, chunksize: int = 100000) None[source]

Delete destination records where source records no longer exist.

Parameters:
  • nlpdefcrate_anon.nlp_manager.nlp_definition.NlpDefinition

  • ifconfigcrate_anon.nlp_manager.input_field_config.InputFieldConfig

  • report_every – report to the log every n source rows

  • chunksize – insert into the SQLAlchemy session every n records

Development thoughts:

  • Can’t do this in a single SQL command, since the engine can’t necessarily see both databases.

  • Can’t use a single temporary table, since the progress database isn’t necessarily the same as any of the destination database(s).

  • Can’t do this in a multiprocess way, because we’re trying to do a DELETE WHERE NOT IN.

  • So my first attempt was: fetch all source PKs (which, by definition, do exist), stash them in memory, and do a DELETE WHERE NOT IN based on those specified values (or, if there are no PKs in the source, delete everything from the destination).

Problems with that:

  • This is IMPERFECT if we have string source PKs and there are hash collisions (e.g. PKs for records X and Y both hash to the same thing; record X is deleted; then its processed version might not be).

  • With massive tables, we might run out of memory or (much more likely) SQL parameter slots. – This is now happening; error looks like: pyodbc.ProgrammingError: (‘The SQL contains 30807 parameter parkers, but 2717783 parameters were supplied’, ‘HY000’)

A better way might be:

More efficient would be to make one table per destination database.

On the “delete where multiple fields don’t match”:

crate_anon.nlp_manager.nlp_manager.drop_remake(nlpdef: NlpDefinition, incremental: bool = False, skipdelete: bool = False, report_every: int = 100000, chunksize: int = 100000) None[source]

Drop output tables and recreate them.

Parameters:
  • nlpdefcrate_anon.nlp_manager.nlp_definition.NlpDefinition

  • incremental – incremental processing mode?

  • skipdelete – For incremental updates, skip deletion of rows present in the destination but not the source

  • report_every – report to the log every n source rows

  • chunksize – insert into the SQLAlchemy session every n records

crate_anon.nlp_manager.nlp_manager.inner_main() None[source]

Indirect command-line entry point. See command-line help.

crate_anon.nlp_manager.nlp_manager.main() None[source]

Command-line entry point.

crate_anon.nlp_manager.nlp_manager.print_cloud_processors(nlpdef: NlpDefinition, indent: int = 4, sort_keys: bool = True) None[source]

Print remote processor definitions to the screen.

crate_anon.nlp_manager.nlp_manager.process_cloud_nlp(crinfo: CloudRunInfo, incremental: bool = False, report_every: int = 500) None[source]

Process text by sending it off to the cloud processors in queued mode.

crate_anon.nlp_manager.nlp_manager.process_cloud_now(crinfo: CloudRunInfo, incremental: bool = False, report_every: int = 500) None[source]

Process text by sending it off to the cloud processors in non-queued mode.

crate_anon.nlp_manager.nlp_manager.process_nlp(nlpdef: NlpDefinition, incremental: bool = False, report_every: int = 500, tasknum: int = 0, ntasks: int = 1) None[source]

Main NLP processing function. Fetch text, send it to the NLP processor(s), storing the results, and make a note in the progress database.

Parameters:
  • nlpdefcrate_anon.nlp_manager.nlp_definition.NlpDefinition

  • incremental – Incremental processing (skip previously processed records).

  • report_every – Report to the log every n source rows.

  • tasknum – Which task number am I?

  • ntasks – How many tasks are there in total?

crate_anon.nlp_manager.nlp_manager.retrieve_nlp_data(crinfo: CloudRunInfo, incremental: bool = False) None[source]

Try to retrieve the data from the cloud processors.

crate_anon.nlp_manager.nlp_manager.show_cloud_queue(nlpdef: NlpDefinition) None[source]

Get list of the user’s queued requests and print to screen.

crate_anon.nlp_manager.nlp_manager.show_dest_counts(nlpdef: NlpDefinition) None[source]

Print (to stdout) the number of records in all destination tables.

Parameters:

nlpdefcrate_anon.nlp_manager.nlp_definition.NlpDefinition

crate_anon.nlp_manager.nlp_manager.show_source_counts(nlpdef: NlpDefinition) None[source]

Print (to stdout) the number of records in all source tables.

Parameters:

nlpdefcrate_anon.nlp_manager.nlp_definition.NlpDefinition

crate_anon.nlp_manager.nlp_manager.test_nlp_stdin(nlpdef: NlpDefinition) None[source]

Tests NLP processor(s) by sending stdin to it/them.

Parameters:

nlpdefcrate_anon.nlp_manager.nlp_definition.NlpDefinition