14.5.19. crate_anon.nlp_manager.nlp_manager
crate_anon/nlp_manager/nlp_manager.py
Copyright (C) 2015, University of Cambridge, Department of Psychiatry. Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CRATE.
CRATE is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
CRATE is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with CRATE. If not, see <https://www.gnu.org/licenses/>.
Manage natural-language processing (NLP) via internal and external tools.
Speed testing:
8 processes, extracting person, location from a mostly text database
commit off during full (non-incremental) processing (much faster)
needs lots of RAM; e.g. Java subprocess uses 1.4 Gb per process as an average (rises from ~250Mb to ~1.4Gb and falls; steady rise means memory leak!); tested on a 16 Gb machine. See also the
max_external_prog_uses
parameter.
from __future__ import division
test_size_mb = 1887
n_person_tags_found =
n_locations_tags_found =
time_s = 10333 # 10333 s for main bit; 10465 including indexing; is 2.9 hours
speed_mb_per_s = test_size_mb / time_s
… gives 0.18 Mb/s, and note that’s 1.9 Gb of text, not of attachments.
With incremental option, and nothing to do: same run took 18 s.
During the main run, snapshot CPU usage:
java about 81% across all processes, everything else close to 0 (using about 12 Gb RAM total) ... or 75-85% * 8 [from top] mysqld about 18% [from top] nlp_manager.py about 4-5% * 8 [from top]
- class crate_anon.nlp_manager.nlp_manager.DbInfo(session: Session | None = None, engine: Engine | None = None, metadata: MetaData | None = None, db: DatabaseHolder | None = None, temptable: Table | None = None)[source]
Simple object carrying information about a database. Used by
delete_where_no_source()
.- __init__(session: Session | None = None, engine: Engine | None = None, metadata: MetaData | None = None, db: DatabaseHolder | None = None, temptable: Table | None = None) None [source]
- crate_anon.nlp_manager.nlp_manager.cancel_request(nlpdef: NlpDefinition, cancel_all: bool = False) None [source]
Delete pending requests from the server’s queue.
- crate_anon.nlp_manager.nlp_manager.delete_where_no_source(nlpdef: NlpDefinition, ifconfig: InputFieldConfig, report_every: int = 100000, chunksize: int = 100000) None [source]
Delete destination records where source records no longer exist.
- Parameters:
nlpdef –
crate_anon.nlp_manager.nlp_definition.NlpDefinition
ifconfig – crate_anon.nlp_manager.input_field_config.InputFieldConfig
report_every – report to the log every n source rows
chunksize – insert into the SQLAlchemy session every n records
Development thoughts:
Can’t do this in a single SQL command, since the engine can’t necessarily see both databases.
Can’t use a single temporary table, since the progress database isn’t necessarily the same as any of the destination database(s).
Can’t do this in a multiprocess way, because we’re trying to do a
DELETE WHERE NOT IN
.So my first attempt was: fetch all source PKs (which, by definition, do exist), stash them in memory, and do a
DELETE WHERE NOT IN
based on those specified values (or, if there are no PKs in the source, delete everything from the destination).
Problems with that:
This is IMPERFECT if we have string source PKs and there are hash collisions (e.g. PKs for records X and Y both hash to the same thing; record X is deleted; then its processed version might not be).
With massive tables, we might run out of memory or (much more likely) SQL parameter slots. – This is now happening; error looks like: pyodbc.ProgrammingError: (‘The SQL contains 30807 parameter parkers, but 2717783 parameters were supplied’, ‘HY000’)
A better way might be:
for each table, make a temporary table in the same database
populate that table with (source PK integer/hash, source PK string) pairs
delete where pairs don’t match – is that portable SQL? https://stackoverflow.com/questions/7356108/sql-query-for-deleting-rows-with-not-in-using-2-columns # noqa
More efficient would be to make one table per destination database.
On the “delete where multiple fields don’t match”:
Single field syntax is
DELETE FROM a WHERE a1 NOT IN (SELECT b1 FROM b)
Multiple field syntax is
DELETE FROM a WHERE NOT EXISTS ( SELECT 1 FROM b WHERE a.a1 = b.b1 AND a.a2 = b.b2 )
In SQLAlchemy,
exists()
:Furthermore, in SQL
NULL = NULL
is false (it’s null), andNULL <> NULL
is also false (it’s null), so we have to do an explicit null check. You do that withfield == None
. See https://stackoverflow.com/questions/21668606. We’re aiming, therefore, for:DELETE FROM a WHERE NOT EXISTS ( SELECT 1 FROM b WHERE a.a1 = b.b1 AND ( a.a2 = b.b2 OR (a.a2 IS NULL AND b.b2 IS NULL) ) )
- crate_anon.nlp_manager.nlp_manager.drop_remake(nlpdef: NlpDefinition, incremental: bool = False, skipdelete: bool = False, report_every: int = 100000, chunksize: int = 100000) None [source]
Drop output tables and recreate them.
- Parameters:
nlpdef –
crate_anon.nlp_manager.nlp_definition.NlpDefinition
incremental – incremental processing mode?
skipdelete – For incremental updates, skip deletion of rows present in the destination but not the source
report_every – report to the log every n source rows
chunksize – insert into the SQLAlchemy session every n records
- crate_anon.nlp_manager.nlp_manager.inner_main() None [source]
Indirect command-line entry point. See command-line help.
- crate_anon.nlp_manager.nlp_manager.print_cloud_processors(nlpdef: NlpDefinition, indent: int = 4, sort_keys: bool = True) None [source]
Print remote processor definitions to the screen.
- crate_anon.nlp_manager.nlp_manager.process_cloud_nlp(crinfo: CloudRunInfo, incremental: bool = False, report_every: int = 500) None [source]
Process text by sending it off to the cloud processors in queued mode.
- crate_anon.nlp_manager.nlp_manager.process_cloud_now(crinfo: CloudRunInfo, incremental: bool = False, report_every: int = 500) None [source]
Process text by sending it off to the cloud processors in non-queued mode.
- crate_anon.nlp_manager.nlp_manager.process_nlp(nlpdef: NlpDefinition, incremental: bool = False, report_every: int = 500, tasknum: int = 0, ntasks: int = 1) None [source]
Main NLP processing function. Fetch text, send it to the NLP processor(s), storing the results, and make a note in the progress database.
- Parameters:
nlpdef –
crate_anon.nlp_manager.nlp_definition.NlpDefinition
incremental – Incremental processing (skip previously processed records).
report_every – Report to the log every n source rows.
tasknum – Which task number am I?
ntasks – How many tasks are there in total?
- crate_anon.nlp_manager.nlp_manager.retrieve_nlp_data(crinfo: CloudRunInfo, incremental: bool = False) None [source]
Try to retrieve the data from the cloud processors.
- crate_anon.nlp_manager.nlp_manager.show_cloud_queue(nlpdef: NlpDefinition) None [source]
Get list of the user’s queued requests and print to screen.
- crate_anon.nlp_manager.nlp_manager.show_dest_counts(nlpdef: NlpDefinition) None [source]
Print (to stdout) the number of records in all destination tables.
- Parameters:
nlpdef –
crate_anon.nlp_manager.nlp_definition.NlpDefinition
- crate_anon.nlp_manager.nlp_manager.show_source_counts(nlpdef: NlpDefinition) None [source]
Print (to stdout) the number of records in all source tables.
- Parameters:
nlpdef –
crate_anon.nlp_manager.nlp_definition.NlpDefinition
- crate_anon.nlp_manager.nlp_manager.test_nlp_stdin(nlpdef: NlpDefinition) None [source]
Tests NLP processor(s) by sending stdin to it/them.
- Parameters:
nlpdef –
crate_anon.nlp_manager.nlp_definition.NlpDefinition