Python API

FactoryTX includes a Python API that enables you to create custom FactoryTX components. The easiest way to begin using the API is to take a look at the walkthroughs in the Tutorials section. This assumes that you are familiar with Python 3 and common third-party libraries, such as Pandas.

This section serves to provide more details for a given module or class.

factorytx.base module

This module contains the base classes for user-configurable components of FactoryTX. This includes

  • receivers, which extract data from a data source (eg. via OPC UA, SQL, etc.);

  • transforms, which manipulate data extracted by receivers; and

  • transmits, which push data from FactoryTX to another service or location.

Receiver

class factorytx.base.Receiver(dataflow: DataflowGraph, config: dict, root_config: dict)[source]

Receivers extract data from data sources. Each supported type of data source (eg. SQL, OPC UA, etc.) should have a corresponding receiver type, which knows how to fetch data and convert it to pandas DataFrames.

At runtime Receiver instances will be created in the main thread and then run in a separate process using the multiprocessing module. This has two implications:

  1. Receiver instances can set up persistent data structures in __init__ without worrying about races with other instances. This is useful eg. if you’re creating a sqlite database to hold receiver state.

  2. Receivers must be picklable. If __init__ sets any unpicklable values (eg. file descriptors) then the Receiver class must implement __getstate__ and __setstate__. See https://docs.python.org/3/library/pickle.html#pickle-state for more information.

abstract __init__(dataflow: DataflowGraph, config: dict, root_config: dict) None[source]

Initializes a Receiver instance.

Parameters:
  • dataflow – object used to store retrieved data. Calling dataflow.process(df) will route the DataFrame df to all applicable transforms and transmits.

  • config – a configuration dictionary which has already been validated and normalized using the clean method. __init__ will only be called if the prior call to clean did not return any validation errors.

  • root_config – the root configuration dictionary. This can be used to access global settings, eg. whether debugging is enabled or what the Sight Machine platform credentials are.

WARNING: This method will not be run in the same thread as run. It must not call factorytx.logs.set_context or set up any thread-local data.

abstract classmethod clean(config: dict, root_config: dict) List[ValidationMessage][source]

Normalizes and validates a configuration dictionary.

This method is responsible for validating that the configuration it receives is well-formed and will be accepted by the constructor. Any issues that prevent the configuration from being loaded must be signalled by returning one or more error-level ValidationMessages. Less severe issues should be reported as warning-level messages instead.

This method may also modify the config dictionary, eg. to insert default values or fix up legacy configuration structures. Any changes made to the config will persist in the configuration dictionary passed to the constructor.

Parameters:
  • config – the configuration dictionary to validate.

  • root_config – the root configuration dictionary, which includes global settings and all component configurations. This dictionary must not be modified.

abstract purge(streams: List[InputStreamId]) None[source]

Removes all receiver state associated with a specified streams.

Parameters:

streams – list of input streams to purge.

abstract run() None[source]

Performs the work of extracting data from the data source.

This method is responsible for fetching data from the data source, converting it to frames, and handing it to the dataflow graph that was passed to __init__. This method should run forever, eg. by running in an infinite loop and waking up periodically to poll data.

For debugging purposes, this method should call factorytx.logs.set_context before doing any work. This ensures that any error markers or log messages contain the receiver name.

property save_failed_frames: bool

Whether to save failed frame transform/transmit to avoid data loss or not. Only needed for real-time data sources that aren’t historical since those can be recovered by restreaming.

test_connection() ConnectionTestResult[source]

Given a connection configuration dict, attempts to connect to a data source, and returns a helpful message describing the outcome of the connection attempt, as well as a boolean connection_successful indicating whether the connection was successful.

Transform

class factorytx.base.Transform(config: dict, root_config: dict)[source]

Transforms manipulate data extracted by receivers. For example, a Transform might remove all rows where a given column is null, or convert a column to a different type.

At runtime Transform instances will be created in the main thread and then passed to the receiver process via the multiprocessing module. This has two implications:

  1. Although ideally transforms should be stateless, Transform instances can set up persistent data structures in __init__ without worrying about races with other instances.

  2. Transforms must be picklable. If __init__ sets any unpicklable values (eg. file descriptors) then the Transform class must implement __getstate__ and __setstate__. See https://docs.python.org/3/library/pickle.html#pickle-state for more information.

abstract __init__(config: dict, root_config: dict) None[source]

Initializes a Transform instance.

Parameters:
  • config – a configuration dictionary which has already been validated and normalized using the clean method. __init__ will only be called if the prior call to clean did not return any validation errors.

  • root_config – the root configuration dictionary. This can be used to access global settings, eg. whether debugging is enabled or what the Sight Machine platform credentials are.

WARNING: This method will not be run in the same thread as process. It must not call factorytx.logs.set_context or set up any thread-local data.

abstract classmethod clean(config: dict, root_config: dict) List[ValidationMessage][source]

Normalizes and validates a configuration dictionary.

This method is responsible for validating that the configuration it receives is well-formed and will be accepted by the constructor. Any issues that prevent the configuration from being loaded must be signalled by returning one or more error-level ValidationMessages. Less severe issues should be reported as warning-level messages instead.

This method may also modify the config dictionary, eg. to insert default values or fix up legacy configuration structures. Any changes made to the config will persist in the configuration dictionary passed to the constructor.

