Source code for crate_anon.crateweb.research.views

"""
crate_anon/crateweb/research/views.py

===============================================================================

    Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CRATE.

    CRATE is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CRATE is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CRATE. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**CRATE views on the research database.**

"""

import copy
import datetime
import json
import logging
from os.path import basename
from typing import Any, Dict, Iterable, List, Sequence, Type, Union, Optional

from cardinal_pythonlib.typing_helpers import Pep249DatabaseCursorType
from cardinal_pythonlib.dbfunc import get_fieldnames_from_cursor
from cardinal_pythonlib.django.function_cache import django_cache_function
from cardinal_pythonlib.django.serve import file_response, serve_file
from cardinal_pythonlib.exceptions import recover_info_from_exception
from cardinal_pythonlib.hash import hash64
from cardinal_pythonlib.httpconst import ContentType
from cardinal_pythonlib.logs import BraceStyleAdapter
from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName
from cardinal_pythonlib.psychiatry.drugs import Drug, all_drugs_where
from django import forms
from django.conf import settings
from django.contrib.auth.decorators import user_passes_test
from django.core.exceptions import (
    ObjectDoesNotExist,
    ValidationError,
)
from django.db import DatabaseError, ProgrammingError
from django.db.models import Q, QuerySet
from django.http.response import (
    HttpResponse,
    HttpResponseBase,
    HttpResponseBadRequest,
    HttpResponseRedirect,
)
from django.http.request import HttpRequest
from django.shortcuts import get_object_or_404, redirect, render
from django.template.loader import render_to_string
from django.urls import reverse
from django.utils.html import escape
from django.views.decorators.cache import cache_control
from mako.exceptions import TemplateLookupException
from pyparsing import ParseException

from crate_anon.common.constants import JSON_SEPARATORS_COMPACT

# from crate_anon.common.profiling import do_cprofile
from crate_anon.common.sql import (
    ColumnId,
    escape_sql_string_literal,
    escape_sql_string_or_int_literal,
    SQL_OPS_MULTIPLE_VALUES,
    SQL_OPS_VALUE_UNNECESSARY,
    TableId,
    toggle_distinct,
    WhereCondition,
)
from crate_anon.crateweb.config.constants import UrlNames, UrlKeys
from crate_anon.crateweb.core.utils import (
    guess_mimetype,
    is_clinician,
    is_superuser,
    paginate,
)
from crate_anon.crateweb.research.archive_backend import (
    archive_attachment_url,
    ARCHIVE_CONTEXT,
    ARCHIVE_IS_CONFIGURED,
    archive_mako_lookup,
    archive_misconfigured_response,
    archive_root_url,
    archive_static_url,
    archive_template_url,
    ArchiveContextKeys,
    audit_archive_attachment,
    audit_archive_template,
    CACHE_CONTROL_MAX_AGE_ARCHIVE_ATTACHMENTS,
    CACHE_CONTROL_MAX_AGE_ARCHIVE_STATIC,
    CACHE_CONTROL_MAX_AGE_ARCHIVE_TEMPLATES,
    DEFAULT_GUESS_CONTENT_TYPE,
    get_archive_attachment_filepath,
    get_archive_static_filepath,
)
from crate_anon.crateweb.research.errors import DatabaseStructureNotUnderstood
from crate_anon.crateweb.research.forms import (
    AddHighlightForm,
    AddQueryForm,
    ClinicianAllTextFromPidForm,
    DatabasePickerForm,
    DEFAULT_MIN_TEXT_FIELD_LENGTH,
    FieldPickerInfo,
    ManualPeQueryForm,
    PidLookupForm,
    QueryBuilderForm,
    RidLookupForm,
    SQLHelperTextAnywhereForm,
    SQLHelperFindAnywhereForm,
    SQLHelperDrugTypeForm,
)
from crate_anon.crateweb.research.html_functions import (
    highlight_text,
    HtmlElementCounter,
    make_result_element,
    make_collapsible_sql_query,
    N_CSS_HIGHLIGHT_CLASSES,
    prettify_sql_css,
    prettify_sql_html,
    prettify_sql_and_args,
)
from crate_anon.crateweb.research.models import (
    get_executed_researchdb_cursor,
    get_executed_researchdb_cursor_qmark_placeholders,
    Highlight,
    PatientExplorer,
    PatientMultiQuery,
    PidLookup,
    Query,
    SitewideQuery,
)
from crate_anon.crateweb.research.research_db_info import (
    PatientFieldPythonTypes,
    research_database_info,
    SingleResearchDatabase,
)
from crate_anon.crateweb.research.sql_writer import (
    add_to_select,
    SelectElement,
)
from crate_anon.crateweb.userprofile.models import (
    get_patients_per_page,
    UserProfile,
)

log = BraceStyleAdapter(logging.getLogger(__name__))


# =============================================================================
# Constants
# =============================================================================

# Maximum number of characters to show of a query in html
MAX_LEN_SHOW = 20000


# Prefix for inline pid and mpid conversion
PID_PREFIX = "~pid"
MPID_PREFIX = "~mpid"


# Fieldnames for CRATE NLP table
FN_NLPDEF = "_nlpdef"
FN_SRCDB = "_srcdb"
FN_SRCTABLE = "_srctable"
FN_SRCFIELD = "_srcfield"
FN_SRCPKFIELD = "_srcpkfield"
FN_SRCPKVAL = "_srcpkval"
FN_SRCPKSTR = "_srcpkstr"


# =============================================================================
# Helper functions
# =============================================================================


