"""The markers module provides a way to flag errors and warnings at runtime
and have them surfaced to the user. Each marker includes:
1. A log context, which specifies which component and stream a marker
   applies to. If context is not provided explicitly then the current
   thread-local log context will be used instead. See the `factorytx.logs`
   module for more information about log contexts.
2. A category string, which is used to organize and deduplicate markers. If
   you try to add a marker when one already exists with the same context and
   category then the new marker will be ignored.
3. A level, which represents the severity of the marker. Each marker may be
   either:
   - a warning, which is an issue that did not impact data collection but
     may harm performance or correctness; or
   - an error, ie. an issue which prevents data collection.
4. A message to present to the user.
Markers can be added using the `error` and `warning` functions, or removed
when they no longer apply using `clear`. Both `error` and `warning` also log
their message so that the caller doesn't need to. Finally, markers can be
searched and retrieved using the `query` function.
Note that the `clear` function clears all markers that whose categories match
a prefix, so it can be helpful to have categories follow a naming convention.
For example, consider polling data from a SQL database. You want to capture
errors that occur while running database queries, and clear the errors whenever
a query succeeds. If you follow the convention that SQL marker categories begin
with the prefix 'sql:', eg. 'sql:connect', 'sql:auth', 'sql:query', etc. then
you can call `markers.clear('sql:')` to clear any active SQL error markers after
a successful query.
"""
from types import TracebackType
from typing import List, Optional, Type
import contextlib
import enum
import logging
import os
import sqlite3
import textwrap
import threading
import time
from datetime import datetime
from factorytx.logs import LogContext, get_context, push_context
__all__ = ['clear', 'error', 'initialize', 'setup_db', 'setup_test_db', 'warning', 'clear_components',]
log = logging.getLogger(__name__)
@enum.unique
class Level(enum.Enum):
    WARNING = "warning"
    ERROR = "error"
_LOG_LEVELS = {
    Level.WARNING: logging.WARNING,
    Level.ERROR: logging.ERROR,
}
_SQL_CONNECTION_SETUP = textwrap.dedent("""\
    -- Prevent connections from rolling journal back
    PRAGMA journal_mode = PERSIST;
    -- If app crashes, data is safe, but db may be corrupted if OS crashes
    -- or computer loses power before data is written. Commits can be faster.
    PRAGMA synchronous = OFF;
""")
_SQL_CREATE_SCHEMA = textwrap.dedent("""\
    -- Create markers table
    CREATE TABLE IF NOT EXISTS markers (
      component TEXT NOT NULL,
      component_type TEXT NOT NULL,
      asset TEXT NOT NULL,
      stream_type TEXT NOT NULL,
      category TEXT NOT NULL,
      timestamp INTEGER NOT NULL,
      level TEXT NOT NULL,
      message TEXT NOT NULL,
      PRIMARY KEY (component, asset, stream_type, category) ON CONFLICT IGNORE
    );
    -- Create migrations table
    CREATE TABLE IF NOT EXISTS migrations (
      migration_id TEXT NOT NULL,
      PRIMARY KEY (migration_id)
    );
""")
def _is_db_uri(path: str) -> bool:
    """Returns True if a path is a sqlite3 database URI, or False otherwise."""
    return path.startswith('file:')
def _get_db_uri(path: str) -> str:
    """Converts a sqlite3 database URI or path to a database URI."""
    return path if _is_db_uri(path) else f'file:{path}'
[docs]def setup_db(path: str) -> None:
    """Initializes a marker database on the filesytem at `path` if `path` does
    not already point to a valid marker database. If a marker database already
    exists but is corrupt then it will be deleted and reinitialized.
    """
    conn: Optional[sqlite3.Connection] = None
    db_uri = _get_db_uri(path)
    try:
        try:
            conn = sqlite3.connect(db_uri, uri=True)
            res, = list(conn.execute("PRAGMA integrity_check(1);"))[0]
        except sqlite3.DatabaseError:
            res = "error"
        if res != "ok":
            log.error("Markers database is corrupt, recreating it.")
            if conn is not None:
                conn.close()
                conn = None
            os.remove(path)
            conn = sqlite3.connect(db_uri, uri=True)
        assert conn is not None
        conn.executescript(_SQL_CONNECTION_SETUP)
        conn.executescript(_SQL_CREATE_SCHEMA)
    finally:
        if conn is not None:
            conn.close() 
[docs]def setup_test_db(path: str) -> None:
    """Creates a marker database without performing any correctness checks.
    Used by the unit tests to create in-memory databases.
    """
    conn = sqlite3.connect(_get_db_uri(path), uri=True)
    conn.executescript(_SQL_CONNECTION_SETUP)
    conn.executescript(_SQL_CREATE_SCHEMA) 