Parameters:
  • config – the configuration dictionary to validate.

  • root_config – the root configuration dictionary, which includes global settings and all component configurations. This dictionary must not be modified.

property failed_state_save_path: str

Returns the path to the file where the failed state is saved.

abstract process(input_frame: DataFrame) DataFrame[source]

Manipulates a DataFrame to produce a new DataFrame.

This method is responsible for implementing the transformation logic that converts an input frame into a _new_ output frame. The original DataFrame object must not be modified.

Transmit

class factorytx.base.Transmit(config: dict, root_config: dict)[source]

Transmits push data from FactoryTX to another service or location. For example, a transmit might write CSV data to disk for debugging or push data to the Sight Machine platform.

At runtime Transmit instances will be created in the main thread and then passed to other processes via the multiprocessing module. This has two implications:

  1. Transmit instances can set up persistent data structures in __init__ without worrying about races with other instances. This is useful eg. if you’re creating a sqlite database to hold transmit state.

  2. Transmits must be picklable. If __init__ sets any unpicklable values (eg. file descriptors) then the Transmit class must implement __getstate__ and __setstate__. See https://docs.python.org/3/library/pickle.html#pickle-state for more information.

abstract __init__(config: dict, root_config: dict) None[source]

Initializes a Transmit instance.

Parameters:
  • config – a configuration dictionary which has already been validated and normalized using the clean method. __init__ will only be called if the prior call to clean did not return any validation errors.

  • root_config – the root configuration dictionary. This can be used to access global settings, eg. whether debugging is enabled or what the Sight Machine platform credentials are.

WARNING: This method will not be run in the same thread as process. It must not call factorytx.logs.set_context or set up any thread-local data.

abstract classmethod clean(config: dict, root_config: dict) List[ValidationMessage][source]

Normalizes and validates a configuration dictionary.

This method is responsible for validating that the configuration it receives is well-formed and will be accepted by the constructor. Any issues that prevent the configuration from being loaded must be signalled by returning one or more error-level ValidationMessages. Less severe issues should be reported as warning-level messages instead.

This method may also modify the config dictionary, eg. to insert default values or fix up legacy configuration structures. Any changes made to the config will persist in the configuration dictionary passed to the constructor.

Parameters:
  • config – the configuration dictionary to validate.

  • root_config – the root configuration dictionary, which includes global settings and all component configurations. This dictionary must not be modified.

property failed_state_save_path: str

Returns the path to the file where the failed state is saved.

abstract process(input_stream_id: InputStreamId, input_frame: DataFrame) None[source]

Sends data represented as a DataFrame to another service or location.

This method receives DataFrames that have been successfully transformed and is responsible for pushing them to another location. It must do so without mutating the input DataFrame.

By default transmits run synchronously, ie. within the receiver thread. If you would like to push data asynchronously, then you must:

  1. Inherit from both Transmit and Asynchronous.

  2. Implement your process method so that it stores data to a durable queue (eg. in a sqlite3 database.)

  3. Implement a run method that pulls from the queue.

abstract purge(streams: List[InputStreamId]) None[source]

Removes all transmit state associated with a specified stream.

Parameters:

streams – list of input streams to purge.

factorytx.receivers.file module

FileReceiver

class factorytx.receivers.file.receiver.FileReceiver(dataflow: DataflowGraph, config: dict, root_config: dict)[source]

Receiver that fetches files from instances of a Connection subclass and parses them using the configured Parser types.

Here’s how to create a custom FileReceiver class to handle a new protocol:

  1. Define a Connection subclass that uses the protocol to list available files, fetch files, and optionally delete files from a remote server.

class MyConnection(Connection):
    ...
  1. Register a custom FileReceiver subclass for your Connection type:

@receivers.register('my_receiver')
class MyReceiver(FileReceiver):
    connection_class = MyConnection
classmethod clean(config: dict, root_config: dict) List[ValidationMessage][source]

Normalizes and validates a configuration dictionary.

This method is responsible for validating that the configuration it receives is well-formed and will be accepted by the constructor. Any issues that prevent the configuration from being loaded must be signalled by returning one or more error-level ValidationMessages. Less severe issues should be reported as warning-level messages instead.

This method may also modify the config dictionary, eg. to insert default values or fix up legacy configuration structures. Any changes made to the config will persist in the configuration dictionary passed to the constructor.

Parameters:
  • config – the configuration dictionary to validate.

  • root_config – the root configuration dictionary, which includes global settings and all component configurations. This dictionary must not be modified.

alias of FileReceiverDBMigrator

handle_processing_failure(filename: str) Iterator[None][source]

Handles cleaning up of a temporary file if we run into any errors while processing the file’s data, so that we don’t bloat the temporary storage.

poll(connections: List[Connection]) None[source]

Processes all new or modified files from one or more connections.

purge(streams: List[InputStreamId]) None[source]

Removes all receiver state associated with a specified streams.

Parameters:

streams – list of input streams to purge.

run() None[source]

Performs the work of extracting data from the data source.

This method is responsible for fetching data from the data source, converting it to frames, and handing it to the dataflow graph that was passed to __init__. This method should run forever, eg. by running in an infinite loop and waking up periodically to poll data.

For debugging purposes, this method should call factorytx.logs.set_context before doing any work. This ensures that any error markers or log messages contain the receiver name.

run_once() None[source]

Runs a single polling pass. This method should only be used for unit tests since it sets up and tears down new connections whenever it runs.

