"""This module contains various helper functions that can be used when creating
custom FactoryTX components.
"""
import contextlib
import enum
import io
import itertools
import logging
import os
import re
import shutil
import tempfile
import time
import uuid
from base64 import urlsafe_b64encode
from collections import defaultdict
from datetime import datetime, date, timezone
from typing import (
Any, Callable, Dict, Iterable, Iterator, List, Set, Tuple, TypeVar, Union,
Type, Generator
)
import pandas as pd
import pytz
import simplejson
from bson import json_util
from bson.json_util import JSONOptions, DEFAULT_JSON_OPTIONS, DatetimeRepresentation
from pandas import DataFrame
from factorytx import const
from factorytx import markers
log = logging.getLogger(__name__)
AZ_KEY_VAULT_PREFIX = "AZ_FTX_CREDS_"
# Disabled until mypy adds support for recursive types.
#Json = Union[Dict[str, 'Json'], List['Json'], str, int, float, bool]
JsonDict = Dict[str, Any]
class classproperty(property):
def __get__(self, owner_self, owner_cls): # type: ignore
return self.fget(owner_cls) # type: ignore
def _json_default(o: Any) -> Any:
"""Converts custom types to JSON."""
if isinstance(o, datetime):
return o.strftime('%Y-%m-%d %H:%M:%S.%f')
elif isinstance(o, date):
return o.strftime('%Y-%m-%d')
elif isinstance(o, enum.Enum):
return o.value
else:
raise TypeError(repr(o) + " is not JSON serializable.")
ENCODER = simplejson.JSONEncoder(default=_json_default, use_decimal=True, ignore_nan=True)
DECODER = simplejson.JSONDecoder()
[docs]
def jsonencode(obj: Any) -> str:
"""Converts a tree of objects to JSON. Dict key order is preserved.
>>> jsonencode({"a": 1, "c": 2, "b": date(2018, 1, 1)})
'{"a": 1, "c": 2, "b": "2018-01-01"}'
"""
return ENCODER.encode(obj)
[docs]
def jsondecode(data: Union[str, bytes]) -> Any:
"""Converts JSON to a tree of objects. Object key order is preserved.
>>> jsondecode('{"a": 1, "c": 2, "b": "2018-01-01"}')
{'a': 1, 'c': 2, 'b': '2018-01-01'}
"""
return DECODER.decode(data) # type: ignore
[docs]
def make_guid() -> bytes:
"""Returns a short globally-unique ASCII string."""
random_bytes = uuid.uuid4().bytes
guid = urlsafe_b64encode(random_bytes).strip(b'=')
return guid
[docs]
def overwrite_atomic(path: str, data: bytes, mode: int = 0o600) -> None:
"""Atomically replaces the file at `path` with a new one containing `data`.
If power is lost during the write or the system crashes then the file will
contain either the old contents, or the new contents. It will never contain
corrupt or truncated data.
"""
dirname = os.path.dirname(path)
tmp = tempfile.NamedTemporaryFile(delete=False, dir=dirname, suffix='.tmp')
with contextlib.closing(tmp):
fp: io.BytesIO = tmp.file # type: ignore # tempfile stubs are incomplete.
filename: str = tmp.name
fp.write(data)
fp.flush()
os.fsync(fp.fileno())
os.chmod(filename, mode)
os.rename(filename, path)
[docs]
def ensure_dir_exists(dirname: str, exist_ok: bool = True) -> None:
"""Creates the directory `dirname` if it doesn't already exist."""
try:
os.makedirs(dirname, exist_ok=exist_ok)
except FileExistsError:
pass
_PATH_SEPARATORS = os.sep
if os.altsep:
_PATH_SEPARATORS += os.altsep
[docs]
def safe_path_join(base, *parts):
# type: (str, *str) -> str
"""Constructs a path by joining one or more path components. Unlike
`os.path.join`, a leading slash in a trailing component does not cause
previous components to be discarded.
>>> os.path.join('/a', 'b', '/c')
'/c'
>>> safe_path_join('/a', 'b', '/c')
'/a/b/c'
"""
relative_parts = [part.lstrip(_PATH_SEPARATORS) for part in parts]
non_empty_parts = [part for part in relative_parts if part]
path = os.sep.join([base] + non_empty_parts)
return path
# TODO: Move this into dataflow.py?
[docs]
def add_columns_if_absent(frame: DataFrame, values: Dict[str, Any]) -> None:
"""Adds columns to a dataframe if they are not already present.
>>> df = DataFrame({"a": [1, 2, 3], "d": [3, 5, 8]})
>>> add_columns_if_absent(df, {"a": 0, "b": 1, "c": 2})
>>> df
a d b c
0 1 3 1 2
1 2 5 1 2
2 3 8 1 2
"""
columns = set(frame.columns)
for key, value in values.items():
if key not in columns:
frame[key] = value
def get_state_path(component_name: str, asset: str, stream_type: str) -> str:
component_path = get_component_path(component_name)
state_path = safe_path_join(component_path, f'{asset}_{stream_type}_state.json')
return state_path
[docs]
def get_component_path(component_name: str) -> str:
"""Returns the filesystem location that holds data for the named component."""
path = safe_path_join(const.COMPONENT_DATA_PATH, component_name)
return path
[docs]
def group_frame_by_stream(frame: DataFrame) -> List[Tuple[Tuple[str, str], DataFrame]]:
"""Splits a DataFrame into list of per-stream frames.
Returns data in the format `[((asset, stream type), frame), ...]`.
"""
# A groupby on a 100 column DataFrame is 10x slower than finding the
# set of unique assets and stream types. Few frames will have more than one
# stream, so we can speed things up by avoiding redundant groupbys.
if frame.shape[0] == 1:
ix = frame.index[0]
asset = frame.at[ix, const.ASSET_COLNAME]
stream_type = frame.at[ix, const.STREAM_TYPE_COLNAME]
return [((asset, stream_type), frame)]
assets = frame[const.ASSET_COLNAME].unique()
stream_types = frame[const.STREAM_TYPE_COLNAME].unique()
if assets.size == 1 and stream_types.size == 1:
grouped = [((assets[0], stream_types[0]), frame)]
else:
grouped = list(frame.groupby(by=[const.ASSET_COLNAME, const.STREAM_TYPE_COLNAME]))
return grouped
A = TypeVar('A')
B = TypeVar('B')
[docs]
def grouped(values: Iterable[A], key: Callable[[A], B]) -> Iterable[Tuple[B, Iterable[A]]]:
"""Returns an iterable of values grouped by the specific key. Within each
group the ordering if the original argument is preserved. This function is
more analogous to the GROUP BY operator in SQL than `itertools.groupby`.
>>> grouped([1.0, 1.3, 1.9, 0.9, 2.1, 1.1], key=round)
[(1, [1.0, 1.3, 0.9, 1.1]), (2, [1.9, 2.1])]
"""
groups: Dict[B, List[A]] = defaultdict(list)
for v in values:
groups[key(v)].append(v)
return list(groups.items())
[docs]
def sanitize_column_names(frame: DataFrame) -> DataFrame:
"""Sanitizes column names into valid mongo field names. Columns names
that can't be sanitized without conflicting with existing names will be
dropped.
"""
columns: Set[str] = set(frame.columns)
drops: List[str] = [] # [column name, ...]
renames: Dict[str, str] = {} # {from -> to, ...} mapping
for colname in list(frame.columns):
sanitized = colname
if sanitized.startswith('$'):
sanitized = '_' + sanitized[1:]
if '.' in sanitized:
sanitized = sanitized.replace('.', '_')
if sanitized != colname:
if sanitized in columns:
msg = (f'Dropped the column named "{colname}" after sanitizing '
f'it since it would conflict with "{sanitized}".')
markers.warning(f'columns.{colname}.sanitize', msg)
columns.remove(colname)
drops.append(colname)
else:
renames[colname] = sanitized
columns.remove(colname)
columns.add(sanitized)
frame = frame.rename(columns=renames, copy=False)
frame.drop(drops, axis=1, inplace=True)
return frame
UNIX_EPOCH_BASE = pytz.utc.localize(datetime(1970, 1, 1))
[docs]
def to_unix_epoch(dt: datetime) -> float:
"""Converts a timezone-aware datetime to a unix epoch value.
>>> tz = pytz.timezone('America/Los_Angeles')
>>> dt = tz.localize(datetime(2018, 10, 19, 10, 38, 2, 208000))
>>> to_unix_epoch(dt)
1539970682.208
"""
offset = (dt - UNIX_EPOCH_BASE).total_seconds()
return offset
NOTHING = object()
[docs]
def lookahead(iterable: Iterable[A], default: Union[A, B] = None) -> Iterator[Tuple[A, Union[A, B]]]:
"""Yields a pair `(elt, next_elt)` for every `elt` in `iterable`.
If `elt` is the last element then `next_elt` is `default`; otherwise it
is the next element in the iterable.
#>>> list(lookahead([1, 2, 3]))
[(1, 2), (2, 3), (3, None)]
#>>> list(lookahead([1, 2], default=0))
[(1, 2), (2, 0)]
#>>> list(lookahead([]))
[]
"""
iterator = iter(iterable)
value = next_value = next(iterator, NOTHING)
while next_value is not NOTHING:
next_value = next(iterator, NOTHING)
yield value, next_value if next_value is not NOTHING else default # type: ignore
value = next_value
_INVALID_KAFKA_TOPIC_CHARS = re.compile('[^a-zA-Z0-9._-]+')
_TOPIC_DELIMITER = "."
[docs]
def kafka_topic_with_suffix(tenant: str, suffix: str) -> str:
"""Produces generated topic string for Kafka
:param tenant: initial namespace for topic,
:param suffix: a string that will serve as the topic suffix
:return: topic name
"""
return _TOPIC_DELIMITER.join([tenant, _INVALID_KAFKA_TOPIC_CHARS.sub(_TOPIC_DELIMITER, suffix)])
def create_kafka_topic(row: Dict, topic_prefix: str, topic_suffix_field: Union[str, None], default_topic: str) -> str:
if not topic_suffix_field:
return default_topic
row_value = row.get(topic_suffix_field)
if not row_value or pd.isna(row_value):
log.warning(f"Row value for topic suffix field {topic_suffix_field} is missing/empty, using default topic {default_topic}")
return default_topic
return kafka_topic_with_suffix(
topic_prefix,
str(row_value)
)
def get_device_uuid() -> str:
return os.environ['DEVICE_UUID']
def get_device_name() -> str:
return os.environ['DEVICE_NAME']
[docs]
def get_device_environment() -> str:
"""
Which environment FTX thinks it's running in
Should be one of: unknown, mase, balena, iotedge
"""
return os.environ['DEVICE_ENVIRONMENT']
[docs]
def class_name(typ: Type) -> str:
"""
Given a Class/Type, determine the module reference for that type
"""
cls_nm = '.'.join(
i
for i in (typ.__dict__.get('__module__'), typ.__name__)
if i is not None
)
return cls_nm
def chunk_iter(iterable: Union[list, str], chunk_len: int) -> Generator[Any, None, None]:
for i in range(0, len(iterable), chunk_len):
yield iterable[i:i + chunk_len]
class Timer:
"""Records the duration of a block of code in the field `duration`."""
def __enter__(self) -> 'Timer':
self.start = time.monotonic()
return self
def __exit__(self, *args: Any) -> Any:
self.end = time.monotonic()
self.duration = self.end - self.start
def parse_iso_timestamp(timestamp: str) -> datetime:
# Parse Timestamps in ISO-Format but could be missing nano/micro/milli seconds or
# even seconds themselves. (Assumed to be UTC)
timestamp = timestamp.rstrip('Z')
if "." in timestamp:
prefix, fractional_seconds = timestamp.split(".")
return datetime.fromisoformat(prefix + "." + fractional_seconds[:6].ljust(6, '0'))
else:
return datetime.fromisoformat(timestamp)
def pd_timestamp_to_iso(timestamp: pd.Timestamp) -> str:
return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f") + f"{timestamp.nanosecond:03}Z"
[docs]
def timedelta_str_to_sec(timedelta_str: str) -> int:
"""
Convert time delta strings to seconds. For e.g. for "1m", this will return 60
:param timedelta_str: Time delta string
:return: Number of seconds (int)
"""
unit_to_sec = {
"s": 1,
"m": 60,
"h": 60*60,
"d": 24*60*60
}
try:
unit = timedelta_str[-1]
value = int(timedelta_str[:-1])
seconds = value * unit_to_sec[unit]
except (ValueError, KeyError, IndexError):
raise ValueError(f"Invalid timedelta string {timedelta_str}")
return seconds
def configure_extra_hosts(extra_hosts: List[Dict[str, str]], hosts_file_path: str = "/etc/hosts") -> None:
original_hosts_backup = f"{hosts_file_path}.original"
if not extra_hosts:
# If there are no extra hosts, restore the original /etc/hosts if it exists
if os.path.exists(original_hosts_backup):
shutil.copyfile(original_hosts_backup, hosts_file_path)
return
# Something has really gone wrong if this file doesn't exist
assert os.path.exists(hosts_file_path)
# Create /etc/hosts original copy if we haven't already
# If it's already been backed up then restore the file in preparation for overrides
if not os.path.exists(original_hosts_backup):
shutil.copyfile(hosts_file_path, original_hosts_backup)
else:
shutil.copyfile(original_hosts_backup, hosts_file_path)
# Add extra hosts to /etc/hosts and overwrite
with open(hosts_file_path, "a") as hosts_file:
for host in extra_hosts:
hosts_file.write(f"{host['ip']} {host['hostname']}\n")
[docs]
def default_legacy_bson(obj: Any, json_options: JSONOptions = DEFAULT_JSON_OPTIONS) -> Any:
"""
Moving from Python 3.7 to 3.12 would break the checksum_record transform because
the default BSON representation of datetime objects changed. This function sets the
default representation back to the legacy one.
"""
json_options.datetime_representation = DatetimeRepresentation.LEGACY
return json_util.default(obj, json_options=json_options)
[docs]
def manual_infer_dtypes(df: pd.DataFrame) -> pd.DataFrame:
"""
Pandas 2.0 removed the old '_convert' method that some transforms were using,
this method is a replacement for that functionality since convert_dtypes() and infer_types() can't handle
string -> dtype conversion...
This will do an in-place conversion, so pass in a deep copy if you don't want to modify the original df.
"""
for col in df.columns:
if not df[col].dtype == object:
# Skip columns that are already a dtype
continue
try:
df[col] = pd.to_numeric(df[col])
continue
except ValueError:
pass
try:
df[col] = pd.to_timedelta(df[col])
continue
except ValueError:
pass
try:
df[col] = pd.to_datetime(df[col])
continue
except ValueError:
pass
return df
[docs]
def is_nested_iotedge_child_device() -> bool:
"""
Returns True if the device is a child device of an IoT Edge device.
"""
return os.environ.get('IOTEDGE_PARENTHOSTNAME') is not None
[docs]
def data_vol_statvfs() -> os.statvfs_result:
"""
Returns the statvfs information for the data volume.
Unix platforms only.
f_bsize − preferred file system block size.
f_frsize − fundamental file system block size.
f_blocks − total number of blocks in the filesystem.
f_bfree − total number of free blocks.
f_bavail − free blocks available to non-super user.
f_files − total number of file nodes.
f_ffree − total number of free file nodes.
f_favail − free nodes available to non-super user.
f_flag − system dependent.
f_namemax − maximum file name length.
"""
return os.statvfs('/data')
[docs]
def save_dataframe_to_disk(df: DataFrame, save_path: str) -> None:
"""
Serialize a DataFrame to disk in the parquet format.
"""
dataframe_csv_str = df.to_csv(index=False)
disk_bytes_required = len(dataframe_csv_str) * 5 # Add a disk space buffer, so we don't cause everything to halt
# Check if csv space required is larger than available disk space
statvfs: os.statvfs_result = data_vol_statvfs()
disk_free_bytes = float(statvfs.f_frsize * statvfs.f_bavail)
if disk_bytes_required > disk_free_bytes:
log.error(f"Not enough space to save dataframe to disk. "
f"Required bytes: {disk_bytes_required}, Available bytes: {disk_free_bytes}")
return
with open(save_path, 'w') as f:
f.write(dataframe_csv_str)
[docs]
def aware_utcnow() -> datetime:
""" python 3.12 replacement for dt.utcnow() """
return datetime.now(timezone.utc)
def aware_utcfromtimestamp(timestamp: float) -> datetime:
return datetime.fromtimestamp(timestamp, timezone.utc)
def naive_utcnow() -> datetime:
return aware_utcnow().replace(tzinfo=None)
def naive_utcfromtimestamp(timestamp: float) -> datetime:
return aware_utcfromtimestamp(timestamp).replace(tzinfo=None)
[docs]
def dataframe_cdc(
current_state_df: pd.DataFrame,
new_data_df: pd.DataFrame,
ignored_columns_for_diff: List[str] = None
) -> pd.DataFrame:
"""
Returns the difference between two DataFrames (Change Data Capture).
Meant to compare an existing df state to a new df and return only the new data.
If 'ignored_columns_for_diff' is set, those columns will be ignored when diffing the data,
but will be included in the final returned df (using values from new_data_df).
If the column sets are not matching, new_data_df will be returned.
"""
# If ignored_columns_for_diff is set, remove those columns from the dataframes for merging
if ignored_columns_for_diff:
log.info(f"Ignoring columns {ignored_columns_for_diff} for diff")
table_data_df_subset = new_data_df.drop(columns=ignored_columns_for_diff, errors='ignore')
state_data_df_subset = current_state_df.drop(columns=ignored_columns_for_diff, errors='ignore')
else:
ignored_columns_for_diff = []
table_data_df_subset = new_data_df
state_data_df_subset = current_state_df
# Merge the two DataFrames and add a column '_merge' that indicates where each row comes from
# Filter the merged DataFrame to only include new data differences
if set(table_data_df_subset.columns) == set(state_data_df_subset.columns):
merged_df: pd.DataFrame = pd.merge(
table_data_df_subset,
state_data_df_subset,
how='outer',
indicator=True
)
merged_data_df: pd.DataFrame = merged_df[merged_df['_merge'] == 'left_only']
merged_data_df = merged_data_df.drop(columns='_merge')
# Add the ignored columns back to new_data_df from table_data_df if there are any rows
# Use .loc() so it only adds the columns back for the rows that are in the merged_data_df
if len(merged_data_df):
for col in ignored_columns_for_diff:
merged_data_df[col] = new_data_df.loc[merged_data_df.index, col]
else:
# If columns don't match, just assume new data since it would only try to merge on same cols
merged_data_df = new_data_df
return merged_data_df
[docs]
def get_config_component_types_names(root_config: dict) -> Dict[str, Set[str]]:
"""
Get the components' names and types that are in use by the given configuration.
"""
# Local import to avoid circular imports
from factorytx.receivers.file.receiver import PARSER_INFO
from factorytx.config import RECEIVER_INFO, TRANSFORM_INFO, TRANSMIT_INFO
# Parsers are only embedded in receivers, but we want to report on those as well
receiver_entries: List[dict] = root_config.get(RECEIVER_INFO.section_key, [])
parser_entries: Iterable[dict] = itertools.chain.from_iterable(
item[PARSER_INFO.section_key] for item in receiver_entries if PARSER_INFO.section_key in item
)
components_dict: Dict[str, Set[str]] = {
'receivers': set(item[RECEIVER_INFO.type_key] for item in receiver_entries),
'parsers': set(item[PARSER_INFO.type_key] for item in parser_entries),
'transforms': set(
item[TRANSFORM_INFO.type_key] for item in root_config.get(TRANSFORM_INFO.section_key, [])
),
'transmits': set(
item[TRANSMIT_INFO.type_key] for item in root_config.get(TRANSMIT_INFO.section_key, [])
),
}
return components_dict