Source code for factorytx.utils

"""This module contains various helper functions that can be used when creating
custom FactoryTX components.

"""
import shutil
from typing import (
    Any, Callable, Dict, Iterable, Iterator, List, Set, Tuple, TypeVar, Union,
    Type, Generator, Optional, DefaultDict
)

from base64 import urlsafe_b64encode
from collections import defaultdict, abc
import contextlib
from datetime import datetime, date
import enum
import io
import os
import re
import tempfile
import time
import uuid

import numpy as np
from pandas import DataFrame
import pandas as pd
import pytz
import simplejson
from pandas.io.json.normalize import nested_to_record
from pyarrow import Scalar

from factorytx import const
from factorytx import markers

# Disabled until mypy adds support for recursive types.
#Json = Union[Dict[str, 'Json'], List['Json'], str, int, float, bool]
JsonDict = Dict[str, Any]


def _json_default(o: Any) -> Any:
    """Converts custom types to JSON."""
    if isinstance(o, datetime):
        return o.strftime('%Y-%m-%d %H:%M:%S.%f')
    elif isinstance(o, date):
        return o.strftime('%Y-%m-%d')
    elif isinstance(o, enum.Enum):
        return o.value
    else:
        raise TypeError(repr(o) + " is not JSON serializable.")

ENCODER = simplejson.JSONEncoder(default=_json_default, use_decimal=True, ignore_nan=True)
DECODER = simplejson.JSONDecoder()


