Source code for factorytx.dataflow

"""This module routes data from receivers to transforms and transmits."""

import collections
import fnmatch
import logging
import re
import time
from typing import Any, List, Set, Tuple, Counter

import pandas as pd

from factorytx import const
from factorytx.exceptions import Failure
from factorytx.logs import push_context
from factorytx import markers

log = logging.getLogger(__name__)


[docs]class InputStreamId: """Identifies a statically-configured stream, ie. one that corresponds to an entry in a "streams" block in a receiver. As much as possible, statically-configured streams should be treated as opaque objects. Since the stream definition syntax may change in the future, code that uses str() to convert StreamIds to identifiers will be easier to maintain than code that directly accesses the `asset` and `stream_type` attributes. """ def __init__(self, asset: str, stream_type: str) -> None: self.asset = asset self.stream_type = stream_type def __eq__(self, other: Any) -> bool: return (isinstance(other, InputStreamId) and self.asset == other.asset and self.stream_type == other.stream_type) def __hash__(self) -> int: return hash((self.asset, self.stream_type)) def __str__(self) -> str: return make_stream_id(self.asset, self.stream_type)
def make_stream_id(asset: str, stream_type: str) -> str: stream_id = f'{asset}:{stream_type}' return stream_id def stream_matches(stream_id: str, filter_stream: List[str]) -> bool: matched = any(fnmatch.fnmatch(stream_id, f) for f in filter_stream) return matched
[docs]class DataflowGraph: """A DataflowGraph wraps a collection of transforms and transmits. It abstracts away the details of data processing: receivers only need to call `graph.process(frame)` to transform and transmit a dataframe. """ # We use Any instead Transform or Transmit classes to avoid circular dependencies. def __init__(self, transforms: List[Any], transmits: List[Any]) -> None: self.transforms = transforms self.transmits = transmits def _partition_frames(self, filter_stream: List[str], frames: List[pd.DataFrame]) -> Tuple[List[pd.DataFrame], List[pd.DataFrame]]: """Partitions a collection of dataframes based on the `filter_stream`. Returns two lists of dataframes: those containing rows that match the stream pattern, and those that don't. Frames that contain a mix of matching and non-matching data frames will be split. """ matching_frames: List[pd.DataFrame] = [] other_frames: List[pd.DataFrame] = [] regexes: List[str] = [fnmatch.translate(f) for f in filter_stream] regex = re.compile('|'.join(regexes)) for frame in frames: column_names: Set[str] = set(frame.columns) row_count: int = frame.shape[0] if row_count == 0: # Skip empty frames early to avoid spurious warnings about # missing asset or stream_type columns. continue # Since we route by asset and stream type they must not be missing. # Either may be null -- rows with null assets or stream types may be # dropped by the transmit if necessary. if const.ASSET_COLNAME not in column_names: msg = f'Dropped {row_count} records that had no asset column.' markers.warning('dataflow:asset', msg) elif const.STREAM_TYPE_COLNAME not in column_names: msg = f'Dropped {row_count} records that had no stream_type column.' markers.warning('dataflow:stream_type', msg) elif row_count == 1: # Don't bother partitioning the frame if there's only one row. # This reduces the overhead of processing very small frames. ix = frame.index[0] asset = frame.at[ix, const.ASSET_COLNAME] stream_type = frame.at[ix, const.STREAM_TYPE_COLNAME] stream_id = make_stream_id(asset, stream_type) if stream_matches(stream_id, filter_stream): matching_frames.append(frame) else: other_frames.append(frame) else: # Inline stream_id formatting and matching and apply it to the # whole dataframe at once. assets = frame[const.ASSET_COLNAME] stream_types = frame[const.STREAM_TYPE_COLNAME] stream_ids = assets.str.cat(stream_types, sep=':') matched_series = stream_ids.str.match(regex) matched_values = list(matched_series.unique()) if len(matched_values) == 1: # Avoid an expensive groupby call if possible. grouped = [(matched_values[0], frame)] else: grouped = frame.groupby(by=matched_series) for matched, subframe in grouped: if matched: matching_frames.append(subframe) else: other_frames.append(subframe) return matching_frames, other_frames def _validate_frame(self, frame: pd.DataFrame, producer: str) -> None: """Performs any checks required to ensure that DataFrames will not cause errors later stages of the pipeline. :raises: Failure if the dataframe is not valid """ # Make sure that no two columns in the dataframe have the same name. # Otherwise when we convert the columns to a dict in remotedatapost # all but one of the columns will be silently dropped. counts: Counter = collections.Counter(frame.columns) duplicates = [(k, v) for k, v in counts.items() if v > 1] if duplicates: # Report an example column name. Other duplicated column names were # probably caused by the same bug. column_name, count = duplicates[0] msg = (f'There were {count} columns named "{column_name}" in the ' f'result. This means there is an error in {producer}!') markers.error('dataflow:duplicate-columns', msg) raise Failure def _transform_frame(self, input_frame: pd.DataFrame) -> List[pd.DataFrame]: frames: List[pd.DataFrame] = [input_frame] for transform in self.transforms: matching_frames: List[pd.DataFrame] new_frames: List[pd.DataFrame] matching_frames, new_frames = self._partition_frames(transform.filter_stream, frames) for frame in matching_frames: with push_context(component=transform.name, component_type=transform.component_type): try: log.info("Transforming %s records.", frame.shape[0]) new_frame = transform.process(frame) except Failure: raise except Exception as e: markers.error("dataflow:error", f'Unexpected exception: {e!s}', log_exception=True) raise Failure log.info("Got back %d records.", new_frame.shape[0]) self._validate_frame(new_frame, f'the "{transform.name}" transform') new_frames.append(new_frame) frames = new_frames return frames def _transmit_frames(self, input_stream_id: InputStreamId, frames: List[pd.DataFrame]) -> None: # Group together the frames that will be sent through each transmit. # This provides a significant speedup in some scenarios, eg. processing # one record per asset with a significant number of assets. frame_groups: List[List[pd.DataFrame]] = [] for transmit in self.transmits: matching_frames, frames = self._partition_frames(transmit.filter_stream, frames) frame_groups.append(matching_frames) if frames: # Warn about any dropped frames. for frame in frames: grouped = frame.groupby(by=[const.ASSET_COLNAME, const.STREAM_TYPE_COLNAME]) for (asset, stream_type), subframe in grouped: stream_id = make_stream_id(asset, stream_type) msg = (f'{subframe.shape[0]} rows from the stream {stream_id} ' f'did not match a transmit and were dropped') markers.warning('dataflow:dropped', msg) for transmit, frame_group in zip(self.transmits, frame_groups): if not frame_group: # Skip transmits that have no data. continue if len(frame_group) > 1: frame = pd.concat(frame_group, sort=False) else: frame = frame_group[0] with push_context(component=transmit.name, component_type=transmit.component_type): try: log.info("Transmitting %d records.", frame.shape[0]) transmit.process(input_stream_id, frame) except Failure: raise except Exception as e: markers.error("dataflow:error", f'Unexpected exception: {e!s}', log_exception=True) raise Failure log.info("Consumed %d records.", frame.shape[0])
[docs] def process(self, input_stream_id: InputStreamId, input_frame: pd.DataFrame) -> None: """Routes a dataframe through matching transforms and transmits.""" log.info("Received %d records.", input_frame.shape[0]) start_time = time.monotonic() self._validate_frame(input_frame, 'the receiver') frames = self._transform_frame(input_frame) self._transmit_frames(input_stream_id, frames) duration = time.monotonic() - start_time log.info('Processed %d rows in %.3f seconds.', input_frame.shape[0], duration)