Source code for factorytx.snapshot_test_utils

'''This module contains helper functions for snapshot (or gold master) testing
FactoryTX transforms. A snapshot test takes CSV data and processes
it with the specified transform, comparing the output with that of a
previous run.

Example:

.. code-block:: Python

    from factorytx.snapshot_test_utils import compare_with_snapshot
    from factorytx.test_utils import (
        csv_string_to_dataframe,
        load_transform,
    )


    CONFIG = {
        'transform_type': 'convert_timestamps',
        'transform_name': 'Time Changing Transformer',
        'filter_stream': ['*'],
        'field_names': ["TS2", "TS4"],
        'timezone': "Asia/Taipei"
    }
    INPUT_DATA = """\
    TS1,TS2,col_3,TS4,description
    2018-02-16 18:02:01,2018-02-16 18:02:01,1.1,2018-02-16 18:02:01,date 1
    2018-05-27 01:12:01,2018-05-27 01:12:01,2.2,2018-05-27 01:12:01,date 2
    2018-05-29 12:02:12,2018-05-29 12:02:12,3.3,2018-05-29 12:02:12,date 3
    2018-10-29 00:00:29,2018-10-29 00:00:29,4.4,2018-10-29 00:00:29,date 4
    2018-12-31 19:19:19,2018-12-31 19:19:19,5.5,2018-12-31 19:19:19,date 5
    2019-07-04 15:07:04,2019-07-04 15:07:04,6.6,2019-07-04 15:07:04,date 6
    """


    def test_snapshot_rename_transform(capsys, snapshot):
        input_df = csv_string_to_dataframe(INPUT_DATA)
        transform = load_transform(CONFIG)
        transformed_df = transform.process(input_df)

        with capsys.disabled():
            compare_with_snapshot(transformed_df, snapshot, [])

Anatomy of a snapshot test:

* `CONFIG`: a dictionary-type variable that has key-value pairs necessary for
  configuring the transform. The configuration is based on the transform's
  schema.
* `INPUT_DATA`: CSV data that will be processed by the transform. The data can
  come in the form of a multiline Python string or a CSV file saved in a
  directory. It is suggested to only save data as a CSV file if the transform
  needs to process a large amount of data. For an example of a snapshot test
  using a CSV file, please refer to the snapshot test for the `Rename transform <https://github.com/sightmachine/factorytx-core/tree/master/tests/factorytx/transforms/test_snapshot_rename.py>`_.
* The test function should pass in `capsys` and `snapshot` as parameters.

  * `snapshot` for reading and writing snapshot data
  * `capsys` needs to be disabled so differences between the current output
    and snapshot are formatted correctly in the Terminal.

* :func:`~factorytx.test_utils.csv_string_to_dataframe`: Converts the CSV
  string buffer into a Pandas DataFrame for the transform to process.
* :func:`~factorytx.test_utils.load_transform`: Loads the FactoryTX
  transform based on the provided configuration.
* :func:`~factorytx.snapshot_test_utils.compare_with_snapshot`: Compares the
  output from the transform processing the data with the saved snapshot.

To run snapshot tests, please use Pytest:

.. code-block:: Python

    pytest tests/factorytx/transforms/test_snapshot_rename.py

    # To update a snapshot, add `--snapshot-update` to the command
    pytest tests/factorytx/transforms/test_snapshot_rename.py --snapshot-update


**NOTE:** When a test is run for the first time, the snapshot will be
automatically created. Snapshots will be saved in the
`tests/.../transforms/snapshots` directory.
'''
import csv
from datetime import datetime
import numbers
import os
import re
import sys
from typing import Any, List

import daff
import numpy as np
import pandas as pd
import pytz
from snapshottest import module

# This file is shared with the FactoryTX integration tests, so it MUST NOT
# import any modules from factorytx. Please put factorytx-specific test code
# in test_utils.py instead.


Table = List[List[Any]]


