"""This module routes data from receivers to transforms and transmits."""
import collections
import fnmatch
import logging
import os
import re
import time
from datetime import datetime as dt, timezone
from typing import Any, List, Set, Tuple, Counter
import pandas as pd
from factorytx import const
from factorytx import markers
from factorytx.exceptions import Failure
from factorytx.logs import push_context
from factorytx.utils import save_dataframe_to_disk, safe_path_join, ensure_dir_exists
log = logging.getLogger(__name__)
def make_stream_id(asset: str, stream_type: str) -> str:
stream_id = f'{asset}:{stream_type}'
return stream_id
def stream_matches(stream_id: str, filter_stream: List[str]) -> bool:
matched = any(fnmatch.fnmatch(stream_id, f) for f in filter_stream)
return matched
class DataflowGraph:
"""A DataflowGraph wraps a collection of transforms and transmits. It
abstracts away the details of data processing: receivers only need to
call `graph.process(frame)` to transform and transmit a dataframe.
# Avoid circular dependencies, these two modules cannot be easily split out without a lot of refactoring right now.
def __init__(self, transforms: list, transmits: list) -> None:
from factorytx.base import Transform, Transmit
self.transforms: List[Transform] = transforms
self.transmits: List[Transmit] = transmits
def _partition_frames(self, filter_stream: List[str], frames: List[pd.DataFrame]) -> Tuple[List[pd.DataFrame], List[pd.DataFrame]]:
"""Partitions a collection of dataframes based on the `filter_stream`.
Returns two lists of dataframes: those containing rows that match the
stream pattern, and those that don't. Frames that contain a mix of
matching and non-matching data frames will be split.
matching_frames: List[pd.DataFrame] = []
other_frames: List[pd.DataFrame] = []
regexes: List[str] = [fnmatch.translate(f) for f in filter_stream]
regex = re.compile('|'.join(regexes))
for frame in frames:
column_names: Set[str] = set(frame.columns)
row_count: int = frame.shape[0]
if row_count == 0:
# Skip empty frames early to avoid spurious warnings about
# missing asset or stream_type columns.
# Since we route by asset and stream type they must not be missing.
# Either may be null -- rows with null assets or stream types may be
# dropped by the transmit if necessary.
if const.ASSET_COLNAME not in column_names:
msg = f'Dropped {row_count} records that had no asset column.'
markers.warning('dataflow:asset', msg)
elif const.STREAM_TYPE_COLNAME not in column_names:
msg = f'Dropped {row_count} records that had no stream_type column.'
markers.warning('dataflow:stream_type', msg)
elif row_count == 1:
# Don't bother partitioning the frame if there's only one row.
# This reduces the overhead of processing very small frames.
ix = frame.index[0]
asset = frame.at[ix, const.ASSET_COLNAME]
stream_type = frame.at[ix, const.STREAM_TYPE_COLNAME]
stream_id = make_stream_id(asset, stream_type)
if stream_matches(stream_id, filter_stream):
# Inline stream_id formatting and matching and apply it to the
# whole dataframe at once.
assets = frame[const.ASSET_COLNAME]
stream_types = frame[const.STREAM_TYPE_COLNAME]
stream_ids = assets.str.cat(stream_types, sep=':')
matched_series = stream_ids.str.match(regex)
matched_values = list(matched_series.unique())
if len(matched_values) == 1:
# Avoid an expensive groupby call if possible.
grouped = [(matched_values[0], frame)]
grouped = frame.groupby(by=matched_series)
for matched, subframe in grouped:
if matched:
return matching_frames, other_frames
def _validate_frame(self, frame: pd.DataFrame, producer: str) -> None:
"""Performs any checks required to ensure that DataFrames will not
cause errors later stages of the pipeline.
:raises: Failure if the dataframe is not valid
# Make sure that no two columns in the dataframe have the same name.
# Otherwise when we convert the columns to a dict in remotedatapost
# all but one of the columns will be silently dropped.
counts: Counter = collections.Counter(frame.columns)
duplicates = [(k, v) for k, v in counts.items() if v > 1]
if duplicates:
# Report an example column name. Other duplicated column names were
# probably caused by the same bug.
column_name, count = duplicates[0]
msg = (f'There were {count} columns named "{column_name}" in the '
f'result. This means there is an error in {producer}!')
markers.error('dataflow:duplicate-columns', msg)
raise Failure
def _transform_frame(
input_frame: pd.DataFrame,
save_failed_frames: bool = False,
) -> List[pd.DataFrame]:
frames: List[pd.DataFrame] = [input_frame]
for transform in self.transforms:
matching_frames: List[pd.DataFrame]
new_frames: List[pd.DataFrame]
matching_frames, new_frames = self._partition_frames(transform.filter_stream, frames)
for frame in matching_frames:
with push_context(component=transform.name, component_type=transform.component_type):
transform_data_path = safe_path_join(
saved_frame_msg = f'Failed to transform data. Data saved to {transform_data_path}.'
log.info("Transforming %s records.", frame.shape[0])
new_frame = transform.process(frame)
except Failure:
# If the exception is 'Failure' it is expected to have logged a marker error already
log.error(f"Transform chain: {[t.name for t in self.transforms]}, failed at: {transform.name}")
self._handle_failed_frame(input_frame, transform_data_path, saved_frame_msg, save_failed_frames)
except Exception as e:
log.error(f"Transform chain: {[t.name for t in self.transforms]}, failed at: {transform.name}")
self._handle_failed_frame(input_frame, transform_data_path, saved_frame_msg, save_failed_frames)
f'Unexpected exception: {e!s}',
raise Failure
log.info("Got back %d records.", new_frame.shape[0])
self._validate_frame(new_frame, f'the "{transform.name}" transform')
frames = new_frames
return frames
def _transmit_frames(
input_stream_id: InputStreamId,
frames: List[pd.DataFrame],
save_failed_frames: bool = False,
) -> None:
# Group together the frames that will be sent through each transmit.
# This provides a significant speedup in some scenarios, eg. processing
# one record per asset with a significant number of assets.
frame_groups: List[List[pd.DataFrame]] = []
for transmit in self.transmits:
matching_frames, frames = self._partition_frames(transmit.filter_stream, frames)
if frames: # Warn about any dropped frames.
for frame in frames:
grouped = frame.groupby(by=[const.ASSET_COLNAME, const.STREAM_TYPE_COLNAME])
for (asset, stream_type), subframe in grouped:
stream_id = make_stream_id(asset, stream_type)
msg = (f'{subframe.shape[0]} rows from the stream {stream_id} '
f'did not match a transmit and were dropped')
markers.warning('dataflow:dropped', msg)
for transmit, frame_group in zip(self.transmits, frame_groups):
if not frame_group: # Skip transmits that have no data.
if len(frame_group) > 1:
frame = pd.concat(frame_group, sort=False)
frame = frame_group[0]
with push_context(component=transmit.name, component_type=transmit.component_type):
transmit_data_path = safe_path_join(
saved_frame_msg = f'Failed to load data into transmit buffer. Data saved to {transmit_data_path}.'
log.info("Transmitting %d records.", frame.shape[0])
transmit.process(input_stream_id, frame)
except Failure:
# If the exception is 'Failure' it is expected to have logged a marker error already
self._handle_failed_frame(frame, transmit_data_path, saved_frame_msg, save_failed_frames)
except Exception as e:
self._handle_failed_frame(frame, transmit_data_path, saved_frame_msg, save_failed_frames)
f'Unexpected exception: {e!s}',
raise Failure
log.info("Consumed %d records.", frame.shape[0])
def _handle_failed_frame(
frame: pd.DataFrame,
save_data_path: str,
error_msg: str,
save_failed_frames: bool = False,
) -> None:
if not save_failed_frames:
save_dataframe_to_disk(frame, save_data_path)
def process(
input_stream_id: InputStreamId,
input_frame: pd.DataFrame,
save_failed_frames: bool = False,
) -> None:
Routes a dataframe through matching transforms and transmits.
:param bool save_failed_frames: will save the failed transformed/transmitted frames to disk if set to True
log.info("Received %d records.", input_frame.shape[0])
start_time = time.monotonic()
self._validate_frame(input_frame, 'the receiver')
frames = self._transform_frame(input_frame, save_failed_frames)
self._transmit_frames(input_stream_id, frames, save_failed_frames)
duration = time.monotonic() - start_time
log.info('Processed %d rows in %.3f seconds.', input_frame.shape[0], duration)