Source code for factorytx.markers

"""The markers module provides a way to flag errors and warnings at runtime
and have them surfaced to the user. Each marker includes:

1. A log context, which specifies which component and stream a marker
   applies to. If context is not provided explicitly then the current
   thread-local log context will be used instead. See the `factorytx.logs`
   module for more information about log contexts.

2. A category string, which is used to organize and deduplicate markers. If
   you try to add a marker when one already exists with the same context and
   category then the new marker will be ignored.

3. A level, which represents the severity of the marker. Each marker may be
   either:

   - a warning, which is an issue that did not impact data collection but
     may harm performance or correctness; or
   - an error, ie. an issue which prevents data collection.

4. A message to present to the user.

Markers can be added using the `error` and `warning` functions, or removed
when they no longer apply using `clear`. Both `error` and `warning` also log
their message so that the caller doesn't need to. Finally, markers can be
searched and retrieved using the `query` function.

Note that the `clear` function clears all markers that whose categories match
a prefix, so it can be helpful to have categories follow a naming convention.
For example, consider polling data from a SQL database. You want to capture
errors that occur while running database queries, and clear the errors whenever
a query succeeds. If you follow the convention that SQL marker categories begin
with the prefix 'sql:', eg. 'sql:connect', 'sql:auth', 'sql:query', etc. then
you can call `markers.clear('sql:')` to clear any active SQL error markers after
a successful query.

"""
from types import TracebackType
from typing import List, Optional, Type

import contextlib
import enum
import logging
import os
import sqlite3
import textwrap
import threading
import time
from datetime import datetime

from factorytx.logs import LogContext, get_context, push_context


__all__ = ['clear', 'error', 'initialize', 'setup_db', 'setup_test_db', 'warning', 'clear_components',]

log = logging.getLogger(__name__)


@enum.unique
class Level(enum.Enum):
    WARNING = "warning"
    ERROR = "error"


_LOG_LEVELS = {
    Level.WARNING: logging.WARNING,
    Level.ERROR: logging.ERROR,
}

_SQL_CONNECTION_SETUP = textwrap.dedent("""\
    -- Prevent connections from rolling journal back
    PRAGMA journal_mode = PERSIST;

    -- If app crashes, data is safe, but db may be corrupted if OS crashes
    -- or computer loses power before data is written. Commits can be faster.
    PRAGMA synchronous = OFF;
""")

_SQL_CREATE_SCHEMA = textwrap.dedent("""\
    -- Create markers table
    CREATE TABLE IF NOT EXISTS markers (
      component TEXT NOT NULL,
      component_type TEXT NOT NULL,
      asset TEXT NOT NULL,
      stream_type TEXT NOT NULL,
      category TEXT NOT NULL,
      timestamp INTEGER NOT NULL,
      level TEXT NOT NULL,
      message TEXT NOT NULL,
      PRIMARY KEY (component, asset, stream_type, category) ON CONFLICT IGNORE
    );

    -- Create migrations table
    CREATE TABLE IF NOT EXISTS migrations (
      migration_id TEXT NOT NULL,
      PRIMARY KEY (migration_id)
    );
""")


def _is_db_uri(path: str) -> bool:
    """Returns True if a path is a sqlite3 database URI, or False otherwise."""
    return path.startswith('file:')


def _get_db_uri(path: str) -> str:
    """Converts a sqlite3 database URI or path to a database URI."""
    return path if _is_db_uri(path) else f'file:{path}'