All file receivers support these required and optional properties:

  • data_receiver_name: Unique name of the data receiver. This name will be used to track the progress state of the data stream.

  • protocol: Protocol to be used.

  • streams: List of input data streams to read from.

    • asset: Asset identifier

    • stream_type: Type of data stream

    • parser: Name of the parser used to convert the file. This has to match one of the parser_name values in the parsers section.

    • file_filter: List of files to filter on. Items can be regular expressions. You must select either file_filter or path_filter but not both.

    • path_filter: List of paths to filter on. Items can be regular expressions. You must select either file_filter or path_filter but not both.

  • connections: A list of connection settings used to connect to the data source. Please refer to the connection settings below.

  • process_files_alphabetically: true to process all of the files retrieved by the connection(s) in alphabetical order. By default, this option is false, so files are processed by last modified (or created) time with oldest files first.

  • process_ordered_connections: If there are multiple connections and you want to process (and transmit) the files received from one connection before another, set option to true to process files based on the order of the connection(s). By default, this option is set to false. This option is usually related to the Kafka transmit because events within a Kafka topic need to be in order. For example, if one connection receives historical files and another connection receives “realtime” files, you’ll want to enable this setting and order the connections with the historical connection first, so that events in the Kafka topic are in chronological order. Please note that the order of the files per connection is maintained: if process_files_alphabetically is enabled, files will be parsed in alphabetical order; if disabled, files will be parsed in chronological order based on the last modified time.

  • parsers: A list of parsers that can be used for the input streams. Each parser has the following default properties:

    • parser_name: A customizable name used by streams to identify the parser.

    • parser_type: Type of parser to apply (e.g. csv)

    • parser_version: Version of the parser type to apply.

    FactoryTX has a few built-in parsers available for you to use. Please refer to the Parsers Configurations section of the manual for more details about them.

  • skip_parser_errors: Whether the receiver will allow parser errors when processing files. If True, parsing errors will be skipped and logged in a quarantine log file while the receiver continues processing. Otherwise if we encounter a parsing error, the receiver will halt until the file gets repaired. (Only works for csv files)

  • archive_completed: true to keep a local copy of each file received from the remote server, or false if local copies should not be kept. If enabled, files will be archived until their total size increases above the amount specified in the max_archive_size_mb parameter.

  • max_archive_size_mb: If archive_completed is true, delete the archive completed files once the total size (in mb) is greater than this value. A negative value means never delete any files. Defaults to -1.

  • delete_completed: true if files should be deleted from the data directory after they have been received by FactoryTX, or false if files should never be deleted. Defaults to false to avoid accidentally losing data.

    • archive_completed and delete_completed are independent of each other. Archiving will create a copy of the file in a new directory, while deleting will remove the original file.

  • read_in_progress_files: Whether to read files as soon as they are created, or wait for the upstream service to stop writing to the file before reading it.

  • emit_file_metadata: Whether or not to inject additional columns into each record containing metadata about the file it came from. If this setting is enabled then every record will contain fields named file_name, file_path, and file_timestamp.

  • poll_interval: Number of seconds to wait before fetching new data from the server.

  • temporary_file_directory: Specify the directory to temporarily store files that have been downloaded from the connection(s). By default, the directory used is /tmp.

  • deduplicate_rows: Whether or not to deduplicate rows from processing files. This is done by hashing each row and saving it compare against later if the file is modified. If the file is modified, only the new rows will be processed. This should be used if the same file is expected to be re-processed multiple times and it will be regularly modified, rather than appended to. All data in each row must be a hashable type (no lists, dicts, etc.) File hash stores will be in the receiver’s component directory in the “row_hash_stores” directory.

  • deduplicate_row_retention_time_days: The number of days to keep row hashes for deduplication (default 90).

test_connection() ConnectionTestResult[source]

Given a connection configuration dict, attempts to connect to a data source, and returns a helpful message describing the outcome of the connection attempt, as well as a boolean connection_successful indicating whether the connection was successful.

Connection

class factorytx.receivers.file.base.Connection(config: dict, root_config: dict)[source]

Represents a connection to a remote server. Each supported type of file server (eg. FTP, SMB, WebDAV, etc.) should have a corresponding connection type.

You can define a custom file receiver type using a connection as follows:

>>> from factorytx.base import receivers
>>> from factorytx.receivers.file import Connection, FileReceiver
>>> class MyFileConnection(Connection):
...     pass  # Implementation goes here.
>>> @receivers.register('my_file_connection')
... class MyFileReceiver(FileReceiver):
...     connection_class = MyFileConnection
abstract classmethod clean(config: dict, root_config: dict) List[ValidationMessage][source]

Normalizes and validates a configuration dictionary.

This method is responsible for validating that the configuration it receives is well-formed and will be accepted by the constructor. Any issues that prevent the configuration from being loaded must be signalled by returning one or more error-level ValidationMessages. Less severe issues should be reported as warning-level messages instead.

This method may also modify the config dictionary, eg. to insert default values or fix up legacy configuration structures. Any changes made to the config will persist in the configuration dictionary passed to the constructor.

Parameters:
  • config – the configuration dictionary to validate.

  • root_config – the root configuration dictionary, which includes global settings and all component configurations. This dictionary must not be modified.

abstract close() None[source]

Closes the connection and frees any associated resources, eg. sockets.

abstract property connection_info: str

