"""This module contains various helper functions that can be used when creating
custom FactoryTX components.
"""
import contextlib
import enum
import io
import logging
import os
import re
import shutil
import tempfile
import time
import uuid
from base64 import urlsafe_b64encode
from collections import defaultdict, abc
from datetime import datetime, date, timezone
from typing import (
Any, Callable, Dict, Iterable, Iterator, List, Set, Tuple, TypeVar, Union,
Type, Generator, Optional, DefaultDict
)
import numpy as np
import pandas as pd
import pytz
import simplejson
from pandas import DataFrame
from pandas.io.json.normalize import nested_to_record
from pyarrow import Scalar
from factorytx import const
from factorytx import markers
log = logging.getLogger(__name__)
AZ_KEY_VAULT_PREFIX = "AZ_FTX_CREDS_"
# Disabled until mypy adds support for recursive types.
#Json = Union[Dict[str, 'Json'], List['Json'], str, int, float, bool]
JsonDict = Dict[str, Any]
def _json_default(o: Any) -> Any:
"""Converts custom types to JSON."""
if isinstance(o, datetime):
return o.strftime('%Y-%m-%d %H:%M:%S.%f')
elif isinstance(o, date):
return o.strftime('%Y-%m-%d')
elif isinstance(o, enum.Enum):
return o.value
else:
raise TypeError(repr(o) + " is not JSON serializable.")
ENCODER = simplejson.JSONEncoder(default=_json_default, use_decimal=True, ignore_nan=True)
DECODER = simplejson.JSONDecoder()
[docs]def jsonencode(obj: Any) -> str:
"""Converts a tree of objects to JSON. Dict key order is preserved.
>>> jsonencode({"a": 1, "c": 2, "b": date(2018, 1, 1)})
'{"a": 1, "c": 2, "b": "2018-01-01"}'
"""
return ENCODER.encode(obj)
[docs]def jsondecode(data: Union[str, bytes]) -> Any:
"""Converts JSON to a tree of objects. Object key order is preserved.
>>> jsondecode('{"a": 1, "c": 2, "b": "2018-01-01"}')
{'a': 1, 'c': 2, 'b': '2018-01-01'}
"""
return DECODER.decode(data) # type: ignore
[docs]def make_guid() -> bytes:
"""Returns a short globally-unique ASCII string."""
random_bytes = uuid.uuid4().bytes
guid = urlsafe_b64encode(random_bytes).strip(b'=')
return guid
[docs]def overwrite_atomic(path: str, data: bytes, mode: int = 0o600) -> None:
"""Atomically replaces the file at `path` with a new one containing `data`.
If power is lost during the write or the system crashes then the file will
contain either the old contents, or the new contents. It will never contain
corrupt or truncated data.
"""
dirname = os.path.dirname(path)
tmp = tempfile.NamedTemporaryFile(delete=False, dir=dirname, suffix='.tmp')
with contextlib.closing(tmp):
fp: io.BytesIO = tmp.file # type: ignore # tempfile stubs are incomplete.
filename: str = tmp.name
fp.write(data)
fp.flush()
os.fsync(fp.fileno())
os.chmod(filename, mode)
os.rename(filename, path)
[docs]def ensure_dir_exists(dirname: str, exist_ok: bool = True) -> None:
"""Creates the directory `dirname` if it doesn't already exist."""
try:
os.makedirs(dirname, exist_ok=exist_ok)
except FileExistsError:
pass
_PATH_SEPARATORS = os.sep
if os.altsep:
_PATH_SEPARATORS += os.altsep
[docs]def safe_path_join(base, *parts):
# type: (str, *str) -> str
"""Constructs a path by joining one or more path components. Unlike
`os.path.join`, a leading slash in a trailing component does not cause
previous components to be discarded.
>>> os.path.join('/a', 'b', '/c')
'/c'
>>> safe_path_join('/a', 'b', '/c')
'/a/b/c'
"""
relative_parts = [part.lstrip(_PATH_SEPARATORS) for part in parts]
non_empty_parts = [part for part in relative_parts if part]
path = os.sep.join([base] + non_empty_parts)
return path
# TODO: Move this into dataflow.py?
[docs]def add_columns_if_absent(frame: DataFrame, values: Dict[str, Any]) -> None:
"""Adds columns to a dataframe if they are not already present.
>>> df = DataFrame({"a": [1, 2, 3], "d": [3, 5, 8]})
>>> add_columns_if_absent(df, {"a": 0, "b": 1, "c": 2})
>>> df
a d b c
0 1 3 1 2
1 2 5 1 2
2 3 8 1 2
"""
columns = set(frame.columns)
for key, value in values.items():
if key not in columns:
frame[key] = value
def get_state_path(component_name: str, asset: str, stream_type: str) -> str:
component_path = get_component_path(component_name)
state_path = safe_path_join(component_path, f'{asset}_{stream_type}_state.json')
return state_path
[docs]def get_component_path(component_name: str) -> str:
"""Returns the filesystem location that holds data for the named component."""
path = safe_path_join(const.COMPONENT_DATA_PATH, component_name)
return path
[docs]def group_frame_by_stream(frame: DataFrame) -> List[Tuple[Tuple[str, str], DataFrame]]:
"""Splits a DataFrame into list of per-stream frames.
Returns data in the format `[((asset, stream type), frame), ...]`.
"""
# A groupby on a 100 column DataFrame is 10x slower than finding the
# set of unique assets and stream types. Few frames will have more than one
# stream, so we can speed things up by avoiding redundant groupbys.
if frame.shape[0] == 1:
ix = frame.index[0]
asset = frame.at[ix, const.ASSET_COLNAME]
stream_type = frame.at[ix, const.STREAM_TYPE_COLNAME]
return [((asset, stream_type), frame)]
assets = frame[const.ASSET_COLNAME].unique()
stream_types = frame[const.STREAM_TYPE_COLNAME].unique()
if assets.size == 1 and stream_types.size == 1:
grouped = [((assets[0], stream_types[0]), frame)]
else:
grouped = list(frame.groupby(by=[const.ASSET_COLNAME, const.STREAM_TYPE_COLNAME]))
return grouped
A = TypeVar('A')
B = TypeVar('B')
[docs]def grouped(values: Iterable[A], key: Callable[[A], B]) -> Iterable[Tuple[B, Iterable[A]]]:
"""Returns an iterable of values grouped by the specific key. Within each
group the ordering if the original argument is preserved. This function is
more analogous to the GROUP BY operator in SQL than `itertools.groupby`.
>>> grouped([1.0, 1.3, 1.9, 0.9, 2.1, 1.1], key=round)
[(1, [1.0, 1.3, 0.9, 1.1]), (2, [1.9, 2.1])]
"""
groups: Dict[B, List[A]] = defaultdict(list)
for v in values:
groups[key(v)].append(v)
return list(groups.items())
[docs]def sanitize_column_names(frame: DataFrame) -> DataFrame:
"""Sanitizes column names into valid mongo field names. Columns names
that can't be sanitized without conflicting with existing names will be
dropped.
"""
columns: Set[str] = set(frame.columns)
drops: List[str] = [] # [column name, ...]
renames: Dict[str, str] = {} # {from -> to, ...} mapping
for colname in list(frame.columns):
sanitized = colname
if sanitized.startswith('$'):
sanitized = '_' + sanitized[1:]
if '.' in sanitized:
sanitized = sanitized.replace('.', '_')
if sanitized != colname:
if sanitized in columns:
msg = (f'Dropped the column named "{colname}" after sanitizing '
f'it since it would conflict with "{sanitized}".')
markers.warning(f'columns.{colname}.sanitize', msg)
columns.remove(colname)
drops.append(colname)
else:
renames[colname] = sanitized
columns.remove(colname)
columns.add(sanitized)
frame = frame.rename(columns=renames, copy=False)
frame.drop(drops, axis=1, inplace=True)
return frame
UNIX_EPOCH_BASE = pytz.utc.localize(datetime(1970, 1, 1))
[docs]def to_unix_epoch(dt: datetime) -> float:
"""Converts a timezone-aware datetime to a unix epoch value.
>>> tz = pytz.timezone('America/Los_Angeles')
>>> dt = tz.localize(datetime(2018, 10, 19, 10, 38, 2, 208000))
>>> to_unix_epoch(dt)
1539970682.208
"""
offset = (dt - UNIX_EPOCH_BASE).total_seconds()
return offset
NOTHING = object()
[docs]def lookahead(iterable: Iterable[A], default: Union[A, B] = None) -> Iterator[Tuple[A, Union[A, B]]]:
"""Yields a pair `(elt, next_elt)` for every `elt` in `iterable`.
If `elt` is the last element then `next_elt` is `default`; otherwise it
is the next element in the iterable.
#>>> list(lookahead([1, 2, 3]))
[(1, 2), (2, 3), (3, None)]
#>>> list(lookahead([1, 2], default=0))
[(1, 2), (2, 0)]
#>>> list(lookahead([]))
[]
"""
iterator = iter(iterable)
value = next_value = next(iterator, NOTHING)
while next_value is not NOTHING:
next_value = next(iterator, NOTHING)
yield value, next_value if next_value is not NOTHING else default # type: ignore
value = next_value
_INVALID_KAFKA_TOPIC_CHARS = re.compile('[^a-zA-Z0-9._-]+')
_TOPIC_DELIMITER = "."
[docs]def kafka_topic_with_suffix(tenant: str, suffix: str) -> str:
"""Produces generated topic string for Kafka
:param tenant: initial namespace for topic,
:param suffix: a string that will serve as the topic suffix
:return: topic name
"""
return _TOPIC_DELIMITER.join([tenant, _INVALID_KAFKA_TOPIC_CHARS.sub(_TOPIC_DELIMITER, suffix)])
def create_kafka_topic(row: Dict, topic_prefix: str, topic_suffix_field: Union[str, None], default_topic: str) -> str:
if not topic_suffix_field:
return default_topic
row_value = row.get(topic_suffix_field)
if not row_value or pd.isna(row_value):
log.warning(f"Row value for topic suffix field {topic_suffix_field} is missing/empty, using default topic {default_topic}")
return default_topic
return kafka_topic_with_suffix(
topic_prefix,
str(row_value)
)
def get_device_uuid() -> str:
return os.environ['DEVICE_UUID']
def get_device_name() -> str:
return os.environ['DEVICE_NAME']
[docs]def get_device_environment() -> str:
"""
Which environment FTX thinks it's running in
Should be one of: unknown, mase, balena, iotedge
"""
return os.environ['DEVICE_ENVIRONMENT']
[docs]def class_name(typ: Type) -> str:
"""
Given a Class/Type, determine the module reference for that type
"""
cls_nm = '.'.join(
i
for i in (typ.__dict__.get('__module__'), typ.__name__)
if i is not None
)
return cls_nm
def chunk_iter(iterable: Union[list, str], chunk_len: int) -> Generator[Any, None, None]:
for i in range(0, len(iterable), chunk_len):
yield iterable[i:i + chunk_len]
class Timer:
"""Records the duration of a block of code in the field `duration`."""
def __enter__(self) -> 'Timer':
self.start = time.monotonic()
return self
def __exit__(self, *args: Any) -> Any:
self.end = time.monotonic()
self.duration = self.end - self.start
def parse_iso_timestamp(timestamp: str) -> datetime:
# Parse Timestamps in ISO-Format but could be missing nano/micro/milli seconds or
# even seconds themselves. (Assumed to be UTC)
timestamp = timestamp.rstrip('Z')
if "." in timestamp:
prefix, fractional_seconds = timestamp.split(".")
return datetime.fromisoformat(prefix + "." + fractional_seconds[:6].ljust(6, '0'))
else:
return datetime.fromisoformat(timestamp)
def pd_timestamp_to_iso(timestamp: pd.Timestamp) -> str:
return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f") + f"{timestamp.nanosecond:03}Z"
[docs]def timedelta_str_to_sec(timedelta_str: str) -> int:
"""
Convert time delta strings to seconds. For e.g. for "1m", this will return 60
:param timedelta_str: Time delta string
:return: Number of seconds (int)
"""
unit_to_sec = {
"s": 1,
"m": 60,
"h": 60*60,
"d": 24*60*60
}
try:
unit = timedelta_str[-1]
value = int(timedelta_str[:-1])
seconds = value * unit_to_sec[unit]
except (ValueError, KeyError, IndexError):
raise ValueError(f"Invalid timedelta string {timedelta_str}")
return seconds
[docs]def json_normalize_to_dataframe(data: Dict,
record_path: Optional[List[str]] = None,
meta: Optional[Iterable[Union[str, List[str]]]] = None,
meta_prefix: Optional[str] = None,
record_prefix: Optional[str] = None,
errors: str = 'raise',
sep: str = '.') -> DataFrame:
"""
Copied from pandas to fix a bug in the original implementation in 0.23.4
Specifically the 2 lines:
```
if isinstance(data, dict):
data = [data]
```
# TODO: This can be removed if we ever update pandas
"""
def _pull_field(
js: Dict[str, Any], spec: Union[list, str], extract_record: bool = False
) -> Union[Scalar, Iterable]:
"""Internal function to pull field"""
result = js
try:
if isinstance(spec, list):
for field in spec:
if result is None:
raise KeyError(field)
result = result[field]
else:
result = result[spec]
except KeyError as e:
if extract_record:
raise KeyError(
f"Key {e} not found. If specifying a record_path, all elements of "
f"data should have the path."
) from e
elif errors == "ignore":
return np.nan
else:
raise KeyError(
f"Key {e} not found. To replace missing values of {e} with "
f"np.nan, pass in errors='ignore'"
) from e
return result
def _pull_records(js: Dict[str, Any], spec: Union[list, str]) -> list:
"""
Internal function to pull field for records, and similar to
_pull_field, but require to return list. And will raise error
if has non iterable value.
"""
result = _pull_field(js, spec, extract_record=True)
# GH 31507 GH 30145, GH 26284 if result is not list, raise TypeError if not
# null, otherwise return an empty list
if not isinstance(result, list):
if pd.isnull(result):
result = []
else:
raise TypeError(
f"{js} has non list value {result} for path {spec}. "
"Must be list or null."
)
return result
if isinstance(data, list) and not data:
return DataFrame()
elif isinstance(data, dict):
# A bit of a hackjob
data = [data] # type: ignore
elif isinstance(data, abc.Iterable) and not isinstance(data, str):
# GH35923 Fix pd.json_normalize to not skip the first element of a
# generator input
data = list(data)
else:
raise NotImplementedError
if record_path is None:
if any([isinstance(x, dict) for x in y.values()] for y in data):
# naive normalization, this is idempotent for flat records
# and potentially will inflate the data considerably for
# deeply nested structures:
# {VeryLong: { b: 1,c:2}} -> {VeryLong.b:1 ,VeryLong.c:@}
#
# TODO: handle record value which are lists, at least error
# reasonably
data = nested_to_record(data, sep=sep)
return DataFrame(data)
elif not isinstance(record_path, list):
record_path = [record_path]
if meta is None:
meta = []
elif not isinstance(meta, list):
meta = [meta] # type: ignore
_meta = [m if isinstance(m, list) else [m] for m in meta]
# Disastrously inefficient for now
records: list = []
lengths = []
meta_vals: DefaultDict = defaultdict(list)
meta_keys = [sep.join(val) for val in _meta]
def _recursive_extract(data, path, seen_meta, level=0): # type: ignore
if isinstance(data, dict):
data = [data]
if len(path) > 1:
for obj in data:
for val, key in zip(_meta, meta_keys):
if level + 1 == len(val):
seen_meta[key] = _pull_field(obj, val[-1])
_recursive_extract(obj[path[0]], path[1:], seen_meta, level=level + 1)
else:
for obj in data:
recs = _pull_records(obj, path[0])
recs = [
nested_to_record(r, sep=sep)
if isinstance(r, dict)
else r
for r in recs
]
# For repeating the metadata later
lengths.append(len(recs))
for val, key in zip(_meta, meta_keys):
if level + 1 > len(val):
meta_val = seen_meta[key]
else:
meta_val = _pull_field(obj, val[level:])
meta_vals[key].append(meta_val)
records.extend(recs)
_recursive_extract(data, record_path, {}, level=0)
result = DataFrame(records)
if record_prefix is not None:
result = result.rename(columns=lambda x: f"{record_prefix}{x}")
# Data types, a problem
for k, v in meta_vals.items():
if meta_prefix is not None:
k = meta_prefix + k
if k in result:
raise ValueError(
f"Conflicting metadata name {k}, need distinguishing prefix "
)
result[k] = np.array(v + [[]], dtype=object)[:-1].repeat(lengths)
return result
def configure_extra_hosts(extra_hosts: List[Dict[str, str]], hosts_file_path: str = "/etc/hosts") -> None:
original_hosts_backup = f"{hosts_file_path}.original"
if not extra_hosts:
# If there are no extra hosts, restore the original /etc/hosts if it exists
if os.path.exists(original_hosts_backup):
shutil.copyfile(original_hosts_backup, hosts_file_path)
return
# Something has really gone wrong if this file doesn't exist
assert os.path.exists(hosts_file_path)
# Create /etc/hosts original copy if we haven't already
# If it's already been backed up then restore the file in preparation for overrides
if not os.path.exists(original_hosts_backup):
shutil.copyfile(hosts_file_path, original_hosts_backup)
else:
shutil.copyfile(original_hosts_backup, hosts_file_path)
# Add extra hosts to /etc/hosts and overwrite
with open(hosts_file_path, "a") as hosts_file:
for host in extra_hosts:
hosts_file.write(f"{host['ip']} {host['hostname']}\n")
[docs]def is_nested_iotedge_child_device() -> bool:
"""
Returns True if the device is a child device of an IoT Edge device.
"""
return os.environ.get('IOTEDGE_PARENTHOSTNAME') is not None
[docs]def data_vol_statvfs() -> os.statvfs_result:
"""
Returns the statvfs information for the data volume.
Unix platforms only.
f_bsize − preferred file system block size.
f_frsize − fundamental file system block size.
f_blocks − total number of blocks in the filesystem.
f_bfree − total number of free blocks.
f_bavail − free blocks available to non-super user.
f_files − total number of file nodes.
f_ffree − total number of free file nodes.
f_favail − free nodes available to non-super user.
f_flag − system dependent.
f_namemax − maximum file name length.
"""
return os.statvfs('/data')
[docs]def save_dataframe_to_disk(df: DataFrame, save_path: str) -> None:
"""
Serialize a DataFrame to disk in the parquet format.
"""
dataframe_csv_str = df.to_csv(index=False)
disk_bytes_required = len(dataframe_csv_str) * 5 # Add a disk space buffer, so we don't cause everything to halt
# Check if csv space required is larger than available disk space
statvfs: os.statvfs_result = data_vol_statvfs()
disk_free_bytes = float(statvfs.f_frsize * statvfs.f_bavail)
if disk_bytes_required > disk_free_bytes:
log.error(f"Not enough space to save dataframe to disk. "
f"Required bytes: {disk_bytes_required}, Available bytes: {disk_free_bytes}")
return
with open(save_path, 'w') as f:
f.write(dataframe_csv_str)
[docs]def aware_utcnow() -> datetime:
""" python 3.12 replacement for dt.utcnow() """
return datetime.now(timezone.utc)
def aware_utcfromtimestamp(timestamp: float) -> datetime:
return datetime.fromtimestamp(timestamp, timezone.utc)
def naive_utcnow() -> datetime:
return aware_utcnow().replace(tzinfo=None)
def naive_utcfromtimestamp(timestamp: float) -> datetime:
return aware_utcfromtimestamp(timestamp).replace(tzinfo=None)
[docs]def dataframe_cdc(
current_state_df: pd.DataFrame,
new_data_df: pd.DataFrame,
ignored_columns_for_diff: List[str] = None
) -> pd.DataFrame:
"""
Returns the difference between two DataFrames (Change Data Capture).
Meant to compare an existing df state to a new df and return only the new data.
If 'ignored_columns_for_diff' is set, those columns will be ignored when diffing the data,
but will be included in the final returned df (using values from new_data_df).
If the column sets are not matching, new_data_df will be returned.
"""
# If ignored_columns_for_diff is set, remove those columns from the dataframes for merging
if ignored_columns_for_diff:
log.info(f"Ignoring columns {ignored_columns_for_diff} for diff")
table_data_df_subset = new_data_df.drop(columns=ignored_columns_for_diff, errors='ignore')
state_data_df_subset = current_state_df.drop(columns=ignored_columns_for_diff, errors='ignore')
else:
ignored_columns_for_diff = []
table_data_df_subset = new_data_df
state_data_df_subset = current_state_df
# Merge the two DataFrames and add a column '_merge' that indicates where each row comes from
# Filter the merged DataFrame to only include new data differences
if set(table_data_df_subset.columns) == set(state_data_df_subset.columns):
merged_df: pd.DataFrame = pd.merge(
table_data_df_subset,
state_data_df_subset,
how='outer',
indicator=True
)
merged_data_df: pd.DataFrame = merged_df[merged_df['_merge'] == 'left_only']
merged_data_df = merged_data_df.drop(columns='_merge')
# Add the ignored columns back to new_data_df from table_data_df if there are any rows
# Use .loc() so it only adds the columns back for the rows that are in the merged_data_df
if len(merged_data_df):
for col in ignored_columns_for_diff:
merged_data_df[col] = new_data_df.loc[merged_data_df.index, col]
else:
# If columns don't match, just assume new data since it would only try to merge on same cols
merged_data_df = new_data_df
return merged_data_df