[docs]def setup_db(path: str) -> None: """Initializes a marker database on the filesytem at `path` if `path` does not already point to a valid marker database. If a marker database already exists but is corrupt then it will be deleted and reinitialized. """ conn: Optional[sqlite3.Connection] = None db_uri = _get_db_uri(path) try: try: conn = sqlite3.connect(db_uri, uri=True) res, = list(conn.execute("PRAGMA integrity_check(1);"))[0] except sqlite3.DatabaseError: res = "error" if res != "ok": log.error("Markers database is corrupt, recreating it.") if conn is not None: conn.close() conn = None if os.path.exists(path): os.remove(path) conn = sqlite3.connect(db_uri, uri=True) assert conn is not None conn.executescript(_SQL_CONNECTION_SETUP) conn.executescript(_SQL_CREATE_SCHEMA) finally: if conn is not None: conn.close()
[docs]def setup_test_db(path: str) -> None: """Creates a marker database without performing any correctness checks. Used by the unit tests to create in-memory databases. """ conn = sqlite3.connect(_get_db_uri(path), uri=True) conn.executescript(_SQL_CONNECTION_SETUP) conn.executescript(_SQL_CREATE_SCHEMA)
_conn = None _lock = None
[docs]def initialize(path: str) -> None: """Configures where the markers module stores information on the filesystem and sets up the module's internal state. This must be called before client code can add, clear, or query markers. """ global _conn, _lock # Unit tests use memory-mode databases, so skip checking for them on disk. if not os.path.exists(path) and not _is_db_uri(path): raise RuntimeError("Please call markers.setup_db before initialization") _conn = sqlite3.connect(_get_db_uri(path), check_same_thread=False, uri=True) _conn.row_factory = sqlite3.Row _conn.executescript(_SQL_CONNECTION_SETUP) _lock = threading.Lock()
_SQL_CHECK_INSERT_MARKER = textwrap.dedent("""\ SELECT 1 FROM markers WHERE component = :component AND asset = :asset AND stream_type = :stream_type AND category = :category; """) _SQL_INSERT_MARKER = textwrap.dedent("""\ INSERT INTO markers (timestamp, component, component_type, asset, stream_type, category, level, message) VALUES (:timestamp, :component, :component_type, :asset, :stream_type, :category, :level, :message); """) def _add(level: Level, context: Optional[LogContext], category: str, message: str, log_exception: bool) -> None: """Adds a marker to the backing store, if one doesn't already exist with the same context and category. This function also logs the marker message to cut down on boilerplate. If you would like to include exception information, you can pass `log_exception=True`. """ if _conn is None or _lock is None: raise RuntimeError("Please call markers.initialize before adding markers.") if context is None: context = get_context() with push_context(context=context): log.log(_LOG_LEVELS[level], message, exc_info=log_exception) params = { 'timestamp': int(time.time()), 'component': context.component, 'component_type': context.component_type, 'asset': context.asset, 'stream_type': context.stream_type, 'category': category, 'level': level.value, 'message': message, } try: with _lock, _conn, contextlib.closing(_conn.cursor()) as cur: # Check if a marker already exists before writing. Otherwise we have # to contend with other threads to grab an exclusive lock. Luckily # there are no TOCTOU issues since there should be only one thread # running for a given component + asset + stream_type. if not cur.execute(_SQL_CHECK_INSERT_MARKER, params).fetchall(): cur.execute(_SQL_INSERT_MARKER, params) except sqlite3.Error as e: # Don't kill the current thread if we can't write to the database. log.exception('Failed to write to marker database: %r', e) _SQL_CHECK_DELETE_MARKER = textwrap.dedent("""\ SELECT 1 FROM markers WHERE component = :component AND asset = :asset AND stream_type = :stream_type AND category LIKE :category_pattern ESCAPE '\\'; """) _SQL_DELETE = textwrap.dedent("""\ DELETE FROM markers WHERE component = :component AND component_type = :component_type AND asset = :asset AND stream_type = :stream_type AND category LIKE :category_pattern ESCAPE '\\'; """)
[docs]def clear(category_prefix: str = '', context: LogContext = None) -> None: """Clears markers in the selected context which have a category that starts with `category_prefix`. :param category_prefix: prefix categories of markers to remove. For example, if you pass 'sql' then markers categorized as 'sqlalchemy:error' and 'sql:query' will be deleted, but 'postgresql:auth' will not. If no prefix is specified then all markers in the selected context are deleted. :param context: context (component and stream) of the markers to delete. The context must match exactly. If no context is specified then the current thread's log context is used. """ if _conn is None or _lock is None: raise RuntimeError("Please call markers.initialize before clearing markers") if context is None: context = get_context() escaped_category_prefix = category_prefix.replace('\\', '\\\\').replace('%', '\\%') params = { 'component': context.component, 'component_type': context.component_type, 'asset': context.asset, 'stream_type': context.stream_type, 'category_pattern': f'{escaped_category_prefix}%', } try: with _lock, _conn, contextlib.closing(_conn.cursor()) as cur: # Check if a marker already exists before writing. Otherwise we have # to contend with other threads to grab an exclusive lock. Luckily # there are no TOCTOU issues since there should be only one thread # running for a given component + asset + stream_type. if cur.execute(_SQL_CHECK_DELETE_MARKER, params).fetchall(): cur.execute(_SQL_DELETE, params) except sqlite3.Error as e: # Don't kill the current thread if we can't write to the database. log.exception('Failed to write to marker database: %s', repr(e))
[docs]def clear_components(components: List[str]) -> None: """Clears markers based on a list of component names. Please note that any component names that aren't configured will be ignored. """ if _conn is None or _lock is None: raise RuntimeError("Please call markers.initialize before clearing markers") c_string = ','.join('?' * len(components)) query = "DELETE FROM markers WHERE component in ({!s});".format(c_string) with _lock, _conn: cur = _conn.execute(query, components) cur.close()
def clear_all() -> None: """Clears all error and warning markers.""" if _conn is None or _lock is None: raise RuntimeError("Please call makers.initialize before clearing markers") with _lock, _conn: cur = _conn.execute("DELETE FROM markers") cur.close()
[docs]def warning(category: str, message: str, context: LogContext = None) -> None: """Creates a warning marker and logs a warning message. :param category: free-form string used to categorize the marker. This should follow a naming convention to make it easier to clear related markers; see the `clear` method for more information. :param message: warning message to include in the marker. :param context: context (component and stream) to include in the marker. """ _add(Level.WARNING, context, category, message, log_exception=False)
[docs]def error(category: str, message: str, context: LogContext = None, log_exception: bool = False) -> None: """Creates a error marker and logs an error message. :param category: free-form string used to categorize the marker. This should follow a naming convention to make it easier to clear related markers; see the `clear` method for more information. :param message: warning message to include in the marker. :param context: context (component and stream) to include in the marker. :param log_exception: True to include a stack trace in the log message, or False otherwise. The stack trace is never included in the marker itself. """ _add(Level.ERROR, context, category, message, log_exception=log_exception)
def query(component: str = None, component_type: str = None, asset: str = None, stream_type: str = None) -> List[dict]: """Returns all markers with the specified component, component_type, asset, and stream_type. Any omitted parameters are treated as wildcards; passing no parameters will cause this function to return all current markers for all components and streams. """ if _conn is None or _lock is None: raise RuntimeError("Please call markers.initialize before querying markers") filters = [] # list of filter string, value pairs if component is not None: filters.append(('component = ?', component)) if component_type is not None: filters.append(('component_type = ?', component_type)) if asset is not None: filters.append(('asset = ?', asset)) if stream_type is not None: filters.append(('stream_type = ?', stream_type)) query = ['SELECT * FROM markers'] params: List = [] if filters: query.append('WHERE') query.append(' AND '.join(clause for clause, value in filters)) params.extend(value for clause, value in filters) querytext = ' '.join(query) results = [] with _lock, _conn: cur = _conn.execute(querytext, params) for row in cur: result = dict(row) result['timestamp'] = datetime.utcfromtimestamp(result['timestamp']) results.append(result) cur.close() return results class MarkerContext: """ A context class for that clears markers once we exit the context. When exceptions are thrown within a context, markers will not be cleared. # Simple example with MarkerContext("request") as ctx: ctx.error("Error occurred") # A marker with the "request" error will have been generated. # Nested example with MarkerContext("request") as ctx: ctx.error("ctx1 error") with MarkerContext("request.error") as ctx2: ctx.error("ctx2 error") """ stack: threading.local = threading.local() lame: bool prefix: str def __init__(self, prefix: str, context: LogContext = None, lame: bool = False): """Meant to be called from within a ``with`` statement. ``prefix`` will be used prepended to markers created with instance methods, and used for clearing if no exception is detected (and the context hasn't been lamed).""" self.lame = lame self.prefix = prefix self.context = context if not hasattr(MarkerContext.stack, 'prefixes'): MarkerContext.stack.prefixes = [prefix] else: MarkerContext.stack.prefixes.append(prefix) def __enter__(self) -> "MarkerContext": return self def __exit__(self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType]) -> None: old_prefix = self.get_prefix() assert MarkerContext.stack.prefixes[-1] == self.prefix MarkerContext.stack.prefixes.pop() if self.lame: return if exc_type is not None or exc_value is not None or traceback is not None: return # ok, we're not lame and we're not exiting through an exception clear(old_prefix, self.context) def get_prefix(self) -> str: return ':'.join(MarkerContext.stack.prefixes) def make_lame(self) -> None: """Lame MarkerContext's will not clear their markers on exit.""" self.lame = True def error(self, message: str, log_exception: bool = False) -> None: """Report error.""" error(self.get_prefix(), message, self.context, log_exception) def warning(self, message: str) -> None: """Report warning.""" warning(self.prefix, message, self.context)