def encode_value(value: Any) -> str:
    if isinstance(value, numbers.Number):
        return repr(value)
    elif isinstance(value, str):
        # Encode strings inside single quotes so that they are represented
        # differently from other values. Otherwise the diff will omit changes
        # from strings to other types or vice versa.
        return f"'{value}'"
    elif isinstance(value, list):
        parts = []
        parts.append('[')
        for element in value:
            parts.append(encode_value(element))
            parts.append(', ')
        parts.append(']')
        return ''.join(parts)
    elif isinstance(value, dict):
        parts = []
        parts.append('{')
        for key, element in value.items():
            parts.append(f'{key}: {encode_value(element)}')
            parts.append(', ')
        parts.append('}')
        return ''.join(parts)
    elif pd.isnull(value):
        return 'None'
    elif isinstance(value, (datetime, np.datetime64, pd.Timestamp)):
        # As far as FactoryTX is concerned all datetime types are equivalent.
        if isinstance(value, np.datetime64):
            value = value.astype(datetime)
        elif isinstance(value, pd.Timestamp):
            value = value.to_pydatetime()
        if value.tzinfo is not None:
            value = value.astimezone(pytz.utc)
        return value.strftime("%Y-%m-%d %H:%M:%S.%f")
    else:
        raise TypeError(f'Unsupported type "{type(value).__name__}": {value}')


def encode_table(table: Table) -> Table:
    encoded = [table[0]]  # Don't encode values in the header row.
    encoded.extend([encode_value(x) for x in row] for row in table[1:])
    return encoded


REQUIRES_ESCAPE_RE = re.compile('[",\n]')

def escape_value(value: str) -> str:
    if REQUIRES_ESCAPE_RE.search(value):
        value = f'''"{value.replace('"', '""')}"'''
    return value


def escape_table(table: Table) -> Table:
    escaped = [[escape_value(x) for x in row] for row in table]
    return escaped


[docs]def format_table(table: Table) -> str: """Convert tabular data to an aligned quasi-CSV format.""" assert all(len(row) == len(table[0]) for row in table), \ "All rows in a table must have the same number of columns" escaped = escape_table(table) widths = [0] * len(table[0]) for row in escaped: for colno, value in enumerate(row): widths[colno] = max(widths[colno], len(value)) parts = ['\n'] # Leading newline to align snapshots in docstrings. for row in escaped: for colno, value in enumerate(row): if colno > 0: parts.append(',') parts.append(value) parts.append(' ' * (widths[colno] - len(value))) parts.append('\n') output = ''.join(parts) return output
[docs]def load_table(s: str) -> Table: """Convert an aligned table from `format_table` back to an encoded table.""" csv_content = [row.replace('\0', '') for row in s.split('\n')] reader = csv.reader( csv_content, delimiter=',', doublequote=True, quotechar='"', strict=False ) # All strings are wrapped in single quotes so we can safely strip values. table = [[x.rstrip() for x in row] for row in reader if row] return table
ANSI_ESCAPE_RE = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -/]*[@-~]') def strip_ansi_escapes(s: str) -> str: return ANSI_ESCAPE_RE.sub('', s) def format_diff(a: Table, b: Table, index: List[str] = []) -> str: flags = daff.CompareFlags() for name in index: flags.addPrimaryKey(name) diff = daff.Coopy.diff(a, b, flags=flags) if len(diff.data) > 1: output = daff.TerminalDiffRender().render(diff) # The terminal format setting is ignored by the renderer, so we need to # strip escape sequences by hand. :-( if not os.isatty(sys.stdout.fileno()): output = strip_ansi_escapes(output) else: output = "" return output def dataframe_to_table(df: pd.DataFrame) -> Table: # Sort columns to ensure that their order is consistent. df = df.sort_index(axis=1) rows = [df.columns.tolist()] for idx, row in df.iterrows(): rows.append(row.tolist()) return rows
[docs]def compare_with_snapshot(input_df: pd.DataFrame, snapshot: module.SnapshotTest, index: List = []) -> None: """Transforms a provided DataFrame and compares it with test's saved snapshot. :param input_df: DataFrame converted from a CSV file :param transform: FTX Transform to apply on the DataFrame :param snapshot: A SnapshotTest that can read and write a saved Snapshot :param index: Table columns used for how the table is displayed """ input_table = encode_table(dataframe_to_table(input_df)) # Because snapshots compare only between strings, # to check data types we'll need to convert the # saved snapshot and check for differences res = snapshot.module[snapshot.test_name] if not snapshot.update and res: expected_table = load_table(res) if input_table != expected_table: output = format_diff(input_table, expected_table, index=index) if output != "": msg = f'Transform produced differences from the ' \ f'snapshot!\n{output}' raise AssertionError(msg) # Format the table to a good format for Git diff # and as a string for `--snapshot-update` to overwrite formatted_table = format_table(input_table) snapshot.assert_match(formatted_table)