Returns a short string that with detailed connection information. This is usually formatted as a URI, eg. ‘s3://my-bucket/bucket-prefix’.

abstract copy(file_entry: FileEntry, local_path: str) None[source]

Copies a file’s contents from a remote server to a specified temporary file on the local disk.

Parameters:
  • file_entry – metadata about the file, such as the path to the file on the remote server.

  • local_path – path to write the file to. local_path will always point to an existing zero-byte file.

abstract delete(file_entry: FileEntry) None[source]

Deletes a file from the remote server. Connection classes that do not support deletion may raise a NotImplementedError instead.

Parameters:

file_entry – metadata about the file, such as the path to the file on the remote server.

abstract list(start_after_hint: Optional[str]) Generator[FileEntry, None, None][source]

Lists files on the remote server.

Parameters:

start_after_hint – Latest file path (in lexicographic order) received by this connection, or None if no files have been received so far. This can be used to optimize fetching new files if (1) the data source generates file paths that are lexicographically-ordered, and (2) the server can fetch a subset of files with paths greater than a specified parameter (e.g. the StartAfter parameter for an S3 connection).

ConnectionError

class factorytx.receivers.file.base.ConnectionError(message: str)[source]

Represents an error that occurred while using a Connection.

FileEntry

class factorytx.receivers.file.base.FileEntry(path: str, filename: str, mtime: float, size: int)[source]

Represents a file or resource on a remote system.

Variables:
  • path – relative path of the file on the remote system. If the FileEntry represents a virtual file, the path should remain the same across subsequent calls to list_files (ie. if the remote system updates a/b.c, the path should still remain the same instead of becoming a/b.v2.c.) This includes the filename.

  • filename – base name of the file on the remote system. This is stored separately from the path since the directory separator my vary.

  • mtime – file modification time as seconds since the Unix epoch.

  • size – size of the file in bytes.

Parser

class factorytx.receivers.file.base.Parser(config: dict, root_config: dict)[source]

Converts file contents into structured data. Each supported class of file (eg. CSV files, Excel files, unparseable binary files, etc.) should have a corresponding parser type which is responsible for converting files into pandas DataFrames.

Though a parser should be stateless, it can associate a state dictionary with each file it parses. See the process method for details.

abstract classmethod clean(config: dict, root_config: dict) List[ValidationMessage][source]

Normalizes and validates a configuration dictionary.

This method is responsible for validating that the configuration it receives is well-formed and will be accepted by the constructor. Any issues that prevent the configuration from being loaded must be signalled by returning one or more error-level ValidationMessages. Less severe issues should be reported as warning-level messages instead.

This method may also modify the config dictionary, eg. to insert default values or fix up legacy configuration structures. Any changes made to the config will persist in the configuration dictionary passed to the constructor.

Parameters:
  • config – the configuration dictionary to validate.

  • root_config – the root configuration dictionary, which includes global settings and all component configurations. This dictionary must not be modified.

abstract process(file_entry: FileEntry, state: Optional[dict], local_path: str) Generator[Tuple[DataFrame, dict], None, None][source]

Processes a file and converts it into a pandas DataFrame.

Parameters:
  • file_entry – metadata about the file, eg. its name, path on the remote server, last modification time, etc.

  • state – None if the file has not been previously processed, or the state dictionary returned the last time this file was processed.

  • local_path – path to a temporary file containing the file contents on the local disk. This file will be removed once file has been parsed.

Returns:

an iterator that yields tuples. Each tuple holds a DataFrame containing data extracted from the file, and a JSON-serializable state dictionary. The state dictionary will be persisted and passed back to the process method when the file is next parsed, eg. after the file is modified.

ParsingCandidate

class factorytx.receivers.file.state.ParsingCandidate(connection: Connection, parser: Parser, file: FileEntry, input_stream_id: InputStreamId, asset: str, stream_type: str, state: Optional[dict])[source]

Holds the data necessary to process a file for a given stream. To simplify the act of fetching, parsing, transmitting, and deleting data this object has attributes that would normally be scattered over several data structures:

Variables:
  • connection – Connection to use to retrieve the file and (optionally) delete it.

  • parser – Parser to apply to the file after retrieving it.

  • file – metadata about the file, eg. it’s path on the remote server.

  • input_stream_id – identifier for the input stream associated with the file.

  • asset – stream to associate with the data extracted from the file.

  • stream_type – stream to associate with the data extracted from the file.

  • state – state dictionary from the last time the file was parsed, or None if the file has not been processed before.

StateStore

class factorytx.receivers.file.state.StateStore(path: str)[source]

Tracks parser state and file metadata (eg. file size and modification time) for files that have been parsed and pushed to a transmit.

fetch_last_completed_file(connection_name: str) Optional[str][source]

Fetches the path of the last completed file for a specific connection.

Parameters:

connection_name – Name of the connection to the file server

fetch_new_or_modified(candidates: List[ParsingCandidate], alphabetical: bool = False) List[ParsingCandidate][source]

Returns all candidates that have not been previously parsed or have changed since they were last parsed. As a side effect, this method fetches the state dictionary of previously-parsed files from persistent storage.

initialize(force_migrate: bool = False, db_migrator: Optional[StateStoreDBMigrator] = None) None[source]
Parameters:
  • force_migrate – if True, then apply migration scripts instead of the schema script even if the database does not exist. Defaults to False.

  • additional_init_fn – An initialization function called within the context of state initialization, but after the primary state initialization has occurred.