_conn = None
_lock = None
[docs]def initialize(path: str) -> None:
    """Configures where the markers module stores information on the filesystem
    and sets up the module's internal state. This must be called before client
    code can add, clear, or query markers.
    """
    global _conn, _lock
    # Unit tests use memory-mode databases, so skip checking for them on disk.
    if not os.path.exists(path) and not _is_db_uri(path):
        raise RuntimeError("Please call markers.setup_db before initialization")
    _conn = sqlite3.connect(_get_db_uri(path), check_same_thread=False, uri=True)
    _conn.row_factory = sqlite3.Row
    _conn.executescript(_SQL_CONNECTION_SETUP)
    _lock = threading.Lock() 
_SQL_CHECK_INSERT_MARKER = textwrap.dedent("""\
    SELECT 1
      FROM markers
     WHERE component = :component
       AND asset = :asset
       AND stream_type = :stream_type
       AND category = :category;
""")
_SQL_INSERT_MARKER = textwrap.dedent("""\
    INSERT INTO
    markers (timestamp, component, component_type, asset, stream_type, category, level, message)
    VALUES (:timestamp, :component, :component_type, :asset, :stream_type, :category, :level, :message);
""")
def _add(level: Level, context: Optional[LogContext], category: str, message: str, log_exception: bool) -> None:
    """Adds a marker to the backing store, if one doesn't already exist with the
    same context and category.
    This function also logs the marker message to cut down on boilerplate. If
    you would like to include exception information, you can pass `log_exception=True`.
    """
    if _conn is None or _lock is None:
        raise RuntimeError("Please call markers.initialize before adding markers.")
    if context is None:
        context = get_context()
    with push_context(context=context):
        log.log(_LOG_LEVELS[level], message, exc_info=log_exception)
    params = {
        'timestamp': int(time.time()),
        'component': context.component,
        'component_type': context.component_type,
        'asset': context.asset,
        'stream_type': context.stream_type,
        'category': category,
        'level': level.value,
        'message': message,
    }
    try:
        with _lock, _conn, contextlib.closing(_conn.cursor()) as cur:
            # Check if a marker already exists before writing. Otherwise we have
            # to contend with other threads to grab an exclusive lock. Luckily
            # there are no TOCTOU issues since there should be only one thread
            # running for a given component + asset + stream_type.
            if not cur.execute(_SQL_CHECK_INSERT_MARKER, params).fetchall():
                cur.execute(_SQL_INSERT_MARKER, params)
    except sqlite3.Error as e:
        # Don't kill the current thread if we can't write to the database.
        log.exception('Failed to write to marker database: %r', e)
_SQL_CHECK_DELETE_MARKER = textwrap.dedent("""\
    SELECT 1
      FROM markers
     WHERE component = :component
       AND asset = :asset
       AND stream_type = :stream_type
       AND category LIKE :category_pattern ESCAPE '\\';
""")
_SQL_DELETE = textwrap.dedent("""\
    DELETE FROM markers
    WHERE component = :component
      AND component_type = :component_type
      AND asset = :asset
      AND stream_type = :stream_type
      AND category LIKE :category_pattern ESCAPE '\\';
""")
[docs]def clear(category_prefix: str = '', context: LogContext = None) -> None:
    """Clears markers in the selected context which have a category that
    starts with `category_prefix`.
    :param category_prefix: prefix categories of markers to remove. For example,
        if you pass 'sql' then markers categorized as 'sqlalchemy:error' and
        'sql:query' will be deleted, but 'postgresql:auth' will not. If no
        prefix is specified then all markers in the selected context are deleted.
    :param context: context (component and stream) of the markers to delete.
        The context must match exactly. If no context is specified then the
        current thread's log context is used.
    """
    if _conn is None or _lock is None:
        raise RuntimeError("Please call markers.initialize before clearing markers")
    if context is None:
        context = get_context()
    escaped_category_prefix = category_prefix.replace('\\', '\\\\').replace('%', '\\%')
    params = {
        'component': context.component,
        'component_type': context.component_type,
        'asset': context.asset,
        'stream_type': context.stream_type,
        'category_pattern': f'{escaped_category_prefix}%',
    }
    try:
        with _lock, _conn, contextlib.closing(_conn.cursor()) as cur:
            # Check if a marker already exists before writing. Otherwise we have
            # to contend with other threads to grab an exclusive lock. Luckily
            # there are no TOCTOU issues since there should be only one thread
            # running for a given component + asset + stream_type.
            if cur.execute(_SQL_CHECK_DELETE_MARKER, params).fetchall():
                cur.execute(_SQL_DELETE, params)
    except sqlite3.Error as e:
        # Don't kill the current thread if we can't write to the database.
        log.exception('Failed to write to marker database: %s', repr(e)) 