[docs]def validate_blank_form(request: HttpRequest) -> None: """ Checks that the request is (a) a POST request, and (b) passes CRSF validation. Args: request: the :class:`django.http.request.HttpRequest` Raises: :exc:`django.core.exceptions.ValidationError` if it fails """ if request.method != "POST": raise ValidationError("Use HTTP POST, not HTTP GET or other methods") form = forms.Form(request.POST) if not form.is_valid(): # checks CSRF raise ValidationError("Form failed validation")
[docs]def query_context(request: HttpRequest) -> Dict[str, Any]: """ Query context dictionary used for (nearly?) *every* request. Args: request: the :class:`django.http.request.HttpRequest` Returns: dict: a dictionary with core information about the request, like the currently selected query/Patient Explorer ID for the user. Notes: - Try to minimize SQL here, as these calls will be used for EVERY request. - This problem can be circumvented with a per-request cache; see https://stackoverflow.com/questions/3151469/per-request-cache-in-django """ query_id = Query.get_active_query_id_or_none(request) pe_id = PatientExplorer.get_active_pe_id_or_none(request) return { "query_selected": query_id is not None, "current_query_id": query_id, "pe_selected": pe_id is not None, "current_pe_id": pe_id, }
[docs]def datetime_iso_for_filename() -> str: """ Returns a date/time as a string formatted for filenames. """ dtnow = datetime.datetime.now() return dtnow.strftime("%Y%m%d_%H%M%S")
# ============================================================================= # Errors # =============================================================================
[docs]def generic_error(request: HttpRequest, error: str) -> HttpResponse: """ Returns a generic error response. Args: request: the :class:`django.http.request.HttpRequest` error: the error text Returns: a :class:`django.http.response.HttpResponse` """ context = { "error": error, } return render(request, "generic_error.html", context)
# ============================================================================= # Queries # ============================================================================= @django_cache_function(timeout=None) # @lru_cache(maxsize=None) def get_db_structure_json() -> str: """ Returns the research database structure in JSON format. """ log.debug("get_db_structure_json") colinfolist = research_database_info.get_colinfolist() if not colinfolist: log.warning("get_db_structure_json(): colinfolist is empty") info = [] # type: List[Dict[str, Any]] for dbinfo in research_database_info.dbinfolist: log.info(f"get_db_structure_json: schema {dbinfo.schema_identifier}") if not dbinfo.eligible_for_query_builder: log.debug( f"Skipping schema={dbinfo.schema_identifier}: " f"not eligible for query builder" ) continue schema_cil = [ x for x in colinfolist if x.table_catalog == dbinfo.database and x.table_schema == dbinfo.schema_name ] table_info = [] # type: List[Dict[str, Any]] for table in sorted(set(x.table_name for x in schema_cil)): table_cil = [x for x in schema_cil if x.table_name == table] if not any( x for x in table_cil if x.column_name == dbinfo.trid_field ): # This table doesn't contain a TRID, so we will skip it. log.debug( f"... skipping table {table}: " f"no TRID [{dbinfo.trid_field}]" ) continue if not any( x for x in table_cil if x.column_name == dbinfo.rid_field ): # This table doesn't contain a RID, so we will skip it. log.debug( f"... skipping table {table}: " f"no RID [{dbinfo.rid_field}]" ) continue column_info = [] # type: List[Dict[str, str]] for ci in sorted(table_cil, key=lambda x: x.column_name): column_info.append( { "colname": ci.column_name, "coltype": ci.querybuilder_type, "rawtype": ci.column_type, "comment": ci.column_comment or "", } ) if column_info: table_info.append( { "table": table, "columns": column_info, } ) log.debug(f"... using table {table}: {len(column_info)} columns") if table_info: info.append( { "database": dbinfo.database, "schema": dbinfo.schema_name, "tables": table_info, } ) json_result = json.dumps(info, separators=JSON_SEPARATORS_COMPACT) log.debug( f"... get_db_structure_json returning string of size " f"{len(json_result)}" ) return json_result
[docs]def query_build(request: HttpRequest) -> HttpResponse: """ Assisted query builder, based on the data structure read from the research database. Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ # NOTES FOR FIRST METHOD, with lots (and lots) of forms. # - In what follows, we want a normal template but we want to include a # large chunk of raw HTML. I was doing this with # {{ builder_html | safe }} within the template, but it was very slow # (e.g. 500ms on my machine; 50s on the CPFT "sandpit" server, # 2016-06-28). The delay was genuinely in the template rendering, it # seems, based on profiling and manual log calls. # - A simple string replacement, as below, was about 7% of the total time # (e.g. 3300ms instead of 50s). # - Other alternatives might include the Jinja2 template system, which is # apparently faster than the Django default, but we may not need further # optimization. # - Another, potentially better, solution, is not to send dozens or # hundreds of forms, but to write some Javascript to make this happen # mostly on the client side. Might look better, too. (Yes, it does.) # NB: first "submit" button takes the Enter key, so place WHERE # before SELECT so users can hit enter in the WHERE value fields. # - If you provide the "request=request" argument to # render_to_string it gives you the CSRF token. # - Another way is to ignore "request" and use render_to_string # with a manually crafted context including 'csrf_token'. # (This avoids the global context processors.) # - Note that the CSRF token prevents simple caching of the forms. # - But we can't cache anyway if we're going to have some forms # (differentially) non-collapsed at the start, e.g. on form POST. # - Also harder work to do this HTML manually (rather than with # template rendering), because the csrf_token ends up like: # <input type='hidden' name='csrfmiddlewaretoken' value='RGN5UZnTVkLFAVNtXRpJwn5CclBRAdLr' /> # noqa # noinspection PyUnresolvedReferences profile = request.user.profile # type: UserProfile parse_error = "" default_database = research_database_info.get_default_database_name() default_schema = research_database_info.get_default_schema_name() with_database = research_database_info.uses_database_level() form = None if request.method == "POST": grammar = research_database_info.grammar try: if "global_clear" in request.POST: profile.sql_scratchpad = "" profile.save() elif "global_toggle_distinct" in request.POST: profile.sql_scratchpad = toggle_distinct( profile.sql_scratchpad, grammar=grammar ) profile.save() elif "global_save" in request.POST: return query_submit(request, profile.sql_scratchpad, run=False) elif "global_run" in request.POST: return query_submit(request, profile.sql_scratchpad, run=True) else: form = QueryBuilderForm(request.POST, request.FILES) if form.is_valid(): database = ( form.cleaned_data["database"] if with_database else "" ) schema = form.cleaned_data["schema"] table = form.cleaned_data["table"] column = form.cleaned_data["column"] column_id = ColumnId( db=database, schema=schema, table=table, column=column ) table_id = column_id.table_id if "submit_select" in request.POST: profile.sql_scratchpad = add_to_select( profile.sql_scratchpad, select_elements=[ SelectElement(column_id=column_id) ], magic_join=True, grammar=grammar, ) elif "submit_select_star" in request.POST: select_elements = [ SelectElement(column_id=c.column_id) for c in research_database_info.all_columns( table_id ) ] profile.sql_scratchpad = add_to_select( profile.sql_scratchpad, select_elements=select_elements, magic_join=True, grammar=grammar, ) elif "submit_where" in request.POST: datatype = form.cleaned_data["datatype"] op = form.cleaned_data["where_op"] # Value if op in SQL_OPS_MULTIPLE_VALUES: value = form.file_values_list elif op in SQL_OPS_VALUE_UNNECESSARY: value = None else: value = form.get_cleaned_where_value() # WHERE fragment wherecond = WhereCondition( column_id=column_id, op=op, datatype=datatype, value_or_values=value, ) profile.sql_scratchpad = add_to_select( profile.sql_scratchpad, where_type="AND", where_conditions=[wherecond], magic_join=True, grammar=grammar, ) else: raise ValueError("Bad form command!") profile.save() else: pass except (ParseException, DatabaseStructureNotUnderstood) as e: parse_error = str(e) if form is None: form = QueryBuilderForm() starting_values_dict = { "database": form.data.get("database", "") if with_database else "", "schema": form.data.get("schema", ""), "table": form.data.get("table", ""), "column": form.data.get("column", ""), "op": form.data.get("where_op", ""), "date_value": form.data.get("date_value", ""), # Impossible to set file_value programmatically. (See querybuilder.js.) "float_value": form.data.get("float_value", ""), "int_value": form.data.get("int_value", ""), "string_value": form.data.get("string_value", ""), "offer_where": bool(profile.sql_scratchpad), # existing SELECT? "form_errors": "<br>".join( f"{k}: {v}" for k, v in form.errors.items() ), "default_database": default_database, "default_schema": default_schema, "with_database": with_database, } context = { "nav_on_querybuilder": True, "sql": prettify_sql_html(profile.sql_scratchpad), "parse_error": parse_error, "database_structure": get_db_structure_json(), "starting_values": json.dumps( starting_values_dict, separators=JSON_SEPARATORS_COMPACT ), "sql_dialect": settings.RESEARCH_DB_DIALECT, "dialect_mysql": settings.RESEARCH_DB_DIALECT == SqlaDialectName.MYSQL, "dialect_mssql": settings.RESEARCH_DB_DIALECT == SqlaDialectName.MSSQL, "sql_highlight_css": prettify_sql_css(), } context.update(query_context(request)) return render(request, "query_build.html", context)
[docs]def get_all_queries(request: HttpRequest) -> QuerySet: """ Return all database queries for the current user. Args: request: the :class:`django.http.request.HttpRequest` Returns: request: a :class:`django.db.models.QuerySet` for :class:`crate_anon.crateweb.research.models.Query` objects """ return Query.objects.filter(user=request.user, deleted=False).order_by( "-active", "-created" )
[docs]def get_all_sitewide_queries() -> QuerySet: """ Returns all site-wide queries. Returns: request: a :class:`django.db.models.QuerySet` for :class:`crate_anon.crateweb.research.models.SitewideQuery` objects """ return SitewideQuery.objects.filter(deleted=False).order_by("-created")
[docs]def get_identical_queries( request: HttpRequest, sql: str, sitewide: bool = False ) -> List[Query]: """ Returns all queries that are identical to the SQL provided. This saves us creating a new query when one exists already that's identical. We check by hash. Args: request: the :class:`django.http.request.HttpRequest` sql: SQL text sitewide: check sitewide, rather than user-specific, queries? Returns: list: :class:`crate_anon.crateweb.research.models.Query` objects """ if sitewide: all_queries = get_all_sitewide_queries() else: all_queries = get_all_queries(request) # identical_queries = all_queries.filter(sql=sql) # # - 2017-02-03: we had a problem here, in which the parameter was sent to # SQL Server as type NTEXT, but the field "sql" is NVARCHAR(MAX), leading # to "The data types nvarchar(max) and ntext are incompatible in the # equal to operator." # - The Django field type TextField is converted to NVARCHAR(MAX) by # django-pyodbc-azure, in sql_server/pyodbc/base.py, also at [1]. # - That seems fine; NVARCHAR(MAX) seems more capable than NTEXT. # NTEXT is deprecated. # - Error is reproducible with # ... WHERE sql = CAST('hello' AS NTEXT) ... # - The order of the types in the error message matches the order in the # SQL statement. # - A solution would be to cast the parameter as # CAST(some_parameter AS NVARCHAR(MAX)) # - Fixed by upgrading pyodbc from 3.1.1 to 4.0.3 # - Added to FAQ # - WARNING: the problem came back with pyodbc==4.0.6, but not fixed again # by downgrading to 4.0.3 # - See also [2]. # - An alternative solution would not be to compare on the long text, but # store and compare on a hash of it. # - The problem is that either pyodbc or ODBC itself, somehow, is sending # the string parameter as NTEXT. # Similar Perl problem: [3]. # # - In pyodbc, the key functions are: # cursor.cpp: static PyObject* execute(...) # -> params.cpp: bool PrepareAndBind(...) # -> GetParameterInfo // THIS ONE # Parameter will be of type str. # This will fail for PyBytes_Check [4]. # This will match for PyUnicode_Check [5]. # Thus: # -> GetUnicodeInfo # ... and depending on the string length of the # parameter, this returns either # SQL_WVARCHAR -> NVARCHAR on SQL Server [6], for short strings # noqa # SQL_WLONGVARCHAR -> NTEXT on SQL Server [6], for long strings # noqa # ... and the length depends on # -> connection.h: cur->cnxn->GetMaxLength(info.ValueType); # noqa # -> BindParameter # in cursor.cpp # # - Now we also have pyodbc docs: [7]. # # - Anyway, the upshot is that there is some unpredictabilty in sending # very long parameters... the intermittency would be explained by some # dependency on string length. # - Empirically, it fails somewhere around 1,900 characters. # # - Could switch away from pyodbc, e.g. to Django-mssql [8, 9]. # But, as per the CRATE manual, there were version incompatibilities # here. Tried again with v1.8, but it gave configuration errors # (ADODB.Connection; Provider cannot be found. It may not be properly # installed.) Anyway, pyodbc is good enough for SQLAlchemy. # # [1] https://github.com/michiya/django-pyodbc-azure/blob/azure-1.10/sql_server/pyodbc/base.py # noqa # [2] https://github.com/mkleehammer/pyodbc/blob/master/tests2/informixtests.py # noqa # [3] https://stackoverflow.com/questions/13090907 # [4] https://docs.python.org/3/c-api/bytes.html # [5] https://docs.python.org/3/c-api/unicode.html # [6] https://documentation.progress.com/output/DataDirect/DataDirectCloud/index.html#page/queries/microsoft-sql-server-data-types.html # noqa # [7] https://github.com/mkleehammer/pyodbc/wiki/Data-Types # [8] https://docs.djangoproject.com/en/1.10/ref/databases/#using-a-3rd-party-database-backend # noqa # [9] https://django-mssql.readthedocs.io/en/latest/ # Screw it, let's use a hash. We can use our hash64() function and # a Django BigIntegerField. identical_queries = all_queries.filter(sql_hash=hash64(sql)) # Now eliminate any chance of errors via hash collisions by double-checking # the Python objects: return [q for q in identical_queries if q.sql == sql]
# noinspection PyUnusedLocal
[docs]@user_passes_test(is_clinician) def parse_privileged_sql(request: HttpRequest, sql: str) -> List[Any]: """ Parses clinicians' queries to find rid from pid. SQL should be, e.g.: 'SELECT * FROM tablename WHERE ~pid:dbname = pidnumber' or 'SELECT * FROM tablename WHERE ~mpid:dbname IN (value1, value2, ...)' where dbname has the secret lookup table the user wants to use. Args: request: the :class:`django.http.request.HttpRequest` sql: SQL text Returns: [bool, str] where the bool is success (0) or failure (1). If success, the str is the new sql and if failure it's the error message. """ sql_components = sql.split() new_sql = "" i = 0 while i < len(sql_components): split_component = sql_components[i].split(":") if len(split_component) == 2 and ( split_component[0] == PID_PREFIX or split_component[0] == MPID_PREFIX ): id_type, dbname = split_component try: dbinfo = research_database_info.get_dbinfo_by_name(dbname) except ValueError: return [1, f"No such database with name '{dbname}'"] if not dbinfo.secret_lookup_db: return [1, f"Database '{dbname}' has no pid to rid lookup"] rid_field = dbinfo.rid_field i += 1 try: operator = sql_components[i] except IndexError: return [1, "No operator given"] if operator.upper() == "IN": i += 1 try: if not sql_components[i].startswith("("): return [ 1, "When using the operator 'IN', values " "must be enclosed with brackets", ] except IndexError: return [1, "Missing values in clause"] values = [] at_end = False while not at_end: try: current = sql_components[i] except IndexError: return [1, "Final bracket missing in list of values"] values.extend( [ x for x in current.replace("(", "") .replace(")", "") .split(",") if x ] ) if ")" in current or i >= len(sql_components): at_end = True i += 1 if id_type == MPID_PREFIX: lookups = PidLookup.objects.using( dbinfo.secret_lookup_db ).filter(Q(mpid__in=values)) else: lookups = PidLookup.objects.using( dbinfo.secret_lookup_db ).filter(Q(pid__in=values)) rids = [lk.rid for lk in lookups] if rids: rids = [f"'{rid}'" for rid in rids] extra_sql = ",".join(rids) new_sql += f"{rid_field} IN ({extra_sql}) " else: new_sql += f"{rid_field} = ''" elif operator == "=": i += 1 try: value = sql_components[i] except IndexError: return [1, "Missing value in clause"] i += 1 if id_type == MPID_PREFIX: lookup = ( PidLookup.objects.using(dbinfo.secret_lookup_db) .filter(mpid=value) .first() ) else: lookup = ( PidLookup.objects.using(dbinfo.secret_lookup_db) .filter(pid=value) .first() ) rid = "" if not lookup else lookup.rid new_sql += f"{rid_field} {operator} '{rid}' " else: return [ 1, "pid and mpid conversion does not work with " f"operator '{operator}', only with operators '=' " "and 'IN'.", ] else: new_sql += f"{sql_components[i]} " i += 1 # Remove trailing space new_sql = new_sql.strip() return [0, new_sql]
[docs]def query_submit( request: HttpRequest, sql: str, run: bool = False, filter_display: bool = False, ) -> HttpResponse: """ Ancillary function to add a query, and redirect to the editing or run page. Args: request: the :class:`django.http.request.HttpRequest` sql: SQL text run: execute the query and show the results? Otherwise, save the query and return to the editing page filter_display: after saving the query, redirect to the filter page? Returns: a :class:`django.http.response.HttpResponse` """ if is_clinician(request.user): parsed_sql = parse_privileged_sql(request, sql) if not parsed_sql[0]: sql = parsed_sql[1] else: return generic_error(request, parsed_sql[1]) elif PID_PREFIX in sql or MPID_PREFIX in sql: return generic_error( request, "Only clinicians are authorised to use " "pid to rid conversion", ) identical_queries = get_identical_queries(request, sql) if identical_queries: identical_queries[0].activate() query_id = identical_queries[0].id else: query = Query(sql=sql, raw=True, user=request.user, active=True) query.save() query_id = query.id # redirect to a new URL: if run: return redirect(UrlNames.RESULTS, query_id) elif filter_display: return redirect(UrlNames.EDIT_DISPLAY, query_id) else: return redirect(UrlNames.QUERY)
def show_query(request: HttpRequest, query_id: str) -> HttpResponse: query = get_object_or_404(Query, id=query_id) context = { "query": query, "sql_highlight_css": prettify_sql_css(), } return render(request, "query_show.html", context) # @do_cprofile
[docs]def query_edit_select(request: HttpRequest) -> HttpResponse: """ View to edit SQL for the current ``SELECT`` query (and/or run it). Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ # log.debug("query") # if this is a POST request we need to process the form data if request.method == "POST": # create a form instance and populate it with data from the request: form = AddQueryForm(request.POST) # check whether it's valid: if form.is_valid(): cmd_run = "submit_run" in request.POST cmd_add = "submit_add" in request.POST cmd_builder = "submit_builder" in request.POST cmd_filter = "submit_filter" in request.POST # process the data in form.cleaned_data as required sql = form.cleaned_data["sql"] if cmd_add or cmd_run: run = "submit_run" in request.POST return query_submit(request, sql, run) elif cmd_builder: # noinspection PyUnresolvedReferences profile = request.user.profile # type: UserProfile profile.sql_scratchpad = sql profile.save() return redirect(UrlNames.BUILD_QUERY) elif cmd_filter: # If filtering, also add the query return query_submit(request, sql, filter_display=True) else: raise ValueError("Bad command!") # if a GET (or any other method) we'll create a blank form values = {} # type: Dict[str, Any] all_queries = get_all_queries(request) active_queries = all_queries.filter(active=True) if active_queries: values["sql"] = active_queries[0].get_original_sql() form = AddQueryForm(values) queries = paginate(request, all_queries) # noinspection PyUnresolvedReferences profile = request.user.profile # type: UserProfile element_counter = HtmlElementCounter() for q in queries: # Format sql only if it hasn't been done already if not q.get_formatted_sql(): q.save() # calls 'set_formatted_sql' q.formatted_query_safe = make_collapsible_sql_query( q.get_formatted_sql(), element_counter=element_counter, collapse_at_n_lines=profile.collapse_at_n_lines, ) sql = q.get_original_sql() if len(sql) < MAX_LEN_SHOW: q.truncated_sql = None else: # Have to use plain sql for this (not coloured) in case it cuts it # off after an html start tag but before the end tag q.truncated_sql = sql[:50] context = { "form": form, "queries": queries, "nav_on_query": True, "dialect_mysql": settings.RESEARCH_DB_DIALECT == SqlaDialectName.MYSQL, "dialect_mssql": settings.RESEARCH_DB_DIALECT == SqlaDialectName.MSSQL, "sql_highlight_css": prettify_sql_css(), "dbinfolist": ( None if not is_clinician(request.user) else research_database_info.dbinfolist ), } context.update(query_context(request)) return render(request, "query_edit_select.html", context)
[docs]@user_passes_test(is_superuser) def query_add_sitewide(request: HttpRequest) -> HttpResponse: """ Superuser view to add or edit sitewide queries and their descriptions. Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ if "submit_add" in request.POST: sql = request.POST["sql"] description = request.POST["description"] identical_queries = get_identical_queries(request, sql, sitewide=True) # noinspection PyUnresolvedReferences descriptions = [query.description for query in identical_queries] if not identical_queries: query = SitewideQuery(sql=sql, description=description, raw=True) query.save() elif description not in descriptions: # noinspection PyUnresolvedReferences identical_queries[0].description = description identical_queries[0].save() all_queries = get_all_sitewide_queries() queries = paginate(request, all_queries) # noinspection PyUnresolvedReferences profile = request.user.profile # type: UserProfile element_counter = HtmlElementCounter() for q in queries: # Format sql only if it hasn't been done already if not q.get_formatted_sql(): q.save() q.formatted_query_safe = make_collapsible_sql_query( q.get_formatted_sql(), element_counter=element_counter, collapse_at_n_lines=profile.collapse_at_n_lines, ) if "edit" in request.POST: query_id = request.POST["query_id"] query = SitewideQuery.objects.get(id=query_id) selected_sql = query.sql selected_description = query.description else: selected_sql = "" selected_description = "" context = { "queries": queries, "selected_sql": selected_sql, "selected_description": selected_description, "sql_highlight_css": prettify_sql_css(), } return render(request, "query_add_sitewide.html", context)
[docs]def show_sitewide_queries(request: HttpRequest) -> HttpResponse: """ View to show all site-wide queries. Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ queries = get_all_sitewide_queries() context = { "queries": queries, "sql_highlight_css": prettify_sql_css(), } return render(request, "show_sitewide_queries.html", context)
[docs]def query_activate(request: HttpRequest, query_id: str) -> HttpResponse: """ Activate the specified query for the current user. Args: request: the :class:`django.http.request.HttpRequest` query_id: string form of the integer PK of the :class:`crate_anon.crateweb.research.models.Query` Returns: a :class:`django.http.response.HttpResponse` """ validate_blank_form(request) query = get_object_or_404(Query, id=query_id) # type: Query query.activate() return redirect(UrlNames.QUERY)
[docs]def query_delete(request: HttpRequest, query_id: str) -> HttpResponse: """ Delete (or hide if required for audit purposes) the specified query for the current user. Args: request: the :class:`django.http.request.HttpRequest` query_id: string form of the integer PK of the :class:`crate_anon.crateweb.research.models.Query` Returns: a :class:`django.http.response.HttpResponse` """ validate_blank_form(request) query = get_object_or_404(Query, id=query_id) # type: Query query.delete_if_permitted() return redirect(UrlNames.QUERY)
[docs]@user_passes_test(is_superuser) def sitewide_query_delete(request: HttpRequest, query_id: str) -> HttpResponse: """ Delete a site-wide query. Args: request: the :class:`django.http.request.HttpRequest` query_id: string form of the integer PK of the :class:`crate_anon.crateweb.research.models.SitewideQuery` Returns: a :class:`django.http.response.HttpResponse` Note: - When sitewide queries are used, their SQL is added to the user's personal libraries. All auditing therefore relates to users' personal query libraries. Sitewide queries cannot be executed "standalone". - As a result, we use a raw :meth:`crate_anon.crateweb.research.models.SitewideQuery.delete``, rather than the system used by :meth:`crate_anon.crateweb.research.models.Query.delete_if_permitted()`. """ validate_blank_form(request) query = get_object_or_404( SitewideQuery, id=query_id ) # type: SitewideQuery query.delete() return redirect(UrlNames.SITEWIDE_QUERIES)
[docs]def sitewide_query_process( request: HttpRequest, query_id: str ) -> HttpResponse: """ Takes a sitewide query ID and receives (through ``POST``) replacements for the placeholders. Then adds the code to user's personal library or adds and runs it. Args: request: the :class:`django.http.request.HttpRequest` query_id: string form of the integer PK of the :class:`crate_anon.crateweb.research.models.SitewideQuery` Returns: a :class:`django.http.response.HttpResponse` """ validate_blank_form(request) cmd_add = "submit_add" in request.POST cmd_run = "submit_run" in request.POST if cmd_add or cmd_run: query = get_object_or_404(SitewideQuery, id=query_id) sql = "" for i, chunk in enumerate(query.sql_chunks): if i % 2 == 0: # add the original SQL - the even-numbered chunks sql += chunk else: # add SQL to replace the placeholders chunknum = f"chunk{i + 1}" if chunknum in request.POST: replacement = request.POST[chunknum] else: replacement = "" sql += replacement return query_submit(request, sql, run=cmd_run) else: return redirect(UrlNames.STANDARD_QUERIES)
[docs]def no_query_selected(request: HttpRequest) -> HttpResponse: """ View to say "no query selected" when one should have been. Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ return render(request, "query_none_selected.html", query_context(request))
[docs]def query_count(request: HttpRequest, query_id: str) -> HttpResponse: """ View ``COUNT(*)`` from the specific query. Args: request: the :class:`django.http.request.HttpRequest` query_id: string form of the integer PK of the :class:`crate_anon.crateweb.research.models.Query` Returns: a :class:`django.http.response.HttpResponse` """ if query_id is None: return no_query_selected(request) try: query_id = int(query_id) # ... conceivably might raise TypeError (from e.g. None), ValueError # (from e.g. "xyz"), but both should be filtered out by the URL parser query = Query.objects.get(id=query_id, user=request.user) # ... will return None if not found, but may raise something derived # from ObjectDoesNotExist or (in principle, if this weren't a PK) # MultipleObjectsReturned; # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#django.db.models.query.QuerySet.get # noqa except ObjectDoesNotExist: return render_bad_query_id(request, query_id) return render_resultcount(request, query)
[docs]def query_count_current(request: HttpRequest) -> HttpResponse: """ View ``COUNT(*)`` from the user's current query. Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ query = Query.get_active_query_or_none(request) if query is None: return no_query_selected(request) return render_resultcount(request, query)
[docs]class NlpSourceResult: """ Serves as the return value for :func:`get_source_results`. """
[docs] def __init__( self, fieldnames: List[str] = None, results: Sequence[Sequence[Any]] = None, sql: str = None, error: str = None, ) -> None: """ Args: fieldnames: fieldnames in source record results: source record result sql: SQL used in getting source text error: error message """ self.fieldnames = fieldnames or [] # type: List[str] self.results = results or [] # type: List[List[Any]] self.sql = sql self.error = error
@django_cache_function(timeout=None) def get_source_results( srcdb: str, srctable: str, srcfield: str, srcpkfield: str, srcpk: Union[str, int], ) -> NlpSourceResult: """ Get source text for CRATE NLP table record. Args: srcdb: source database as given by the nlp record srctable: source table srcfield: source field srcpkfield: the fieldname in the source table which contains the primary key srcpk: source primary key value Returns: a :class:`NlpSourceResult` """ try: dbname = research_database_info.nlp_sourcedb_map[srcdb] except KeyError: return NlpSourceResult( error=f"No source database in settings.NLP_SOURCEDB_MAP " f"named {srcdb}" ) try: dbinfo = research_database_info.get_dbinfo_by_name(dbname) except ValueError: return NlpSourceResult( error=f"No source database in settings.RESEARCH_DB_INFO " f"named {dbname}, for source database {srcdb}" ) full_tablename = dbinfo.schema_identifier + "." + srctable sql = f"SELECT {srcfield} FROM {full_tablename} WHERE {srcpkfield}={srcpk}" try: # noinspection PyTypeChecker cursor = get_executed_researchdb_cursor( sql ) # type: Pep249DatabaseCursorType except ProgrammingError: return NlpSourceResult( error=f"Table or fieldname incorrect in: {srctable}.{srcfield}" ) fieldnames = get_fieldnames_from_cursor(cursor) results = cursor.fetchall() return NlpSourceResult(fieldnames=fieldnames, results=results, sql=sql)
[docs]def source_info( request: HttpRequest, srcdb: str, srctable: str, srcfield: str, srcpkfield: str, srcpkval: Optional[str], srcpkstr: Optional[str], ) -> HttpResponse: """ Show source information for a record in a CRATE NLP table. Args: request: the :class:`django.http.request.HttpRequest` srcdb: source database as given by the nlp record srctable: source table srcfield: source field srcpkfield: the fieldname in the source table which contains the primary key srcpkval: primary key value in source table - may be 'None' srcpkstr: primary key string in source table - may be 'None' Returns: a :class:`django.http.response.HttpResponse` Only one of srcpkval and srcpkstr should be 'None', and this will be given as a string because it's through a URL link. """ if srcpkstr == "None": srcpkstr = None srcpk = srcpkstr if srcpkstr else srcpkval nlpsourceresult = get_source_results( srcdb, srctable, srcfield, srcpkfield, srcpk ) if nlpsourceresult.error: return generic_error( request, f"Source info lookup failed: {nlpsourceresult.error}" ) results = nlpsourceresult.results if not results: log.warning(f"No source data found. SQL: {nlpsourceresult.sql}") else: if len(results) > 1: log.warning( f"More than one source record found. " f"SQL: {nlpsourceresult.sql}" ) table_html = resultset_html_table( fieldnames=nlpsourceresult.fieldnames, rows=results, element_counter=HtmlElementCounter(), ) context = { "table_html": table_html, "sql": prettify_sql_html(nlpsourceresult.sql), "sql_highlight_css": prettify_sql_css(), } # context.update(query_context(request)) return render(request, "source_information.html", context)
[docs]def query_results(request: HttpRequest, query_id: str) -> HttpResponse: """ View the results of chosen query, in conventional tabular format. Args: request: the :class:`django.http.request.HttpRequest` query_id: string form of the integer PK of the :class:`crate_anon.crateweb.research.models.Query` Returns: a :class:`django.http.response.HttpResponse` """ if query_id is None: return no_query_selected(request) try: query_id = int(query_id) query = Query.objects.get(id=query_id, user=request.user) except ObjectDoesNotExist: return render_bad_query_id(request, query_id) # noinspection PyUnresolvedReferences profile = request.user.profile # type: UserProfile highlights = Highlight.get_active_highlights(request) return render_resultset( request, query, highlights, collapse_at_len=profile.collapse_at_len, collapse_at_n_lines=profile.collapse_at_n_lines, line_length=profile.line_length, )
[docs]def query_results_recordwise( request: HttpRequest, query_id: str ) -> HttpResponse: """ View results of chosen query, in recordwise tabular format. Args: request: the :class:`django.http.request.HttpRequest` query_id: string form of the integer PK of the :class:`crate_anon.crateweb.research.models.Query` Returns: a :class:`django.http.response.HttpResponse` """ if query_id is None: return no_query_selected(request) try: query_id = int(query_id) query = Query.objects.get(id=query_id, user=request.user) except ObjectDoesNotExist: return render_bad_query_id(request, query_id) # noinspection PyUnresolvedReferences profile = request.user.profile # type: UserProfile highlights = Highlight.get_active_highlights(request) return render_resultset_recordwise( request, query, highlights, collapse_at_len=profile.collapse_at_len, collapse_at_n_lines=profile.collapse_at_n_lines, line_length=profile.line_length, )
[docs]def query_tsv(request: HttpRequest, query_id: str) -> HttpResponse: """ Download TSV of the specified query. Args: request: the :class:`django.http.request.HttpRequest` query_id: string form of the integer PK of the :class:`crate_anon.crateweb.research.models.Query` Returns: a :class:`django.http.response.HttpResponse` """ query = get_object_or_404(Query, id=query_id) # type: Query try: return file_response( query.make_tsv(), content_type=ContentType.TSV, filename="crate_results_{num}_{datetime}.tsv".format( num=query.id, datetime=datetime_iso_for_filename(), ), ) except DatabaseError as exception: return render_bad_query(request, query, exception)
[docs]def query_excel(request: HttpRequest, query_id: str) -> HttpResponse: """ Serves an XLSX (Excel) file with the results of the specified query. Args: request: the :class:`django.http.request.HttpRequest` query_id: string form of the integer PK of the :class:`crate_anon.crateweb.research.models.Query` Returns: a :class:`django.http.response.HttpResponse` """ query = get_object_or_404(Query, id=query_id) # type: Query try: return file_response( query.make_excel(), content_type=ContentType.XLSX, filename="crate_query_{}_{}.xlsx".format( query_id, datetime_iso_for_filename() ), ) except DatabaseError as exception: return render_bad_query(request, query, exception)
[docs]def edit_display(request: HttpRequest, query_id: str) -> HttpResponse: """ Edit the 'display' attribute of the selected query by choosing a list of columns to show when the results are displayed. """ query = get_object_or_404(Query, user=request.user, id=query_id) display_fields = query.get_display_list() try: fieldnames = query.get_column_names() except DatabaseError as exception: query.audit(failed=True, fail_msg=str(exception)) return render_bad_query(request, query, exception) context = { "query": query, "display_fields": display_fields, "fieldnames": fieldnames, } return render(request, "edit_display.html", context)
[docs]def save_display(request: HttpRequest, query_id: str) -> HttpResponse: """ Save changes to the 'display' attribute of the selected query, and to the value of 'no_null'. """ query = get_object_or_404(Query, user=request.user, id=query_id) if request.method == "POST": try: fieldnames = query.get_column_names() except DatabaseError as exception: query.audit(failed=True, fail_msg=str(exception)) return render_bad_query(request, query, exception) display = [] # type: List[str] # noinspection PyArgumentList,PyCallByClass display_fieldnames = request.POST.getlist("include_field") for display_fieldname in display_fieldnames: if display_fieldname in fieldnames: display.append(display_fieldname) query.set_display_list(display) # If the user has selected 'no_null' set this attribute to True # noinspection PyArgumentList,PyCallByClass query.no_null = request.POST.get("no_null") == "true" query.save() return query_edit_select(request)
# @user_passes_test(is_superuser) # def audit(request): # """ # View audit log # """ # all_audits = QueryAudit.objects.all()\ # .select_related('query', 'query__user')\ # .order_by('-id') # audits = paginate(request, all_audits) # context = {'audits': audits} # return render(request, 'audit.html', context) # ============================================================================= # Internal functions for views on queries # ============================================================================= # def make_demo_query_unless_exists(request): # DEMOQUERY = Query( # pk=1, # sql="SELECT * FROM notes\nWHERE note LIKE '%Adam%'\nLIMIT 20", # raw=True, # user=request.user, # ) # DEMOQUERY.save() # H1 = Highlight(pk=1, text="Aaron", colour=0, user=request.user) # H1.save() # H2 = Highlight(pk=2, text="Adam", colour=0, user=request.user) # H2.save() # H3 = Highlight(pk=3, text="October", colour=1, user=request.user) # H3.save() # EXCEPTIONS FOR HOMEBREW SQL. # You can see: # - django.db.ProgrammingError # - django.db.OperationalError # - InternalError (?django.db.utils.InternalError) # ... but I think all are subclasses of django.db.utils.DatabaseError
[docs]def render_resultcount(request: HttpRequest, query: Query) -> HttpResponse: """ Displays the number of rows that a given query will fetch. Args: request: the :class:`django.http.request.HttpRequest` query: a :class:`crate_anon.crateweb.research.models.Query` Returns: a :class:`django.http.response.HttpResponse` """ if query is None: return render_missing_query(request) try: rowcount = query.get_rowcount() query.audit(count_only=True, n_records=rowcount) context = { "rowcount": rowcount, "sql": query.get_original_sql(), "nav_on_count": True, } context.update(query_context(request)) return render(request, "query_count.html", context) # See above re exception classes except DatabaseError as exception: query.audit(count_only=True, failed=True, fail_msg=str(exception)) return render_bad_query(request, query, exception)
[docs]def resultset_html_table( fieldnames: List[str], rows: List[List[Any]], element_counter: HtmlElementCounter, start_index: int = 0, highlight_dict: Dict[int, List[Highlight]] = None, collapse_at_len: int = None, collapse_at_n_lines: int = None, line_length: int = None, ditto: bool = True, ditto_html: str = "″", no_ditto_cols: List[int] = None, null: str = "<i>NULL</i>", ) -> str: """ Returns an HTML table representing a set of results from a query. Its columns are the database columns; its rows are the database rows. Args: fieldnames: list of column names rows: list of rows (each row being a list of values in the same order as ``fieldnames``) element_counter: a :class:`crate_anon.crateweb.research.html_functions.HtmlElementCounter`, which will be modified start_index: the zero-based index of the first row in this table (used for pagination, when the second and subsequent tables don't start with the first row of the result set) highlight_dict: an optional dictionary mapping highlight colour to all the :class:`crate_anon.crateweb.research.models.Highlight` objects that use it (e.g.: ``2`` maps to highlight objects for all the separate pieces of text to be highlighted in colour 2) collapse_at_len: if specified, the string length beyond which the cell will be collapsed collapse_at_n_lines: if specified, the number of lines beyond which the cell will be collapsed line_length: if specified, the line length to word-wrap at ditto: whether to replace cells that are identical to the cell immediately above with ditto marks ditto_html: the HTML string to use as a ditto mark no_ditto_cols: column indexes (zero-based) for which ditto marks should never be used null: the HTML string to use for database ``NULL`` (Python ``None``) values Returns: str: HTML """ # noqa # Considered but not implemented: hiding table columns # ... see esp "tr > *:nth-child(n)" at # https://stackoverflow.com/questions/5440657/how-to-hide-columns-in-html-table # noqa nlptable = False if FN_NLPDEF in fieldnames: srcdb_ind = srctable_ind = srcfield_ind = None srcpkfield_ind = srcpkval_ind = srcpkstr_ind = None for i, field in enumerate(fieldnames): if field == FN_SRCDB: srcdb_ind = i elif field == FN_SRCTABLE: srctable_ind = i elif field == FN_SRCFIELD: srcfield_ind = i elif field == FN_SRCPKFIELD: srcpkfield_ind = i elif field == FN_SRCPKVAL: srcpkval_ind = i elif field == FN_SRCPKSTR: srcpkstr_ind = i if all( ( srcdb_ind, srctable_ind, srcfield_ind, srcpkfield_ind, srcpkval_ind, srcpkstr_ind, ) ): nlptable = True no_ditto_cols = no_ditto_cols or [] ditto_cell = f' <td class="queryresult ditto">{ditto_html}</td>\n' html = "<table>\n" html += " <tr>\n" html += " <th><i>#</i></th>\n" for field in fieldnames: html += f" <th>{escape(field)}</th>\n" if nlptable: html += " <th>Link to source</th>\n" html += " </tr>\n" for row_index, row in enumerate(rows): # row_index is zero-based within this table html += ' <tr class="{}">\n'.format( "stripy_even" if row_index % 2 == 0 else "stripy_odd" ) # Row number html += " <td><b><i>{}</i></b></td>\n".format( row_index + start_index + 1 ) # Values for col_index, value in enumerate(row): if ( row_index > 0 and ditto and col_index not in no_ditto_cols and value == rows[row_index - 1][col_index] ): html += ditto_cell else: html += ' <td class="queryresult">{}</td>\n'.format( make_result_element( value, element_counter=element_counter, highlight_dict=highlight_dict, collapse_at_len=collapse_at_len, collapse_at_n_lines=collapse_at_n_lines, line_length=line_length, null=null, ) ) # If it's an NLP table, add link to source info if nlptable: # noinspection PyUnboundLocalVariable source_url = reverse( UrlNames.SRCINFO, kwargs={ "srcdb": row[srcdb_ind], "srctable": row[srctable_ind], "srcfield": row[srcfield_ind], "srcpkfield": row[srcpkfield_ind], "srcpkval": row[srcpkval_ind], "srcpkstr": row[srcpkstr_ind], }, ) html += f' <td><a href="{source_url}">Source info</a></td>\n' html += " </tr>\n" html += "</table>\n" return html
[docs]def single_record_html_table( fieldnames: List[str], record: List[Any], element_counter: HtmlElementCounter, highlight_dict: Dict[int, List[Highlight]] = None, collapse_at_len: int = None, collapse_at_n_lines: int = None, line_length: int = None, ) -> str: """ Returns an HTML table representing a set of results from a query, in recordwise format. It has two columns, effectively "database column" and "value"; its rows are the database columns; it displays a single database result row. Args: fieldnames: list of column names record: a single result row, i.e. a list of values in the same order as ``fieldnames`` element_counter: a :class:`crate_anon.crateweb.research.html_functions.HtmlElementCounter`, which will be modified highlight_dict: an optional dictionary mapping highlight colour to all the :class:`crate_anon.crateweb.research.models.Highlight` objects that use it (e.g.: ``2`` maps to highlight objects for all the separate pieces of text to be highlighted in colour 2) collapse_at_len: if specified, the string length beyond which the cell will be collapsed collapse_at_n_lines: if specified, the number of lines beyond which the cell will be collapsed line_length: if specified, the line length to word-wrap at Returns: str: HTML """ # noqa table_html = "" if FN_NLPDEF in fieldnames: srcdb_ind = srctable_ind = srcfield_ind = None srcpkfield_ind = srcpkval_ind = srcpkstr_ind = None for i, field in enumerate(fieldnames): if field == FN_SRCDB: srcdb_ind = i elif field == FN_SRCTABLE: srctable_ind = i elif field == FN_SRCFIELD: srcfield_ind = i elif field == FN_SRCPKFIELD: srcpkfield_ind = i elif field == FN_SRCPKVAL: srcpkval_ind = i elif field == FN_SRCPKSTR: srcpkstr_ind = i if all( ( srcdb_ind, srctable_ind, srcfield_ind, srcpkfield_ind, srcpkval_ind, srcpkstr_ind, ) ): # If it's an NLP table, add link to source info above the results # noinspection PyUnboundLocalVariable source_url = reverse( UrlNames.SRCINFO, kwargs={ "srcdb": record[srcdb_ind], "srctable": record[srctable_ind], "srcfield": record[srcfield_ind], "srcpkfield": record[srcpkfield_ind], "srcpkval": record[srcpkval_ind], "srcpkstr": record[srcpkstr_ind], }, ) table_html += f'<b><a href="{source_url}">See NLP source info</a></b>\n' # noqa table_html += "<table>\n" for col_index, value in enumerate(record): fieldname = fieldnames[col_index] table_html += ' <tr class="{}">\n'.format( "stripy_even" if col_index % 2 == 0 else "stripy_odd" ) table_html += f" <th>{escape(fieldname)}</th>" table_html += ' <td class="queryresult">{}</td>\n'.format( make_result_element( value, element_counter=element_counter, highlight_dict=highlight_dict, collapse_at_len=collapse_at_len, collapse_at_n_lines=collapse_at_n_lines, line_length=line_length, collapsed=False, ) ) table_html += " </tr>\n" table_html += "</table>\n" return table_html
[docs]def render_resultset( request: HttpRequest, query: Query, highlights: Iterable[Highlight], collapse_at_len: int = None, collapse_at_n_lines: int = None, line_length: int = None, ditto: bool = True, ditto_html: str = "″", ) -> HttpResponse: """ Show the results of a user's query in paginated, tabular format. Args: request: the :class:`django.http.request.HttpRequest` query: a :class:`crate_anon.crateweb.research.models.Query` to execute highlights: an iterable of :class:`crate_anon.crateweb.research.models.Highlight` objects to apply colourful highlighting to the results collapse_at_len: if specified, the string length beyond which the cell will be collapsed collapse_at_n_lines: if specified, the number of lines beyond which the cell will be collapsed line_length: if specified, the line length to word-wrap at ditto: whether to replace cells that are identical to the cell immediately above with ditto marks ditto_html: the HTML string to use as a ditto mark Returns: a :class:`django.http.response.HttpResponse` """ # Query if query is None: return render_missing_query(request) try: rows = query.get_display_rows() fieldnames = query.get_display_column_names() rowcount = query.get_rowcount() query.audit(n_records=rowcount) except DatabaseError as exception: query.audit(failed=True, fail_msg=str(exception)) return render_bad_query(request, query, exception) row_indexes = list(range(len(rows))) # We don't need to process all rows before we paginate. page = paginate(request, row_indexes) start_index = page.start_index() - 1 end_index = page.end_index() - 1 display_rows = rows[start_index : end_index + 1] # Highlights highlight_dict = Highlight.as_ordered_dict(highlights) # Table element_counter = HtmlElementCounter() table_html = resultset_html_table( fieldnames=fieldnames, rows=display_rows, element_counter=element_counter, start_index=start_index, highlight_dict=highlight_dict, collapse_at_len=collapse_at_len, collapse_at_n_lines=collapse_at_n_lines, line_length=line_length, ditto=ditto, ditto_html=ditto_html, ) # Wich columns are displayed display_columns = query.get_display_column_names() all_columns = query.get_column_names() omit_columns = [x for x in all_columns if x not in display_columns] # Set last_run of the query to now query.update_last_run() # Render context = { "table_html": table_html, "page": page, "rowcount": rowcount, "sql": prettify_sql_html(query.get_original_sql()), "nav_on_results": True, "sql_highlight_css": prettify_sql_css(), "display_columns": display_columns, "omit_columns": omit_columns, "no_null": query.no_null, "query_id": query.id, } context.update(query_context(request)) return render(request, "query_result.html", context)
[docs]def render_resultset_recordwise( request: HttpRequest, query: Query, highlights: Iterable[Highlight], collapse_at_len: int = None, collapse_at_n_lines: int = None, line_length: int = None, ) -> HttpResponse: """ Show the results of a user's query in recordwise format. Args: request: the :class:`django.http.request.HttpRequest` query: a :class:`crate_anon.crateweb.research.models.Query` to execute highlights: an iterable of :class:`crate_anon.crateweb.research.models.Highlight` objects to apply colourful highlighting to the results collapse_at_len: if specified, the string length beyond which the cell will be collapsed collapse_at_n_lines: if specified, the number of lines beyond which the cell will be collapsed line_length: if specified, the line length to word-wrap at Returns: a :class:`django.http.response.HttpResponse` """ # Query if query is None: return render_missing_query(request) try: rows = query.get_display_rows() fieldnames = query.get_display_column_names() rowcount = query.get_rowcount() query.audit(n_records=rowcount) except DatabaseError as exception: query.audit(failed=True, fail_msg=str(exception)) return render_bad_query(request, query, exception) row_indexes = list(range(len(rows))) # We don't need to process all rows before we paginate. page = paginate(request, row_indexes, per_page=1) # Highlights highlight_dict = Highlight.as_ordered_dict(highlights) if rows: record_index = page.start_index() - 1 record = rows[record_index] # Table element_counter = HtmlElementCounter() table_html = f"<p><i>Record {page.start_index()}</i></p>\n" table_html += single_record_html_table( fieldnames=fieldnames, record=record, element_counter=element_counter, highlight_dict=highlight_dict, collapse_at_len=collapse_at_len, collapse_at_n_lines=collapse_at_n_lines, line_length=line_length, ) else: table_html = "<b>No rows returned.</b>" # Wich columns are displayed display_columns = query.get_display_column_names() all_columns = query.get_column_names() omit_columns = [x for x in all_columns if x not in display_columns] # Set last_run of the query to now query.update_last_run() # Render context = { "table_html": table_html, "page": page, "rowcount": rowcount, "sql": prettify_sql_html(query.get_original_sql()), "nav_on_results_recordwise": True, "sql_highlight_css": prettify_sql_css(), "display_columns": display_columns, "omit_columns": omit_columns, "no_null": query.no_null, "query_id": query.id, } context.update(query_context(request)) return render(request, "query_result.html", context)
[docs]def render_missing_query(request: HttpRequest) -> HttpResponse: """ A view saying "missing query". Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ return render(request, "query_missing.html", query_context(request))
[docs]def render_bad_query( request: HttpRequest, query: Query, exception: Exception ) -> HttpResponse: """ A view saying "your query failed". This is the normal thing to see if the user has entered bad SQL. Args: request: the :class:`django.http.request.HttpRequest` query: the :class:`crate_anon.crateweb.research.models.Query` that went wrong exception: the Python exception that resulted, which may have had extra information attached via :func:`cardinal_pythonlib.exceptions.add_info_to_exception` Returns: a :class:`django.http.response.HttpResponse` """ info = recover_info_from_exception(exception) final_sql = info.get("sql", "") args = info.get("args", []) context = { "original_sql": prettify_sql_html(query.get_original_sql()), "final_sql": prettify_sql_and_args(final_sql, args), "exception": repr(exception), "sql_highlight_css": prettify_sql_css(), } context.update(query_context(request)) return render(request, "query_bad.html", context)
[docs]def render_bad_query_id(request: HttpRequest, query_id: str) -> HttpResponse: """ A view saying "bad query ID". Args: request: the :class:`django.http.request.HttpRequest` query_id: the query ID that was bad Returns: a :class:`django.http.response.HttpResponse` """ context = {"query_id": query_id} context.update(query_context(request)) return render(request, "query_bad_id.html", context)
# ============================================================================= # Highlights # =============================================================================
[docs]def highlight_edit_select(request: HttpRequest) -> HttpResponse: """ Edit or activate highlighting (which will apply to any queries that the user runs). Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ all_highlights = Highlight.objects.filter(user=request.user).order_by( "text", "colour" ) if request.method == "POST": form = AddHighlightForm(request.POST) if form.is_valid(): colour = form.cleaned_data["colour"] text = form.cleaned_data["text"] identicals = all_highlights.filter(colour=colour, text=text) if identicals: identicals[0].activate() else: highlight = Highlight( colour=colour, text=text, user=request.user, active=True ) highlight.save() return redirect(UrlNames.HIGHLIGHT) values = {"colour": 0} form = AddHighlightForm(values) active_highlights = all_highlights.filter(active=True) highlight_dict = Highlight.as_ordered_dict(active_highlights) highlight_descriptions = get_highlight_descriptions(highlight_dict) highlights = paginate(request, all_highlights) context = { "form": form, "highlights": highlights, "nav_on_highlight": True, "N_CSS_HIGHLIGHT_CLASSES": N_CSS_HIGHLIGHT_CLASSES, "highlight_descriptions": highlight_descriptions, "colourlist": list(range(N_CSS_HIGHLIGHT_CLASSES)), } context.update(query_context(request)) return render(request, "highlight_edit_select.html", context)
[docs]def highlight_activate( request: HttpRequest, highlight_id: str ) -> HttpResponse: """ Activate a highlight. Args: request: the :class:`django.http.request.HttpRequest` highlight_id: string form of the integer PK for :class:`crate_anon.crateweb.research.models.Highlight` Returns: a :class:`django.http.response.HttpResponse` """ validate_blank_form(request) highlight = get_object_or_404( Highlight, id=highlight_id ) # type: Highlight highlight.activate() return redirect(UrlNames.HIGHLIGHT)
[docs]def highlight_deactivate( request: HttpRequest, highlight_id: str ) -> HttpResponse: """ Deactivate a highlight. Args: request: the :class:`django.http.request.HttpRequest` highlight_id: string form of the integer PK for :class:`crate_anon.crateweb.research.models.Highlight` Returns: a :class:`django.http.response.HttpResponse` """ validate_blank_form(request) highlight = get_object_or_404( Highlight, id=highlight_id ) # type: Highlight highlight.deactivate() return redirect(UrlNames.HIGHLIGHT)
[docs]def highlight_delete(request: HttpRequest, highlight_id: str) -> HttpResponse: """ Delete a highlight. Args: request: the :class:`django.http.request.HttpRequest` highlight_id: string form of the integer PK for :class:`crate_anon.crateweb.research.models.Highlight` Returns: a :class:`django.http.response.HttpResponse` """ validate_blank_form(request) highlight = get_object_or_404( Highlight, id=highlight_id ) # type: Highlight highlight.delete() return redirect(UrlNames.HIGHLIGHT)
# def render_bad_highlight_id(request, highlight_id): # context = {'highlight_id': highlight_id} # context.update(query_context(request)) # return render(request, 'highlight_bad_id.html', context)
[docs]def get_highlight_descriptions( highlight_dict: Dict[int, List[Highlight]] ) -> List[str]: """ Returns a list of length up to ``N_CSS_HIGHLIGHT_CLASSES`` of HTML elements illustrating the highlights. Args: highlight_dict: a dictionary mapping highlight colour to all the :class:`crate_anon.crateweb.research.models.Highlight` objects that use it (e.g.: ``2`` maps to highlight objects for all the separate pieces of text to be highlighted in colour 2) Returns: str: HTML describing the highlights """ desc = [] # type: List[str] for n in range(N_CSS_HIGHLIGHT_CLASSES): if n not in highlight_dict: continue desc.append( ", ".join([highlight_text(h.text, n) for h in highlight_dict[n]]) ) return desc
# ============================================================================= # PID lookup # ============================================================================= # In general with these database-choosing functions, don't redirect between # the "generic" and "database-specific" views using POST, because we can't then # add default values to a new form (since the request.POST object is # populated and immutable). Use a dbname query parameter as well. # (That doesn't make it HTTP GET; it makes it HTTP POST with query parameters.)
[docs]def pid_rid_lookup( request: HttpRequest, with_db_url_name: str, html_filename: str ) -> HttpResponse: """ Common functionality for :func`pidlookup`, :func:`ridlookup`. Provides a view/form allowing the user to choose a database, if more than one is possible, and then redirect to another view once we have that database choice. Args: request: the :class:`django.http.request.HttpRequest` with_db_url_name: URL name to redirect to, passed as a parameter to :func:`django.urls.reverse` html_filename: Django HTML template filename Returns: a :class:`django.http.response.HttpResponse` """ dbinfolist = research_database_info.dbs_with_secret_map n = len(dbinfolist) if n == 0: return generic_error(request, "No databases with lookup map!") elif n == 1: dbname = dbinfolist[0].name return HttpResponseRedirect(reverse(with_db_url_name, args=[dbname])) else: form = DatabasePickerForm(request.POST or None, dbinfolist=dbinfolist) if form.is_valid(): dbname = form.cleaned_data["database"] return HttpResponseRedirect( reverse(with_db_url_name, args=[dbname]) ) return render(request, html_filename, {"form": form})
[docs]def pid_rid_lookup_with_db( request: HttpRequest, dbname: str, form_html_filename: str, formclass: Any, result_html_filename: str, ) -> HttpResponse: """ Common functionality for :func:`pidlookup_with_db`, :func:`ridlookup_with_db`. Args: request: the :class:`django.http.request.HttpRequest` dbname: name of the research database to use form_html_filename: Django HTML template filename to ask for PID/RID/etc. details formclass: form class to use for requesting PID/RID/etc. result_html_filename: Django HTML template filename to display results Returns: a :class:`django.http.response.HttpResponse` """ # There's a bug in the Python 3.5 typing module; we can't use # Union[Type[PidLookupForm], Type[RidLookupForm]] yet; we get # TypeError: descriptor '__subclasses__' of 'type' object needs an argument # ... see https://github.com/python/typing/issues/266 try: dbinfo = research_database_info.get_dbinfo_by_name(dbname) except ValueError: return generic_error(request, f"No research database named {dbname!r}") form = formclass( request.POST or None, dbinfo=dbinfo ) # type: Union[PidLookupForm, RidLookupForm] if form.is_valid(): pids = form.cleaned_data.get("pids") or [] # type: List[int] mpids = form.cleaned_data.get("mpids") or [] # type: List[int] trids = form.cleaned_data.get("trids") or [] # type: List[int] rids = form.cleaned_data.get("rids") or [] # type: List[str] mrids = form.cleaned_data.get("mrids") or [] # type: List[str] return render_lookup( request=request, dbinfo=dbinfo, result_html_filename=result_html_filename, pids=pids, mpids=mpids, trids=trids, rids=rids, mrids=mrids, ) context = { "db_name": dbinfo.name, "db_description": dbinfo.description, "form": form, } return render(request, form_html_filename, context)
[docs]@user_passes_test(is_superuser) def pidlookup(request: HttpRequest) -> HttpResponse: """ Look up PID information from RID information. Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ return pid_rid_lookup( request=request, with_db_url_name="pidlookup_with_db", html_filename="pid_lookup_choose_db.html", )
[docs]@user_passes_test(is_superuser) def pidlookup_with_db(request: HttpRequest, dbname: str) -> HttpResponse: """ Look up PID information from RID information, for a specific database. Args: request: the :class:`django.http.request.HttpRequest` dbname: name of the research database to use Returns: a :class:`django.http.response.HttpResponse` """ return pid_rid_lookup_with_db( request=request, dbname=dbname, form_html_filename="pid_lookup_form.html", formclass=PidLookupForm, result_html_filename="pid_lookup_result.html", )
[docs]@user_passes_test(is_clinician) def ridlookup(request: HttpRequest) -> HttpResponse: """ Look up RID information from PID information. Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ return pid_rid_lookup( request=request, with_db_url_name="ridlookup_with_db", html_filename="rid_lookup_choose_db.html", )
[docs]@user_passes_test(is_clinician) def ridlookup_with_db(request: HttpRequest, dbname: str) -> HttpResponse: """ Look up RID information from PID information, for a specific database. Args: request: the :class:`django.http.request.HttpRequest` dbname: name of the research database to use Returns: a :class:`django.http.response.HttpResponse` """ return pid_rid_lookup_with_db( request=request, dbname=dbname, form_html_filename="rid_lookup_form.html", formclass=RidLookupForm, result_html_filename="rid_lookup_result.html", )
[docs]def render_lookup( request: HttpRequest, dbinfo: SingleResearchDatabase, result_html_filename: str, trids: List[int] = None, rids: List[str] = None, mrids: List[str] = None, pids: List[int] = None, mpids: List[int] = None, ) -> HttpResponse: """ Shows the output of a PID/RID lookup. Args: request: the :class:`django.http.request.HttpRequest` dbinfo: a :class:`crate_anon.crateweb.research.research_db_info.SingleResearchDatabase` detailing the research database to use result_html_filename: Django HTML template filename to display results trids: list of TRIDs to look up from rids: list of RIDs to look up from mrids: list of MRIDs to look up from pids: list of PIDs to look up from mpids: list of MPIDs to look up from Returns: a :class:`django.http.response.HttpResponse` """ # noqa # if not request.user.superuser: # return HttpResponse('Forbidden', status=403) # # https://stackoverflow.com/questions/3297048/403-forbidden-vs-401-unauthorized-http-responses # noqa trids = [] if trids is None else trids rids = [] if rids is None else rids mrids = [] if mrids is None else mrids pids = [] if pids is None else pids mpids = [] if mpids is None else mpids assert dbinfo.secret_lookup_db lookups = ( PidLookup.objects.using(dbinfo.secret_lookup_db) .filter( Q(trid__in=trids) | Q(rid__in=rids) | Q(mrid__in=mrids) | Q(pid__in=pids) | Q(mpid__in=mpids) ) .order_by("pid") ) context = { "lookups": lookups, "trid_field": dbinfo.trid_field, "trid_description": dbinfo.trid_description, "rid_field": dbinfo.rid_field, "rid_description": dbinfo.rid_description, "mrid_field": dbinfo.mrid_field, "mrid_description": dbinfo.mrid_description, "pid_description": dbinfo.pid_description, "mpid_description": dbinfo.mpid_description, } return render(request, result_html_filename, context)
# ============================================================================= # Research database structure # =============================================================================
[docs]def structure_table_long(request: HttpRequest) -> HttpResponse: """ Shows the table structure of the research database(s) in long format. Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ colinfolist = research_database_info.get_colinfolist() rowcount = len(colinfolist) context = { "paginated": False, "colinfolist": colinfolist, "rowcount": rowcount, "default_database": research_database_info.get_default_database_name(), "default_schema": research_database_info.get_default_schema_name(), "with_database": research_database_info.uses_database_level(), } return render(request, "database_structure.html", context)
[docs]def structure_table_paginated(request: HttpRequest) -> HttpResponse: """ Shows the table structure of the research database(s) in paginated format. Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ colinfolist = research_database_info.get_colinfolist() rowcount = len(colinfolist) colinfolist = paginate(request, colinfolist) context = { "paginated": True, "colinfolist": colinfolist, "rowcount": rowcount, "default_database": research_database_info.get_default_database_name(), "default_schema": research_database_info.get_default_schema_name(), "with_database": research_database_info.uses_database_level(), } return render(request, "database_structure.html", context)
@django_cache_function(timeout=None) # @lru_cache(maxsize=None) def get_structure_tree_html() -> str: """ Returns HTML for an expand-and-collapse tree showing the table structure of the research database(s). Returns: str: HTML """ table_to_colinfolist = research_database_info.get_colinfolist_by_tables() content = "" element_counter = HtmlElementCounter() grammar = research_database_info.grammar for table_id, colinfolist in table_to_colinfolist.items(): html_table = render_to_string( "database_structure_table.html", { "colinfolist": colinfolist, "default_database": research_database_info.get_default_database_name(), # noqa "default_schema": research_database_info.get_default_schema_name(), # noqa "with_database": research_database_info.uses_database_level(), }, ) cd_button = element_counter.visibility_div_spanbutton() cd_content = element_counter.visibility_div_contentdiv( contents=html_table ) content += ( '<div class="titlecolour">{db_schema}.<b>{table}</b>{button}</div>' "{cd}".format( db_schema=table_id.database_schema_part(grammar), table=table_id.table_part(grammar), button=cd_button, cd=cd_content, ) ) return content
[docs]def structure_tree(request: HttpRequest) -> HttpResponse: """ Shows an expand-and-collapse tree view of the table structure of the research database(s). Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ context = { "content": get_structure_tree_html(), "default_database": research_database_info.get_default_database_name(), "default_schema": research_database_info.get_default_schema_name(), } return render(request, "database_structure_tree.html", context)
# noinspection PyUnusedLocal
[docs]def structure_tsv(request: HttpRequest) -> HttpResponse: """ Serves the table structure of the research database(s) as TSV. Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ return file_response( research_database_info.get_tsv(), content_type=ContentType.TSV, filename="structure.tsv", )
# noinspection PyUnusedLocal
[docs]def structure_excel(request: HttpRequest) -> HttpResponse: """ Serves the table structure of the research database(s) as an Excel XLSX file. Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ return file_response( research_database_info.get_excel(), content_type=ContentType.TSV, filename="structure.xlsx", )
# ============================================================================= # Local help on structure # =============================================================================
[docs]def local_structure_help(request: HttpRequest) -> HttpResponse: """ Serves a locally specifed help page. Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ if settings.DATABASE_HELP_HTML_FILENAME: with open(settings.DATABASE_HELP_HTML_FILENAME, "r") as infile: content = infile.read() return HttpResponse(content.encode("utf8")) else: content = "<p>No local help available.</p>" context = {"content": content} return render(request, "local_structure_help.html", context)
# ============================================================================= # SQL helpers # =============================================================================
[docs]def textmatch( column_name: str, fragment: str, as_fulltext: bool, dialect: str = "mysql" ) -> str: """ Returns SQL to check for the presence of text anywhere in a field. Args: column_name: name of the column fragment: piece of text to look for as_fulltext: use a FULLTEXT search if the database dialect supports it dialect: dialect name (``mysql``, ``mssql`` are known) Returns: str: SQL fragment like: - ``column LIKE '%fragment%'`` (ANSI SQL) - ``MATCH(column) AGAINST ('fragment')`` (MySQL full-text) - ``CONTAINS(column, 'fragment')`` (Microsoft SQL Server full-text) """ if as_fulltext and dialect == "mysql": return f"MATCH({column_name}) AGAINST ('{fragment}')" elif as_fulltext and dialect == "mssql": return f"CONTAINS({column_name}, '{fragment}')" else: return f"{column_name} LIKE '%{fragment}%'"
[docs]def drugmatch(drug_type: str, colname: str) -> str: """ Returns SQL to check for the presence of any drug of type 'drug_type' anywhere in a field. Args: drug_type: drug type to look for colname: name of the column """ criteria = {drug_type: True} drugs = all_drugs_where(**criteria) # type: List[Drug] drugs_sql_parts = [drug.sql_column_like_drug(colname) for drug in drugs] drugs_sql = " OR ".join(drugs_sql_parts) return drugs_sql
[docs]def textfinder_sql( patient_id_fieldname: str, min_length: int, use_fulltext_index: bool, include_content: bool, include_datetime: bool, fragment: str = "", drug_type: str = "", patient_id_value: Union[int, str] = None, extra_fieldname: str = None, extra_value: Union[int, str] = None, ) -> str: """ Returns SQL to find the text in ``fragment`` across all tables that contain the field indicated by ``patient_id_fieldname``, where the length of the text field is at least ``min_length``. Args: patient_id_fieldname: field (column) name across all tables that contains the patient ID; any tables that don't contain this column will be ignored fragment: fragment of text to find (e.g. "paracetamol") drug_type: type of drug to find any example of min_length: text fields must be at least this large to bother searching; use this option to exclude e.g. ``VARCHAR(1)`` columns from the search use_fulltext_index: use database full-text indexing? include_content: include the text fields in the output? include_datetime: include the date/time of each record, if known (see :meth:`crate_anon.crateweb.research.research_db_info.ResearchDatabaseInfo.get_default_date_column` patient_id_value: specify this to restrict to a single patient; the value of the patient ID column (see ``patient_id_fieldname``) to restrict to extra_fieldname: extra_value: Returns: str: SQL query Raises: :exc:`ValueError` if no tables match the request """ if not fragment and not drug_type: raise ValueError( "Must supply either 'fragment' or 'drug_type' to 'textfinder_sql'" ) grammar = research_database_info.grammar tables = research_database_info.tables_containing_field( patient_id_fieldname ) if not tables: raise ValueError( f"No tables containing fieldname: {patient_id_fieldname}" ) have_pid_value = patient_id_value is not None and patient_id_value != "" if have_pid_value: pidclause = "{patient_id_fieldname} = {value}".format( patient_id_fieldname=patient_id_fieldname, value=escape_sql_string_or_int_literal(patient_id_value), ) else: pidclause = "" using_extra = extra_fieldname and extra_value is not None table_heading = "_table_name" contents_colname_heading = "_column_name" datetime_heading = "_datetime" queries = [] # type: List[str] def add_query( table_ident: str, extra_cols: List[str], date_value_select: str, extra_conditions: List[str], ) -> None: selectcols = [] # type: List[str] # Patient ID(s); date if using_extra: selectcols.append( "{lit} AS {ef}".format( lit=escape_sql_string_or_int_literal(extra_value), ef=extra_fieldname, ) ) selectcols.append(patient_id_fieldname) if include_datetime: selectcols.append(f"{date_value_select} AS {datetime_heading}") # +/- table/column/content selectcols += extra_cols # Build query query = f"SELECT {', '.join(selectcols)}\n" f"FROM {table_ident}" conditions = [] # type: List[str] if have_pid_value: conditions.append(pidclause) conditions.extend(extra_conditions) query += "\nWHERE " + " AND ".join(conditions) queries.append(query) for table_id in tables: columns = research_database_info.text_columns( table_id=table_id, min_length=min_length ) if not columns: continue table_identifier = table_id.identifier(grammar) date_col = research_database_info.get_default_date_column( table=table_id ) if date_col: date_identifier = date_col.identifier(grammar) else: date_identifier = "NULL" if include_content: # Content required; therefore, one query per text column. table_select = "'{}' AS {}".format( escape_sql_string_literal(table_identifier), table_heading ) for columninfo in columns: column_identifier = columninfo.column_id.identifier(grammar) # 'extra_conditions' will be the sql fragment finding either # the fragment of text supplied or all drugs of the given type if fragment: extra = textmatch( column_name=column_identifier, fragment=fragment, as_fulltext=( columninfo.indexed_fulltext and use_fulltext_index ), dialect=settings.RESEARCH_DB_DIALECT, ) else: extra = drugmatch( colname=column_identifier, drug_type=drug_type ) contentcol_name_select = ( f"'{column_identifier}' AS {contents_colname_heading}" ) content_select = f"{column_identifier} AS _content" add_query( table_ident=table_identifier, extra_cols=[ table_select, contentcol_name_select, content_select, ], date_value_select=date_identifier, extra_conditions=[extra], ) else: # Content not required; therefore, one query per table. elements = [] # type: List[str] for columninfo in columns: if fragment: elmnt = textmatch( column_name=columninfo.column_id.identifier(grammar), fragment=fragment, as_fulltext=( columninfo.indexed_fulltext and use_fulltext_index ), dialect=settings.RESEARCH_DB_DIALECT, ) else: elmnt = drugmatch( colname=columninfo.column_id.identifier(grammar), drug_type=drug_type, ) elements.append(elmnt) add_query( table_ident=table_identifier, extra_cols=[], date_value_select=date_identifier, extra_conditions=[ "(\n {}\n)".format("\n OR ".join(elements)) ], ) sql = "\nUNION\n".join(queries) if sql: order_by_cols = [] # type: List[str] if using_extra: order_by_cols.append(extra_fieldname) order_by_cols.append(patient_id_fieldname) if include_datetime: order_by_cols.append(datetime_heading + " DESC") if include_content: order_by_cols.extend([table_heading, contents_colname_heading]) sql += "\nORDER BY " + ", ".join(order_by_cols) return sql
[docs]def common_find_text( request: HttpRequest, dbinfo: SingleResearchDatabase, form_class: Type[SQLHelperFindAnywhereForm], default_values: Dict[str, Any], permit_pid_search: bool, html_filename: str, ) -> HttpResponse: """ Finds and displays text anywhere in the database(s), via a ``UNION`` query. Args: request: the :class:`django.http.request.HttpRequest` dbinfo: a :class:`crate_anon.crateweb.research.research_db_info.SingleResearchDatabase` detailing the research database to use form_class: form class to use to specify search options; :class:`crate_anon.crateweb.research.forms.SQLHelperTextAnywhereForm` or a subclass of it (like `crate_anon.crateweb.research.forms.ClinicianAllTextFromPidForm`) default_values: default values to be passed to the form (see ``form_class``) permit_pid_search: allow the user to search by PID/MPID (for clinicians)? html_filename: Django HTML template filename to capture search options Returns: a :class:`django.http.response.HttpResponse` """ # noqa # When you forget about Django forms, go back to: # http://www.slideshare.net/pydanny/advanced-django-forms-usage # ------------------------------------------------------------------------- # What may the user use to look up patients? # ------------------------------------------------------------------------- fk_options = [] # type: List[FieldPickerInfo] if permit_pid_search: fk_options.append( FieldPickerInfo( value=dbinfo.pid_pseudo_field, description=( f"{dbinfo.pid_pseudo_field}: {dbinfo.pid_description}" ), type_=PatientFieldPythonTypes.PID, permits_empty_id=False, ) ) fk_options.append( FieldPickerInfo( value=dbinfo.mpid_pseudo_field, description=( f"{dbinfo.mpid_pseudo_field}: {dbinfo.mpid_description}" ), type_=PatientFieldPythonTypes.MPID, permits_empty_id=False, ) ) assert dbinfo.secret_lookup_db default_values["fkname"] = dbinfo.pid_pseudo_field fk_options.append( FieldPickerInfo( value=dbinfo.rid_field, description=f"{dbinfo.rid_field}: {dbinfo.rid_description}", type_=PatientFieldPythonTypes.RID, permits_empty_id=True, ), ) if dbinfo.secret_lookup_db: fk_options.append( FieldPickerInfo( value=dbinfo.mrid_field, description=f"{dbinfo.mrid_field}: {dbinfo.mrid_description}", type_=PatientFieldPythonTypes.MRID, permits_empty_id=False, ) ) # We don't want to make too much of the TRID. Let's not offer it as # a lookup option. If performance becomes a major problem with these # queries, we could always say "if dbinfo.secret_lookup_db, then # look up the TRID from the RID (or whatever we're using)". # # FieldPickerInfo(value=dbinfo.trid_field, # description="{}: {}".format(dbinfo.trid_field, # dbinfo.trid_description), # type_=PatientFieldPythonTypes.TRID), form = form_class(request.POST or default_values, fk_options=fk_options) if form.is_valid(): patient_id_fieldname = form.cleaned_data["fkname"] pidvalue = form.cleaned_data["patient_id"] min_length = form.cleaned_data["min_length"] # --------------------------------------------------------------------- # Whare are we going to use internally for the lookup? # --------------------------------------------------------------------- # For patient lookups, a TRID is quick but not so helpful for # clinicians. Use the RID. if patient_id_fieldname == dbinfo.pid_pseudo_field: lookup = ( PidLookup.objects.using(dbinfo.secret_lookup_db) .filter(pid=pidvalue) .first() ) # type: PidLookup if lookup is None: return generic_error( request, f"No patient with PID {pidvalue!r}" ) # Replace: extra_fieldname = patient_id_fieldname extra_value = pidvalue patient_id_fieldname = dbinfo.rid_field pidvalue = lookup.rid # string elif patient_id_fieldname == dbinfo.mpid_pseudo_field: lookup = ( PidLookup.objects.using(dbinfo.secret_lookup_db) .filter(mpid=pidvalue) .first() ) # type: PidLookup if lookup is None: return generic_error( request, f"No patient with MPID {pidvalue!r}" ) # Replace: extra_fieldname = patient_id_fieldname extra_value = pidvalue patient_id_fieldname = dbinfo.rid_field pidvalue = lookup.rid # string elif patient_id_fieldname == dbinfo.mrid_field: # Using MRID. This is not stored in each table. Rather than have # an absolutely enormous query (SELECT stuff FROM texttable INNER # JOIN mridtable ON patient_id_stuff WHERE textttable.contents # LIKE something AND mridtable.mrid = ? UNION SELECT morestuff...) # let's look up the RID from the MRID. Consequently, we only offer # MRID lookup if we have a secret lookup table. lookup = ( PidLookup.objects.using(dbinfo.secret_lookup_db) .filter(mrid=pidvalue) .first() ) if lookup is None: return generic_error( request, f"No patient with RID {pidvalue!r}" ) # Replace: extra_fieldname = patient_id_fieldname extra_value = pidvalue patient_id_fieldname = dbinfo.rid_field pidvalue = lookup.rid # string else: # Using RID directly (or, if we wanted to support it, TRID). extra_fieldname = None extra_value = None # --------------------------------------------------------------------- # Generate the query # --------------------------------------------------------------------- if form_class == SQLHelperDrugTypeForm: fragment = "" drug_type = escape_sql_string_literal( form.cleaned_data["drug_type"] ) else: fragment = escape_sql_string_literal(form.cleaned_data["fragment"]) drug_type = "" try: sql = textfinder_sql( patient_id_fieldname=patient_id_fieldname, fragment=fragment, drug_type=drug_type, min_length=min_length, use_fulltext_index=form.cleaned_data["use_fulltext_index"], include_content=form.cleaned_data["include_content"], include_datetime=form.cleaned_data["include_datetime"], patient_id_value=pidvalue, extra_fieldname=extra_fieldname, extra_value=extra_value, ) # This SQL will link across all available research databases # where the fieldname conditions are met. if not sql: raise ValueError( f"No fields matched your criteria (text columns of " f"minimum length {min_length} in tables containing " f"field {patient_id_fieldname!r})" ) except ValueError as e: return generic_error(request, str(e)) # --------------------------------------------------------------------- # Run, save, or display the query # --------------------------------------------------------------------- if "submit_save" in request.POST: return query_submit(request, sql, run=False) elif "submit_run" in request.POST: return query_submit(request, sql, run=True) else: return render(request, "sql_fragment.html", {"sql": sql}) # ------------------------------------------------------------------------- # Offer the starting choices # ------------------------------------------------------------------------- return render( request, html_filename, { "db_name": dbinfo.name, "db_description": dbinfo.description, "form": form, }, )
[docs]def sqlhelper_text_anywhere(request: HttpRequest) -> HttpResponse: """ Picks a database, then redirects to :func:`sqlhelper_text_anywhere_with_db`. Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ if research_database_info.single_research_db: dbname = research_database_info.first_dbinfo.name return HttpResponseRedirect( reverse(UrlNames.SQLHELPER_TEXT_ANYWHERE_WITH_DB, args=[dbname]) ) else: form = DatabasePickerForm( request.POST or None, dbinfolist=research_database_info.dbinfolist ) if form.is_valid(): dbname = form.cleaned_data["database"] return HttpResponseRedirect( reverse( UrlNames.SQLHELPER_TEXT_ANYWHERE_WITH_DB, args=[dbname] ) ) return render( request, "sqlhelper_form_text_anywhere_choose_db.html", {"form": form}, )
[docs]def sqlhelper_text_anywhere_with_db( request: HttpRequest, dbname: str ) -> HttpResponse: """ Finds text anywhere in the database(s) via a ``UNION`` query. Args: request: the :class:`django.http.request.HttpRequest` dbname: name of the research database to use Returns: a :class:`django.http.response.HttpResponse` """ try: dbinfo = research_database_info.get_dbinfo_by_name(dbname) except ValueError: return generic_error(request, f"No research database named {dbname!r}") default_values = { "fkname": dbinfo.rid_field, "min_length": DEFAULT_MIN_TEXT_FIELD_LENGTH, "use_fulltext_index": True, "include_content": False, "include_datetime": False, } return common_find_text( request=request, dbinfo=dbinfo, form_class=SQLHelperTextAnywhereForm, default_values=default_values, permit_pid_search=False, html_filename="sqlhelper_form_text_anywhere.html", )
[docs]def sqlhelper_drug_type(request: HttpRequest) -> HttpResponse: """ Picks a database, then redirects to :func:`sqlhelper_drug_type_with_db`. Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ if research_database_info.single_research_db: dbname = research_database_info.first_dbinfo.name return HttpResponseRedirect( reverse(UrlNames.SQLHELPER_DRUG_TYPE_WITH_DB, args=[dbname]) ) else: form = DatabasePickerForm( request.POST or None, dbinfolist=research_database_info.dbinfolist ) if form.is_valid(): dbname = form.cleaned_data["database"] return HttpResponseRedirect( reverse(UrlNames.SQLHELPER_DRUG_TYPE_WITH_DB, args=[dbname]) ) return render( request, "sqlhelper_form_drug_type_choose_db.html", {"form": form} )
[docs]def sqlhelper_drug_type_with_db( request: HttpRequest, dbname: str ) -> HttpResponse: """ Finds drugs of a given type anywhere in the database(s) via a ``UNION`` query. Args: request: the :class:`django.http.request.HttpRequest` dbname: name of the research database to use Returns: a :class:`django.http.response.HttpResponse` """ try: dbinfo = research_database_info.get_dbinfo_by_name(dbname) except ValueError: return generic_error(request, f"No research database named {dbname!r}") default_values = { "fkname": dbinfo.rid_field, "min_length": DEFAULT_MIN_TEXT_FIELD_LENGTH, "use_fulltext_index": True, "include_content": False, "include_datetime": False, } return common_find_text( request=request, dbinfo=dbinfo, form_class=SQLHelperDrugTypeForm, default_values=default_values, permit_pid_search=False, html_filename="sqlhelper_form_drugtype.html", )
[docs]@user_passes_test(is_clinician) def all_text_from_pid(request: HttpRequest) -> HttpResponse: """ Picks a database, then redirects to :func:`all_text_from_pid_with_db`. Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ dbinfolist = research_database_info.dbs_with_secret_map n = len(dbinfolist) if n == 0: return generic_error(request, "No databases with lookup map!") elif n == 1: dbname = dbinfolist[0].name return HttpResponseRedirect( reverse(UrlNames.ALL_TEXT_FROM_PID_WITH_DB, args=[dbname]) ) else: form = DatabasePickerForm(request.POST or None, dbinfolist=dbinfolist) if form.is_valid(): dbname = form.cleaned_data["database"] return HttpResponseRedirect( reverse(UrlNames.ALL_TEXT_FROM_PID_WITH_DB, args=[dbname]) ) return render( request, "clinician_form_all_text_from_pid_choose_db.html", {"form": form}, )
[docs]@user_passes_test(is_clinician) def all_text_from_pid_with_db( request: HttpRequest, dbname: str ) -> HttpResponse: """ Clinician view to look up a patient's RID from their PID and display text from any field. Args: request: the :class:`django.http.request.HttpRequest` dbname: name of the research database to use Returns: a :class:`django.http.response.HttpResponse` """ try: dbinfo = research_database_info.get_dbinfo_by_name(dbname) except ValueError: return generic_error(request, f"No research database named {dbname!r}") default_values = { "min_length": DEFAULT_MIN_TEXT_FIELD_LENGTH, "use_fulltext_index": True, "include_content": True, "include_datetime": True, } return common_find_text( request=request, dbinfo=dbinfo, form_class=ClinicianAllTextFromPidForm, default_values=default_values, permit_pid_search=True, html_filename="clinician_form_all_text_from_pid.html", )
# ============================================================================= # Per-patient views: Patient Explorer # =============================================================================
[docs]def pe_build(request: HttpRequest) -> HttpResponse: """ View to build/edit a Patient Explorer (see :class:`crate_anon.crateweb.research.models.PatientExplorer`). Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ # noinspection PyUnresolvedReferences profile = request.user.profile # type: UserProfile default_database = research_database_info.get_default_database_name() default_schema = research_database_info.get_default_schema_name() with_database = research_database_info.uses_database_level() manual_form = None form = None if not profile.patient_multiquery_scratchpad: profile.patient_multiquery_scratchpad = PatientMultiQuery() pmq = profile.patient_multiquery_scratchpad if request.method == "POST": if "global_clear_select" in request.POST: pmq.clear_output_columns() profile.save() elif "global_clear_where" in request.POST: pmq.clear_patient_conditions() profile.save() elif "global_clear_everything" in request.POST: pmq.clear_output_columns() pmq.clear_patient_conditions() pmq.set_override_query("") profile.save() elif "global_save" in request.POST: if pmq.ok_to_run: return pe_submit(request, pmq, run=False) elif "global_run" in request.POST: if pmq.ok_to_run: return pe_submit(request, pmq, run=True) elif "global_manual_set" in request.POST: manual_form = ManualPeQueryForm(request.POST) if manual_form.is_valid(): sql = manual_form.cleaned_data["sql"] pmq.set_override_query(sql) profile.save() elif "global_manual_clear" in request.POST: pmq.set_override_query("") profile.save() else: form = QueryBuilderForm(request.POST, request.FILES) if form.is_valid(): database = ( form.cleaned_data["database"] if with_database else "" ) schema = form.cleaned_data["schema"] table = form.cleaned_data["table"] column = form.cleaned_data["column"] column_id = ColumnId( db=database, schema=schema, table=table, column=column ) if "submit_select" in request.POST: pmq.add_output_column(column_id) elif "submit_select_star" in request.POST: table_id = column_id.table_id all_column_ids = [ c.column_id for c in research_database_info.all_columns(table_id) ] for c in all_column_ids: pmq.add_output_column(c) elif "submit_where" in request.POST: datatype = form.cleaned_data["datatype"] op = form.cleaned_data["where_op"] # Value if op in SQL_OPS_MULTIPLE_VALUES: value = form.file_values_list elif op in SQL_OPS_VALUE_UNNECESSARY: value = None else: value = form.get_cleaned_where_value() # WHERE fragment wherecond = WhereCondition( column_id=column_id, op=op, datatype=datatype, value_or_values=value, ) pmq.add_patient_condition(wherecond) else: raise ValueError("Bad form command!") profile.save() else: # log.critical("not is_valid") pass manual_query = pmq.manual_patient_id_query if form is None: form = QueryBuilderForm() if manual_form is None: manual_form = ManualPeQueryForm({"sql": manual_query}) starting_values_dict = { "database": form.data.get("database", "") if with_database else "", "schema": form.data.get("schema", ""), "table": form.data.get("table", ""), "column": form.data.get("column", ""), "op": form.data.get("where_op", ""), "date_value": form.data.get("date_value", ""), # Impossible to set file_value programmatically. (See querybuilder.js.) "float_value": form.data.get("float_value", ""), "int_value": form.data.get("int_value", ""), "string_value": form.data.get("string_value", ""), "offer_where": bool(True), "form_errors": "<br>".join( f"{k}: {v}" for k, v in form.errors.items() ), "default_database": default_database, "default_schema": default_schema, "with_database": with_database, } if manual_query: pmq_patient_conditions = ( "<div><i>Overridden by manual query.</i></div>" ) pmq_manual_patient_query = prettify_sql_html( pmq.manual_patient_id_query ) else: pmq_patient_conditions = pmq.pt_conditions_html pmq_manual_patient_query = "<div><i>None</i></div>" pmq_final_patient_query = prettify_sql_html( pmq.patient_id_query(with_order_by=True) ) warnings = "" if not pmq.has_patient_id_query: warnings += '<div class="warning">No patient criteria yet</div>' if not pmq.has_output_columns: warnings += '<div class="warning">No output columns yet</div>' context = { "nav_on_pe_build": True, "pmq_output_columns": pmq.output_cols_html, "pmq_patient_conditions": pmq_patient_conditions, "pmq_manual_patient_query": pmq_manual_patient_query, "pmq_final_patient_query": pmq_final_patient_query, "warnings": warnings, "database_structure": get_db_structure_json(), "starting_values": json.dumps( starting_values_dict, separators=JSON_SEPARATORS_COMPACT ), "sql_dialect": settings.RESEARCH_DB_DIALECT, "dialect_mysql": settings.RESEARCH_DB_DIALECT == SqlaDialectName.MYSQL, "dialect_mssql": settings.RESEARCH_DB_DIALECT == SqlaDialectName.MSSQL, "sql_highlight_css": prettify_sql_css(), "manual_form": manual_form, } context.update(query_context(request)) return render(request, "pe_build.html", context)
[docs]def pe_choose(request: HttpRequest) -> HttpResponse: """ Choose one of the user's Patient Explorers (see :class:`crate_anon.crateweb.research.models.PatientExplorer`). Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ all_pes = get_all_pes(request) patient_explorers = paginate(request, all_pes) context = { "nav_on_pe_choose": True, "patient_explorers": patient_explorers, "sql_highlight_css": prettify_sql_css(), } context.update(query_context(request)) return render(request, "pe_choose.html", context)
[docs]def pe_activate(request: HttpRequest, pe_id: str) -> HttpResponse: """ Activate one of the user's Patient Explorers. Args: request: the :class:`django.http.request.HttpRequest` pe_id: string form of the integer PK of a :class:`crate_anon.crateweb.research.models.PatientExplorer` Returns: a :class:`django.http.response.HttpResponse` """ validate_blank_form(request) pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer pe.activate() return redirect(UrlNames.PE_CHOOSE)
[docs]def pe_delete(request: HttpRequest, pe_id: str) -> HttpResponse: """ Delete one of the user's Patient Explorers. Args: request: the :class:`django.http.request.HttpRequest` pe_id: string form of the integer PK of a :class:`crate_anon.crateweb.research.models.PatientExplorer` Returns: a :class:`django.http.response.HttpResponse` """ validate_blank_form(request) pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer pe.delete_if_permitted() return redirect(UrlNames.PE_CHOOSE)
[docs]def pe_edit(request: HttpRequest, pe_id: str) -> HttpResponse: """ Edit one of the user's Patient Explorers. Args: request: the :class:`django.http.request.HttpRequest` pe_id: string form of the integer PK of a :class:`crate_anon.crateweb.research.models.PatientExplorer` Returns: a :class:`django.http.response.HttpResponse` """ validate_blank_form(request) pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer # noinspection PyUnresolvedReferences profile = request.user.profile # type: UserProfile profile.patient_multiquery_scratchpad = pe.patient_multiquery profile.save() return redirect(UrlNames.PE_BUILD)
[docs]def pe_results(request: HttpRequest, pe_id: str) -> HttpResponse: """ Show the results of a Patient Explorer, in paginated tabular form. Args: request: the :class:`django.http.request.HttpRequest` pe_id: string form of the integer PK of a :class:`crate_anon.crateweb.research.models.PatientExplorer` Returns: a :class:`django.http.response.HttpResponse` """ pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer grammar = research_database_info.grammar # noinspection PyUnresolvedReferences profile = request.user.profile # type: UserProfile highlights = Highlight.get_active_highlights(request) highlight_dict = Highlight.as_ordered_dict(highlights) element_counter = HtmlElementCounter() patient_id_query_html = prettify_sql_html(pe.get_patient_id_query()) patients_per_page = get_patients_per_page(request) try: mrids = pe.get_patient_mrids() page = paginate(request, mrids, per_page=patients_per_page) active_mrids = list(page) # type: List[str] results = [] # type: List[Dict[str, Any]] if active_mrids: for tsa in pe.all_queries(mrids=active_mrids): table_id = tsa.table_id sql = tsa.sql args = tsa.args with pe.get_executed_cursor(sql, args) as cursor: fieldnames = get_fieldnames_from_cursor(cursor) rows = cursor.fetchall() table_html = resultset_html_table( fieldnames=fieldnames, rows=rows, element_counter=element_counter, highlight_dict=highlight_dict, collapse_at_len=profile.collapse_at_len, collapse_at_n_lines=profile.collapse_at_n_lines, line_length=profile.line_length, ) query_html = element_counter.visibility_div_with_divbutton( contents=prettify_sql_and_args(sql, args), title_html="SQL", ) results.append( { "tablename": table_id.identifier(grammar), "table_html": table_html, "query_html": query_html, } ) n_records = len(mrids) context = { "nav_on_pe_results": True, "results": results, "page": page, "rowcount": n_records, "patient_id_query_html": patient_id_query_html, "patients_per_page": patients_per_page, "sql_highlight_css": prettify_sql_css(), } context.update(query_context(request)) pe.audit(n_records=n_records) return render(request, "pe_result.html", context) except DatabaseError as exception: pe.audit(failed=True, fail_msg=str(exception)) return render_bad_pe(request, pe, exception)
[docs]def render_missing_pe(request: HttpRequest) -> HttpResponse: """ Tell the user that there's no Patient Explorer selected, when there should have been. Args: request: the :class:`django.http.request.HttpRequest` Returns: a :class:`django.http.response.HttpResponse` """ return render(request, "pe_missing.html", query_context(request))
# noinspection PyUnusedLocal
[docs]def render_bad_pe( request: HttpRequest, pe: PatientExplorer, exception: Exception ) -> HttpResponse: """ A view saying "your Patient Explorer failed". Args: request: the :class:`django.http.request.HttpRequest` pe: the :class:`crate_anon.crateweb.research.models.PatientExplorer` that went wrong exception: the Python exception that resulted, which may have had extra information attached via :func:`cardinal_pythonlib.exceptions.add_info_to_exception` Returns: a :class:`django.http.response.HttpResponse` """ info = recover_info_from_exception(exception) final_sql = info.get("sql", "") args = info.get("args", []) context = { "exception": repr(exception), "query": prettify_sql_and_args(final_sql, args), "sql_highlight_css": prettify_sql_css(), } context.update(query_context(request)) return render(request, "pe_bad.html", context)
# def render_bad_pe_id(request: HttpRequest, pe_id: int) -> HttpResponse: # context = {'pe_id': pe_id} # context.update(query_context(request)) # return render(request, 'pe_bad_id.html', context)
[docs]def get_all_pes(request: HttpRequest) -> QuerySet: """ Return all Patient Explorers for the current user. Args: request: the :class:`django.http.request.HttpRequest` Returns: request: a :class:`django.db.models.QuerySet` for :class:`crate_anon.crateweb.research.models.PatientExplorer` objects """ return PatientExplorer.objects.filter( user=request.user, deleted=False ).order_by("-active", "-created")
[docs]def get_identical_pes( request: HttpRequest, pmq: PatientMultiQuery ) -> List[PatientExplorer]: """ Return all Patient Explorers for the current user whose query is identical to the query specified. Args: request: the :class:`django.http.request.HttpRequest` pmq: a :class:`crate_anon.crateweb.research.models.PatientMultiQuery` Returns: a list of :class:`crate_anon.crateweb.research.models.PatientExplorer` objects """ all_pes = get_all_pes(request) # identical_pes = all_pes.filter(patient_multiquery=pmq) # # ... this works, but does so by converting the parameter (pmq) to its # JSON representation, presumably via JsonClassField.get_prep_value(). # Accordingly, we can predict problems under SQL Server with very long # strings; see the problem in query_submit(). # So, we should similarly hash: identical_pes = all_pes.filter(pmq_hash=pmq.hash64) # Beware: Python's hash() function will downconvert to 32 bits on 32-bit # machines; use pmq.hash64() directly, not hash(pmq). # Double-check in Python in case of hash collision: return [pe for pe in identical_pes if pe.patient_multiquery == pmq]
[docs]def pe_submit( request: HttpRequest, pmq: PatientMultiQuery, run: bool = False ) -> HttpResponse: """ Save a :class:`crate_anon.crateweb.research.models.PatientMultiQuery` as a :class:`crate_anon.crateweb.research.models.PatientExplorer` for the current user. Args: request: the :class:`django.http.request.HttpRequest` pmq: a :class:`crate_anon.crateweb.research.models.PatientMultiQuery` run: run and show results? Otherwise, save and return to the choice view Returns: a :class:`django.http.response.HttpResponse` """ identical_pes = get_identical_pes(request, pmq) if identical_pes: identical_pes[0].activate() pe_id = identical_pes[0].id else: pe = PatientExplorer( patient_multiquery=pmq, user=request.user, active=True ) pe.save() pe_id = pe.id # log.critical(pprint.pformat(connection.queries)) # show all queries # redirect to a new URL: if run: return redirect(UrlNames.PE_RESULTS, pe_id) else: return redirect(UrlNames.PE_CHOOSE)
[docs]def pe_tsv_zip(request: HttpRequest, pe_id: str) -> HttpResponse: """ Return the results of a :class:`crate_anon.crateweb.research.models.PatientExplorer` as a ZIP file of TSV files. Args: request: the :class:`django.http.request.HttpRequest` pe_id: string form of the integer PK of a :class:`crate_anon.crateweb.research.models.PatientExplorer` Returns: a :class:`django.http.response.HttpResponse` """ # https://stackoverflow.com/questions/12881294/django-create-a-zip-of-multiple-files-and-make-it-downloadable # noqa pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer try: response = file_response( pe.get_zipped_tsv_binary(), content_type=ContentType.ZIP, filename="crate_pe_{num}_{datetime}.zip".format( num=pe.id, datetime=datetime_iso_for_filename(), ), ) pe.audit() return response except DatabaseError as exception: pe.audit(failed=True, fail_msg=str(exception)) return render_bad_pe(request, pe, exception)
[docs]def pe_excel(request: HttpRequest, pe_id: str) -> HttpResponse: """ Return the results of a :class:`crate_anon.crateweb.research.models.PatientExplorer` as an Excel XLSX file. Args: request: the :class:`django.http.request.HttpRequest` pe_id: string form of the integer PK of a :class:`crate_anon.crateweb.research.models.PatientExplorer` Returns: a :class:`django.http.response.HttpResponse` """ pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer try: response = file_response( pe.get_xlsx_binary(), content_type=ContentType.XLSX, filename="crate_pe_{num}_{datetime}.xlsx".format( num=pe.id, datetime=datetime_iso_for_filename(), ), ) pe.audit() return response except DatabaseError as exception: pe.audit(failed=True, fail_msg=str(exception)) return render_bad_pe(request, pe, exception)
[docs]def pe_data_finder_results(request: HttpRequest, pe_id: str) -> HttpResponse: """ Shows the **data finder** view of a :class:`crate_anon.crateweb.research.models.PatientExplorer`. This counts records for each table (by patient), without showing all the data. Args: request: the :class:`django.http.request.HttpRequest` pe_id: string form of the integer PK of a :class:`crate_anon.crateweb.research.models.PatientExplorer` Returns: a :class:`django.http.response.HttpResponse` """ pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer # noinspection PyUnresolvedReferences profile = request.user.profile # type: UserProfile patients_per_page = get_patients_per_page(request) element_counter = HtmlElementCounter() patient_id_query_html = prettify_sql_html(pe.get_patient_id_query()) # If this query is done as a UNION, it's massive, e.g. ~410 characters # * number of tables (e.g. 1448 in one RiO database), for 0.5 Mb of query. # So do it more sensibly: try: mrids = pe.get_patient_mrids() page = paginate(request, mrids, per_page=patients_per_page) active_mrids = list(page) # type: List[str] results_table_html = "" query_html = "" if active_mrids: fieldnames = [] # type: List[str] rows = [] # type: List[List[Any]] for tsa in pe.patient_multiquery.gen_data_finder_queries( mrids=active_mrids ): table_identifier = tsa.table_id sql = tsa.sql args = tsa.args with pe.get_executed_cursor(sql, args) as cursor: if not fieldnames: fieldnames = get_fieldnames_from_cursor(cursor) rows = cursor.fetchall() query_html += ( element_counter.visibility_div_with_divbutton( contents=prettify_sql_and_args(sql, args), title_html=f"SQL for {table_identifier}", ) ) results_table_html = resultset_html_table( fieldnames=fieldnames, rows=rows, element_counter=element_counter, collapse_at_len=profile.collapse_at_len, collapse_at_n_lines=profile.collapse_at_n_lines, line_length=profile.line_length, no_ditto_cols=[2, 3, 4], null="", ) n_records = len(mrids) context = { "nav_on_pe_df_results": True, "some_patients": len(active_mrids) > 0, "results_table_html": results_table_html, "query_html": query_html, "page": page, "rowcount": n_records, "patient_id_query_html": patient_id_query_html, "patients_per_page": patients_per_page, "sql_highlight_css": prettify_sql_css(), } context.update(query_context(request)) pe.audit(count_only=True, n_records=n_records) return render(request, "pe_df_result.html", context) except DatabaseError as exception: pe.audit(failed=True, fail_msg=str(exception)) return render_bad_pe(request, pe, exception)
[docs]def pe_data_finder_excel(request: HttpRequest, pe_id: str) -> HttpResponse: """ Serves the data finder view of a :class:`crate_anon.crateweb.research.models.PatientExplorer` (see :func:`pe_data_finder_results`) as an Excel XLSX file. Args: request: the :class:`django.http.request.HttpRequest` pe_id: string form of the integer PK of a :class:`crate_anon.crateweb.research.models.PatientExplorer` Returns: a :class:`django.http.response.HttpResponse` """ pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer try: return file_response( pe.data_finder_excel, content_type=ContentType.XLSX, filename="crate_pe_df_{num}_{datetime}.xlsx".format( num=pe.id, datetime=datetime_iso_for_filename(), ), ) except DatabaseError as exception: return render_bad_pe(request, pe, exception)
[docs]def pe_monster_results(request: HttpRequest, pe_id: str) -> HttpResponse: """ Shows the **monster data** view of a :class:`crate_anon.crateweb.research.models.PatientExplorer`. This performs a ``SELECT(*)`` for all rows retrieved by the PatientExplorer. Args: request: the :class:`django.http.request.HttpRequest` pe_id: string form of the integer PK of a :class:`crate_anon.crateweb.research.models.PatientExplorer` Returns: a :class:`django.http.response.HttpResponse` """ pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer grammar = research_database_info.grammar # noinspection PyUnresolvedReferences profile = request.user.profile # type: UserProfile highlights = Highlight.get_active_highlights(request) highlight_dict = Highlight.as_ordered_dict(highlights) element_counter = HtmlElementCounter() patient_id_query_html = prettify_sql_html(pe.get_patient_id_query()) patients_per_page = get_patients_per_page(request) try: rids = pe.get_patient_mrids() page = paginate(request, rids, per_page=patients_per_page) active_rids = list(page) results = [] # type: List[Dict[str, Any]] pmq = pe.patient_multiquery if active_rids: for tsa in pmq.gen_monster_queries(mrids=active_rids): table_id = tsa.table_id sql = tsa.sql args = tsa.args with pe.get_executed_cursor(sql, args) as cursor: fieldnames = get_fieldnames_from_cursor(cursor) rows = cursor.fetchall() if rows: table_html = resultset_html_table( fieldnames=fieldnames, rows=rows, element_counter=element_counter, highlight_dict=highlight_dict, collapse_at_len=profile.collapse_at_len, collapse_at_n_lines=profile.collapse_at_n_lines, line_length=profile.line_length, ) else: table_html = "<div><i>No data</i></div>" query_html = element_counter.visibility_div_with_divbutton( contents=prettify_sql_and_args(sql, args), title_html="SQL", ) results.append( { "tablename": table_id.identifier(grammar), "table_html": table_html, "query_html": query_html, } ) context = { "nav_on_pe_monster_results": True, "results": results, "page": page, "rowcount": len(rids), "patient_id_query_html": patient_id_query_html, "patients_per_page": patients_per_page, "sql_highlight_css": prettify_sql_css(), } context.update(query_context(request)) return render(request, "pe_monster_result.html", context) except DatabaseError as exception: return render_bad_pe(request, pe, exception)
[docs]def pe_table_browser(request: HttpRequest, pe_id: str) -> HttpResponse: """ Shows the **table browser** view of a :class:`crate_anon.crateweb.research.models.PatientExplorer`. This shows a list of all tables in the database, with hyperlinks to a single-table Patient Explorer view for each. Args: request: the :class:`django.http.request.HttpRequest` pe_id: string form of the integer PK of a :class:`crate_anon.crateweb.research.models.PatientExplorer` Returns: a :class:`django.http.response.HttpResponse` """ pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer tables = research_database_info.get_tables() with_database = research_database_info.uses_database_level() try: context = { "nav_on_pe_table_browser": True, "pe_id": pe_id, "tables": tables, "with_database": with_database, } context.update(query_context(request)) return render(request, "pe_table_browser.html", context) except DatabaseError as exception: return render_bad_pe(request, pe, exception)
[docs]def pe_one_table( request: HttpRequest, pe_id: str, schema: str, table: str, db: str = "" ) -> HttpResponse: """ Shows the **single table** view of a :class:`crate_anon.crateweb.research.models.PatientExplorer`. This shows results from a single table. Args: request: the :class:`django.http.request.HttpRequest` pe_id: string form of the integer PK of a :class:`crate_anon.crateweb.research.models.PatientExplorer` schema: name of the table's schema table: name of the table db: name of the table's database (above the schema level), if appliable Returns: a :class:`django.http.response.HttpResponse` .. todo:: pe_one_table Might it be better to feed the resulting query back into the main Query system, allowing users to turn columns on/off, etc.? At present it forces ``query_id`` to ``None`` and this is detected by ``query_result.html``. """ pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer table_id = TableId(db=db, schema=schema, table=table) grammar = research_database_info.grammar highlights = Highlight.get_active_highlights(request) highlight_dict = Highlight.as_ordered_dict(highlights) element_counter = HtmlElementCounter() # noinspection PyUnresolvedReferences profile = request.user.profile # type: UserProfile patients_per_page = get_patients_per_page(request) try: mrids = pe.get_patient_mrids() page = paginate(request, mrids, per_page=patients_per_page) active_mrids = list(page) table_html = "<div><i>No data</i></div>" sql = "" args = [] # type: List[Any] rowcount = 0 if active_mrids: mrid_column = research_database_info.get_mrid_column_from_table( table_id ) where_clause = "{mrid} IN ({in_clause})".format( mrid=mrid_column.identifier(grammar), in_clause=",".join(["?"] * len(active_mrids)), ) # ... see notes for translate_sql_qmark_to_percent() args = active_mrids sql = add_to_select( sql="", select_elements=[ SelectElement( raw_select="*", from_table_for_raw_select=table_id ) ], grammar=grammar, where_conditions=[ WhereCondition( raw_sql=where_clause, from_table_for_raw_sql=mrid_column.table_id, ) ], magic_join=True, formatted=True, ) with pe.get_executed_cursor(sql, args) as cursor: fieldnames = get_fieldnames_from_cursor(cursor) rows = cursor.fetchall() rowcount = cursor.rowcount if rows: table_html = resultset_html_table( fieldnames=fieldnames, rows=rows, element_counter=element_counter, highlight_dict=highlight_dict, collapse_at_len=profile.collapse_at_len, collapse_at_n_lines=profile.collapse_at_n_lines, line_length=profile.line_length, ) # Render context = { "table_html": table_html, "page": page, "query_id": None, "rowcount": rowcount, "sql": prettify_sql_and_args(sql=sql, args=args), "sql_highlight_css": prettify_sql_css(), } context.update(query_context(request)) return render(request, "query_result.html", context) except DatabaseError as exception: return render_bad_pe(request, pe, exception)
# ============================================================================= # Archive and visualization zone system # ============================================================================= # ----------------------------------------------------------------------------- # Archive views # -----------------------------------------------------------------------------
[docs]def launch_archive(request: HttpRequest) -> HttpResponse: """ Takes the submitted ``patient_id`` (from ``request.POST``) and launches the archive's root page for that patient. Args: request: the Django :class:`HttpRequest` object """ if not ARCHIVE_IS_CONFIGURED: return archive_misconfigured_response() return redirect(archive_root_url())
[docs]@cache_control(private=True, max_age=CACHE_CONTROL_MAX_AGE_ARCHIVE_TEMPLATES) def archive_template(request: HttpRequest) -> HttpResponse: """ Provides the views for the configurable "archive" system. Args: request: the Django :class:`HttpRequest` object Returns: a Django :class:`HttpResponse`. Note: - The archive template name is in ``request.GET``; this allows the use of special (e.g. filename) characters. - The patient ID is also in ``request.GET``; this allows the optional use of more complex strings, e.g. JSON, to represent multiple ID numbers for use with several databases. - Additional arguments are also in ``request.GET``. - To create a URL within a Django template, one would use e.g. .. code-block:: none <a href="{% url 'archive' patient_id template_name %}">link text</a> (the ``'archive'`` bit being configured in :mod:`crate_anon.crateweb.config.urls`). - But to create a URL within a Mako template, there are several ways... for example, in CamCOPS via Pyramid, we use ``request.route_url(...)``. We can make this simple by passing patient-specific function to the context; see :ref:`the Python Mako context <archive_mako_context>`. - If we use DMP, it will add a ``request.dmp`` object; see https://doconix.github.io/django-mako-plus/topics_variables.html. (We won't use DMP.) """ if not ARCHIVE_IS_CONFIGURED: return archive_misconfigured_response() # noinspection PyCallByClass,PyArgumentList template_name = request.GET.get(UrlKeys.TEMPLATE) if not template_name: return HttpResponseBadRequest( f"URL arguments must include the key {UrlKeys.TEMPLATE!r}" ) # log.debug(f"Archive template request: {template_name!r}") # noinspection PyCallByClass,PyArgumentList patient_id = request.GET.get(UrlKeys.PATIENT_ID, "") # ------------------------------------------------------------------------- # URL builders # ------------------------------------------------------------------------- def same_patient_template_url(_template: str, **kw) -> str: """ Returns a URL to a template for the same patient. """ return archive_template_url(_template, patient_id=patient_id, **kw) def same_patient_attachment_url(_filename: str, **kw) -> str: """ Returns a URL to an attachment, marked as being for the same patient. """ return archive_attachment_url(_filename, patient_id=patient_id, **kw) # ------------------------------------------------------------------------- # Build context # ------------------------------------------------------------------------- context = copy.copy(ARCHIVE_CONTEXT) context.update( { ArchiveContextKeys.get_patient_template_url: same_patient_template_url, # noqa: E501 ArchiveContextKeys.get_template_url: archive_template_url, ArchiveContextKeys.get_attachment_url: same_patient_attachment_url, ArchiveContextKeys.CRATE_HOME_URL: reverse("home"), ArchiveContextKeys.execute: get_executed_researchdb_cursor_qmark_placeholders, # noqa: E501 ArchiveContextKeys.patient_id: patient_id, ArchiveContextKeys.query_params: request.GET, ArchiveContextKeys.request: request, ArchiveContextKeys.get_static_url: archive_static_url, } ) # log.debug("Archive template {!r} with context {!r}", # template_name, context) # ------------------------------------------------------------------------- # Render # ------------------------------------------------------------------------- try: template = archive_mako_lookup.get_template(template_name) except TemplateLookupException: return HttpResponseBadRequest( f"No such archive template: {template_name!r}" ) html = template.render(**context) # noinspection PyArgumentList query_string = request.GET.urlencode() audit_archive_template(request, patient_id, query_string) return HttpResponse(html)
[docs]@cache_control(private=True, max_age=CACHE_CONTROL_MAX_AGE_ARCHIVE_ATTACHMENTS) def archive_attachment(request: HttpRequest) -> HttpResponseBase: """ Serve a binary file from the archive. The patient_id is not required to find the file -- but is used for the audit trail. (It would be possible for the end user to look up a file by name via a faked patient_id. However, this could be established from the audit log.) Args: request: the Django :class:`HttpRequest` object """ if not ARCHIVE_IS_CONFIGURED: return archive_misconfigured_response() # noinspection PyCallByClass,PyArgumentList patient_id = request.GET.get(UrlKeys.PATIENT_ID) if not patient_id: return HttpResponseBadRequest( f"URL arguments must include the key {UrlKeys.PATIENT_ID!r}" ) # noinspection PyCallByClass,PyArgumentList,PyTypeChecker content_type = request.GET.get(UrlKeys.CONTENT_TYPE, None) # noinspection PyCallByClass,PyArgumentList filename = request.GET.get(UrlKeys.FILENAME) if not filename: return HttpResponseBadRequest( f"URL arguments must include the key {UrlKeys.FILENAME!r}" ) # log.debug(f"Archive attachment request: {filename!r}") try: # noinspection PyArgumentList,PyCallByClass guess_content_type = bool( int(request.GET.get(UrlKeys.GUESS_CONTENT_TYPE)) ) except (TypeError, ValueError): guess_content_type = DEFAULT_GUESS_CONTENT_TYPE # noinspection PyCallByClass,PyTypeChecker offered_filename = request.GET.get(UrlKeys.OFFERED_FILENAME, None) full_filename = get_archive_attachment_filepath(filename) if not full_filename: return HttpResponseBadRequest( f"Invalid archive attachment filename: {filename!r}" ) if content_type: final_content_type = content_type elif guess_content_type: final_content_type = guess_mimetype(filename) else: final_content_type = None # ensure it's not "" # If content_type is None, we wil end up with "application/force-download", # which is OK. prefer_inline = bool(final_content_type) offered_filename = offered_filename or basename(filename) # log.critical( # f"archive_attachment(): " # f"content_type: {content_type!r}, " # f"guess_content_type: {guess_content_type!r}, " # f"final_content_type: {final_content_type!r}, " # f"filename: {filename!r}, " # f"full_filename: {full_filename!r}, " # f"offered_filename: {offered_filename!r}, " # f"prefer_inline: {prefer_inline!r}") audit_archive_attachment(request, patient_id, filename) return serve_file( path_to_file=full_filename, offered_filename=offered_filename, content_type=final_content_type, as_attachment=not prefer_inline, as_inline=prefer_inline, )
[docs]@cache_control(private=True, max_age=CACHE_CONTROL_MAX_AGE_ARCHIVE_STATIC) def archive_static(request: HttpRequest) -> HttpResponseBase: """ Serve a static file from the archive. Args: request: the Django :class:`HttpRequest` object """ if not ARCHIVE_IS_CONFIGURED: return archive_misconfigured_response() # noinspection PyCallByClass,PyArgumentList filename = request.GET.get(UrlKeys.FILENAME) # log.debug(f"Archive static request: {filename!r}") full_filename = get_archive_static_filepath(filename) if not full_filename: return HttpResponseBadRequest( f"Invalid archive static filename: {filename!r}" ) return serve_file( path_to_file=full_filename, as_attachment=False, as_inline=False, default_content_type=None, )