purge(streams: List[InputStreamId]) None[source]

Removes all buffered data for the specified streams.

Parameters:

streams – list of (asset, stream_type) pairs.

record(parsed: ParsingCandidate, completed: bool, clear_state_on_completed: bool = False) None[source]

Stores or updates the state of a parsed file on disk. The parsed parameter must be a ParsingCandidate with an associated state dict.

factorytx.dataflow module

This module routes data from receivers to transforms and transmits.

InputStreamId

class factorytx.dataflow.InputStreamId(asset: str, stream_type: str)[source]

Identifies a statically-configured stream, ie. one that corresponds to an entry in a “streams” block in a receiver. As much as possible, statically-configured streams should be treated as opaque objects. Since the stream definition syntax may change in the future, code that uses str() to convert StreamIds to identifiers will be easier to maintain than code that directly accesses the asset and stream_type attributes.

DataflowGraph

class factorytx.dataflow.DataflowGraph(transforms: list, transmits: list)[source]

A DataflowGraph wraps a collection of transforms and transmits. It abstracts away the details of data processing: receivers only need to call graph.process(frame) to transform and transmit a dataframe.

process(input_stream_id: InputStreamId, input_frame: DataFrame) None[source]

Routes a dataframe through matching transforms and transmits.

factorytx.registry module

This module provides a way to extend FactoryTX with custom components (eg. custom receivers, transforms, and transmits.) Each component has an associated name and type by which is may be referenced in a FactoryTX configuration.

Registry

class factorytx.registry.Registry(base_class: T)[source]

Creates a registry for named subclasses of a given type.

Each class in the registry is identified by a (name, version) pair. The version may be omitted; omitted versions are interpreted as version 1.

Example:

>>> class Actuator: pass
>>> actuators = Registry(Actuator)
>>> @actuators.register('upgoer')  # Same as .register('upgoer', version=1).
... class UpGoer1(Actuator): pass
>>> @actuators.register('upgoer', version=2)
... class UpGoer2(Actuator): pass
>>> actuators.list()
[ComponentInfo(name='upgoer', version=1),
 ComponentInfo(name='upgoer', version=2)]
>>> actuators.get('upgoer')
<class 'factorytx.registry.UpGoer1'>
>>> actuators.get('upgoer', version=1)
<class 'factorytx.registry.UpGoer1'>
>>> actuators.get('upgoer', version=2)
<class 'factorytx.registry.UpGoer2'>
>>> try: actuators.get('parser')
... except Exception as e: e
MissingComponentError('There is no component named "parser"')
>>> try: actuators.get('upgoer', version=3)
... except Exception as e: e
MissingVersionError('There is no component named "upgoer" version 3')
get(name: str, version: int = 1) T[source]

Returns the class registered for the component named name with version version.

Raises:
  • MissingComponentError – if there is no component registered with the name name.

  • MissingVersionError – if there is a component registered with the name name, but no component matches both name and version.

list() List[ComponentInfo][source]

Returns a list of all registered component versions.

register(name: str, version: int = 1) Callable[[T], T][source]

Registers a component class with a given name and version.

factorytx.registry.initialize() None[source]

Imports all subpackages of factorytx plugins (packages named ‘factorytx*.*’) in order to trigger component registration.

exception factorytx.registry.MissingComponentError[source]

Thrown when Registry.get fails because there was no component with the requested name.

exception factorytx.registry.MissingVersionError[source]

Thrown when Registry.get fails because there was no component with the requested name and version, though a component with the requested name was found.

factorytx.validation module

This module contains the basic functions and types used for cleaning and validating configurations. “Cleaning” a configuration may include modifying it, eg. to adapt it to a new configuration structure or to inject default values.

Concrete validations are implemented by other modules. For example, the factorytx.config module cleans and validates entire configuration files while individual components (eg. transforms) implement the logic to clean and validate their sub-configurations.

ValidationMessage

class factorytx.validation.ValidationMessage(level: Level, path: Tuple[Union[int, str], ...], message: str)[source]

Represents a message returned when validating a configuration.

Variables:
  • level – either Level.WARNING or Level.ERROR.

  • path – ConfigPath indicating the object that this message applies to. For example, if the “asset” property of the first stream in a document had an issue then the path would be [“streams”, 0, “asset”].

  • message – message to present to the user.

ValidationWarning

factorytx.validation.ValidationWarning(path: Tuple[Union[int, str], ...], message: str) ValidationMessage[source]

ValidationError

factorytx.validation.ValidationError(path: Tuple[Union[int, str], ...], message: str) ValidationMessage[source]

clean_with_json_schema

factorytx.validation.clean_with_json_schema(schema: Dict[str, Any], instance: Dict[str, Any]) List[ValidationMessage][source]

Validates a document against a JSON schema and modifies it by injecting default values.

The document must correspond to a JSON document, i.e. it must be a tree of dicts, list, ints, bools, and numbers.

>>> schema = {
...     'type': 'object',
...     'properties': {
...         'a': {'type': 'integer', 'default': 2},
...         'b': {'type': 'string', 'minLength': 1}},
...     'required': ['b']}
>>> doc = {'b': 'hello'}
>>> clean_with_json_schema(schema, doc)
[]
>>> doc
{'b': 'hello', 'a': 2}
>>> doc = {'a': 'nope'}
>>> clean_with_json_schema(schema, doc)
[ValidationMessage(level=<Level.ERROR: 'error'>, path=('a',), message="'nope' is not of type 'integer'"),
 ValidationMessage(level=<Level.ERROR: 'error'>, path=(), message="'b' is a required property")]

