Source code for crate_anon.crateweb.research.sql_writer

"""
crate_anon/crateweb/research/sql_writer.py

===============================================================================

    Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CRATE.

    CRATE is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CRATE is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CRATE. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**Automatically create/manipulate SQL statements based on our extra knowledge
of the fields that can be used to link across tables/databases.**

"""

import logging
from typing import List, Optional

from cardinal_pythonlib.sql.sql_grammar import (
    format_sql,
    SqlGrammar,
    text_from_parsed,
)
from pyparsing import ParseResults

from crate_anon.common.sql import (
    ColumnId,
    get_first_from_table,
    JoinInfo,
    parser_add_result_column,
    parser_add_from_tables,
    set_distinct_within_parsed,
    TableId,
    WhereCondition,
)
from crate_anon.crateweb.research.errors import DatabaseStructureNotUnderstood
from crate_anon.crateweb.research.research_db_info import (
    research_database_info,
)

log = logging.getLogger(__name__)


# =============================================================================
# Automatic SQL generation functions
# =============================================================================


[docs]def get_join_info( grammar: SqlGrammar, parsed: ParseResults, jointable: TableId, magic_join: bool = False, nonmagic_join_type: str = "INNER JOIN", nonmagic_join_condition: str = "", ) -> List[JoinInfo]: """ Works out how to join a new table into an existing SQL ``SELECT`` query. Args: grammar: :class:`cardinal_pythonlib.sql.sql_grammar.SqlGrammar` representing the SQL dialect/grammar in use parsed: existing :class:`pyparsing.ParseResults` representing the ``SELECT`` statement so far jointable: :class:`crate_anon.common.sql.TableId` representing the table to be joined in magic_join: perform a "magic join", i.e. join the new table in based on our knowledge of the research database structure? nonmagic_join_type: if ``not magic_join``, this is an SQL string specifying the join type, e.g. ``"INNER JOIN"`` nonmagic_join_condition: if ``not magic_join``, this is an SQL string specifying the join condition, e.g. ``"ON x = y"`` Returns: a list of :class:`crate_anon.common.sql.JoinInfo` objects, e.g. ``[JoinInfo("tablename", "INNER JOIN", "WHERE somecondition")]``. Raises: ``DatabaseStructureNotUnderstood`` if the relevant schema information cannot be looked up. Notes: - ``INNER JOIN`` etc. is part of ANSI SQL """ first_from_table = get_first_from_table(parsed) from_table_in_join_schema = get_first_from_table( parsed, match_db=jointable.db, match_schema=jointable.schema ) exact_match_table = get_first_from_table( parsed, match_db=jointable.db, match_schema=jointable.schema, match_table=jointable.table, ) if not first_from_table: # No tables in query yet. # This should not happen; this function is to help with adding # new FROM tables to existing FROM clauses. log.warning("get_join_info: no tables in query") return [] if exact_match_table: # This table is already in the query. No JOIN should be required. # log.debug("get_join_info: same table already in query") return [] if not magic_join: # log.debug("get_join_info: non-magic join") return [ JoinInfo( join_type=nonmagic_join_type, table=jointable.identifier(grammar), join_condition=nonmagic_join_condition, ) ] if from_table_in_join_schema: # Another table from the same database is present. Link on the # TRID field. # log.debug("get_join_info: joining to another table in same DB") return [ JoinInfo( join_type="INNER JOIN", table=jointable.identifier(grammar), join_condition="ON {new} = {existing}".format( new=research_database_info.get_trid_column( jointable ).identifier(grammar), existing=research_database_info.get_trid_column( from_table_in_join_schema ).identifier(grammar), ), ) ] # OK. So now we're building a cross-database join. try: existing_family = research_database_info.get_dbinfo_by_schema( first_from_table.schema_id ).rid_family new_family = research_database_info.get_dbinfo_by_schema( jointable.schema_id ).rid_family except ValueError: # Some schema information is absent. This probably means that the user # has created a custom query and passed it to the query builder; # alternatively, that the database structure has changed. Either way, # the query builder won't cope. raise DatabaseStructureNotUnderstood( "Some schema information is absent. Likely, either a custom query " "has passed to the query builder, or the database structure has " "changed since the query was written." ) # log.debug("existing_family={}, new_family={}".format( # existing_family, new_family)) if existing_family and existing_family == new_family: # log.debug("get_join_info: new DB, same RID family") return [ JoinInfo( join_type="INNER JOIN", table=jointable.identifier(grammar), join_condition="ON {new} = {existing}".format( new=research_database_info.get_rid_column( jointable ).identifier(grammar), existing=research_database_info.get_rid_column( first_from_table ).identifier(grammar), ), ) ] # If we get here, we have to do a complicated join via the MRID. # log.debug("get_join_info: new DB, different RID family, using MRID") existing_mrid_column = research_database_info.get_mrid_column_from_table( first_from_table ) existing_mrid_table = existing_mrid_column.table_id if not existing_mrid_table: raise ValueError( f"No MRID table available (in the same database as table " f"{first_from_table}; cannot link)" ) new_mrid_column = research_database_info.get_mrid_column_from_table( jointable ) new_mrid_table = new_mrid_column.table_id existing_mrid_table_in_query = bool( get_first_from_table( parsed, match_db=existing_mrid_table.db, match_schema=existing_mrid_table.schema, match_table=existing_mrid_table.table, ) ) joins = [] # type: List[JoinInfo] if not existing_mrid_table_in_query: joins.append( JoinInfo( join_type="INNER JOIN", table=existing_mrid_table.identifier(grammar), join_condition="ON {m1_trid1} = {t1_trid1}".format( m1_trid1=research_database_info.get_trid_column( existing_mrid_table ).identifier(grammar), t1_trid1=research_database_info.get_trid_column( first_from_table ).identifier(grammar), ), ) ) joins.append( JoinInfo( join_type="INNER JOIN", table=new_mrid_table.identifier(grammar), join_condition="ON {m2_mrid2} = {m1_mrid1}".format( m2_mrid2=new_mrid_column.identifier(grammar), m1_mrid1=existing_mrid_column.identifier(grammar), ), ) ) if jointable != new_mrid_table: joins.append( JoinInfo( join_type="INNER JOIN", table=jointable.identifier(grammar), join_condition="ON {t2_trid2} = {m2_trid2}".format( t2_trid2=research_database_info.get_trid_column( jointable ).identifier(grammar), m2_trid2=research_database_info.get_trid_column( new_mrid_table ).identifier(grammar), ), ) ) return joins
[docs]class SelectElement: """ Class to represent a result column in an SQL ``SELECT`` statement. """
[docs] def __init__( self, column_id: ColumnId = None, raw_select: str = "", from_table_for_raw_select: TableId = None, alias: str = "", ): """ Args: column_id: a :class:`crate_anon.common.sql.ColumnId` object; using this will automatically add the column's table to the ``FROM`` clause raw_select: as an alternative to ``column_id``, raw SQL for the ``SELECT`` clause from_table_for_raw_select: if ``raw_select`` is used, a :class:`crate_anon.common.sql.TableId` that should be added to the ``FROM`` clause alias: alias to be used, i.e. for ``SELECT something AS alias`` """ self.column_id = column_id self.raw_select = raw_select self.from_table_for_raw_select = from_table_for_raw_select self.alias = alias
def __repr__(self) -> str: return ( "<{qualname}(" "column_id={column_id}, " "raw_select={raw_select}, " "from_table_for_raw_select={from_table_for_raw_select}, " "alias={alias}) " "at {addr}>".format( qualname=self.__class__.__qualname__, column_id=repr(self.column_id), raw_select=repr(self.raw_select), from_table_for_raw_select=repr(self.from_table_for_raw_select), alias=repr(self.alias), addr=hex(id(self)), ) )
[docs] def sql_select_column(self, grammar: SqlGrammar) -> str: """ Return the raw SQL for this ``SELECT`` result column. Args: grammar: :class:`cardinal_pythonlib.sql.sql_grammar.SqlGrammar` representing the SQL dialect/grammar in use Returns: str: SQL like ``colname`` or ``expression`` or ``colname AS alias`` """ result = self.raw_select or self.column_id.identifier(grammar) if self.alias: result += " AS " + self.alias return result
[docs] def from_table(self) -> Optional[TableId]: """ Returns details of the table to be added to the ``FROM`` clause of the ``SELECT`` statement. Returns: a :class:`crate_anon.common.sql.TableId`, or ``None`` (if ``raw_select`` is used and ``from_table_for_raw_select`` was not specified) """ if self.raw_select: return self.from_table_for_raw_select return self.column_id.table_id
[docs] def from_table_str(self, grammar: SqlGrammar) -> str: """ Returns a string form of :meth:`from_table`, i.e. an SQL identifier for the ``FROM`` clause. Args: grammar: :class:`cardinal_pythonlib.sql.sql_grammar.SqlGrammar` representing the SQL dialect/grammar in use Returns: str: SQL like ``from_table`` """ table_id = self.from_table() if not table_id: return "" return table_id.identifier(grammar)
[docs] def sql_select_from(self, grammar: SqlGrammar) -> str: """ Returns a full ``SELECT... FROM...`` statement. Args: grammar: :class:`cardinal_pythonlib.sql.sql_grammar.SqlGrammar` representing the SQL dialect/grammar in use Returns: str: SQL like ``SELECT colname AS alias FROM from_table`` """ sql = "SELECT " + self.sql_select_column(grammar=grammar) from_table = self.from_table() if from_table: sql += " FROM " + from_table.identifier(grammar) return sql
[docs]def reparse_select(p: ParseResults, grammar: SqlGrammar) -> ParseResults: """ Internal function for when we get desperate trying to hack around the results of ``pyparsing``'s efforts. - takes a :class:`pyparsing.ParseResults` - converts it to an SQL string - parses the string as a ``SELECT`` statement - returns the resulting :class:`pyparsing.ParseResults` """ return grammar.get_select_statement().parseString( text_from_parsed(p, formatted=False), parseAll=True )
[docs]def add_to_select( sql: str, grammar: SqlGrammar, select_elements: List[SelectElement] = None, where_conditions: List[WhereCondition] = None, # For SELECT: distinct: bool = None, # For WHERE: where_type: str = "AND", bracket_where: bool = False, # For either, for JOIN: magic_join: bool = True, join_type: str = "NATURAL JOIN", join_condition: str = "", # General: formatted: bool = True, debug: bool = False, debug_verbose: bool = False, ) -> str: """ This function encapsulates our query builder's common operations. One premise is that SQL parsing is relatively slow, so we should do this only once. We parse; add bits to the parsed structure as required; then re-convert to text. If you specify table/column, elements will be added to ``SELECT`` and ``FROM`` unless they already exist. If you specify ``where_expression``, elements will be added to ``WHERE``. In this situation, you should also specify ``where_table``; if the ``where_table`` isn't yet in the ``FROM`` clause, this will be added as well. Parsing is SLOW, so we should do as much as possible in a single call to this function. Args: sql: existing SQL statement grammar: :class:`cardinal_pythonlib.sql.sql_grammar.SqlGrammar` representing the SQL dialect/grammar in use select_elements: optional list of :class:`SelectElement` objects representing things to add to the ``SELECT`` clause of the ``SELECT`` statement (i.e. results columns) where_conditions: optional list of :class:`crate_anon.common.sql.WhereCondition` representing conditions to add to the ``WHERE`` clause of the ``SELECT`` statement distinct: if ``True``, make the ``SELECT`` statement a ``SELECT DISTINCT``; if ``False``, remove any ``DISTINCT``; if ``None``, leave the ``DISTINCT`` status as it is. where_type: logical operator with which to join multiple parts of the ``WHERE`` expression, typically ``AND`` (but maybe ``OR``, etc.) bracket_where: put brackets ``()`` around each new part of the ``WHERE`` expression? magic_join: perform a "magic join", i.e. join the new table in based on our knowledge of the research database structure? join_type: if ``not magic_join``, this is an SQL string specifying the join type, e.g. ``"INNER JOIN"`` join_condition: if ``not magic_join``, this is an SQL string specifying the join condition, e.g. ``"ON x = y"`` formatted: reformat the SQL to look pretty? debug: show debugging information debug_verbose: show verbose debugging information Returns: str: SQL statement Raises: ``DatabaseStructureNotUnderstood`` if the relevant schema information cannot be looked up. """ select_elements = select_elements or [] # type: List[SelectElement] where_conditions = where_conditions or [] # type: List[WhereCondition] if debug: log.info(f"START: {sql}") log.debug(f"select_elements: {select_elements}") log.debug(f"where_conditions: {where_conditions}") log.debug(f"where_type: {where_type}") log.debug(f"join_type: {join_type}") log.debug(f"join_condition: {join_condition}") # ------------------------------------------------------------------------- # Get going. We have to handle a fresh SQL statement in a slightly # different way. # ------------------------------------------------------------------------- if not sql: if not select_elements: raise ValueError( "Fresh SQL statements must include a SELECT " "element" ) # --------------------------------------------------------------------- # Fresh SQL statement # --------------------------------------------------------------------- first_select = select_elements[0] select_elements = select_elements[1:] sql = first_select.sql_select_from(grammar) # log.debug("Starting SQL from scratch as: " + sql) # ------------------------------------------------------------------------- # Parse what we have (which is now, at a minimum, SELECT ... FROM ...). # ------------------------------------------------------------------------- p = grammar.get_select_statement().parseString(sql, parseAll=True) if debug and debug_verbose: log.debug("start dump:\n" + p.dump()) existing_tables = p.join_source.from_tables.asList() # type: List[str] new_tables = [] # type: List[TableId] def add_new_table(_table_id: TableId) -> None: if ( _table_id and _table_id not in new_tables and _table_id.identifier(grammar) not in existing_tables ): new_tables.append(_table_id) # ------------------------------------------------------------------------- # DISTINCT? # ------------------------------------------------------------------------- if distinct is True: set_distinct_within_parsed(p, action="set") elif distinct is False: set_distinct_within_parsed(p, action="clear") # ------------------------------------------------------------------------- # Process all the (other?) SELECT clauses # ------------------------------------------------------------------------- for se in select_elements: p = parser_add_result_column( p, se.sql_select_column(grammar), grammar=grammar ) add_new_table(se.from_table()) # ------------------------------------------------------------------------- # Process all the WHERE clauses # ------------------------------------------------------------------------- for wc in where_conditions: where_expression = wc.sql(grammar) if bracket_where: where_expression = "(" + where_expression + ")" # The tricky bit: inserting it. # We use the [0] to overcome the effects of defining these things # as a pyparsing Group(), which encapsulates the results in a list. if p.where_clause: cond = grammar.get_expr().parseString( where_expression, parseAll=True )[0] extra = [where_type, cond] p.where_clause.where_expr.extend(extra) else: # No WHERE as yet # Doing this properly is a nightmare. # It's hard to add a *named* ParseResults element to another. # So it's very hard to alter p.where_clause.where_expr such that # we can continue adding more WHERE clauses if we want. # This is the inefficient, cop-out method: # (1) Add as plain text p.where_clause.append("WHERE " + where_expression) # (2) Reparse... p = reparse_select(p, grammar=grammar) add_new_table(wc.table_id) # ------------------------------------------------------------------------- # Process all the FROM clauses, autojoining as necessary # ------------------------------------------------------------------------- for table_id in new_tables: p = parser_add_from_tables( p, get_join_info( grammar=grammar, parsed=p, jointable=table_id, magic_join=magic_join, nonmagic_join_type=join_type, nonmagic_join_condition=join_condition, ), # may raise DatabaseStructureNotUnderstood grammar=grammar, ) if debug and debug_verbose: log.debug("end dump:\n" + p.dump()) result = text_from_parsed(p, formatted=False) if formatted: result = format_sql(result) if debug: log.info(f"END: {result}") return result