"""This module routes data from receivers to transforms and transmits."""
import collections
import fnmatch
import logging
import re
import time
from typing import Any, List, Set, Tuple
import pandas as pd
from factorytx import const
from factorytx.exceptions import Failure
from factorytx.logs import push_context
from factorytx import markers
log = logging.getLogger(__name__)
def make_stream_id(asset: str, stream_type: str) -> str:
stream_id = f'{asset}:{stream_type}'
return stream_id
def stream_matches(stream_id: str, filter_stream: List[str]) -> bool:
matched = any(fnmatch.fnmatch(stream_id, f) for f in filter_stream)
return matched
[docs]class DataflowGraph:
"""A DataflowGraph wraps a collection of transforms and transmits. It
abstracts away the details of data processing: receivers only need to
call `graph.process(frame)` to transform and transmit a dataframe.
"""
# We use Any instead Transform or Transmit classes to avoid circular dependencies.
def __init__(self, transforms: List[Any], transmits: List[Any]) -> None:
self.transforms = transforms
self.transmits = transmits
def _partition_frames(self, filter_stream: List[str], frames: List[pd.DataFrame]) -> Tuple[List[pd.DataFrame], List[pd.DataFrame]]:
"""Partitions a collection of dataframes based on the `filter_stream`.
Returns two lists of dataframes: those containing rows that match the
stream pattern, and those that don't. Frames that contain a mix of
matching and non-matching data frames will be split.
"""
matching_frames: List[pd.DataFrame] = []
other_frames: List[pd.DataFrame] = []
regexes: List[str] = [fnmatch.translate(f) for f in filter_stream]
regex = re.compile('|'.join(regexes))
for frame in frames:
column_names: Set[str] = set(frame.columns)
row_count: int = frame.shape[0]
if row_count == 0:
# Skip empty frames early to avoid spurious warnings about
# missing asset or stream_type columns.
continue
# Since we route by asset and stream type they must not be missing.
# Either may be null -- rows with null assets or stream types may be
# dropped by the transmit if necessary.
if const.ASSET_COLNAME not in column_names:
msg = f'Dropped {row_count} records that had no asset column.'
markers.warning('dataflow:asset', msg)
elif const.STREAM_TYPE_COLNAME not in column_names:
msg = f'Dropped {row_count} records that had no stream_type column.'
markers.warning('dataflow:stream_type', msg)
elif row_count == 1:
# Don't bother partitioning the frame if there's only one row.
# This reduces the overhead of processing very small frames.
ix = frame.index[0]
asset = frame.at[ix, const.ASSET_COLNAME]
stream_type = frame.at[ix, const.STREAM_TYPE_COLNAME]
stream_id = make_stream_id(asset, stream_type)
if stream_matches(stream_id, filter_stream):
matching_frames.append(frame)
else:
other_frames.append(frame)
else:
# Inline stream_id formatting and matching and apply it to the
# whole dataframe at once.
assets = frame[const.ASSET_COLNAME]
stream_types = frame[const.STREAM_TYPE_COLNAME]
stream_ids = assets.str.cat(stream_types, sep=':')
matched_series = stream_ids.str.match(regex)
matched_values = list(matched_series.unique())
if len(matched_values) == 1:
# Avoid an expensive groupby call if possible.
grouped = [(matched_values[0], frame)]
else:
grouped = frame.groupby(by=matched_series)
for matched, subframe in grouped:
if matched:
matching_frames.append(subframe)
else:
other_frames.append(subframe)
return matching_frames, other_frames
def _validate_frame(self, frame: pd.DataFrame, producer: str) -> None:
"""Performs any checks required to ensure that DataFrames will not
cause errors later stages of the pipeline.
:raises: Failure if the dataframe is not valid
"""
# Make sure that no two columns in the dataframe have the same name.
# Otherwise when we convert the columns to a dict in remotedatapost
# all but one of the columns will be silently dropped.
counts = collections.Counter(frame.columns)
duplicates = [(k, v) for k, v in counts.items() if v > 1]
if duplicates:
# Report an example column name. Other duplicated column names were
# probably caused by the same bug.
column_name, count = duplicates[0]
msg = (f'There were {count} columns named "{column_name}" in the '
f'result. This means there is an error in {producer}!')
markers.error('dataflow:duplicate-columns', msg)
raise Failure
def _transform_frame(self, input_frame: pd.DataFrame) -> List[pd.DataFrame]:
frames: List[pd.DataFrame] = [input_frame]
for transform in self.transforms:
matching_frames: List[pd.DataFrame]
new_frames: List[pd.DataFrame]
matching_frames, new_frames = self._partition_frames(transform.filter_stream, frames)
for frame in matching_frames:
with push_context(component=transform.name, component_type=transform.component_type):
try:
log.info("Transforming %s records.", frame.shape[0])
new_frame = transform.process(frame)
except Failure:
raise
except Exception as e:
markers.error("dataflow:error", f'Unexpected exception: {e!s}',
log_exception=True)
raise Failure
log.info("Got back %d records.", new_frame.shape[0])
self._validate_frame(new_frame, f'the "{transform.name}" transform')
new_frames.append(new_frame)
frames = new_frames
return frames
def _transmit_frames(self, input_stream_id: InputStreamId, frames: List[pd.DataFrame]) -> None:
# Group together the frames that will be sent through each transmit.
# This provides a significant speedup in some scenarios, eg. processing
# one record per asset with a significant number of assets.
frame_groups: List[List[pd.DataFrame]] = []
for transmit in self.transmits:
matching_frames, frames = self._partition_frames(transmit.filter_stream, frames)
frame_groups.append(matching_frames)
if frames: # Warn about any dropped frames.
for frame in frames:
grouped = frame.groupby(by=[const.ASSET_COLNAME, const.STREAM_TYPE_COLNAME])
for (asset, stream_type), subframe in grouped:
stream_id = make_stream_id(asset, stream_type)
msg = (f'{subframe.shape[0]} rows from the stream {stream_id} '
f'did not match a transmit and were dropped')
markers.warning('dataflow:dropped', msg)
for transmit, frame_group in zip(self.transmits, frame_groups):
if not frame_group: # Skip transmits that have no data.
continue
if len(frame_group) > 1:
frame = pd.concat(frame_group, sort=False)
else:
frame = frame_group[0]
with push_context(component=transmit.name, component_type=transmit.component_type):
try:
log.info("Transmitting %d records.", frame.shape[0])
transmit.process(input_stream_id, frame)
except Failure:
raise
except Exception as e:
markers.error("dataflow:error", f'Unexpected exception: {e!s}',
log_exception=True)
raise Failure
log.info("Consumed %d records.", frame.shape[0])
[docs] def process(self, input_stream_id: InputStreamId, input_frame: pd.DataFrame) -> None:
"""Routes a dataframe through matching transforms and transmits."""
log.info("Received %d records.", input_frame.shape[0])
start_time = time.monotonic()
self._validate_frame(input_frame, 'the receiver')
frames = self._transform_frame(input_frame)
self._transmit_frames(input_stream_id, frames)
duration = time.monotonic() - start_time
log.info('Processed %d rows in %.3f seconds.', input_frame.shape[0], duration)