has_errors

factorytx.validation.has_errors(validation_results: List[ValidationMessage]) bool[source]

Returns True if a collection of ValidationMessages contains at least one error, or False otherwise.

factorytx.markers module

The markers module provides a way to flag errors and warnings at runtime and have them surfaced to the user. Each marker includes:

  1. A log context, which specifies which component and stream a marker applies to. If context is not provided explicitly then the current thread-local log context will be used instead. See the factorytx.logs module for more information about log contexts.

  2. A category string, which is used to organize and deduplicate markers. If you try to add a marker when one already exists with the same context and category then the new marker will be ignored.

  3. A level, which represents the severity of the marker. Each marker may be either:

    • a warning, which is an issue that did not impact data collection but may harm performance or correctness; or

    • an error, ie. an issue which prevents data collection.

  4. A message to present to the user.

Markers can be added using the error and warning functions, or removed when they no longer apply using clear. Both error and warning also log their message so that the caller doesn’t need to. Finally, markers can be searched and retrieved using the query function.

Note that the clear function clears all markers that whose categories match a prefix, so it can be helpful to have categories follow a naming convention. For example, consider polling data from a SQL database. You want to capture errors that occur while running database queries, and clear the errors whenever a query succeeds. If you follow the convention that SQL marker categories begin with the prefix ‘sql:’, eg. ‘sql:connect’, ‘sql:auth’, ‘sql:query’, etc. then you can call markers.clear(‘sql:’) to clear any active SQL error markers after a successful query.

factorytx.markers.initialize(path: str) None[source]

Configures where the markers module stores information on the filesystem and sets up the module’s internal state. This must be called before client code can add, clear, or query markers.

factorytx.markers.setup_db(path: str) None[source]

Initializes a marker database on the filesytem at path if path does not already point to a valid marker database. If a marker database already exists but is corrupt then it will be deleted and reinitialized.

factorytx.markers.setup_test_db(path: str) None[source]

Creates a marker database without performing any correctness checks.

Used by the unit tests to create in-memory databases.

markers.warning

factorytx.markers.warning(category: str, message: str, context: Optional[LogContext] = None) None[source]

Creates a warning marker and logs a warning message.

Parameters:
  • category – free-form string used to categorize the marker. This should follow a naming convention to make it easier to clear related markers; see the clear method for more information.

  • message – warning message to include in the marker.

  • context – context (component and stream) to include in the marker.

markers.error

factorytx.markers.error(category: str, message: str, context: Optional[LogContext] = None, log_exception: bool = False) None[source]

Creates a error marker and logs an error message.

Parameters:
  • category – free-form string used to categorize the marker. This should follow a naming convention to make it easier to clear related markers; see the clear method for more information.

  • message – warning message to include in the marker.

  • context – context (component and stream) to include in the marker.

  • log_exception – True to include a stack trace in the log message, or False otherwise. The stack trace is never included in the marker itself.

markers.clear

factorytx.markers.clear(category_prefix: str = '', context: Optional[LogContext] = None) None[source]

Clears markers in the selected context which have a category that starts with category_prefix.

Parameters:
  • category_prefix – prefix categories of markers to remove. For example, if you pass ‘sql’ then markers categorized as ‘sqlalchemy:error’ and ‘sql:query’ will be deleted, but ‘postgresql:auth’ will not. If no prefix is specified then all markers in the selected context are deleted.

  • context – context (component and stream) of the markers to delete. The context must match exactly. If no context is specified then the current thread’s log context is used.

markers.clear_components

factorytx.markers.clear_components(components: List[str]) None[source]

Clears markers based on a list of component names. Please note that any component names that aren’t configured will be ignored.

factorytx.supervisor module

This module provides services to automatically restart threads and processes that die.

Supervisor

class factorytx.supervisor.Supervisor(run_method: RunMethod, targets: List[Target])[source]

A Supervisor supervises a collection of callables, running each callable in its own thread or process. Each callable will be restarted if it terminates; callables which terminate too frequently will wait longer to restart to avoid overloading the system.

__init__(run_method: RunMethod, targets: List[Target]) None[source]

Instantiates a supervisor object.

Parameters:
  • run_method – how to run targets. Please refer to options below.

  • targets – list of callables, each of which will be run in its own thread / process. Each callable has an associated log context which is used to log messages if the callable terminates unexpectedly.

Options for running a method:

  • RunMethod.DUMMY_PROCESSES: run threads in the current process but simulate running in another process;

  • RunMethod.THREADS: run threads in the current process; or

  • RunMethod.PROCESSES: run each target in a separate process.

run() None[source]

Runs forever, restarting any targets that terminate.

factorytx.utils module

This module contains various helper functions that can be used when creating custom FactoryTX components.

Records the duration of a block of code in the field duration.

factorytx.utils.add_columns_if_absent(frame: DataFrame, values: Dict[str, Any]) None[source]

Adds columns to a dataframe if they are not already present.

>>> df = DataFrame({"a": [1, 2, 3], "d": [3, 5, 8]})
>>> add_columns_if_absent(df, {"a": 0, "b": 1, "c": 2})
>>> df
   a  d  b  c
