"""The markers module provides a way to flag errors and warnings at runtime
and have them surfaced to the user. Each marker includes:
1. A log context, which specifies which component and stream a marker
applies to. If context is not provided explicitly then the current
thread-local log context will be used instead. See the `factorytx.logs`
module for more information about log contexts.
2. A category string, which is used to organize and deduplicate markers. If
you try to add a marker when one already exists with the same context and
category then the new marker will be ignored.
3. A level, which represents the severity of the marker. Each marker may be
either:
- a warning, which is an issue that did not impact data collection but
may harm performance or correctness; or
- an error, ie. an issue which prevents data collection.
4. A message to present to the user.
Markers can be added using the `error` and `warning` functions, or removed
when they no longer apply using `clear`. Both `error` and `warning` also log
their message so that the caller doesn't need to. Finally, markers can be
searched and retrieved using the `query` function.
Note that the `clear` function clears all markers that whose categories match
a prefix, so it can be helpful to have categories follow a naming convention.
For example, consider polling data from a SQL database. You want to capture
errors that occur while running database queries, and clear the errors whenever
a query succeeds. If you follow the convention that SQL marker categories begin
with the prefix 'sql:', eg. 'sql:connect', 'sql:auth', 'sql:query', etc. then
you can call `markers.clear('sql:')` to clear any active SQL error markers after
a successful query.
"""
from types import TracebackType
from typing import List, Optional, Type
import contextlib
import enum
import logging
import os
import sqlite3
import textwrap
import threading
import time
from datetime import datetime
from factorytx.logs import LogContext, get_context, push_context
__all__ = ['clear', 'error', 'initialize', 'setup_db', 'setup_test_db', 'warning', 'clear_components',]
log = logging.getLogger(__name__)
@enum.unique
class Level(enum.Enum):
WARNING = "warning"
ERROR = "error"
_LOG_LEVELS = {
Level.WARNING: logging.WARNING,
Level.ERROR: logging.ERROR,
}
_SQL_CONNECTION_SETUP = textwrap.dedent("""\
-- Prevent connections from rolling journal back
PRAGMA journal_mode = PERSIST;
-- If app crashes, data is safe, but db may be corrupted if OS crashes
-- or computer loses power before data is written. Commits can be faster.
PRAGMA synchronous = OFF;
""")
_SQL_CREATE_SCHEMA = textwrap.dedent("""\
-- Create markers table
CREATE TABLE IF NOT EXISTS markers (
component TEXT NOT NULL,
component_type TEXT NOT NULL,
asset TEXT NOT NULL,
stream_type TEXT NOT NULL,
category TEXT NOT NULL,
timestamp INTEGER NOT NULL,
level TEXT NOT NULL,
message TEXT NOT NULL,
PRIMARY KEY (component, asset, stream_type, category) ON CONFLICT IGNORE
);
-- Create migrations table
CREATE TABLE IF NOT EXISTS migrations (
migration_id TEXT NOT NULL,
PRIMARY KEY (migration_id)
);
""")
def _is_db_uri(path: str) -> bool:
"""Returns True if a path is a sqlite3 database URI, or False otherwise."""
return path.startswith('file:')
def _get_db_uri(path: str) -> str:
"""Converts a sqlite3 database URI or path to a database URI."""
return path if _is_db_uri(path) else f'file:{path}'
[docs]def setup_db(path: str) -> None:
"""Initializes a marker database on the filesytem at `path` if `path` does
not already point to a valid marker database. If a marker database already
exists but is corrupt then it will be deleted and reinitialized.
"""
conn: Optional[sqlite3.Connection] = None
db_uri = _get_db_uri(path)
try:
try:
conn = sqlite3.connect(db_uri, uri=True)
res, = list(conn.execute("PRAGMA integrity_check(1);"))[0]
except sqlite3.DatabaseError:
res = "error"
if res != "ok":
log.error("Markers database is corrupt, recreating it.")
if conn is not None:
conn.close()
conn = None
if os.path.exists(path):
os.remove(path)
conn = sqlite3.connect(db_uri, uri=True)
assert conn is not None
conn.executescript(_SQL_CONNECTION_SETUP)
conn.executescript(_SQL_CREATE_SCHEMA)
finally:
if conn is not None:
conn.close()
[docs]def setup_test_db(path: str) -> None:
"""Creates a marker database without performing any correctness checks.
Used by the unit tests to create in-memory databases.
"""
conn = sqlite3.connect(_get_db_uri(path), uri=True)
conn.executescript(_SQL_CONNECTION_SETUP)
conn.executescript(_SQL_CREATE_SCHEMA)
_conn = None
_lock = None
[docs]def initialize(path: str) -> None:
"""Configures where the markers module stores information on the filesystem
and sets up the module's internal state. This must be called before client
code can add, clear, or query markers.
"""
global _conn, _lock
# Unit tests use memory-mode databases, so skip checking for them on disk.
if not os.path.exists(path) and not _is_db_uri(path):
raise RuntimeError("Please call markers.setup_db before initialization")
_conn = sqlite3.connect(_get_db_uri(path), check_same_thread=False, uri=True)
_conn.row_factory = sqlite3.Row
_conn.executescript(_SQL_CONNECTION_SETUP)
_lock = threading.Lock()
_SQL_CHECK_INSERT_MARKER = textwrap.dedent("""\
SELECT 1
FROM markers
WHERE component = :component
AND asset = :asset
AND stream_type = :stream_type
AND category = :category;
""")
_SQL_INSERT_MARKER = textwrap.dedent("""\
INSERT INTO
markers (timestamp, component, component_type, asset, stream_type, category, level, message)
VALUES (:timestamp, :component, :component_type, :asset, :stream_type, :category, :level, :message);
""")
def _add(level: Level, context: Optional[LogContext], category: str, message: str, log_exception: bool) -> None:
"""Adds a marker to the backing store, if one doesn't already exist with the
same context and category.
This function also logs the marker message to cut down on boilerplate. If
you would like to include exception information, you can pass `log_exception=True`.
"""
if _conn is None or _lock is None:
raise RuntimeError("Please call markers.initialize before adding markers.")
if context is None:
context = get_context()
with push_context(context=context):
log.log(_LOG_LEVELS[level], message, exc_info=log_exception)
params = {
'timestamp': int(time.time()),
'component': context.component,
'component_type': context.component_type,
'asset': context.asset,
'stream_type': context.stream_type,
'category': category,
'level': level.value,
'message': message,
}
try:
with _lock, _conn, contextlib.closing(_conn.cursor()) as cur:
# Check if a marker already exists before writing. Otherwise we have
# to contend with other threads to grab an exclusive lock. Luckily
# there are no TOCTOU issues since there should be only one thread
# running for a given component + asset + stream_type.
if not cur.execute(_SQL_CHECK_INSERT_MARKER, params).fetchall():
cur.execute(_SQL_INSERT_MARKER, params)
except sqlite3.Error as e:
# Don't kill the current thread if we can't write to the database.
log.exception('Failed to write to marker database: %r', e)
_SQL_CHECK_DELETE_MARKER = textwrap.dedent("""\
SELECT 1
FROM markers
WHERE component = :component
AND asset = :asset
AND stream_type = :stream_type
AND category LIKE :category_pattern ESCAPE '\\';
""")
_SQL_DELETE = textwrap.dedent("""\
DELETE FROM markers
WHERE component = :component
AND component_type = :component_type
AND asset = :asset
AND stream_type = :stream_type
AND category LIKE :category_pattern ESCAPE '\\';
""")
[docs]def clear(category_prefix: str = '', context: LogContext = None) -> None:
"""Clears markers in the selected context which have a category that
starts with `category_prefix`.
:param category_prefix: prefix categories of markers to remove. For example,
if you pass 'sql' then markers categorized as 'sqlalchemy:error' and
'sql:query' will be deleted, but 'postgresql:auth' will not. If no
prefix is specified then all markers in the selected context are deleted.
:param context: context (component and stream) of the markers to delete.
The context must match exactly. If no context is specified then the
current thread's log context is used.
"""
if _conn is None or _lock is None:
raise RuntimeError("Please call markers.initialize before clearing markers")
if context is None:
context = get_context()
escaped_category_prefix = category_prefix.replace('\\', '\\\\').replace('%', '\\%')
params = {
'component': context.component,
'component_type': context.component_type,
'asset': context.asset,
'stream_type': context.stream_type,
'category_pattern': f'{escaped_category_prefix}%',
}
try:
with _lock, _conn, contextlib.closing(_conn.cursor()) as cur:
# Check if a marker already exists before writing. Otherwise we have
# to contend with other threads to grab an exclusive lock. Luckily
# there are no TOCTOU issues since there should be only one thread
# running for a given component + asset + stream_type.
if cur.execute(_SQL_CHECK_DELETE_MARKER, params).fetchall():
cur.execute(_SQL_DELETE, params)
except sqlite3.Error as e:
# Don't kill the current thread if we can't write to the database.
log.exception('Failed to write to marker database: %s', repr(e))
[docs]def clear_components(components: List[str]) -> None:
"""Clears markers based on a list of component names. Please note that any
component names that aren't configured will be ignored.
"""
if _conn is None or _lock is None:
raise RuntimeError("Please call markers.initialize before clearing markers")
c_string = ','.join('?' * len(components))
query = "DELETE FROM markers WHERE component in ({!s});".format(c_string)
with _lock, _conn:
cur = _conn.execute(query, components)
cur.close()
def clear_all() -> None:
"""Clears all error and warning markers."""
if _conn is None or _lock is None:
raise RuntimeError("Please call makers.initialize before clearing markers")
with _lock, _conn:
cur = _conn.execute("DELETE FROM markers")
cur.close()
[docs]def warning(category: str, message: str, context: LogContext = None) -> None:
"""Creates a warning marker and logs a warning message.
:param category: free-form string used to categorize the marker. This should
follow a naming convention to make it easier to clear related markers;
see the `clear` method for more information.
:param message: warning message to include in the marker.
:param context: context (component and stream) to include in the marker.
"""
_add(Level.WARNING, context, category, message, log_exception=False)
[docs]def error(category: str, message: str, context: LogContext = None, log_exception: bool = False) -> None:
"""Creates a error marker and logs an error message.
:param category: free-form string used to categorize the marker. This should
follow a naming convention to make it easier to clear related markers;
see the `clear` method for more information.
:param message: warning message to include in the marker.
:param context: context (component and stream) to include in the marker.
:param log_exception: True to include a stack trace in the log message, or
False otherwise. The stack trace is never included in the marker itself.
"""
_add(Level.ERROR, context, category, message, log_exception=log_exception)
def query(component: str = None, component_type: str = None,
asset: str = None, stream_type: str = None) -> List[dict]:
"""Returns all markers with the specified component, component_type, asset,
and stream_type. Any omitted parameters are treated as wildcards; passing
no parameters will cause this function to return all current markers for
all components and streams.
"""
if _conn is None or _lock is None:
raise RuntimeError("Please call markers.initialize before querying markers")
filters = [] # list of filter string, value pairs
if component is not None:
filters.append(('component = ?', component))
if component_type is not None:
filters.append(('component_type = ?', component_type))
if asset is not None:
filters.append(('asset = ?', asset))
if stream_type is not None:
filters.append(('stream_type = ?', stream_type))
query = ['SELECT * FROM markers']
params: List = []
if filters:
query.append('WHERE')
query.append(' AND '.join(clause for clause, value in filters))
params.extend(value for clause, value in filters)
querytext = ' '.join(query)
results = []
with _lock, _conn:
cur = _conn.execute(querytext, params)
for row in cur:
result = dict(row)
result['timestamp'] = datetime.utcfromtimestamp(result['timestamp'])
results.append(result)
cur.close()
return results
class MarkerContext:
"""
A context class for that clears markers once we exit the context.
When exceptions are thrown within a context, markers will not be cleared.
# Simple example
with MarkerContext("request") as ctx:
ctx.error("Error occurred") # A marker with the "request" error will have been generated.
# Nested example
with MarkerContext("request") as ctx:
ctx.error("ctx1 error")
with MarkerContext("request.error") as ctx2:
ctx.error("ctx2 error")
"""
stack: threading.local = threading.local()
lame: bool
prefix: str
def __init__(self, prefix: str, context: LogContext = None, lame: bool = False):
"""Meant to be called from within a ``with`` statement. ``prefix`` will be used prepended
to markers created with instance methods, and used for clearing if no exception is
detected (and the context hasn't been lamed)."""
self.lame = lame
self.prefix = prefix
self.context = context
if not hasattr(MarkerContext.stack, 'prefixes'):
MarkerContext.stack.prefixes = [prefix]
else:
MarkerContext.stack.prefixes.append(prefix)
def __enter__(self) -> "MarkerContext":
return self
def __exit__(self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType]) -> None:
old_prefix = self.get_prefix()
assert MarkerContext.stack.prefixes[-1] == self.prefix
MarkerContext.stack.prefixes.pop()
if self.lame:
return
if exc_type is not None or exc_value is not None or traceback is not None:
return
# ok, we're not lame and we're not exiting through an exception
clear(old_prefix, self.context)
def get_prefix(self) -> str:
return ':'.join(MarkerContext.stack.prefixes)
def make_lame(self) -> None:
"""Lame MarkerContext's will not clear their markers on exit."""
self.lame = True
def error(self, message: str, log_exception: bool = False) -> None:
"""Report error."""
error(self.get_prefix(), message, self.context, log_exception)
def warning(self, message: str) -> None:
"""Report warning."""
warning(self.prefix, message, self.context)