"""This module contains the base classes for user-configurable components of
FactoryTX. This includes
* receivers, which extract data from a data source (eg. via OPC UA, SQL, etc.);
* transforms, which manipulate data extracted by receivers; and
* transmits, which push data from FactoryTX to another service or location.
"""
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from typing import List, Any
from pandas import DataFrame
from factorytx.dataflow import DataflowGraph, InputStreamId
from factorytx.registry import Registry
from factorytx.utils import get_component_path, safe_path_join
from factorytx.validation import ValidationMessage
@dataclass
class ConnectionTestResult:
connection_successful: bool
message: str
[docs]class Receiver(metaclass=ABCMeta):
"""Receivers extract data from data sources. Each supported type of data
source (eg. SQL, OPC UA, etc.) should have a corresponding receiver type,
which knows how to fetch data and convert it to pandas DataFrames.
At runtime Receiver instances will be created in the main thread and then
run in a separate process using the `multiprocessing` module. This has
two implications:
1. Receiver instances can set up persistent data structures in __init__
without worrying about races with other instances. This is useful eg.
if you're creating a sqlite database to hold receiver state.
2. Receivers must be picklable. If __init__ sets any unpicklable values
(eg. file descriptors) then the Receiver class must implement __getstate__
and __setstate__. See https://docs.python.org/3/library/pickle.html#pickle-state
for more information.
"""
component_type = "Receiver"
CONNECTION_INFO: Any = None
CONNECTION_FORM_INPUTS: Any = None
[docs] @classmethod
@abstractmethod
def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]:
"""Normalizes and validates a configuration dictionary.
This method is responsible for validating that the configuration it
receives is well-formed and will be accepted by the constructor. Any
issues that prevent the configuration from being loaded must be
signalled by returning one or more error-level ValidationMessages. Less
severe issues should be reported as warning-level messages instead.
This method may also modify the `config` dictionary, eg. to insert
default values or fix up legacy configuration structures. Any changes
made to the config will persist in the configuration dictionary passed
to the constructor.
:param config: the configuration dictionary to validate.
:param root_config: the root configuration dictionary, which includes
global settings and all component configurations. This dictionary
**must not** be modified.
"""
pass # pragma: no cover
# TODO: Preview options?
[docs] @abstractmethod
def __init__(self, dataflow: DataflowGraph, config: dict, root_config: dict) -> None:
"""Initializes a Receiver instance.
:param dataflow: object used to store retrieved data. Calling
`dataflow.process(df)` will route the DataFrame `df` to all
applicable transforms and transmits.
:param config: a configuration dictionary which has already been
validated and normalized using the `clean` method. __init__
will only be called if the prior call to clean did not return any
validation errors.
:param root_config: the root configuration dictionary. This can be used
to access global settings, eg. whether debugging is enabled or what
the Sight Machine platform credentials are.
WARNING: This method will not be run in the same thread as `run`.
It **must not** call `factorytx.logs.set_context` or set up any
thread-local data.
"""
self.name: str = config['data_receiver_name']
self.declared_streams = [
InputStreamId(stream['asset'], stream['stream_type'])
for stream in config['streams']
]
self.dataflow: DataflowGraph = dataflow
self.dataflow.save_failed_frames = self.save_failed_frames
[docs] @abstractmethod
def purge(self, streams: List[InputStreamId]) -> None:
"""Removes all receiver state associated with a specified streams.
:param streams: list of input streams to purge.
"""
pass # pragma: no cover
[docs] @abstractmethod
def run(self) -> None:
"""Performs the work of extracting data from the data source.
This method is responsible for fetching data from the data source,
converting it to frames, and handing it to the dataflow graph that was
passed to __init__. This method should run forever, eg. by running in an
infinite loop and waking up periodically to poll data.
For debugging purposes, this method should call `factorytx.logs.set_context`
before doing any work. This ensures that any error markers or log messages
contain the receiver name.
"""
pass # pragma: no cover
@property
def save_failed_frames(self) -> bool:
"""
Whether to save failed frame transform/transmit to avoid data loss or not.
Only needed for real-time data sources that aren't historical since those can be recovered by restreaming.
"""
return False
[docs] def test_connection(self) -> ConnectionTestResult:
"""Given a connection configuration dict, attempts to connect to a data source,
and returns a helpful message describing the outcome of the
connection attempt, as well as a boolean `connection_successful` indicating whether the connection
was successful.
"""
return ConnectionTestResult(
connection_successful=False,
message="Connection testing has not been implemented for this type of connection.",
)
receivers = Registry(Receiver)
transforms = Registry(Transform)
[docs]class Transmit(metaclass=ABCMeta):
"""Transmits push data from FactoryTX to another service or location. For
example, a transmit might write CSV data to disk for debugging or push
data to the Sight Machine platform.
At runtime Transmit instances will be created in the main thread and then
passed to other processes via the `multiprocessing` module. This has two
implications:
1. Transmit instances can set up persistent data structures in __init__
without worrying about races with other instances. This is useful eg.
if you're creating a sqlite database to hold transmit state.
2. Transmits must be picklable. If __init__ sets any unpicklable values
(eg. file descriptors) then the Transmit class must implement __getstate__
and __setstate__. See https://docs.python.org/3/library/pickle.html#pickle-state
for more information.
"""
component_type = "Transmit"
[docs] @classmethod
@abstractmethod
def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]:
"""Normalizes and validates a configuration dictionary.
This method is responsible for validating that the configuration it
receives is well-formed and will be accepted by the constructor. Any
issues that prevent the configuration from being loaded must be
signalled by returning one or more error-level ValidationMessages. Less
severe issues should be reported as warning-level messages instead.
This method may also modify the `config` dictionary, eg. to insert
default values or fix up legacy configuration structures. Any changes
made to the config will persist in the configuration dictionary passed
to the constructor.
:param config: the configuration dictionary to validate.
:param root_config: the root configuration dictionary, which includes
global settings and all component configurations. This dictionary
**must not** be modified.
"""
pass # pragma: no cover
[docs] @abstractmethod
def __init__(self, config: dict, root_config: dict) -> None:
"""Initializes a Transmit instance.
:param config: a configuration dictionary which has already been
validated and normalized using the `clean` method. __init__
will only be called if the prior call to clean did not return any
validation errors.
:param root_config: the root configuration dictionary. This can be used
to access global settings, eg. whether debugging is enabled or what
the Sight Machine platform credentials are.
WARNING: This method will not be run in the same thread as `process`.
It **must not** call `factorytx.logs.set_context` or set up any
thread-local data.
"""
self.filter_stream: List[str] = config['filter_stream']
self.name: str = config['transmit_name']
[docs] @abstractmethod
def purge(self, streams: List[InputStreamId]) -> None:
"""Removes all transmit state associated with a specified stream.
:param streams: list of input streams to purge.
"""
pass # pragma: no cover
[docs] @abstractmethod
def process(self, input_stream_id: InputStreamId, input_frame: DataFrame) -> None:
"""Sends data represented as a DataFrame to another service or location.
This method receives DataFrames that have been successfully transformed
and is responsible for pushing them to another location. It must do so
without mutating the input DataFrame.
By default transmits run synchronously, ie. within the receiver thread.
If you would like to push data asynchronously, then you must:
1. Inherit from both `Transmit` and `Asynchronous`.
2. Implement your `process` method so that it stores data to a durable
queue (eg. in a sqlite3 database.)
3. Implement a `run` method that pulls from the queue.
"""
pass # pragma: no cover
@property
def failed_state_save_path(self) -> str:
"""Returns the path to the file where the failed state is saved."""
return safe_path_join(get_component_path(self.name), 'failed')
transmits = Registry(Transmit)
class Asynchronous(metaclass=ABCMeta):
"""Marker class for asynchronous transmits.
While simple transmits can be implemented synchronously, more complex
transmits may need to buffer data locally and send it in batches. These
transmits may inherit from Asynchronous to request that FactoryTX assign
them their own thread.
"""
@abstractmethod
def run(self) -> None:
"""Performs the work required to push data to remote services or
locations. This will likely require fetching data from a location where
it was stashed by the `process` method.
"""
pass # pragma: no cover