[docs]def jsonencode(obj: Any) -> str: """Converts a tree of objects to JSON. Dict key order is preserved. >>> jsonencode({"a": 1, "c": 2, "b": date(2018, 1, 1)}) '{"a": 1, "c": 2, "b": "2018-01-01"}' """ return ENCODER.encode(obj)
[docs]def jsondecode(data: Union[str, bytes]) -> Any: """Converts JSON to a tree of objects. Object key order is preserved. >>> jsondecode('{"a": 1, "c": 2, "b": "2018-01-01"}') {'a': 1, 'c': 2, 'b': '2018-01-01'} """ return DECODER.decode(data) # type: ignore
[docs]def make_guid() -> bytes: """Returns a short globally-unique ASCII string.""" random_bytes = uuid.uuid4().bytes guid = urlsafe_b64encode(random_bytes).strip(b'=') return guid
[docs]def overwrite_atomic(path: str, data: bytes, mode: int = 0o600) -> None: """Atomically replaces the file at `path` with a new one containing `data`. If power is lost during the write or the system crashes then the file will contain either the old contents, or the new contents. It will never contain corrupt or truncated data. """ dirname = os.path.dirname(path) tmp = tempfile.NamedTemporaryFile(delete=False, dir=dirname, suffix='.tmp') with contextlib.closing(tmp): fp: io.BytesIO = tmp.file # type: ignore # tempfile stubs are incomplete. filename: str = tmp.name fp.write(data) fp.flush() os.fsync(fp.fileno()) os.chmod(filename, mode) os.rename(filename, path)
[docs]def ensure_dir_exists(dirname: str, exist_ok: bool = True) -> None: """Creates the directory `dirname` if it doesn't already exist.""" try: os.makedirs(dirname, exist_ok=exist_ok) except FileExistsError: pass
_PATH_SEPARATORS = os.sep if os.altsep: _PATH_SEPARATORS += os.altsep
[docs]def safe_path_join(base, *parts): # type: (str, *str) -> str """Constructs a path by joining one or more path components. Unlike `os.path.join`, a leading slash in a trailing component does not cause previous components to be discarded. >>> os.path.join('/a', 'b', '/c') '/c' >>> safe_path_join('/a', 'b', '/c') '/a/b/c' """ relative_parts = [part.lstrip(_PATH_SEPARATORS) for part in parts] non_empty_parts = [part for part in relative_parts if part] path = os.sep.join([base] + non_empty_parts) return path
# TODO: Move this into dataflow.py?
[docs]def add_columns_if_absent(frame: DataFrame, values: Dict[str, Any]) -> None: """Adds columns to a dataframe if they are not already present. >>> df = DataFrame({"a": [1, 2, 3], "d": [3, 5, 8]}) >>> add_columns_if_absent(df, {"a": 0, "b": 1, "c": 2}) >>> df a d b c 0 1 3 1 2 1 2 5 1 2 2 3 8 1 2 """ columns = set(frame.columns) for key, value in values.items(): if key not in columns: frame[key] = value
def get_state_path(component_name: str, asset: str, stream_type: str) -> str: component_path = get_component_path(component_name) state_path = safe_path_join(component_path, f'{asset}_{stream_type}_state.json') return state_path
[docs]def get_component_path(component_name: str) -> str: """Returns the filesystem location that holds data for the named component.""" path = safe_path_join(const.COMPONENT_DATA_PATH, component_name) return path
[docs]def group_frame_by_stream(frame: DataFrame) -> List[Tuple[Tuple[str, str], DataFrame]]: """Splits a DataFrame into list of per-stream frames. Returns data in the format `[((asset, stream type), frame), ...]`. """ # A groupby on a 100 column DataFrame is 10x slower than finding the # set of unique assets and stream types. Few frames will have more than one # stream, so we can speed things up by avoiding redundant groupbys. if frame.shape[0] == 1: ix = frame.index[0] asset = frame.at[ix, const.ASSET_COLNAME] stream_type = frame.at[ix, const.STREAM_TYPE_COLNAME] return [((asset, stream_type), frame)] assets = frame[const.ASSET_COLNAME].unique() stream_types = frame[const.STREAM_TYPE_COLNAME].unique() if assets.size == 1 and stream_types.size == 1: grouped = [((assets[0], stream_types[0]), frame)] else: grouped = list(frame.groupby(by=[const.ASSET_COLNAME, const.STREAM_TYPE_COLNAME])) return grouped
A = TypeVar('A') B = TypeVar('B')
[docs]def grouped(values: Iterable[A], key: Callable[[A], B]) -> Iterable[Tuple[B, Iterable[A]]]: """Returns an iterable of values grouped by the specific key. Within each group the ordering if the original argument is preserved. This function is more analogous to the GROUP BY operator in SQL than `itertools.groupby`. >>> grouped([1.0, 1.3, 1.9, 0.9, 2.1, 1.1], key=round) [(1, [1.0, 1.3, 0.9, 1.1]), (2, [1.9, 2.1])] """ groups: Dict[B, List[A]] = defaultdict(list) for v in values: groups[key(v)].append(v) return list(groups.items())
[docs]def sanitize_column_names(frame: DataFrame) -> DataFrame: """Sanitizes column names into valid mongo field names. Columns names that can't be sanitized without conflicting with existing names will be dropped. """ columns: Set[str] = set(frame.columns) drops: List[str] = [] # [column name, ...] renames: Dict[str, str] = {} # {from -> to, ...} mapping for colname in list(frame.columns): sanitized = colname if sanitized.startswith('$'): sanitized = '_' + sanitized[1:] if '.' in sanitized: sanitized = sanitized.replace('.', '_') if sanitized != colname: if sanitized in columns: msg = (f'Dropped the column named "{colname}" after sanitizing ' f'it since it would conflict with "{sanitized}".') markers.warning(f'columns.{colname}.sanitize', msg) columns.remove(colname) drops.append(colname) else: renames[colname] = sanitized columns.remove(colname) columns.add(sanitized) frame = frame.rename(columns=renames, copy=False) frame.drop(drops, axis=1, inplace=True) return frame
UNIX_EPOCH_BASE = pytz.utc.localize(datetime(1970, 1, 1))
[docs]def to_unix_epoch(dt: datetime) -> float: """Converts a timezone-aware datetime to a unix epoch value. >>> tz = pytz.timezone('America/Los_Angeles') >>> dt = tz.localize(datetime(2018, 10, 19, 10, 38, 2, 208000)) >>> to_unix_epoch(dt) 1539970682.208 """ offset = (dt - UNIX_EPOCH_BASE).total_seconds() return offset
NOTHING = object()
[docs]def lookahead(iterable: Iterable[A], default: Union[A, B] = None) -> Iterator[Tuple[A, Union[A, B]]]: """Yields a pair `(elt, next_elt)` for every `elt` in `iterable`. If `elt` is the last element then `next_elt` is `default`; otherwise it is the next element in the iterable. #>>> list(lookahead([1, 2, 3])) [(1, 2), (2, 3), (3, None)] #>>> list(lookahead([1, 2], default=0)) [(1, 2), (2, 0)] #>>> list(lookahead([])) [] """ iterator = iter(iterable) value = next_value = next(iterator, NOTHING) while next_value is not NOTHING: next_value = next(iterator, NOTHING) yield value, next_value if next_value is not NOTHING else default # type: ignore value = next_value
_INVALID_KAFKA_TOPIC_CHARS = re.compile('[^a-zA-Z0-9._-]+') _TOPIC_DELIMITER = "."
[docs]def kafka_topic_with_suffix(tenant: str, suffix: str) -> str: """Produces generated topic string for Kafka :param tenant: initial namespace for topic, :param suffix: a string that will serve as the topic suffix :return: topic name """ return _TOPIC_DELIMITER.join([tenant, _INVALID_KAFKA_TOPIC_CHARS.sub(_TOPIC_DELIMITER, suffix)])
def create_kafka_topic(row: Dict, topic_prefix: str, topic_suffix_field: Union[str, None], default_topic: str) -> str: if not topic_suffix_field: return default_topic row_value = row.get(topic_suffix_field) if not row_value or pd.isna(row_value): return default_topic return kafka_topic_with_suffix( topic_prefix, str(row_value) ) def get_device_uuid() -> str: return os.environ['DEVICE_UUID'] def get_device_name() -> str: return os.environ['DEVICE_NAME']
[docs]def get_device_environment() -> str: """ Which environment FTX thinks it's running in Should be one of: unknown, mase, balena, iotedge """ return os.environ['DEVICE_ENVIRONMENT']
[docs]def class_name(typ: Type) -> str: """ Given a Class/Type, determine the module reference for that type """ cls_nm = '.'.join( i for i in (typ.__dict__.get('__module__'), typ.__name__) if i is not None ) return cls_nm
def chunk_iter(iterable: Union[list, str], chunk_len: int) -> Generator[Any, None, None]: for i in range(0, len(iterable), chunk_len): yield iterable[i:i + chunk_len] class Timer: """Records the duration of a block of code in the field `duration`.""" def __enter__(self) -> 'Timer': self.start = time.monotonic() return self def __exit__(self, *args: Any) -> Any: self.end = time.monotonic() self.duration = self.end - self.start def parse_iso_timestamp(timestamp: str) -> datetime: # Parse Timestamps in ISO-Format but could be missing nano/micro/milli seconds or # even seconds themselves. (Assumed to be UTC) timestamp = timestamp.rstrip('Z') if "." in timestamp: prefix, fractional_seconds = timestamp.split(".") return datetime.fromisoformat(prefix + "." + fractional_seconds[:6].ljust(6, '0')) else: return datetime.fromisoformat(timestamp) def pd_timestamp_to_iso(timestamp: pd.Timestamp) -> str: return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f") + f"{timestamp.nanosecond:03}Z"
[docs]def timedelta_str_to_sec(timedelta_str: str) -> int: """ Convert time delta strings to seconds. For e.g. for "1m", this will return 60 :param timedelta_str: Time delta string :return: Number of seconds (int) """ unit_to_sec = { "s": 1, "m": 60, "h": 60*60, "d": 24*60*60 } try: unit = timedelta_str[-1] value = int(timedelta_str[:-1]) seconds = value * unit_to_sec[unit] except (ValueError, KeyError, IndexError): raise ValueError(f"Invalid timedelta string {timedelta_str}") return seconds
[docs]def json_normalize_to_dataframe(data: Dict, record_path: Optional[List[str]] = None, meta: Optional[Iterable[Union[str, List[str]]]] = None, meta_prefix: Optional[str] = None, record_prefix: Optional[str] = None, errors: str = 'raise', sep: str = '.') -> DataFrame: """ Copied from pandas to fix a bug in the original implementation in 0.23.4 Specifically the 2 lines: ``` if isinstance(data, dict): data = [data] ``` # TODO: This can be removed if we ever update pandas """ def _pull_field( js: Dict[str, Any], spec: Union[list, str], extract_record: bool = False ) -> Union[Scalar, Iterable]: """Internal function to pull field""" result = js try: if isinstance(spec, list): for field in spec: if result is None: raise KeyError(field) result = result[field] else: result = result[spec] except KeyError as e: if extract_record: raise KeyError( f"Key {e} not found. If specifying a record_path, all elements of " f"data should have the path." ) from e elif errors == "ignore": return np.nan else: raise KeyError( f"Key {e} not found. To replace missing values of {e} with " f"np.nan, pass in errors='ignore'" ) from e return result def _pull_records(js: Dict[str, Any], spec: Union[list, str]) -> list: """ Internal function to pull field for records, and similar to _pull_field, but require to return list. And will raise error if has non iterable value. """ result = _pull_field(js, spec, extract_record=True) # GH 31507 GH 30145, GH 26284 if result is not list, raise TypeError if not # null, otherwise return an empty list if not isinstance(result, list): if pd.isnull(result): result = [] else: raise TypeError( f"{js} has non list value {result} for path {spec}. " "Must be list or null." ) return result if isinstance(data, list) and not data: return DataFrame() elif isinstance(data, dict): # A bit of a hackjob data = [data] # type: ignore elif isinstance(data, abc.Iterable) and not isinstance(data, str): # GH35923 Fix pd.json_normalize to not skip the first element of a # generator input data = list(data) else: raise NotImplementedError if record_path is None: if any([isinstance(x, dict) for x in y.values()] for y in data): # naive normalization, this is idempotent for flat records # and potentially will inflate the data considerably for # deeply nested structures: # {VeryLong: { b: 1,c:2}} -> {VeryLong.b:1 ,VeryLong.c:@} # # TODO: handle record value which are lists, at least error # reasonably data = nested_to_record(data, sep=sep) return DataFrame(data) elif not isinstance(record_path, list): record_path = [record_path] if meta is None: meta = [] elif not isinstance(meta, list): meta = [meta] # type: ignore _meta = [m if isinstance(m, list) else [m] for m in meta] # Disastrously inefficient for now records: list = [] lengths = [] meta_vals: DefaultDict = defaultdict(list) meta_keys = [sep.join(val) for val in _meta] def _recursive_extract(data, path, seen_meta, level=0): # type: ignore if isinstance(data, dict): data = [data] if len(path) > 1: for obj in data: for val, key in zip(_meta, meta_keys): if level + 1 == len(val): seen_meta[key] = _pull_field(obj, val[-1]) _recursive_extract(obj[path[0]], path[1:], seen_meta, level=level + 1) else: for obj in data: recs = _pull_records(obj, path[0]) recs = [ nested_to_record(r, sep=sep) if isinstance(r, dict) else r for r in recs ] # For repeating the metadata later lengths.append(len(recs)) for val, key in zip(_meta, meta_keys): if level + 1 > len(val): meta_val = seen_meta[key] else: meta_val = _pull_field(obj, val[level:]) meta_vals[key].append(meta_val) records.extend(recs) _recursive_extract(data, record_path, {}, level=0) result = DataFrame(records) if record_prefix is not None: result = result.rename(columns=lambda x: f"{record_prefix}{x}") # Data types, a problem for k, v in meta_vals.items(): if meta_prefix is not None: k = meta_prefix + k if k in result: raise ValueError( f"Conflicting metadata name {k}, need distinguishing prefix " ) result[k] = np.array(v + [[]], dtype=object)[:-1].repeat(lengths) return result
def configure_extra_hosts(extra_hosts: List[Dict[str, str]], hosts_file_path: str = "/etc/hosts") -> None: original_hosts_backup = f"{hosts_file_path}.original" if not extra_hosts: # If there are no extra hosts, restore the original /etc/hosts if it exists if os.path.exists(original_hosts_backup): shutil.copyfile(original_hosts_backup, hosts_file_path) return # Something has really gone wrong if this file doesn't exist assert os.path.exists(hosts_file_path) # Create /etc/hosts original copy if we haven't already # If it's already been backed up then restore the file in preparation for overrides if not os.path.exists(original_hosts_backup): shutil.copyfile(hosts_file_path, original_hosts_backup) else: shutil.copyfile(original_hosts_backup, hosts_file_path) # Add extra hosts to /etc/hosts and overwrite with open(hosts_file_path, "a") as hosts_file: for host in extra_hosts: hosts_file.write(f"{host['ip']} {host['hostname']}\n")