"""This module provides services to automatically restart threads and processes
that die.
"""
import enum
import logging
import multiprocessing
import pickle
import random
import threading
import time
from typing import Any, Callable, Dict, Generator, List, NamedTuple, Optional, Tuple
import pandas
from setproctitle import setproctitle
from factorytx import const
from factorytx.exceptions import Failure
from factorytx import logs
from factorytx import markers
log = logging.getLogger(__name__)
class DummyProcess(threading.Thread):
pass
@enum.unique
class RunMethod(enum.Enum):
PROCESSES: Any = multiprocessing.Process
DUMMY_PROCESSES: Any = DummyProcess
THREADS: Any = threading.Thread
class Target(NamedTuple):
context: logs.LogContext
callable: Callable
args: Tuple[Any, ...]
kwargs: Dict[str, Any]
CHECK_INTERVAL_SECONDS = 1.0 # How long to wait between process checks.
MIN_BACKOFF_SECONDS = 1.0 # Min duration to back off before restarting a process.
MAX_BACKOFF_SECONDS = 60.0 # Max duration to back off before restarting a process.
HEALTHY_UPTIME_SECONDS = 600.0 # Duration a process must be up before it is considered healthy.
def _run_process(run_method: RunMethod, target: Target) -> None:
"""Wraps the execution of `target`. This is used to set up the thread /
process context and catch any unexpected errors.
"""
if run_method == RunMethod.PROCESSES:
# TODO: Factor this out?
logs.initialize()
markers.initialize(const.MARKER_PATH)
# Don't just warn, or else bugs go unnoticed in multi-stage transform pipelines.
pandas.set_option('mode.chained_assignment', 'raise')
setproctitle(f'factorytx [{logs.format_context(target.context)}]')
elif run_method == RunMethod.DUMMY_PROCESSES:
# Check that targets can be pickled and unpickled successfully to
# simulate spawning them under multiprocessing. We MUST do this in
# the target thread since some targets count on deserialization to
# reinitialize data structures that are tied to the current thread, eg.
# sqlite connections.
target = pickle.loads(pickle.dumps(target))
logs.set_context(context=target.context)
try:
target.callable(*target.args, **target.kwargs)
except Failure:
pass
except Exception as e:
# Pass the context to markers.error so that it matches the context that
# the supervisor will pass to markers.clear(). Otherwise the supervisor
# might not clean up the marker when the process stabilizes.
markers.error('ftx.supervisor.error', f'Unexpected error: {e!s}',
context=target.context, log_exception=True)
# The Supervisor is implemented as a class so that in the future we can add
# methods to request that supervised process be stopped or restarted.
[docs]class Supervisor:
"""A Supervisor supervises a collection of callables, running each callable
in its own thread or process. Each callable will be restarted if it terminates;
callables which terminate too frequently will wait longer to restart to
avoid overloading the system.
"""
[docs] def __init__(self, run_method: RunMethod, targets: List[Target]) -> None:
"""Instantiates a supervisor object.
:param run_method: how to run targets. Please refer to options below.
:param targets: list of callables, each of which will be run in its own
thread / process. Each callable has an associated log context which
is used to log messages if the callable terminates unexpectedly.
Options for running a method:
- `RunMethod.DUMMY_PROCESSES`: run threads in the current process
but simulate running in another process;
- `RunMethod.THREADS`: run threads in the current process; or
- `RunMethod.PROCESSES`: run each target in a separate process.
"""
self.run_method = run_method
self.targets = targets
if self.run_method in (RunMethod.DUMMY_PROCESSES, RunMethod.THREADS):
self.running_type = 'thread'
elif self.run_method == RunMethod.PROCESSES:
self.running_type = 'process'
else:
assert False
def _supervise(self, target: Target) -> Generator[None, None, None]:
"""Coroutine which supervises a specific callable."""
last_start_time = 0.0
backoff_seconds = 1.0
process: Optional[Any] = None
while True:
try:
now = time.time()
if process is None:
last_start_time = now
process = self.run_method.value(
target=_run_process,
args=(self.run_method, target),
daemon=True
)
process.start()
if process.is_alive():
if now > last_start_time + HEALTHY_UPTIME_SECONDS:
backoff_seconds = MIN_BACKOFF_SECONDS
markers.clear('ftx.supervisor.', context=target.context)
yield # Wait for the next polling period.
else:
process.join()
# Only mark a warning if we're running a process and it
# exited with a non-zero exitcode. Otherwise, we've already
# logged the error once so there's no reason to log it again.
if getattr(process, 'exitcode', 0) != 0:
msg = f'{self.running_type.title()} stopped unexpectedly.'
markers.error('ftx.supervisor.restart', msg, context=target.context)
next_start_time = now + backoff_seconds
backoff_seconds = min(2 * backoff_seconds, MAX_BACKOFF_SECONDS)
process = None
log.info('Backing off %s seconds before restarting ...', backoff_seconds)
while time.time() < next_start_time:
yield # Wait for the next polling period.
except Exception as e:
markers.error('ftx.supervisor.unknown', f'An unknown error occurred: {e!s}',
context=target.context, log_exception=True)
yield # Wait for the next polling period.
[docs] def run(self) -> None:
"""Runs forever, restarting any targets that terminate."""
coroutines = [self._supervise(target) for target in self.targets]
while True:
time.sleep(CHECK_INTERVAL_SECONDS)
for coroutine in coroutines:
coroutine.send(None)
# `python -m factorytx.supervisor` will start a manual test of the supervisor.
# Task A should die and back off repeatedly until eventually its backoff resets;
# Task B should die less frequently, and should always be restarted with the
# minimum backoff.
def _test_task_a() -> None:
print("A")
time.sleep(random.randint(0, 15))
def _test_task_b() -> None:
print("B")
time.sleep(20)
if __name__ == '__main__':
from factorytx import logs
HEALTHY_UPTIME_SECONDS = 10
MAX_BACKOFF_SECONDS = 14
logs.initialize()
context_a = logs.LogContext(component='A', component_type='Receiver', asset='', stream_type='')
context_b = logs.LogContext(component='B', component_type='Receiver', asset='foo', stream_type='bar')
targets = [
Target(context=context_a, callable=_test_task_a, args=(), kwargs={}),
Target(context=context_b, callable=_test_task_b, args=(), kwargs={}),
]
supervisor = Supervisor(RunMethod.THREADS, targets)
supervisor.run()