[docs]def clear_components(components: List[str]) -> None:
    """Clears markers based on a list of component names. Please note that any
    component names that aren't configured will be ignored.
    """
    if _conn is None or _lock is None:
        raise RuntimeError("Please call markers.initialize before clearing markers")
    c_string = ','.join('?' * len(components))
    query = "DELETE FROM markers WHERE component in ({!s});".format(c_string)
    with _lock, _conn:
        cur = _conn.execute(query, components)
        cur.close() 
def clear_all() -> None:
    """Clears all error and warning markers."""
    if _conn is None or _lock is None:
        raise RuntimeError("Please call makers.initialize before clearing markers")
    with _lock, _conn:
        cur = _conn.execute("DELETE FROM markers")
        cur.close()
[docs]def warning(category: str, message: str, context: LogContext = None) -> None:
    """Creates a warning marker and logs a warning message.
    :param category: free-form string used to categorize the marker. This should
        follow a naming convention to make it easier to clear related markers;
        see the `clear` method for more information.
    :param message: warning message to include in the marker.
    :param context: context (component and stream) to include in the marker.
    """
    _add(Level.WARNING, context, category, message, log_exception=False) 
[docs]def error(category: str, message: str, context: LogContext = None, log_exception: bool = False) -> None:
    """Creates a error marker and logs an error message.
    :param category: free-form string used to categorize the marker. This should
        follow a naming convention to make it easier to clear related markers;
        see the `clear` method for more information.
    :param message: warning message to include in the marker.
    :param context: context (component and stream) to include in the marker.
    :param log_exception: True to include a stack trace in the log message, or
        False otherwise. The stack trace is never included in the marker itself.
    """
    _add(Level.ERROR, context, category, message, log_exception=log_exception) 
def query(component: str = None, component_type: str = None,
          asset: str = None, stream_type: str = None) -> List[dict]:
    """Returns all markers with the specified component, component_type, asset,
    and stream_type. Any omitted parameters are treated as wildcards; passing
    no parameters will cause this function to return all current markers for
    all components and streams.
    """
    if _conn is None or _lock is None:
        raise RuntimeError("Please call markers.initialize before querying markers")
    filters = []    # list of filter string, value pairs
    if component is not None:
        filters.append(('component = ?', component))
    if component_type is not None:
        filters.append(('component_type = ?', component_type))
    if asset is not None:
        filters.append(('asset = ?', asset))
    if stream_type is not None:
        filters.append(('stream_type = ?', stream_type))
    query = ['SELECT * FROM markers']
    params: List = []
    if filters:
        query.append('WHERE')
        query.append(' AND '.join(clause for clause, value in filters))
        params.extend(value for clause, value in filters)
    querytext = ' '.join(query)
    results = []
    with _lock, _conn:
        cur = _conn.execute(querytext, params)
        for row in cur:
            result = dict(row)
            result['timestamp'] = datetime.utcfromtimestamp(result['timestamp'])
            results.append(result)
        cur.close()
    return results
class MarkerContext:
    """
    A context class for that clears markers once we exit the context.
    When exceptions are thrown within a context, markers will not be cleared.
    # Simple example
    with MarkerContext("request") as ctx:
        ctx.error("Error occurred")  # A marker with the "request" error will have been generated.
    # Nested example
    with MarkerContext("request") as ctx:
        ctx.error("ctx1 error")
        with MarkerContext("request.error") as ctx2:
            ctx.error("ctx2 error")
    """
    stack: threading.local = threading.local()
    lame: bool
    prefix: str
    def __init__(self, prefix: str, context: LogContext = None, lame: bool = False):
        """Meant to be called from within a ``with`` statement. ``prefix`` will be used prepended
        to markers created with instance methods, and used for clearing if no exception is
        detected (and the context hasn't been lamed)."""
        self.lame = lame
        self.prefix = prefix
        self.context = context
        if not hasattr(MarkerContext.stack, 'prefixes'):
            MarkerContext.stack.prefixes = [prefix]
        else:
            MarkerContext.stack.prefixes.append(prefix)
    def __enter__(self) -> "MarkerContext":
        return self
    def __exit__(self,
                 exc_type: Optional[Type[BaseException]],
                 exc_value: Optional[BaseException],
                 traceback: Optional[TracebackType]) -> None:
        old_prefix = self.get_prefix()
        assert MarkerContext.stack.prefixes[-1] == self.prefix
        MarkerContext.stack.prefixes.pop()
        if self.lame:
            return
        if exc_type is not None or exc_value is not None or traceback is not None:
            return
        # ok, we're not lame and we're not exiting through an exception
        clear(old_prefix, self.context)
    def get_prefix(self) -> str:
        return ':'.join(MarkerContext.stack.prefixes)
    def make_lame(self) -> None:
        """Lame MarkerContext's will not clear their markers on exit."""
        self.lame = True
    def error(self, message: str, log_exception: bool = False) -> None:
        """Report error."""
        error(self.get_prefix(), message, self.context, log_exception)
    def warning(self, message: str) -> None:
        """Report warning."""
        warning(self.prefix, message, self.context)