Source code for factorytx.base

"""This module contains the base classes for user-configurable components of
FactoryTX. This includes

* receivers, which extract data from a data source (eg. via OPC UA, SQL, etc.);
* transforms, which manipulate data extracted by receivers; and
* transmits, which push data from FactoryTX to another service or location.

"""
from abc import ABCMeta, abstractmethod
from typing import List

from pandas import DataFrame

from factorytx.dataflow import DataflowGraph, InputStreamId
from factorytx.registry import Registry
from factorytx.validation import ValidationMessage


[docs]class Receiver(metaclass=ABCMeta): """Receivers extract data from data sources. Each supported type of data source (eg. SQL, OPC UA, etc.) should have a corresponding receiver type, which knows how to fetch data and convert it to pandas DataFrames. At runtime Receiver instances will be created in the main thread and then run in a separate process using the `multiprocessing` module. This has two implications: 1. Receiver instances can set up persistent data structures in __init__ without worrying about races with other instances. This is useful eg. if you're creating a sqlite database to hold receiver state. 2. Receivers must be picklable. If __init__ sets any unpicklable values (eg. file descriptors) then the Receiver class must implement __getstate__ and __setstate__. See https://docs.python.org/3/library/pickle.html#pickle-state for more information. """ component_type = "Receiver"
[docs] @classmethod @abstractmethod def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]: """Normalizes and validates a configuration dictionary. This method is responsible for validating that the configuration it receives is well-formed and will be accepted by the constructor. Any issues that prevent the configuration from being loaded must be signalled by returning one or more error-level ValidationMessages. Less severe issues should be reported as warning-level messages instead. This method may also modify the `config` dictionary, eg. to insert default values or fix up legacy configuration structures. Any changes made to the config will persist in the configuration dictionary passed to the constructor. :param config: the configuration dictionary to validate. :param root_config: the root configuration dictionary, which includes global settings and all component configurations. This dictionary **must not** be modified. """ pass # pragma: no cover
# TODO: Preview options?
[docs] @abstractmethod def __init__(self, dataflow: DataflowGraph, config: dict, root_config: dict) -> None: """Initializes a Receiver instance. :param dataflow: object used to store retrieved data. Calling `dataflow.process(df)` will route the DataFrame `df` to all applicable transforms and transmits. :param config: a configuration dictionary which has already been validated and normalized using the `clean` method. __init__ will only be called if the prior call to clean did not return any validation errors. :param root_config: the root configuration dictionary. This can be used to access global settings, eg. whether debugging is enabled or what the Sight Machine platform credentials are. WARNING: This method will not be run in the same thread as `run`. It **must not** call `factorytx.logs.set_context` or set up any thread-local data. """ self.name: str = config['data_receiver_name'] self.declared_streams = [ InputStreamId(stream['asset'], stream['stream_type']) for stream in config['streams'] ]
[docs] @abstractmethod def purge(self, streams: List[InputStreamId]) -> None: """Removes all receiver state associated with a specified streams. :param streams: list of input streams to purge. """ pass # pragma: no cover
[docs] @abstractmethod def run(self) -> None: """Performs the work of extracting data from the data source. This method is responsible for fetching data from the data source, converting it to frames, and handing it to the dataflow graph that was passed to __init__. This method should run forever, eg. by running in an infinite loop and waking up periodically to poll data. For debugging purposes, this method should call `factorytx.logs.set_context` before doing any work. This ensures that any error markers or log messages contain the receiver name. """ pass # pragma: no cover
receivers = Registry(Receiver)
[docs]class Transform(metaclass=ABCMeta): """Transforms manipulate data extracted by receivers. For example, a Transform might remove all rows where a given column is null, or convert a column to a different type. At runtime Transform instances will be created in the main thread and then passed to the receiver process via the `multiprocessing` module. This has two implications: 1. Although ideally transforms should be stateless, Transform instances can set up persistent data structures in __init__ without worrying about races with other instances. 2. Transforms must be picklable. If __init__ sets any unpicklable values (eg. file descriptors) then the Transform class must implement __getstate__ and __setstate__. See https://docs.python.org/3/library/pickle.html#pickle-state for more information. """ component_type = "Transform"
[docs] @classmethod @abstractmethod def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]: """Normalizes and validates a configuration dictionary. This method is responsible for validating that the configuration it receives is well-formed and will be accepted by the constructor. Any issues that prevent the configuration from being loaded must be signalled by returning one or more error-level ValidationMessages. Less severe issues should be reported as warning-level messages instead. This method may also modify the `config` dictionary, eg. to insert default values or fix up legacy configuration structures. Any changes made to the config will persist in the configuration dictionary passed to the constructor. :param config: the configuration dictionary to validate. :param root_config: the root configuration dictionary, which includes global settings and all component configurations. This dictionary **must not** be modified. """ pass # pragma: no cover
[docs] @abstractmethod def __init__(self, config: dict, root_config: dict) -> None: """Initializes a Transform instance. :param config: a configuration dictionary which has already been validated and normalized using the `clean` method. __init__ will only be called if the prior call to clean did not return any validation errors. :param root_config: the root configuration dictionary. This can be used to access global settings, eg. whether debugging is enabled or what the Sight Machine platform credentials are. WARNING: This method will not be run in the same thread as `process`. It **must not** call `factorytx.logs.set_context` or set up any thread-local data. """ self.filter_stream: List[str] = config['filter_stream'] self.name: str = config['transform_name']
[docs] @abstractmethod def process(self, input_frame: DataFrame) -> DataFrame: """Manipulates a DataFrame to produce a new DataFrame. This method is responsible for implementing the transformation logic that converts an input frame into a _new_ output frame. The original DataFrame object must not be modified. """ pass # pragma: no cover
transforms = Registry(Transform)
[docs]class Transmit(metaclass=ABCMeta): """Transmits push data from FactoryTX to another service or location. For example, a transmit might write CSV data to disk for debugging or push data to the Sight Machine platform. At runtime Transmit instances will be created in the main thread and then passed to other processes via the `multiprocessing` module. This has two implications: 1. Transmit instances can set up persistent data structures in __init__ without worrying about races with other instances. This is useful eg. if you're creating a sqlite database to hold transmit state. 2. Transmits must be picklable. If __init__ sets any unpicklable values (eg. file descriptors) then the Transmit class must implement __getstate__ and __setstate__. See https://docs.python.org/3/library/pickle.html#pickle-state for more information. """ component_type = "Transmit"
[docs] @classmethod @abstractmethod def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]: """Normalizes and validates a configuration dictionary. This method is responsible for validating that the configuration it receives is well-formed and will be accepted by the constructor. Any issues that prevent the configuration from being loaded must be signalled by returning one or more error-level ValidationMessages. Less severe issues should be reported as warning-level messages instead. This method may also modify the `config` dictionary, eg. to insert default values or fix up legacy configuration structures. Any changes made to the config will persist in the configuration dictionary passed to the constructor. :param config: the configuration dictionary to validate. :param root_config: the root configuration dictionary, which includes global settings and all component configurations. This dictionary **must not** be modified. """ pass # pragma: no cover
[docs] @abstractmethod def __init__(self, config: dict, root_config: dict) -> None: """Initializes a Transmit instance. :param config: a configuration dictionary which has already been validated and normalized using the `clean` method. __init__ will only be called if the prior call to clean did not return any validation errors. :param root_config: the root configuration dictionary. This can be used to access global settings, eg. whether debugging is enabled or what the Sight Machine platform credentials are. WARNING: This method will not be run in the same thread as `process`. It **must not** call `factorytx.logs.set_context` or set up any thread-local data. """ self.filter_stream: List[str] = config['filter_stream'] self.name: str = config['transmit_name']
[docs] @abstractmethod def purge(self, streams: List[InputStreamId]) -> None: """Removes all transmit state associated with a specified stream. :param streams: list of input streams to purge. """ pass # pragma: no cover
[docs] @abstractmethod def process(self, input_stream_id: InputStreamId, input_frame: DataFrame) -> None: """Sends data represented as a DataFrame to another service or location. This method receives DataFrames that have been successfully transformed and is responsible for pushing them to another location. It must do so without mutating the input DataFrame. By default transmits run synchronously, ie. within the receiver thread. If you would like to push data asynchronously, then you must: 1. Inherit from both `Transmit` and `Asynchronous`. 2. Implement your `process` method so that it stores data to a durable queue (eg. in a sqlite3 database.) 3. Implement a `run` method that pulls from the queue. """ pass # pragma: no cover
transmits = Registry(Transmit) class Asynchronous(metaclass=ABCMeta): """Marker class for asynchronous transmits. While simple transmits can be implemented synchronously, more complex transmits may need to buffer data locally and send it in batches. These transmits may inherit from Asynchronous to request that FactoryTX assign them their own thread. """ @abstractmethod def run(self) -> None: """Performs the work required to push data to remote services or locations. This will likely require fetching data from a location where it was stashed by the `process` method. """ pass # pragma: no cover