0  1  3  1  2
1  2  5  1  2
2  3  8  1  2
factorytx.utils.aware_utcnow() datetime[source]

python 3.12 replacement for dt.utcnow()

factorytx.utils.class_name(typ: Type) str[source]

Given a Class/Type, determine the module reference for that type

factorytx.utils.data_vol_statvfs() statvfs_result[source]

Returns the statvfs information for the data volume. Unix platforms only.

f_bsize − preferred file system block size. f_frsize − fundamental file system block size. f_blocks − total number of blocks in the filesystem. f_bfree − total number of free blocks. f_bavail − free blocks available to non-super user. f_files − total number of file nodes. f_ffree − total number of free file nodes. f_favail − free nodes available to non-super user. f_flag − system dependent. f_namemax − maximum file name length.

factorytx.utils.dataframe_cdc(current_state_df: DataFrame, new_data_df: DataFrame, ignored_columns_for_diff: Optional[List[str]] = None) DataFrame[source]

Returns the difference between two DataFrames (Change Data Capture). Meant to compare an existing df state to a new df and return only the new data. If ‘ignored_columns_for_diff’ is set, those columns will be ignored when diffing the data, but will be included in the final returned df (using values from new_data_df).

If the column sets are not matching, new_data_df will be returned.

factorytx.utils.ensure_dir_exists(dirname: str, exist_ok: bool = True) None[source]

Creates the directory dirname if it doesn’t already exist.

factorytx.utils.get_component_path(component_name: str) str[source]

Returns the filesystem location that holds data for the named component.

factorytx.utils.get_device_environment() str[source]

Which environment FTX thinks it’s running in Should be one of: unknown, mase, balena, iotedge

factorytx.utils.group_frame_by_stream(frame: DataFrame) List[Tuple[Tuple[str, str], DataFrame]][source]

Splits a DataFrame into list of per-stream frames. Returns data in the format [((asset, stream type), frame), …].

factorytx.utils.grouped(values: Iterable[A], key: Callable[[A], B]) Iterable[Tuple[B, Iterable[A]]][source]

Returns an iterable of values grouped by the specific key. Within each group the ordering if the original argument is preserved. This function is more analogous to the GROUP BY operator in SQL than itertools.groupby.

>>> grouped([1.0, 1.3, 1.9, 0.9, 2.1, 1.1], key=round)
[(1, [1.0, 1.3, 0.9, 1.1]), (2, [1.9, 2.1])]
factorytx.utils.is_nested_iotedge_child_device() bool[source]

Returns True if the device is a child device of an IoT Edge device.

factorytx.utils.json_normalize_to_dataframe(data: Dict, record_path: Optional[List[str]] = None, meta: Optional[Iterable[Union[str, List[str]]]] = None, meta_prefix: Optional[str] = None, record_prefix: Optional[str] = None, errors: str = 'raise', sep: str = '.') DataFrame[source]

Copied from pandas to fix a bug in the original implementation in 0.23.4 Specifically the 2 lines: ``` if isinstance(data, dict):

data = [data]

```

# TODO: This can be removed if we ever update pandas

factorytx.utils.jsondecode(data: Union[str, bytes]) Any[source]

Converts JSON to a tree of objects. Object key order is preserved.

>>> jsondecode('{"a": 1, "c": 2, "b": "2018-01-01"}')
{'a': 1, 'c': 2, 'b': '2018-01-01'}
factorytx.utils.jsonencode(obj: Any) str[source]

Converts a tree of objects to JSON. Dict key order is preserved.

>>> jsonencode({"a": 1, "c": 2, "b": date(2018, 1, 1)})
'{"a": 1, "c": 2, "b": "2018-01-01"}'
factorytx.utils.kafka_topic_with_suffix(tenant: str, suffix: str) str[source]

Produces generated topic string for Kafka

Parameters:
  • tenant – initial namespace for topic,

  • suffix – a string that will serve as the topic suffix

Returns:

topic name

factorytx.utils.lookahead(iterable: Iterable[A], default: Optional[Union[A, B]] = None) Iterator[Tuple[A, Union[A, B]]][source]

Yields a pair (elt, next_elt) for every elt in iterable.

If elt is the last element then next_elt is default; otherwise it is the next element in the iterable.

#>>> list(lookahead([1, 2, 3])) [(1, 2), (2, 3), (3, None)] #>>> list(lookahead([1, 2], default=0)) [(1, 2), (2, 0)] #>>> list(lookahead([])) []

factorytx.utils.make_guid() bytes[source]

Returns a short globally-unique ASCII string.

factorytx.utils.overwrite_atomic(path: str, data: bytes, mode: int = 384) None[source]

Atomically replaces the file at path with a new one containing data. If power is lost during the write or the system crashes then the file will contain either the old contents, or the new contents. It will never contain corrupt or truncated data.

factorytx.utils.safe_path_join(base, *parts)[source]

Constructs a path by joining one or more path components. Unlike os.path.join, a leading slash in a trailing component does not cause previous components to be discarded. >>> os.path.join(‘/a’, ‘b’, ‘/c’) ‘/c’ >>> safe_path_join(‘/a’, ‘b’, ‘/c’) ‘/a/b/c’

factorytx.utils.sanitize_column_names(frame: DataFrame) DataFrame[source]

Sanitizes column names into valid mongo field names. Columns names that can’t be sanitized without conflicting with existing names will be dropped.

