Source code for factorytx.dataflow

"""This module routes data from receivers to transforms and transmits."""

import collections
import fnmatch
import logging
import os
import re
import time
from datetime import datetime as dt, timezone
from typing import Any, List, Set, Tuple, Counter

import pandas as pd

from factorytx import const
from factorytx import markers
from factorytx.exceptions import Failure
from factorytx.logs import push_context
from factorytx.utils import save_dataframe_to_disk, safe_path_join, ensure_dir_exists

log = logging.getLogger(__name__)


[docs]class InputStreamId: """Identifies a statically-configured stream, ie. one that corresponds to an entry in a "streams" block in a receiver. As much as possible, statically-configured streams should be treated as opaque objects. Since the stream definition syntax may change in the future, code that uses str() to convert StreamIds to identifiers will be easier to maintain than code that directly accesses the `asset` and `stream_type` attributes. """ def __init__(self, asset: str, stream_type: str) -> None: self.asset = asset self.stream_type = stream_type def __eq__(self, other: Any) -> bool: return (isinstance(other, InputStreamId) and self.asset == other.asset and self.stream_type == other.stream_type) def __hash__(self) -> int: return hash((self.asset, self.stream_type)) def __str__(self) -> str: return make_stream_id(self.asset, self.stream_type)
def make_stream_id(asset: str, stream_type: str) -> str: stream_id = f'{asset}:{stream_type}' return stream_id def stream_matches(stream_id: str, filter_stream: List[str]) -> bool: matched = any(fnmatch.fnmatch(stream_id, f) for f in filter_stream) return matched
[docs]class DataflowGraph: """A DataflowGraph wraps a collection of transforms and transmits. It abstracts away the details of data processing: receivers only need to call `graph.process(frame)` to transform and transmit a dataframe. """ # Avoid circular dependencies, these two modules cannot be easily split out without a lot of refactoring right now. def __init__(self, transforms: list, transmits: list) -> None: from factorytx.base import Transform, Transmit self.transforms: List[Transform] = transforms self.transmits: List[Transmit] = transmits self.save_failed_frames: bool = False # This only needs to be enabled for non-historical data sources def _partition_frames(self, filter_stream: List[str], frames: List[pd.DataFrame]) -> Tuple[List[pd.DataFrame], List[pd.DataFrame]]: """Partitions a collection of dataframes based on the `filter_stream`. Returns two lists of dataframes: those containing rows that match the stream pattern, and those that don't. Frames that contain a mix of matching and non-matching data frames will be split. """ matching_frames: List[pd.DataFrame] = [] other_frames: List[pd.DataFrame] = [] regexes: List[str] = [fnmatch.translate(f) for f in filter_stream] regex = re.compile('|'.join(regexes)) for frame in frames: column_names: Set[str] = set(frame.columns) row_count: int = frame.shape[0] if row_count == 0: # Skip empty frames early to avoid spurious warnings about # missing asset or stream_type columns. continue # Since we route by asset and stream type they must not be missing. # Either may be null -- rows with null assets or stream types may be # dropped by the transmit if necessary. if const.ASSET_COLNAME not in column_names: msg = f'Dropped {row_count} records that had no asset column.' markers.warning('dataflow:asset', msg) elif const.STREAM_TYPE_COLNAME not in column_names: msg = f'Dropped {row_count} records that had no stream_type column.' markers.warning('dataflow:stream_type', msg) elif row_count == 1: # Don't bother partitioning the frame if there's only one row. # This reduces the overhead of processing very small frames. ix = frame.index[0] asset = frame.at[ix, const.ASSET_COLNAME] stream_type = frame.at[ix, const.STREAM_TYPE_COLNAME] stream_id = make_stream_id(asset, stream_type) if stream_matches(stream_id, filter_stream): matching_frames.append(frame) else: other_frames.append(frame) else: # Inline stream_id formatting and matching and apply it to the # whole dataframe at once. assets = frame[const.ASSET_COLNAME] stream_types = frame[const.STREAM_TYPE_COLNAME] stream_ids = assets.str.cat(stream_types, sep=':') matched_series = stream_ids.str.match(regex) matched_values = list(matched_series.unique()) if len(matched_values) == 1: # Avoid an expensive groupby call if possible. grouped = [(matched_values[0], frame)] else: grouped = frame.groupby(by=matched_series) for matched, subframe in grouped: if matched: matching_frames.append(subframe) else: other_frames.append(subframe) return matching_frames, other_frames def _validate_frame(self, frame: pd.DataFrame, producer: str) -> None: """Performs any checks required to ensure that DataFrames will not cause errors later stages of the pipeline. :raises: Failure if the dataframe is not valid """ # Make sure that no two columns in the dataframe have the same name. # Otherwise when we convert the columns to a dict in remotedatapost # all but one of the columns will be silently dropped. counts: Counter = collections.Counter(frame.columns) duplicates = [(k, v) for k, v in counts.items() if v > 1] if duplicates: # Report an example column name. Other duplicated column names were # probably caused by the same bug. column_name, count = duplicates[0] msg = (f'There were {count} columns named "{column_name}" in the ' f'result. This means there is an error in {producer}!') markers.error('dataflow:duplicate-columns', msg) raise Failure def _transform_frame(self, input_frame: pd.DataFrame) -> List[pd.DataFrame]: frames: List[pd.DataFrame] = [input_frame] for transform in self.transforms: matching_frames: List[pd.DataFrame] new_frames: List[pd.DataFrame] matching_frames, new_frames = self._partition_frames(transform.filter_stream, frames) for frame in matching_frames: with push_context(component=transform.name, component_type=transform.component_type): try: transform_data_path = safe_path_join( transform.failed_state_save_path, f"{dt.now().replace(tzinfo=timezone.utc).strftime('%Y-%m-%d_%H-%M-%S_%f')}.csv" ) saved_frame_msg = f'Failed to transform data. Data saved to {transform_data_path}.' log.info("Transforming %s records.", frame.shape[0]) new_frame = transform.process(frame) except Failure: # If the exception is 'Failure' it is expected to have logged a marker error already log.error(f"Transform chain: {[t.name for t in self.transforms]}, failed at: {transform.name}") self._handle_failed_frame(input_frame, transform_data_path, saved_frame_msg) raise except Exception as e: log.error(f"Transform chain: {[t.name for t in self.transforms]}, failed at: {transform.name}") self._handle_failed_frame(input_frame, transform_data_path, saved_frame_msg) markers.error("dataflow:error", f'Unexpected exception: {e!s}', log_exception=True) raise Failure log.info("Got back %d records.", new_frame.shape[0]) self._validate_frame(new_frame, f'the "{transform.name}" transform') new_frames.append(new_frame) frames = new_frames return frames def _transmit_frames(self, input_stream_id: InputStreamId, frames: List[pd.DataFrame]) -> None: # Group together the frames that will be sent through each transmit. # This provides a significant speedup in some scenarios, eg. processing # one record per asset with a significant number of assets. frame_groups: List[List[pd.DataFrame]] = [] for transmit in self.transmits: matching_frames, frames = self._partition_frames(transmit.filter_stream, frames) frame_groups.append(matching_frames) if frames: # Warn about any dropped frames. for frame in frames: grouped = frame.groupby(by=[const.ASSET_COLNAME, const.STREAM_TYPE_COLNAME]) for (asset, stream_type), subframe in grouped: stream_id = make_stream_id(asset, stream_type) msg = (f'{subframe.shape[0]} rows from the stream {stream_id} ' f'did not match a transmit and were dropped') markers.warning('dataflow:dropped', msg) for transmit, frame_group in zip(self.transmits, frame_groups): if not frame_group: # Skip transmits that have no data. continue if len(frame_group) > 1: frame = pd.concat(frame_group, sort=False) else: frame = frame_group[0] with push_context(component=transmit.name, component_type=transmit.component_type): try: transmit_data_path = safe_path_join( transmit.failed_state_save_path, f"{dt.now().replace(tzinfo=timezone.utc).strftime('%Y-%m-%d_%H-%M-%S_%f')}.csv" ) saved_frame_msg = f'Failed to load data into transmit buffer. Data saved to {transmit_data_path}.' log.info("Transmitting %d records.", frame.shape[0]) transmit.process(input_stream_id, frame) except Failure: # If the exception is 'Failure' it is expected to have logged a marker error already self._handle_failed_frame(frame, transmit_data_path, saved_frame_msg) raise except Exception as e: self._handle_failed_frame(frame, transmit_data_path, saved_frame_msg) markers.error("dataflow:error", f'Unexpected exception: {e!s}', log_exception=True) raise Failure log.info("Consumed %d records.", frame.shape[0]) def _handle_failed_frame(self, frame: pd.DataFrame, save_data_path: str, error_msg: str) -> None: if not self.save_failed_frames: return log.error(error_msg) ensure_dir_exists(os.path.dirname(save_data_path)) save_dataframe_to_disk(frame, save_data_path)
[docs] def process(self, input_stream_id: InputStreamId, input_frame: pd.DataFrame) -> None: """Routes a dataframe through matching transforms and transmits.""" log.info("Received %d records.", input_frame.shape[0]) start_time = time.monotonic() self._validate_frame(input_frame, 'the receiver') frames = self._transform_frame(input_frame) self._transmit_frames(input_stream_id, frames) duration = time.monotonic() - start_time log.info('Processed %d rows in %.3f seconds.', input_frame.shape[0], duration)