#!/usr/bin/env python
"""
crate_anon/anonymise/researcher_report.py
===============================================================================
Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CRATE.
CRATE is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CRATE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CRATE. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
**Produce a researcher-oriented report about a destination database.**
"""
import argparse
from dataclasses import dataclass
import datetime
import decimal
import enum
import logging
import os
from typing import Any, Dict, List, Optional, Tuple
from cardinal_pythonlib.datetimefunc import (
format_datetime,
get_now_localtz_pendulum,
strfdelta,
)
from cardinal_pythonlib.logs import main_only_quicksetup_rootlogger
from cardinal_pythonlib.pdf import make_pdf_on_disk_from_html
import django
from django.conf import settings
from django.template.loader import render_to_string
import pendulum
from sqlalchemy.engine.url import make_url, URL
from sqlalchemy.sql.expression import distinct, func, select, table
from sqlalchemy.schema import Column, Table
from crate_anon.anonymise.config import Config
from crate_anon.anonymise.constants import ANON_CONFIG_ENV_VAR
from crate_anon.anonymise.dbholder import DatabaseHolder
from crate_anon.anonymise.ddr import DataDictionaryRow, DDRLabels
from crate_anon.common.argparse_assist import (
RawDescriptionArgumentDefaultsRichHelpFormatter,
)
from crate_anon.common.sql import ReflectedColumnInfo
from crate_anon.version import CRATE_VERSION, CRATE_VERSION_PRETTY
log = logging.getLogger(__name__)
# =============================================================================
# Constants
# =============================================================================
THIS_DIR = os.path.abspath(os.path.dirname(__file__))
TEMPLATE_DIR = os.path.join(THIS_DIR, "templates", "researcher_report")
[docs]class Templates:
"""
Template filenames, within TEMPLATE_DIR.
"""
PDF_FOOTER = "pdf_footer.html"
PDF_HEADER = "pdf_header.html"
REPORT = "report.html"
STYLE = "style.css"
TABLE = "table.html"
class DateFormat:
# https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes # noqa: E501
PRETTY = "%a %d %B %Y, %H:%M %z"
# ... e.g. Wed 24 July 2013, 20:04 +0100
DATE = "%Y-%m-%d" # e.g. 2023-07-24
DATETIME = "%Y-%m-%d %H:%M" # e.g. 2023-07-24 20:04
TIME = "%H:%M" # e.g. 20:04
# And one for our custom strfdelta function:
TIMEDELTA = "{D:02}d {H:02}h {M:02}m {S:02}s"
[docs]class Default:
"""
Default values.
"""
BASE_FONT_SIZE = "11pt"
HEADER_FOOTER_SPACING_MM = 3
# ... always in mm; https://wkhtmltopdf.org/usage/wkhtmltopdf.txt
MAX_DISTINCT_VALUES = 20
MAX_VALUE_LENGTH = 50
ORIENTATION = "landscape"
PAGE_SIZE = "A4"
MARGIN_LEFT_RIGHT = "15mm"
MARGIN_TOP_BOTTOM = "18mm" # see HEADER_FOOTER_SPACING_MM
ELLIPSIS = "…"
EN_DASH = "–"
MINUS = "−"
HYPHEN = "-"
SINGLE_QUOTE_L = "‘"
SINGLE_QUOTE_R = "’"
# SINGLE_QUOTE = "'"
# TWO_SINGLE_QUOTES = "''"
TICK = "✓"
# RIGHT_ARROW = "►"
# =============================================================================
# Helper classes/functions
# =============================================================================
[docs]@dataclass
class ResearcherReportConfig:
output_filename: str
anonconfig: Config = None
base_font_size: str = Default.BASE_FONT_SIZE
db_name: str = None # overrides that in config
db_url: str = None # overrides that in config
debug_pdf: bool = False
max_distinct_values: int = Default.MAX_DISTINCT_VALUES
max_value_length: int = Default.MAX_VALUE_LENGTH
header_footer_spacing_mm: int = Default.HEADER_FOOTER_SPACING_MM
margin_left_right: str = Default.MARGIN_LEFT_RIGHT
margin_top_bottom: str = Default.MARGIN_TOP_BOTTOM
page_size: str = Default.PAGE_SIZE
orientation: str = Default.ORIENTATION
show_counts: bool = True # count records in each table?
show_url: bool = True # include a sanitised URL for the database
show_values: bool = True # include specimen values/ranges
skip_values_if_too_many: bool = False
use_dd: bool = True # include info from the data dictionary
echo: bool = False # echo SQL
def __post_init__(self) -> None:
# Set up lookups.
anonconfig = self.anonconfig
if anonconfig:
self.annotation_from_colname = {
anonconfig.trid_fieldname: DDRLabels.TRID,
anonconfig.master_research_id_fieldname: DDRLabels.MRID,
anonconfig.research_id_fieldname: DDRLabels.RID,
anonconfig.source_hash_fieldname: DDRLabels.SOURCE_HASH,
}
# Set up DD
if self.use_dd:
anonconfig.load_dd(check_against_source_db=False)
else:
self.use_dd = False
# Set up database
if self.db_url:
# Use a custom database
if not self.db_name:
raise ValueError(
"Must specify database name if passing a custom URL"
)
self.db = DatabaseHolder(
self.db_name,
self.db_url,
with_session=True,
reflect=True,
echo=self.echo,
)
else:
# Use destination database from the config
if not anonconfig:
raise ValueError(
"Must specify a CRATE anonymisation config file if you "
"do not specify a database by URL/name"
)
self.db = anonconfig.destdb
self.db.engine.echo = self.echo
self.db.enable_reflect()
self.db.create_session()
self.db_name = self.db_name or anonconfig.destdb.name
self.db_url = self.db.engine.url
self.db_session = self.db.session
[docs] def safe_db_url_if_selected(self) -> str:
"""
Sanitised version of the database URL, or a blank string if not
enabled.
"""
if not self.show_url or not self.db_url:
return ""
url_obj = make_url(self.db_url) # type: URL
return repr(url_obj)
# For SQLAlchemy URL objects, the default str() implementation calls
# self.__to_string__(hide_password=False), but the default repr() hides
# passwords.
[docs] def wkhtmltopdf_options(self) -> Dict[str, Optional[str]]:
"""
Returns wkhtmltopdf options for the current setup.
"""
return { # dict for pdfkit
"page-size": self.page_size,
"margin-left": self.margin_left_right,
"margin-right": self.margin_left_right,
"margin-top": self.margin_top_bottom,
"margin-bottom": self.margin_top_bottom,
"header-spacing": str(self.header_footer_spacing_mm),
"footer-spacing": str(self.header_footer_spacing_mm),
# "--print-media-type": None
# ... https://stackoverflow.com/q/42005819
"orientation": self.orientation,
}
[docs] def get_db_name(self) -> str:
"""
Returns a short database name used for titles.
"""
return self.db_name
[docs] def get_db_engine_type(self) -> str:
"""
Returns the engine type (e.g. mysql).
"""
return self.db.engine.name
[docs] def get_annotation_when_no_ddr_found(self, col_name: str) -> str:
"""
Returns best-guess CRATE annotation information when no data dictionary
row is available.
Args:
col_name:
Column name.
"""
return self.annotation_from_colname.get(col_name, DDRLabels.UNKNOWN)
[docs]def template(filename: str) -> str:
"""
Returns a filename from our specific template directory.
"""
return os.path.join(TEMPLATE_DIR, filename)
[docs]def literal(
value: Any,
max_length: int = Default.MAX_VALUE_LENGTH,
truncated_suffix: str = ELLIPSIS,
) -> str:
"""
Returns a rough-and-ready SQL literal, intended for human viewing only.
Truncates long strings at a given length.
- Some duplication from within
cardinal_pythonlib.sqlalchemy.dump.get_literal_query.
- Dates/times are NOT enclosed in quotes here.
"""
if value is None:
return "NULL"
elif isinstance(value, str):
length = len(value)
if length > max_length:
value = value[:max_length]
suffix = truncated_suffix + SINGLE_QUOTE_R + f" [length {length}]"
else:
suffix = SINGLE_QUOTE_R
# We won't escape quotes. This report is about visual ease, not
# electronic exactness.
return SINGLE_QUOTE_L + value + suffix
elif isinstance(value, (float, int)):
return repr(value).replace(HYPHEN, MINUS)
elif isinstance(value, decimal.Decimal):
return str(value).replace(HYPHEN, MINUS)
elif isinstance(value, datetime.datetime) or isinstance(
value, pendulum.DateTime
):
return value.strftime(DateFormat.DATETIME)
elif isinstance(value, datetime.date) or isinstance(value, pendulum.Date):
return value.strftime(DateFormat.DATE)
elif isinstance(value, datetime.time) or isinstance(value, pendulum.Time):
return value.strftime(DateFormat.TIME)
elif isinstance(value, bytes):
return f"<binary_length_{len(value)}>"
elif isinstance(value, datetime.timedelta):
return strfdelta(value, fmt=DateFormat.TIMEDELTA)
elif isinstance(value, enum.Enum):
return f"{value.name} ({value.value})"
else:
raise NotImplementedError(
f"Don't know how to represent value {value!r}"
)
[docs]def sorter(x: Any) -> Tuple[bool, Any]:
"""
Used for sorting values that may be None/NULL. Remember that False < True,
so this puts None values lowest (first in a default sort).
"""
return x is not None, x
# =============================================================================
# Researcher report about destination database
# =============================================================================
[docs]def get_values_summary(
column: Column,
reportcfg: ResearcherReportConfig,
ddr: DataDictionaryRow = None,
) -> str:
"""
Return a textual summary of values in a column (from a de-identified
database).
Args:
column:
SQLAlchemy Column object to summarize. (It knows its own Table.)
reportcfg:
ResearcherReportConfig object, governing the report.
ddr:
Corresponding CRATE DataDictionaryRow, if there is one.
"""
if not reportcfg.show_values:
# Don't show anything.
return EN_DASH
# Otherwise, we can always do the number of distinct values:
items = [] # type: List[str]
session = reportcfg.db_session
n_distinct_notnull = session.execute(
select(func.count(distinct(column)))
).fetchone()[0]
# This does NOT include NULL values, by the SQL standard.
suffix = "" if n_distinct_notnull == 1 else "s" # "value" or "values"?
items.append(f"{n_distinct_notnull} distinct non-null value{suffix}.")
show_min_max = False
show_distinct = False # show the actual distinct values?
empty = n_distinct_notnull == 0
sensitive = (
not empty
and ddr
and (
ddr.contains_patient_info
or ddr.contains_third_party_info
or ddr.contains_scrub_src
or ddr.being_scrubbed
)
)
# ... not *actually* sensitive; merely having the appearance of being
# sensitive for a general-purpose report.
dull = (
not empty
and not sensitive
and reportcfg.use_dd
and not ddr
and column.name in reportcfg.annotation_from_colname.keys()
)
if not (empty or sensitive or dull):
# Show some more detail.
if n_distinct_notnull > 1:
show_min_max = True
if (
n_distinct_notnull <= reportcfg.max_distinct_values
or not reportcfg.skip_values_if_too_many
):
show_distinct = True
def lit(value: Any) -> str:
return literal(value, reportcfg.max_value_length)
if show_min_max:
min_val, max_val = session.execute(
select(func.min(column), func.max(column))
).fetchone()
items.append(f"Min {lit(min_val)}; max {lit(max_val)}.")
if show_distinct:
dv_rows = session.execute(
select(column)
.distinct()
.order_by(column)
.limit(reportcfg.max_value_length + 1)
).fetchall()
# These WILL include any NULL values, so there may be one more than
# n_distinct_notnull (or the same, if there are no NULLs). The only
# way to be sure if we are truncating, therefore, and to show a
# truncation indicator, is to fetch up to one more and see if we are
# over the limit.
# Sort before literal (so we get numeric, not string, sort):
distinct_values = sorted((row[0] for row in dv_rows), key=sorter)
distinct_value_elements = [lit(v) for v in distinct_values]
if len(distinct_values) > reportcfg.max_distinct_values:
distinct_value_elements = distinct_value_elements[
0 : reportcfg.max_distinct_values
] + [ELLIPSIS]
distinct_value_str = ", ".join(distinct_value_elements)
items.append(f"Distinct values: {{{distinct_value_str}}}.")
# It's a set, so use set notation.
return " ".join(items)
[docs]def mk_table_html(table_name: str, reportcfg: ResearcherReportConfig) -> str:
"""
Returns HTML for the per-table aspects of the report.
Args:
table_name:
Table to process.
reportcfg:
ResearcherReportConfig object, governing the report.
Returns:
HTML as a string.
"""
log.info(f"Processing table: {table_name}")
dest_ddr_rows = (
reportcfg.anonconfig.dd.get_rows_for_dest_table(table_name)
if reportcfg.use_dd
else []
)
session = reportcfg.db_session
n_rows = (
session.execute(
select(func.count()).select_from(table(table_name))
).fetchone()[0]
if reportcfg.show_counts
else None
)
# Rows versus records: https://dba.stackexchange.com/questions/31805/
t = reportcfg.db.metadata.tables[table_name] # type: Table
table_comment = t.comment or "" # may be blank
columns = [] # type: List[ReflectedColumnInfo]
for c in sorted(t.c, key=lambda x: x.name): # type: Column
log.debug(repr(c))
colname = c.name
if reportcfg.use_dd:
try:
ddr = next(x for x in dest_ddr_rows if x.dest_field == colname)
crate_annotation = ddr.report_dest_annotation()
except StopIteration:
ddr = None
crate_annotation = reportcfg.get_annotation_when_no_ddr_found(
col_name=colname
)
else:
ddr = None
crate_annotation = ""
values_info = get_values_summary(
column=c,
reportcfg=reportcfg,
ddr=ddr,
)
columns.append(
ReflectedColumnInfo(
column=c,
override_comment=mk_comment(reportcfg, c, ddr),
crate_annotation=crate_annotation,
values_info=values_info,
)
)
return render_to_string(
template(Templates.TABLE),
dict(
columns=columns,
n_rows=n_rows,
show_counts=reportcfg.show_counts,
show_values=reportcfg.show_values,
table_comment=table_comment,
table_name=table_name,
use_dd=reportcfg.use_dd,
),
)
[docs]def mk_researcher_report_html(
reportcfg: ResearcherReportConfig,
) -> Tuple[str, str, str]:
"""
Produces a researcher-oriented report about a destination database, as
HTML.
Args:
reportcfg:
ResearcherReportConfig object, governing the report.
Returns:
tuple: header_html, html, footer_html
"""
# -------------------------------------------------------------------------
# 1. Set up Django for templates.
# -------------------------------------------------------------------------
# https://stackoverflow.com/questions/28123603
if not settings.configured:
# Settings will already be configured when testing with pytest
settings.configure(
TEMPLATES=[
{
"BACKEND": (
"django.template.backends.django.DjangoTemplates"
),
"DIRS": [TEMPLATE_DIR],
}
]
)
django.setup()
# -------------------------------------------------------------------------
# 2. Core variables
# -------------------------------------------------------------------------
db_name = reportcfg.get_db_name()
now = format_datetime(get_now_localtz_pendulum(), DateFormat.PRETTY)
title = f"{db_name}: CRATE researcher report, {now}"
css = render_to_string(
template(Templates.STYLE),
dict(base_font_size=reportcfg.base_font_size),
)
coredict = dict(title=title, css=css, now=now)
# -------------------------------------------------------------------------
# 3. Read header/footer (e.g. for PDF page numbers).
# -------------------------------------------------------------------------
header_html = render_to_string(template(Templates.PDF_HEADER), coredict)
footer_html = render_to_string(template(Templates.PDF_FOOTER), coredict)
# -------------------------------------------------------------------------
# 4. Scan the database.
# -------------------------------------------------------------------------
table_names = sorted(reportcfg.db.table_names) # reflects (introspects)
# -------------------------------------------------------------------------
# 5. Generate our main report.
# -------------------------------------------------------------------------
table_html_list = [
mk_table_html(table_name, reportcfg) for table_name in table_names
]
html = render_to_string(
template(Templates.REPORT),
dict(
CRATE_VERSION=CRATE_VERSION,
db_engine=reportcfg.get_db_engine_type(),
db_name=db_name,
n_tables=len(table_names),
table_names=table_names,
tables_html="".join(table_html_list),
url=reportcfg.safe_db_url_if_selected(),
**coredict,
),
)
# -------------------------------------------------------------------------
# 6. Return HTML components.
# -------------------------------------------------------------------------
return header_html, html, footer_html
[docs]def mk_researcher_report_pdf(
reportcfg: ResearcherReportConfig,
) -> bool:
"""
Produces a researcher-oriented report about a destination database, as a
PDF.
Args:
reportcfg:
ResearcherReportConfig object, governing the report.
Returns:
success
"""
header_html, html, footer_html = mk_researcher_report_html(reportcfg)
log.info(f"Writing to {reportcfg.output_filename}")
return make_pdf_on_disk_from_html(
html=html,
output_path=reportcfg.output_filename,
header_html=header_html,
footer_html=footer_html,
wkhtmltopdf_options=reportcfg.wkhtmltopdf_options(),
debug_options=reportcfg.debug_pdf,
debug_content=reportcfg.debug_pdf,
debug_wkhtmltopdf_args=reportcfg.debug_pdf,
)
# =============================================================================
# Main
# =============================================================================
[docs]def main() -> None:
"""
Command-line entry point.
"""
# noinspection PyTypeChecker
parser = argparse.ArgumentParser(
description=f"""
Produce a researcher-oriented PDF report about a destination database.
({CRATE_VERSION_PRETTY})
Note: if wkhtmtopdf reports 'Too many open files', see
- https://stackoverflow.com/q/25355697;
- https://github.com/wkhtmltopdf/wkhtmltopdf/issues/3081;
setting e.g. "ulimit -n 2048" is one solution.
""",
formatter_class=RawDescriptionArgumentDefaultsRichHelpFormatter,
)
parser.add_argument("output", help="PDF output filename")
grp_db = parser.add_argument_group("DATABASE")
grp_db.add_argument(
"--config",
help=f"CRATE anonymisation config file, overriding environment "
f"variable {ANON_CONFIG_ENV_VAR}",
)
grp_db.add_argument(
"--noconfig",
action="store_true",
help="Do not use a config file (unusual)",
)
grp_db.add_argument(
"--db_url",
type=str,
default=None,
help="Database URL, overriding that in the config file",
)
grp_db.add_argument(
"--db_name",
type=str,
default=None,
help="Database name, overriding that in the config file; must be "
"specified if you use --db_url",
)
grp_detail = parser.add_argument_group("DETAIL")
grp_detail.add_argument(
"--show_url",
dest="show_url",
action="store_true",
default=False,
help="Include sanitised, password-safe version of database URL",
)
grp_detail.add_argument(
"--no_show_url",
dest="show_url",
action="store_false",
default=True,
help="Do not include database URL",
)
grp_detail.add_argument(
"--show_counts",
dest="show_counts",
action="store_true",
default=True,
help="Include row counts for each table",
)
grp_detail.add_argument(
"--no_show_counts",
dest="show_counts",
action="store_false",
default=False,
help="Do not include row counts",
)
grp_detail.add_argument(
"--use_dd",
dest="use_dd",
action="store_true",
default=True,
help="Use information obtainable from the CRATE data dictionary (DD), "
"including comments, annotations, and value suppression for "
"potentially sensitive fields; only sensible for reporting on a "
"database completely unrelated to the DD",
)
grp_detail.add_argument(
"--no_use_dd",
dest="use_dd",
action="store_false",
default=False,
help="Do not use information from the CRATE data dictionary",
)
grp_detail.add_argument(
"--show_values",
dest="show_values",
action="store_true",
default=True,
help="Include specimen values/ranges",
)
grp_detail.add_argument(
"--no_show_values",
dest="show_values",
action="store_false",
default=False,
help="Do not include specimen values/ranges",
)
grp_detail.add_argument(
"--max_distinct_values",
type=int,
default=Default.MAX_DISTINCT_VALUES,
help="Maximum number of distinct values to show, if applicable",
)
grp_detail.add_argument(
"--skip_values_if_too_many",
action="store_true",
help="If showing values, and there are more distinct values than the "
"maximum, omit them (rather than showing the first few)?",
)
grp_detail.add_argument(
"--max_value_length",
type=int,
default=Default.MAX_VALUE_LENGTH,
help="Maximum string length to show for a literal value",
)
grp_visuals = parser.add_argument_group("VISUALS")
grp_visuals.add_argument(
"--page_size",
default=Default.PAGE_SIZE,
help="Page size, i.e. paper type",
)
grp_visuals.add_argument(
"--margin_left_right",
default=Default.MARGIN_LEFT_RIGHT,
help="Page left/right margins, with units",
)
grp_visuals.add_argument(
"--margin_top_bottom",
default=Default.MARGIN_TOP_BOTTOM,
help="Page top/bottom margins for content, ignoring header/footer "
"(see --header_footer_spacing_mm), with units",
)
grp_visuals.add_argument(
"--header_footer_spacing_mm",
type=int,
default=Default.HEADER_FOOTER_SPACING_MM,
help="Gap between content and header/footer, in mm",
)
grp_visuals.add_argument(
"--orientation",
choices=["portrait", "landscape"],
default=Default.ORIENTATION,
help="Page orientation",
)
grp_visuals.add_argument(
"--base_font_size",
default=Default.BASE_FONT_SIZE,
help="Base font size, with units",
)
grp_progress = parser.add_argument_group("PROGRESS")
grp_progress.add_argument(
"--verbose", "-v", action="store_true", help="Be verbose"
)
grp_progress.add_argument(
"--debug_pdf", action="store_true", help="Debug PDF creation"
)
args = parser.parse_args()
# -------------------------------------------------------------------------
# Verbosity, logging
# -------------------------------------------------------------------------
loglevel = logging.DEBUG if args.verbose else logging.INFO
main_only_quicksetup_rootlogger(level=loglevel)
# -------------------------------------------------------------------------
# Onwards
# -------------------------------------------------------------------------
if args.config:
os.environ[ANON_CONFIG_ENV_VAR] = args.config
if args.noconfig:
log.info("Not using a CRATE anonymisation config file")
config = None
else:
from crate_anon.anonymise.config_singleton import (
config,
)
reportcfg = ResearcherReportConfig(
anonconfig=config,
base_font_size=args.base_font_size,
db_name=args.db_name,
db_url=args.db_url,
debug_pdf=args.debug_pdf,
header_footer_spacing_mm=args.header_footer_spacing_mm,
margin_left_right=args.margin_left_right,
margin_top_bottom=args.margin_top_bottom,
max_distinct_values=args.max_distinct_values,
max_value_length=args.max_value_length,
orientation=args.orientation,
output_filename=args.output,
page_size=args.page_size,
show_counts=args.show_counts,
show_url=args.show_url,
show_values=args.show_values,
skip_values_if_too_many=args.skip_values_if_too_many,
use_dd=args.use_dd,
)
mk_researcher_report_pdf(reportcfg)
if __name__ == "__main__":
main()