factorytx.utils.save_dataframe_to_disk(df: DataFrame, save_path: str) None[source]

Serialize a DataFrame to disk in the parquet format.

factorytx.utils.timedelta_str_to_sec(timedelta_str: str) int[source]

Convert time delta strings to seconds. For e.g. for “1m”, this will return 60 :param timedelta_str: Time delta string :return: Number of seconds (int)

factorytx.utils.to_unix_epoch(dt: datetime) float[source]

Converts a timezone-aware datetime to a unix epoch value.

>>> tz = pytz.timezone('America/Los_Angeles')
>>> dt = tz.localize(datetime(2018, 10, 19, 10, 38, 2, 208000))
>>> to_unix_epoch(dt)
1539970682.208

factorytx.test_utils module

This module includes helper functions for writing unit tests for FactoryTX components.

normalize_frame

factorytx.test_utils.normalize_frame(df: DataFrame) List[List[tuple]][source]

Converts a DataFrame to a normalized form for comparisons.

dataframe_to_csv

factorytx.test_utils.dataframe_to_csv(df: DataFrame, remove_newlines: bool = False) str[source]

Converts a DataFrame to a CSV for assertions. Columns are written in lexicographic order.

csv_string_to_dataframe

factorytx.test_utils.csv_string_to_dataframe(multiline_string: str) StringIO[source]

Converts the CSV string buffer into a Pandas DataFrame.

load_transform

factorytx.test_utils.load_transform(config: Dict) Transform[source]

From the configuration, retrieve the transform object from the Registry, validate the configuration, and return the configured transform.

factorytx.snapshot_test_utils module

This module contains helper functions for snapshot (or gold master) testing FactoryTX transforms. A snapshot test takes CSV data and processes it with the specified transform, comparing the output with that of a previous run.

Example:

from factorytx.snapshot_test_utils import compare_with_snapshot
from factorytx.test_utils import (
    csv_string_to_dataframe,
    load_transform,
)


CONFIG = {
    'transform_type': 'convert_timestamps',
    'transform_name': 'Time Changing Transformer',
    'filter_stream': ['*'],
    'field_names': ["TS2", "TS4"],
    'timezone': "Asia/Taipei"
}
INPUT_DATA = """    TS1,TS2,col_3,TS4,description
2018-02-16 18:02:01,2018-02-16 18:02:01,1.1,2018-02-16 18:02:01,date 1
2018-05-27 01:12:01,2018-05-27 01:12:01,2.2,2018-05-27 01:12:01,date 2
2018-05-29 12:02:12,2018-05-29 12:02:12,3.3,2018-05-29 12:02:12,date 3
2018-10-29 00:00:29,2018-10-29 00:00:29,4.4,2018-10-29 00:00:29,date 4
2018-12-31 19:19:19,2018-12-31 19:19:19,5.5,2018-12-31 19:19:19,date 5
2019-07-04 15:07:04,2019-07-04 15:07:04,6.6,2019-07-04 15:07:04,date 6
"""


def test_snapshot_rename_transform(capsys, snapshot):
    input_df = csv_string_to_dataframe(INPUT_DATA)
    transform = load_transform(CONFIG)
    transformed_df = transform.process(input_df)

    with capsys.disabled():
        compare_with_snapshot(transformed_df, snapshot, [])

Anatomy of a snapshot test:

  • CONFIG: a dictionary-type variable that has key-value pairs necessary for configuring the transform. The configuration is based on the transform’s schema.

  • INPUT_DATA: CSV data that will be processed by the transform. The data can come in the form of a multiline Python string or a CSV file saved in a directory. It is suggested to only save data as a CSV file if the transform needs to process a large amount of data. For an example of a snapshot test using a CSV file, please refer to the snapshot test for the Rename transform.

  • The test function should pass in capsys and snapshot as parameters.

    • snapshot for reading and writing snapshot data

    • capsys needs to be disabled so differences between the current output and snapshot are formatted correctly in the Terminal.

  • csv_string_to_dataframe(): Converts the CSV string buffer into a Pandas DataFrame for the transform to process.

  • load_transform(): Loads the FactoryTX transform based on the provided configuration.

  • compare_with_snapshot(): Compares the output from the transform processing the data with the saved snapshot.

To run snapshot tests, please use Pytest:

pytest tests/factorytx/transforms/test_snapshot_rename.py

# To update a snapshot, add `--snapshot-update` to the command
pytest tests/factorytx/transforms/test_snapshot_rename.py --snapshot-update

NOTE: When a test is run for the first time, the snapshot will be automatically created. Snapshots will be saved in the tests/…/transforms/snapshots directory.

compare_with_snapshot

factorytx.snapshot_test_utils.compare_with_snapshot(input_df: DataFrame, snapshot: SnapshotTest, index: List = []) None[source]

Transforms a provided DataFrame and compares it with test’s saved snapshot.

Parameters:
  • input_df – DataFrame converted from a CSV file

  • transform – FTX Transform to apply on the DataFrame

  • snapshot – A SnapshotTest that can read and write a saved Snapshot

  • index – Table columns used for how the table is displayed

format_table

factorytx.snapshot_test_utils.format_table(table: List[List[Any]]) str[source]

Convert tabular data to an aligned quasi-CSV format.

load_table

factorytx.snapshot_test_utils.load_table(s: str) List[List[Any]][source]

Convert an aligned table from format_table back to an encoded table.