from typing import (
Callable, Dict, Generator, Iterable, Iterator, List, NamedTuple,
Optional, Pattern, Set, Tuple, Type, Union
)
import contextlib
from collections import defaultdict
from datetime import datetime, timedelta
from functools import partial
import fnmatch
import inspect
import logging
import os
import re
import shutil
import tempfile
import textwrap
import threading
import time
import sqlite3
import warnings
import hashlib
import pandas as pd
from func_timeout import func_timeout, FunctionTimedOut
import pytz
from pandas.errors import ParserError
import factorytx.const
from factorytx.base import Receiver, ConnectionTestResult
from factorytx.sqlite import MigrationDict
from factorytx.config import ConfigInfo, clean_section, get_component_class
from factorytx import const
from factorytx.dataflow import DataflowGraph, InputStreamId, stream_matches
from factorytx.exceptions import Failure
from factorytx.logs import push_context, FileContextLogger
from factorytx import markers
from factorytx.monitor.monitors import Monitors
from factorytx.receivers.file.base import Connection, ConnectionError, FileEntry, Parser, parsers
from factorytx.receivers.file.state import ParsingCandidate, StateStore, StateStoreDBMigrator
from factorytx.state.hash_store import IntHashStore, ROW_HASH_COL_NAME, HASH_MAX_INT
from factorytx.utils import (
Timer, add_columns_if_absent, ensure_dir_exists, get_component_path, grouped,
lookahead, safe_path_join
)
from factorytx.validation import (
ConfigPath, ValidationError, ValidationMessage, ValidationWarning,
clean_with_json_schema, has_errors
)
# Set up regular app logger
log = logging.getLogger(__name__)
PARSER_INFO: ConfigInfo[Parser] = ConfigInfo(
registry=parsers,
section_key='parsers',
type_key='parser_type',
version_key='parser_version',
name_key='parser_name',
)
class _FileStream(NamedTuple):
"""Represents a single stream configuration."""
input_stream_id: InputStreamId
asset: str
stream_type: str
applies_to: Callable
parser: Parser
def conn_list_compat(connection: Connection, last_completed_file: Optional[str]) -> Generator[FileEntry, None, None]:
# TODO: Remove the `inspect` detection when we want to drop
# backwards compatibility
list_args = inspect.getfullargspec(connection.list).args
if 'start_after_hint' in list_args:
for f_entry in connection.list(start_after_hint=last_completed_file):
yield f_entry
else:
warnings.warn("The base Connection.list() method now "
"supports using a 'start_after_hint' "
"parameter. Please update your custom "
"connection to support this signature.",
DeprecationWarning)
for f_entry in connection.list(): # type: ignore
yield f_entry
class FileReceiverDBMigrator(StateStoreDBMigrator):
def __init__(self, rcvr: Union['FileReceiver']) -> None:
self.rcvr = rcvr
def _MIGRATION_SQL(self) -> MigrationDict:
"""
Extend state DB migration table to contain additional migration scripts.
"""
migration_sql: MigrationDict = super()._MIGRATION_SQL()
migration_name = '0005-name-connections'
assert migration_name not in migration_sql
migration_sql[migration_name] = self.db_migration_name_connection
return migration_sql
def db_migration_name_connection(self, conn: sqlite3.Connection) -> None:
"""
The state DB use to have nameless connections per (asset, stream_type, path) combinations. Now
named connections are allowed which support the previous compound key to be duplicated between
connections. Leaving nameless connections in the state DB is a problem, so this migration script
ensures all entries have a name.
There is an assumption that at this point in time, there are no entries which violate the assumption
that the state.db only contains one row per (asset, stream_type, path) combinations.
:param conn: sqlite connection to apply changes to.
:return:
"""
rows = conn.execute("SELECT COUNT(1) FROM files WHERE connection IS NULL").fetchall()
if rows[0][0] < 1:
# Nothing to do, skip
return
params: List = []
for connection_config in self.rcvr.connection_configs:
with contextlib.closing(self.rcvr.connection_class(connection_config, self.rcvr.root_config)) as connection:
for file_entry in conn_list_compat(connection, None):
matching_streams = [stream for stream in self.rcvr.streams if stream.applies_to(file_entry)]
# SQLite execute likes lists vs tuples
params.extend(
[connection.name, stream.asset, stream.stream_type, file_entry.path]
for stream in matching_streams
)
if params:
# Assume only one entry per (asset, stream_type, path) compound and no duplicately named connection.
conn.executemany(
# language=SQLite
textwrap.dedent("""\
UPDATE files
SET connection = ?
WHERE connection is NULL
AND asset = ?
AND stream_type = ?
AND path = ?
"""),
params
).fetchall()
# Did we cover all the connections?
# language=SQLite
rows = conn.execute("SELECT * FROM files WHERE connection IS NULL").fetchall()
if rows:
# If this log message is too large, it will be truncated.
columns = [
r[1]
for r in conn.execute("pragma table_info(files)").fetchall()
]
log.warning(
(
'factorytx.receivers.file.receiver.FileReceiver.db_migration_name_connection: '
'The following %s rows which can not be found will be deleted:\n'
'%s\n%s'
),
len(rows),
'|'.join(columns),
'\n'.join(
'|'.join(repr(i) for i in row)
for row in rows
),
)
# language=SQLite
conn.execute("DELETE FROM files WHERE connection IS NULL").fetchall()
[docs]class FileReceiver(Receiver):
"""Receiver that fetches files from instances of a Connection subclass and
parses them using the configured Parser types.
Here's how to create a custom FileReceiver class to handle a new protocol:
1. Define a Connection subclass that uses the protocol to list
available files, fetch files, and optionally delete files from a
remote server.
.. code-block:: Python
class MyConnection(Connection):
...
2. Register a custom FileReceiver subclass for your Connection type:
.. code-block:: Python
@receivers.register('my_receiver')
class MyReceiver(FileReceiver):
connection_class = MyConnection
"""
connection_class: Type[Connection]
db_migrator_class: Type[FileReceiverDBMigrator] = FileReceiverDBMigrator
monitors: Monitors
schema = {
"type": "object",
"properties": {
"data_receiver_name": {"type": "string"},
"protocol": {"type": "string"},
"connections": {"type": "array"},
"process_files_alphabetically": {
"type": "boolean",
"default": False
},
"process_ordered_connections": {
"type": "boolean",
"default": False
},
"streams": {
"type": "array",
"items": {
"type": "object",
"properties": {
"asset": {"type": "string"},
"stream_type": {"type": "string"},
"file_filter": {
"type": "array",
"items": {"type": "string", "minLength": 1},
},
"parser": {"type": "string", "minLength": 1},
"path_filter": {
"type": "array",
"items": {"type": "string", "minLength": 1},
},
},
"additionalProperties": True,
"required": ["asset", "stream_type"],
},
},
"parsers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"parser_name": {"type": "string", "minLength": 1},
"parser_type": {"type": "string", "minLength": 1},
"parser_version": {"type": "integer", "minimum": 1},
"filter_stream": {
"type": "array",
"items": {"type": "string", "minLength": 1},
},
},
"additionalProperties": True,
"required": ["parser_name", "parser_type"],
},
},
"skip_parser_errors": {
"type": "boolean",
"default": False,
"description": (
"Whether the receiver will allow parser errors when processing files. If True, parsing errors will "
"be skipped and logged in a quarantine log file while the receiver continues processing. "
"Otherwise if we encounter a parsing error, the receiver will halt until the file gets repaired."
),
},
"archive_completed": {
"type": "boolean",
"default": False,
"description": (
"Whether to keep a local copy of each file receiver from the remote "
"server or not. If enabled, files will be archived until their total "
"size increases above the amount specified in the 'max_archive_size_mb' "
"parameter."
),
},
"max_archive_size_mb": {
"type": "number",
"default": -1,
"description": (
"If 'archive_completed' is True, delete the archive completed files "
"once the total size (in mb) is greater than this value. A negative "
"value means never delete any files."
)
},
"delete_completed": {
"type": "boolean",
"default": False,
"description": (
"Whether to delete each file from the remote server once it "
"has been successfully processed."
),
},
"read_in_progress_files": {
"type": "boolean",
"default": False,
"description": (
"Whether to read files as soon as they are created, or wait "
"for the upstream service to stop writing to the file before "
"reading it."
),
},
"emit_file_metadata": {
"type": "boolean",
"default": False,
"description": (
"Whether or not to inject additional columns into each record "
"containing metadata about the file it came from. If this setting "
"is enabled then every record will contain fields named `file_name`, "
"`file_path`, and `file_timestamp`."
),
},
"poll_interval": {
"type": "number",
"minimum": 0.1,
"default": 60,
"description": "Number of seconds to wait before fetching new data from the server.",
},
"temporary_file_directory": {
"type": "string",
"default": "/tmp",
"description": "Specify the directory to temporarily store "
"files that have been downloaded from the "
"connection(s). By default, the directory is "
"used is `/tmp`.",
},
"deduplicate_rows": {
"type": "boolean",
"default": False,
},
"deduplicate_row_retention_time_days": {
"type": "number",
"minimum": 1,
"default": 90,
},
},
"required": ["data_receiver_name", "protocol", "connections", "streams", "parsers"],
"additionalProperties": False,
}
"""All file receivers support these **required** and *optional* properties:
- **data_receiver_name**: Unique name of the data receiver. This name
will be used to track the progress state of the data stream.
- **protocol**: Protocol to be used.
- **streams**: List of input data streams to read from.
- **asset**: Asset identifier
- **stream_type**: Type of data stream
- **parser**: Name of the parser used to convert the file. This has to
match one of the *parser_name* values in the *parsers* section.
- *file_filter*: List of files to filter on. Items can be regular
expressions. You must select either *file_filter* or *path_filter*
but not both.
- *path_filter*: List of paths to filter on. Items can be regular
expressions. You must select either *file_filter* or *path_filter*
but not both.
- **connections**: A list of connection settings used to connect to the
data source. Please refer to the connection settings below.
- *process_files_alphabetically*: ``true`` to process all of the files
retrieved by the connection(s) in alphabetical order. By default, this
option is ``false``, so files are processed by last modified
(or created) time with oldest files first.
- *process_ordered_connections*: If there are multiple connections and
you want to process (and transmit) the files received from one
connection before another, set option to ``true`` to process files
based on the order of the connection(s). By default, this option is
set to ``false``. This option is usually related to the Kafka transmit
because events within a Kafka topic need to be in order. For example,
if one connection receives historical files and another connection
receives "realtime" files, you'll want to enable this setting and
order the connections with the historical connection first, so that
events in the Kafka topic are in chronological order. Please note that
the order of the files per connection is maintained: if
``process_files_alphabetically`` is enabled, files will be parsed in
alphabetical order; if disabled, files will be parsed in chronological
order based on the last modified time.
- **parsers**: A list of parsers that can be used for the input streams.
Each parser has the following default properties:
- **parser_name**: A customizable name used by streams to identify
the parser.
- **parser_type**: Type of parser to apply (e.g. ``csv``)
- *parser_version*: Version of the parser type to apply.
FactoryTX has a few built-in parsers available for you to use. Please
refer to the :ref:`Parsers Configurations` section of the manual for
more details about them.
- *skip_parser_errors*: Whether the receiver will allow parser errors when
processing files. If True, parsing errors will be skipped and logged in
a quarantine log file while the receiver continues processing. Otherwise
if we encounter a parsing error, the receiver will halt until the file
gets repaired. (Only works for csv files)
- *archive_completed*: ``true`` to keep a local copy of each file received
from the remote server, or ``false`` if local copies should not be kept.
If enabled, files will be archived until their total size increases
above the amount specified in the *max_archive_size_mb* parameter.
- *max_archive_size_mb*: If *archive_completed* is ``true``, delete the
archive completed files once the total size (in mb) is greater than
this value. A negative value means never delete any files. Defaults
to -1.
- *delete_completed*: ``true`` if files should be deleted from the data
directory after they have been received by FactoryTX, or ``false`` if
files should never be deleted. Defaults to ``false`` to avoid
accidentally losing data.
- *archive_completed* and *delete_completed* are independent of each
other. Archiving will create a copy of the file in a new directory,
while deleting will remove the original file.
- *read_in_progress_files*: Whether to read files as soon as they are
created, or wait for the upstream service to stop writing to the file
before reading it.
- *emit_file_metadata*: Whether or not to inject additional columns into
each record containing metadata about the file it came from. If this
setting is enabled then every record will contain fields named
``file_name``, ``file_path``, and ``file_timestamp``.
- *poll_interval*: Number of seconds to wait before fetching new data
from the server.
- *temporary_file_directory*: Specify the directory to temporarily store
files that have been downloaded from the connection(s). By default,
the directory used is ``/tmp``.
- *deduplicate_rows*: Whether or not to deduplicate rows from processing files.
This is done by hashing each row and saving it compare against later
if the file is modified. If the file is modified, only the new rows
will be processed. This should be used if the same file is expected to be re-processed multiple times and
it will be regularly modified, rather than appended to.
All data in each row must be a hashable type (no lists, dicts, etc.)
File hash stores will be in the receiver's component directory in the "row_hash_stores" directory.
- *deduplicate_row_retention_time_days*: The number of days to keep row hashes for deduplication (default 90).
"""
@classmethod
def _clean_connections(cls: Type['FileReceiver'], config: dict,
root_config: dict) -> List[ValidationMessage]:
path: ConfigPath
connection_names: Set[str] = set()
validation_results: List[ValidationMessage] = []
for conn_no, conn_config in enumerate(config['connections']):
if isinstance(conn_config, dict):
connection_name = conn_config.get('connection_name')
if connection_name is None:
path = ('connections', conn_no)
msg = f"A connection name is required for connection {conn_no + 1}."
validation_results.append(ValidationError(path, msg))
continue
if connection_name == '':
path = ('connections', conn_no, 'connection_name')
msg = "'' is too short."
validation_results.append(ValidationError(path, msg))
continue
if connection_name in connection_names:
path = ('connections', conn_no, 'connection_name')
msg = f'A connection named "{connection_name}" already exists.'
validation_results.append(ValidationError(path, msg))
connection_names.add(connection_name)
for validation_result in cls.connection_class.clean(conn_config, root_config):
path = ('connections', conn_no) + validation_result.path # type: ignore
validation_result = validation_result._replace(path=path)
validation_results.append(validation_result)
return validation_results
@classmethod
def _clean_parsers(cls: Type['FileReceiver'], config: dict,
root_config: dict, parser_names: Set[str]) -> List[ValidationMessage]:
path: ConfigPath
validation_results: List[ValidationMessage] = []
validation_results.extend(clean_section(PARSER_INFO, config, root_config, parser_names))
if has_errors(validation_results):
return validation_results
# Flag a warning if any of the parsers use filter_stream. We only
# warning for the first instance to keep the verbosity low.
for parser_no, parser_config in enumerate(config['parsers']):
if 'filter_stream' in parser_config:
path = ('parsers', parser_no, 'filter_stream')
msg = ('The "filter_stream" option is deprecated. Please set the '
'stream\'s "parser" option instead.')
validation_results.append(ValidationWarning(path, msg))
break
return validation_results
@classmethod
def _clean_streams(cls: Type['FileReceiver'], config: dict,
parser_names: Set[str]) -> List[ValidationMessage]:
path: ConfigPath
validation_results: List[ValidationMessage] = []
# There are two ways to associate parsers with streams:
# 1. By specifying the name of the parser for the stream.
# 2. By specifying a filter_stream on the parser.
#
# Specifying a parser is preferred, since it makes errors (eg.
# associating multiple parsers with a stream) less likely. To simplify
# things the constructor, we convert all instances of filter_stream into
# parser names attached to the stream.
for stream_no, stream_config in enumerate(config['streams']):
parser = stream_config.get('parser')
if parser is not None and parser not in parser_names:
path = ('streams', stream_no, 'parser')
msg = f'No such parser: "{parser}"'
validation_results.append(ValidationError(path, msg))
stream_id = str(InputStreamId(stream_config['asset'], stream_config['stream_type']))
for parser_no, parser_config in enumerate(config['parsers']):
filter_stream = parser_config.get('filter_stream')
if filter_stream is None or not stream_matches(stream_id, filter_stream):
continue
if parser is None:
parser = parser_config['parser_name']
else:
path = ('parsers', parser_no, 'filter_stream')
msg = f'{stream_id} is already associated with the parser "{parser}"'
validation_results.append(ValidationError(path, msg))
if parser is None:
path = ('streams', stream_no)
msg = f"'parser' is a required property"
validation_results.append(ValidationError(path, msg))
stream_config['parser'] = parser
# Streams should have either 'file_filter' or 'path_filter' but not both
if stream_config.get('file_filter') and stream_config.get('path_filter'):
path = ('streams', stream_no, 'file_filter')
msg = f"one stream can't have both 'file_filter' and 'path_filter' properties"
validation_results.append(ValidationError(path, msg))
elif not(stream_config.get('file_filter') or stream_config.get('path_filter')):
path = ('streams', stream_no)
msg = f"either 'file_filter' or 'path_filter' is required"
validation_results.append(ValidationError(path, msg))
return validation_results
[docs] @classmethod
def clean(cls: Type['FileReceiver'], config: dict,
root_config: dict) -> List[ValidationMessage]:
path: ConfigPath
parser_names: Set[str] = set()
validation_results: List[ValidationMessage] = []
validation_results.extend(clean_with_json_schema(FileReceiver.schema, config))
if has_errors(validation_results):
return validation_results
validation_results.extend(cls._clean_connections(config, root_config))
# Nothing else depends on connections being validated, so we can continue
# if there are any errors.
validation_results.extend(cls._clean_parsers(config, root_config, parser_names))
if has_errors(validation_results):
return validation_results
# Streams must be cleaned AFTER parsers, and only if the parsers are valid.
# Otherwise the stream validation won't have the list of parser names, and
# may crash if it encounters an invalid stream.
validation_results.extend(cls._clean_streams(config, parser_names))
if has_errors(validation_results):
return validation_results
ensure_dir_exists(config['temporary_file_directory'])
return validation_results
@staticmethod
def applies_to_filename(file_entry: FileEntry, file_filter: List[Pattern]) -> bool:
return any(f.match(file_entry.filename) for f in file_filter)
@staticmethod
def applies_to_path(file_entry: FileEntry, file_filter: List[Pattern]) -> bool:
return any(f.match(file_entry.path) for f in file_filter)
def __init__(self, dataflow: DataflowGraph, config: dict, root_config: dict) -> None:
super().__init__(dataflow, config, root_config)
self.dataflow = dataflow
self.connection_configs: List[dict] = config['connections']
self.root_config = root_config
self.data_receiver_name = config['data_receiver_name']
self.monitors = Monitors('receiver', 'FileReceiver', self.data_receiver_name)
# Set up file-logger for quarantine results
quarantine_dir_path: str = os.path.join(factorytx.const.COMPONENT_DATA_PATH,
self.data_receiver_name,
'quarantine')
self.quarantine_log = FileContextLogger(os.path.join(quarantine_dir_path, 'quarantine.log'))
parsers: Dict[str, Parser] = {}
for parser_config in config['parsers']:
parser_cls = get_component_class(PARSER_INFO, parser_config)
parsers[parser_config['parser_name']] = parser_cls(parser_config, root_config)
self.streams: List[_FileStream] = []
for stream_config in config['streams']:
if 'file_filter' in stream_config:
file_filter = [re.compile(fnmatch.translate(f)) for f in stream_config['file_filter']]
applies_to = partial(FileReceiver.applies_to_filename, file_filter=file_filter)
elif 'path_filter' in stream_config:
file_filter = [re.compile(fnmatch.translate(f)) for f in stream_config['path_filter']]
applies_to = partial(FileReceiver.applies_to_path, file_filter=file_filter)
else:
assert False
self.streams.append(_FileStream(
input_stream_id=InputStreamId(stream_config['asset'], stream_config['stream_type']),
asset=stream_config['asset'],
stream_type=stream_config['stream_type'],
applies_to=applies_to,
parser=parsers[stream_config['parser']],
))
self.skip_parser_errors = config['skip_parser_errors']
self.archive_completed = config['archive_completed']
self.max_archive_size_mb = config['max_archive_size_mb']
self.delete_completed = config['delete_completed']
self.poll_interval = config['poll_interval']
self.read_in_progress_files = config['read_in_progress_files']
self.emit_file_metadata = config['emit_file_metadata']
self.local_temp_dir = config['temporary_file_directory']
self.last_file_entries: Dict[Tuple[int, str], FileEntry] = {}
self.process_files_alphabetically = config['process_files_alphabetically']
self.process_ordered_connections = config['process_ordered_connections']
self.deduplicate_rows = config['deduplicate_rows']
self.deduplicate_rows_retention_td: timedelta = timedelta(
days=int(config['deduplicate_row_retention_time_days'])
)
self._row_hash_stores_path = safe_path_join(get_component_path(self.name), "row_hash_stores.db")
if self.deduplicate_rows:
self.row_hash_store: Optional[IntHashStore] = IntHashStore(state_path=self._row_hash_stores_path)
else:
self.row_hash_store = None
component_path = get_component_path(self.name)
self._completed_path = safe_path_join(component_path, 'completed')
self._state_store_path = safe_path_join(component_path, 'state.db')
self._store = StateStore(self._state_store_path)
self._store.initialize(db_migrator=self.db_migrator_class(self))
def __getstate__(self) -> dict:
"""Deconstructs an object for pickling."""
# Don't try to pickle the state store -- we'll reconnect after forking.
return {k: v for k, v in self.__dict__.items() if k != '_store'}
def __setstate__(self, state: dict) -> None:
"""Reconstructs an object when unpickling."""
self.__dict__.update(state)
self._store = StateStore(self._state_store_path)
[docs] def purge(self, streams: List[InputStreamId]) -> None:
self._store.purge(streams)
if self.row_hash_store:
self.row_hash_store.purge(streams)
def _list_candidates(self, connections: List[Connection]) -> List[ParsingCandidate]:
"""Returns a list of ParsingCandidate objects representing files to be
parsed and the parsers to parse them with. One file may be associated
with multiple parsers and streams.
"""
new_file_entries: Dict[Tuple[int, str], FileEntry] = {} # (connection no, path) -> file entry
candidates: List[ParsingCandidate] = []
for connection_no, connection in enumerate(connections):
def _list_files(connection: Connection) -> Generator[FileEntry, None, None]:
marker_category = f'connections.{connection.name}.list'
try:
last_completed_file: Optional[str] = None
if self.process_files_alphabetically:
last_completed_file = self._store.fetch_last_completed_file(connection.name)
yield from conn_list_compat(connection, last_completed_file)
markers.clear(marker_category)
except ConnectionError as e:
markers.error(marker_category, f'{connection.name} - {connection.connection_info} failed to list files. {e.message}')
raise Failure
for file_no, file_entry in enumerate(_list_files(connection)):
if file_no % 5000 == 0:
log.debug('List Candidate:%d:%s:%d:%s',
connection_no, connection, file_no, file_entry)
if file_entry.filename == '.DS_Store':
continue # Make OS X users happy.
matching_streams = [stream for stream in self.streams if stream.applies_to(file_entry)]
if not matching_streams:
# Don't bother saving this entry in new_file_entries if we're
# not interested in it. This saves memory when the upstream
# server has a large number of files and we're interested in
# only a small number of them.
continue
if not self.read_in_progress_files:
# Avoid reading files that are still being written.
file_key = (connection_no, file_entry.path)
new_file_entries[file_key] = file_entry
prev_file_entry = self.last_file_entries.get(file_key)
if (prev_file_entry is None or
prev_file_entry.size != file_entry.size or
prev_file_entry.mtime != file_entry.mtime):
continue
for stream in matching_streams:
candidate = ParsingCandidate(
connection=connection,
parser=stream.parser,
file=file_entry,
input_stream_id=stream.input_stream_id,
asset=stream.asset,
stream_type=stream.stream_type,
state=None,
)
candidates.append(candidate)
self.last_file_entries = new_file_entries
return candidates
def _delete_temp_file(self, filename: str) -> None:
"""Deletes a temporary file that's a copy of the source file"""
try:
os.remove(filename)
except OSError as e:
log.warning("Unable to delete temp file %s - %s. If the file still "
"exists, you will need to manually clean it up.",
filename, e.strerror)
[docs] @contextlib.contextmanager
def handle_processing_failure(self, filename: str) -> Iterator[None]:
"""Handles cleaning up of a temporary file if we run into any errors
while processing the file's data, so that we don't bloat the temporary
storage.
"""
# TODO: FTX-575 - Keep track of invalid files, so we can keep a
# single copy of it and having a list or table for reference will
# help debugging
try:
yield
except Failure:
self._delete_temp_file(filename)
raise
# TODO: Need to disambiguate state by connection?
def _deduplicate_rows(self, frame: pd.DataFrame, file_entry: FileEntry, stream_id: InputStreamId) -> pd.DataFrame:
if frame.empty or not self.row_hash_store:
return frame
# Check dataframe rows against file row-hash store
data_id = file_entry.path.replace(os.path.sep, '_')
# These hashes are tracked per file, even with reducing the number of unique hashes we shouldn't get collisions
# SQLite max int is 32-bit, so we'll mod to the max value
frame[ROW_HASH_COL_NAME] = frame.apply(
lambda row: int(hashlib.md5(str(row).encode()).hexdigest(), 16) % HASH_MAX_INT,
axis=1
)
hashes = set(frame[ROW_HASH_COL_NAME])
with self.row_hash_store as hash_store:
new_hashes = hash_store.filter_new_hashes(hashes, data_id)
# remove rows that already exist in the hash table
frame = frame[frame[ROW_HASH_COL_NAME].isin(new_hashes)]
# add new rows to the hash table
hash_store.add_hashes(data_id=data_id, stream_id=stream_id, hash_data=new_hashes)
# remove extra columns
return frame.drop(columns=[ROW_HASH_COL_NAME])
def _process_grouped_candidates(
self,
grouped_candidates: Iterable[Tuple[Tuple[FileEntry, Connection], Iterable[ParsingCandidate]]]
) -> None:
for (file_entry, connection), file_candidates in grouped_candidates:
completed_path = safe_path_join(self._completed_path, file_entry.path)
ensure_dir_exists(os.path.dirname(completed_path))
with tempfile.NamedTemporaryFile(delete=False, dir=self.local_temp_dir) as fp:
with self.handle_processing_failure(fp.name):
marker_category = f'connections.{connection.name}.copy'
try:
connection.copy(file_entry, fp.name)
markers.clear(marker_category)
except ConnectionError as e:
msg = f'{connection.name} - {connection.connection_info} failed to copy {file_entry.path}. {e.message}'
markers.error(marker_category, msg)
raise Failure
for c in file_candidates:
# TODO: Pass stream_id instead?
with push_context(component=self.name,
component_type=self.component_type,
asset=c.asset,
stream_type=c.stream_type):
log.info("Processing file %s from %s",
file_entry.filename, file_entry.path)
try:
results = c.parser.process(file_entry, c.state, fp.name)
for (frame, c.state), next_result in lookahead(results):
frame = frame.copy(deep=True) # avoid SettingWithCopyError if we reassign dataframe cell values
if self.deduplicate_rows:
frame = self._deduplicate_rows(frame, file_entry, c.input_stream_id)
modification_timestamp = datetime.fromtimestamp(file_entry.mtime, pytz.utc)
additional_columns: dict = {
const.ASSET_COLNAME: c.asset,
const.STREAM_TYPE_COLNAME: c.stream_type,
}
if self.emit_file_metadata:
additional_columns[const.FILE_NAME_COLNAME] = file_entry.filename
additional_columns[const.FILE_PATH_COLNAME] = file_entry.path
additional_columns[const.FILE_TIMESTAMP_COLNAME] = modification_timestamp
add_columns_if_absent(frame, additional_columns)
dataflow_process_marker = f'connections.{connection.name}.dataflow:process'
receiver_log_context = markers.get_context()
try:
self.dataflow.process(c.input_stream_id,
frame)
markers.clear(dataflow_process_marker,
receiver_log_context)
except Failure:
# Invalid file - Some ways to fail:
# 1. Frame didn't pass validation
# 2. A transform on the frame failed
# 3. Frame couldn't be processed by transmit
# TODO: Should we differentiate between failures?
markers.error(
category=dataflow_process_marker,
message=f"Dataflow failed to process file: {file_entry.path}",
context=receiver_log_context
)
raise
if next_result is not None: # Avoid writing the last state twice.
self._store.record(c, completed=False)
except ParserError as e:
parser_error_msg = (f"File {file_entry.filename} from {file_entry.path} "
f"could not be parsed: {str(e)}").replace('\n', '')
log.warning(parser_error_msg)
# If config is not set to skip errors, raise the exception
if not self.skip_parser_errors:
raise Failure from ParserError
# Log the ParserError as a warning to system log and also out to
# a separate 'quarantine.log' file
# Then continue on and record ParsingCandidate result to DB
try:
self.quarantine_log.write_msg(f"{parser_error_msg}")
except OSError as exc:
markers.error(
category=f"quarantine",
message=f"Failed to write to quarantine log: {exc}",
)
# Reset state if the file has been completed and we are keeping track of deduplicating row hashes since
# future changes might come out of order, we can't just go by row count
self._store.record(c, completed=True, clear_state_on_completed=self.deduplicate_rows)
# optimize hash_store to save disk space
if self.deduplicate_rows and self.row_hash_store:
with self.row_hash_store as hash_store:
hash_store.optimize(hash_retention_timedelta=self.deduplicate_rows_retention_td)
if self.archive_completed:
shutil.move(fp.name, completed_path)
if self.delete_completed:
marker_category = f'connections.{connection.name}.delete'
try:
connection.delete(file_entry)
markers.clear(marker_category)
except ConnectionError as e:
msg = f'{connection.name} - {connection.connection_info} failed to delete {file_entry.path}. {e.message}'
markers.error(marker_category, msg)
# Keep going if we can't delete the file, backing off
# will only cause us to collect data slowly.
if not self.archive_completed:
self._delete_temp_file(fp.name)
def _process_candidates(self, candidates: Iterable[ParsingCandidate]) -> None:
"""Parses a list of candidates, pushes them through the data pipeline,
and stores any state emitted by the parsers.
"""
if self.process_ordered_connections:
connection_files: Dict[str, List[ParsingCandidate]] = defaultdict(list)
for candidate in candidates:
connection_files[candidate.connection.name].append(candidate)
for connection in self.connection_configs:
connection_name: str = connection['connection_name']
file_list: List[ParsingCandidate] = connection_files.get(connection_name, [])
grouped_candidates = grouped(file_list, key=lambda c: (c.file, c.connection))
self._process_grouped_candidates(grouped_candidates)
else:
grouped_candidates = grouped(candidates, key=lambda c: (c.file, c.connection))
self._process_grouped_candidates(grouped_candidates)
[docs] def test_connection(self) -> ConnectionTestResult:
def inner() -> ConnectionTestResult:
try:
connections: List[Connection] = []
for connection_config in self.connection_configs:
connection = self.connection_class(connection_config, self.root_config)
connections.append(connection)
self._list_candidates(connections)
return ConnectionTestResult(
message="Connection Successful!",
connection_successful=True,
)
except Exception as e:
return ConnectionTestResult(
message=f"Connection Failed! Error: {str(e)}",
connection_successful=False,
)
TIMEOUT_SECONDS = 5
try:
return func_timeout(TIMEOUT_SECONDS, inner)
except FunctionTimedOut:
return ConnectionTestResult(
message=f"Connection attempt timed out after {TIMEOUT_SECONDS} seconds.",
connection_successful=False,
)
[docs] def poll(self, connections: List[Connection]) -> None:
"""Processes all new or modified files from one or more connections."""
candidates: List[ParsingCandidate] = []
with Timer() as list_candidates_timer:
candidates = self._list_candidates(connections)
self.monitors.update_timings('candidate_listing', float(list_candidates_timer.duration))
log.info('Found %d parseable files', len(candidates))
self.monitors.update_qtys('parseable_files', float(len(candidates)))
candidates = self._store.fetch_new_or_modified(
candidates, alphabetical=self.process_files_alphabetically
)
log.info('Processing %d new or modified files', len(candidates))
self.monitors.update_qtys('new_or_modified_parseable_files', float(len(candidates)))
with Timer() as processing_timer:
self._process_candidates(candidates)
self.monitors.update_timings('candidate_processing', float(processing_timer.duration))
[docs] def run_once(self) -> None:
"""Runs a single polling pass. This method should only be used for unit
tests since it sets up and tears down new connections whenever it runs.
"""
with contextlib.ExitStack() as stack:
connections: List[Connection] = []
for connection_config in self.connection_configs:
connection = self.connection_class(connection_config, self.root_config)
stack.push(contextlib.closing(connection))
connections.append(connection)
self.poll(connections)
[docs] def run(self) -> None:
# clean up completed files
if self.archive_completed and self.max_archive_size_mb >= 0:
threading.Thread(target=self.clean_completed_dir).start()
# XXX: Switch to only using a single connection.
with contextlib.ExitStack() as stack:
connections: List[Connection] = []
for connection_config in self.connection_configs:
connection_name = connection_config['connection_name']
marker_category = f'connections.{connection_name}.connect'
try:
connection = self.connection_class(connection_config, self.root_config)
markers.clear(marker_category)
except ConnectionError as e:
markers.error(marker_category, f'{connection_name} failed to connect. {e.message}')
raise Failure
stack.push(contextlib.closing(connection))
connections.append(connection)
while True:
self.poll(connections)
time.sleep(self.poll_interval)
def clean_completed_dir(self, run_once: bool=False, sleep: float=300) -> None:
marker_category = 'os.clean'
max_size = self.max_archive_size_mb * 1024 * 1024
while True:
try:
total_archive_size = 0
ensure_dir_exists(self._completed_path)
for dir_path, _, filenames in os.walk(self._completed_path):
for filename in sorted(filenames, key=lambda f: -os.path.getmtime(os.path.join(dir_path, f))):
path = os.path.join(dir_path, filename)
filesize = os.path.getsize(path)
total_archive_size += filesize
if total_archive_size > max_size:
os.remove(path)
if run_once:
break
time.sleep(sleep)
except OSError as e:
msg = f'Error cleaning directory: {e.strerror}'
markers.error(marker_category, msg)