Source code for factorytx.supervisor

"""This module provides services to automatically restart threads and processes
that die.

"""
import enum
import logging
import multiprocessing
import pickle
import random
import threading
import time
from typing import Any, Callable, Dict, Generator, List, NamedTuple, Optional, Tuple

import pandas
from setproctitle import setproctitle

from factorytx import const
from factorytx.exceptions import Failure
from factorytx import logs
from factorytx import markers

log = logging.getLogger(__name__)


class DummyProcess(threading.Thread):
    pass


@enum.unique
class RunMethod(enum.Enum):
    PROCESSES: Any = multiprocessing.Process
    DUMMY_PROCESSES: Any = DummyProcess
    THREADS: Any = threading.Thread


class Target(NamedTuple):
    context: logs.LogContext
    callable: Callable
    args: Tuple[Any, ...]
    kwargs: Dict[str, Any]


CHECK_INTERVAL_SECONDS = 1.0  # How long to wait between process checks.
MIN_BACKOFF_SECONDS = 1.0  # Min duration to back off before restarting a process.
MAX_BACKOFF_SECONDS = 60.0  # Max duration to back off before restarting a process.
HEALTHY_UPTIME_SECONDS = 600.0  # Duration a process must be up before it is considered healthy.


def _run_process(run_method: RunMethod, target: Target) -> None:
    """Wraps the execution of `target`. This is used to set up the thread /
    process context and catch any unexpected errors.

    """
    if run_method == RunMethod.PROCESSES:
        # TODO: Factor this out?
        logs.initialize()
        markers.initialize(const.MARKER_PATH)
        # Don't just warn, or else bugs go unnoticed in multi-stage transform pipelines.
        pandas.set_option('mode.chained_assignment', 'raise')
        setproctitle(f'factorytx [{logs.format_context(target.context)}]')
    elif run_method == RunMethod.DUMMY_PROCESSES:
        # Check that targets can be pickled and unpickled successfully to
        # simulate spawning them under multiprocessing. We MUST do this in
        # the target thread since some targets count on deserialization to
        # reinitialize data structures that are tied to the current thread, eg.
        # sqlite connections.
        target = pickle.loads(pickle.dumps(target))
    logs.set_context(context=target.context)
    try:
        target.callable(*target.args, **target.kwargs)
    except Failure:
        pass
    except Exception as e:
        # Pass the context to markers.error so that it matches the context that
        # the supervisor will pass to markers.clear(). Otherwise the supervisor
        # might not clean up the marker when the process stabilizes.
        markers.error('ftx.supervisor.error', f'Unexpected error: {e!s}',
                      context=target.context, log_exception=True)


# The Supervisor is implemented as a class so that in the future we can add
# methods to request that supervised process be stopped or restarted.
[docs]class Supervisor: """A Supervisor supervises a collection of callables, running each callable in its own thread or process. Each callable will be restarted if it terminates; callables which terminate too frequently will wait longer to restart to avoid overloading the system. """
[docs] def __init__(self, run_method: RunMethod, targets: List[Target]) -> None: """Instantiates a supervisor object. :param run_method: how to run targets. Please refer to options below. :param targets: list of callables, each of which will be run in its own thread / process. Each callable has an associated log context which is used to log messages if the callable terminates unexpectedly. Options for running a method: - `RunMethod.DUMMY_PROCESSES`: run threads in the current process but simulate running in another process; - `RunMethod.THREADS`: run threads in the current process; or - `RunMethod.PROCESSES`: run each target in a separate process. """ self.run_method = run_method self.targets = targets if self.run_method in (RunMethod.DUMMY_PROCESSES, RunMethod.THREADS): self.running_type = 'thread' elif self.run_method == RunMethod.PROCESSES: self.running_type = 'process' else: assert False
def _supervise(self, target: Target) -> Generator[None, None, None]: """Coroutine which supervises a specific callable.""" last_start_time = 0.0 backoff_seconds = 1.0 process: Optional[Any] = None while True: try: now = time.time() if process is None: last_start_time = now process = self.run_method.value( target=_run_process, args=(self.run_method, target), daemon=True ) process.start() if process.is_alive(): if now > last_start_time + HEALTHY_UPTIME_SECONDS: backoff_seconds = MIN_BACKOFF_SECONDS markers.clear('ftx.supervisor.', context=target.context) yield # Wait for the next polling period. else: process.join() # Only mark a warning if we're running a process and it # exited with a non-zero exitcode. Otherwise, we've already # logged the error once so there's no reason to log it again. if getattr(process, 'exitcode', 0) != 0: msg = f'{self.running_type.title()} stopped unexpectedly.' markers.error('ftx.supervisor.restart', msg, context=target.context) next_start_time = now + backoff_seconds backoff_seconds = min(2 * backoff_seconds, MAX_BACKOFF_SECONDS) process = None log.info('Backing off %s seconds before restarting ...', backoff_seconds) while time.time() < next_start_time: yield # Wait for the next polling period. except Exception as e: markers.error('ftx.supervisor.unknown', f'An unknown error occurred: {e!s}', context=target.context, log_exception=True) yield # Wait for the next polling period.
[docs] def run(self) -> None: """Runs forever, restarting any targets that terminate.""" coroutines = [self._supervise(target) for target in self.targets] while True: time.sleep(CHECK_INTERVAL_SECONDS) for coroutine in coroutines: coroutine.send(None)
# `python -m factorytx.supervisor` will start a manual test of the supervisor. # Task A should die and back off repeatedly until eventually its backoff resets; # Task B should die less frequently, and should always be restarted with the # minimum backoff. def _test_task_a() -> None: print("A") time.sleep(random.randint(0, 15)) def _test_task_b() -> None: print("B") time.sleep(20) if __name__ == '__main__': from factorytx import logs HEALTHY_UPTIME_SECONDS = 10 MAX_BACKOFF_SECONDS = 14 logs.initialize() context_a = logs.LogContext(component='A', component_type='Receiver', asset='', stream_type='') context_b = logs.LogContext(component='B', component_type='Receiver', asset='foo', stream_type='bar') targets = [ Target(context=context_a, callable=_test_task_a, args=(), kwargs={}), Target(context=context_b, callable=_test_task_b, args=(), kwargs={}), ] supervisor = Supervisor(RunMethod.THREADS, targets) supervisor.run()