Source code for factorytx.receivers.file.receiver

from typing import (
    Callable, Dict, Generator, Iterable, Iterator, List, NamedTuple,
    Optional, Pattern, Set, Tuple, Type, Union
)

import contextlib
from collections import defaultdict
from datetime import datetime, timedelta
from functools import partial
import fnmatch
import inspect
import logging
import os
import re
import shutil
import tempfile
import textwrap
import threading
import time
import sqlite3
import warnings

import hashlib
import pandas as pd
from func_timeout import func_timeout, FunctionTimedOut

import pytz
from pandas.errors import ParserError

import factorytx.const
from factorytx.base import Receiver, ConnectionTestResult
from factorytx.sqlite import MigrationDict
from factorytx.config import ConfigInfo, clean_section, get_component_class
from factorytx import const
from factorytx.dataflow import DataflowGraph, InputStreamId, stream_matches
from factorytx.exceptions import Failure
from factorytx.logs import push_context, FileContextLogger
from factorytx import markers
from factorytx.monitor.monitors import Monitors
from factorytx.receivers.file.base import Connection, ConnectionError, FileEntry, Parser, parsers
from factorytx.receivers.file.state import ParsingCandidate, StateStore, StateStoreDBMigrator
from factorytx.state.hash_store import IntHashStore, ROW_HASH_COL_NAME, HASH_MAX_INT
from factorytx.utils import (
    Timer, add_columns_if_absent, ensure_dir_exists, get_component_path, grouped,
    lookahead, safe_path_join
)
from factorytx.validation import (
    ConfigPath, ValidationError, ValidationMessage, ValidationWarning,
    clean_with_json_schema, has_errors
)

# Set up regular app logger
log = logging.getLogger(__name__)

PARSER_INFO: ConfigInfo[Parser] = ConfigInfo(
    registry=parsers,
    section_key='parsers',
    type_key='parser_type',
    version_key='parser_version',
    name_key='parser_name',
)


class _FileStream(NamedTuple):
    """Represents a single stream configuration."""
    input_stream_id: InputStreamId
    asset: str
    stream_type: str
    applies_to: Callable
    parser: Parser


def conn_list_compat(connection: Connection, last_completed_file: Optional[str]) -> Generator[FileEntry, None, None]:
    # TODO: Remove the `inspect` detection when we want to drop
    #       backwards compatibility
    list_args = inspect.getfullargspec(connection.list).args
    if 'start_after_hint' in list_args:
        for f_entry in connection.list(start_after_hint=last_completed_file):
            yield f_entry
    else:
        warnings.warn("The base Connection.list() method now "
                      "supports using a 'start_after_hint' "
                      "parameter. Please update your custom "
                      "connection to support this signature.",
                      DeprecationWarning)
        for f_entry in connection.list():  # type: ignore
            yield f_entry


class FileReceiverDBMigrator(StateStoreDBMigrator):
    def __init__(self, rcvr: Union['FileReceiver']) -> None:
        self.rcvr = rcvr

    def _MIGRATION_SQL(self) -> MigrationDict:
        """
        Extend state DB migration table to contain additional migration scripts.
        """
        migration_sql: MigrationDict = super()._MIGRATION_SQL()

        migration_name = '0005-name-connections'
        assert migration_name not in migration_sql
        migration_sql[migration_name] = self.db_migration_name_connection

        return migration_sql

    def db_migration_name_connection(self, conn: sqlite3.Connection) -> None:
        """
        The state DB use to have nameless connections per (asset, stream_type, path) combinations. Now
        named connections are allowed which support the previous compound key to be duplicated between
        connections.  Leaving nameless connections in the state DB is a problem, so this migration script
        ensures all entries have a name.

        There is an assumption that at this point in time, there are no entries which violate the assumption
        that the state.db only contains one row per (asset, stream_type, path) combinations.

        :param conn: sqlite connection to apply changes to.
        :return:
        """
        rows = conn.execute("SELECT COUNT(1) FROM files WHERE connection IS NULL").fetchall()
        if rows[0][0] < 1:
            # Nothing to do, skip
            return

        params: List = []
        for connection_config in self.rcvr.connection_configs:
            with contextlib.closing(self.rcvr.connection_class(connection_config, self.rcvr.root_config)) as connection:
                for file_entry in conn_list_compat(connection, None):
                    matching_streams = [stream for stream in self.rcvr.streams if stream.applies_to(file_entry)]

                    # SQLite execute likes lists vs tuples
                    params.extend(
                        [connection.name, stream.asset, stream.stream_type, file_entry.path]
                        for stream in matching_streams
                    )

            if params:
                # Assume only one entry per (asset, stream_type, path) compound and no duplicately named connection.
                conn.executemany(
                    # language=SQLite
                    textwrap.dedent("""\
                    UPDATE files
                      SET connection = ?
                      WHERE connection is NULL
                        AND asset = ?
                        AND stream_type = ?
                        AND path = ?
                    """),
                    params
                ).fetchall()

        # Did we cover all the connections?
        # language=SQLite
        rows = conn.execute("SELECT * FROM files WHERE connection IS NULL").fetchall()
        if rows:
            # If this log message is too large, it will be truncated.
            columns = [
                r[1]
                for r in conn.execute("pragma table_info(files)").fetchall()
            ]
            log.warning(
                (
                    'factorytx.receivers.file.receiver.FileReceiver.db_migration_name_connection: '
                    'The following %s rows which can not be found will be deleted:\n'
                    '%s\n%s'
                ),
                len(rows),
                '|'.join(columns),
                '\n'.join(
                    '|'.join(repr(i) for i in row)
                    for row in rows
                ),
            )
            # language=SQLite
            conn.execute("DELETE FROM files WHERE connection IS NULL").fetchall()


[docs]class FileReceiver(Receiver): """Receiver that fetches files from instances of a Connection subclass and parses them using the configured Parser types. Here's how to create a custom FileReceiver class to handle a new protocol: 1. Define a Connection subclass that uses the protocol to list available files, fetch files, and optionally delete files from a remote server. .. code-block:: Python class MyConnection(Connection): ... 2. Register a custom FileReceiver subclass for your Connection type: .. code-block:: Python @receivers.register('my_receiver') class MyReceiver(FileReceiver): connection_class = MyConnection """ connection_class: Type[Connection] db_migrator_class: Type[FileReceiverDBMigrator] = FileReceiverDBMigrator monitors: Monitors schema = { "type": "object", "properties": { "data_receiver_name": {"type": "string"}, "protocol": {"type": "string"}, "connections": {"type": "array"}, "process_files_alphabetically": { "type": "boolean", "default": False }, "process_ordered_connections": { "type": "boolean", "default": False }, "streams": { "type": "array", "items": { "type": "object", "properties": { "asset": {"type": "string"}, "stream_type": {"type": "string"}, "file_filter": { "type": "array", "items": {"type": "string", "minLength": 1}, }, "parser": {"type": "string", "minLength": 1}, "path_filter": { "type": "array", "items": {"type": "string", "minLength": 1}, }, }, "additionalProperties": True, "required": ["asset", "stream_type"], }, }, "parsers": { "type": "array", "items": { "type": "object", "properties": { "parser_name": {"type": "string", "minLength": 1}, "parser_type": {"type": "string", "minLength": 1}, "parser_version": {"type": "integer", "minimum": 1}, "filter_stream": { "type": "array", "items": {"type": "string", "minLength": 1}, }, }, "additionalProperties": True, "required": ["parser_name", "parser_type"], }, }, "skip_parser_errors": { "type": "boolean", "default": False, "description": ( "Whether the receiver will allow parser errors when processing files. If True, parsing errors will " "be skipped and logged in a quarantine log file while the receiver continues processing. " "Otherwise if we encounter a parsing error, the receiver will halt until the file gets repaired." ), }, "archive_completed": { "type": "boolean", "default": False, "description": ( "Whether to keep a local copy of each file receiver from the remote " "server or not. If enabled, files will be archived until their total " "size increases above the amount specified in the 'max_archive_size_mb' " "parameter." ), }, "max_archive_size_mb": { "type": "number", "default": -1, "description": ( "If 'archive_completed' is True, delete the archive completed files " "once the total size (in mb) is greater than this value. A negative " "value means never delete any files." ) }, "delete_completed": { "type": "boolean", "default": False, "description": ( "Whether to delete each file from the remote server once it " "has been successfully processed." ), }, "read_in_progress_files": { "type": "boolean", "default": False, "description": ( "Whether to read files as soon as they are created, or wait " "for the upstream service to stop writing to the file before " "reading it." ), }, "emit_file_metadata": { "type": "boolean", "default": False, "description": ( "Whether or not to inject additional columns into each record " "containing metadata about the file it came from. If this setting " "is enabled then every record will contain fields named `file_name`, " "`file_path`, and `file_timestamp`." ), }, "poll_interval": { "type": "number", "minimum": 0.1, "default": 60, "description": "Number of seconds to wait before fetching new data from the server.", }, "temporary_file_directory": { "type": "string", "default": "/tmp", "description": "Specify the directory to temporarily store " "files that have been downloaded from the " "connection(s). By default, the directory is " "used is `/tmp`.", }, "deduplicate_rows": { "type": "boolean", "default": False, }, "deduplicate_row_retention_time_days": { "type": "number", "minimum": 1, "default": 90, }, }, "required": ["data_receiver_name", "protocol", "connections", "streams", "parsers"], "additionalProperties": False, } """All file receivers support these **required** and *optional* properties: - **data_receiver_name**: Unique name of the data receiver. This name will be used to track the progress state of the data stream. - **protocol**: Protocol to be used. - **streams**: List of input data streams to read from. - **asset**: Asset identifier - **stream_type**: Type of data stream - **parser**: Name of the parser used to convert the file. This has to match one of the *parser_name* values in the *parsers* section. - *file_filter*: List of files to filter on. Items can be regular expressions. You must select either *file_filter* or *path_filter* but not both. - *path_filter*: List of paths to filter on. Items can be regular expressions. You must select either *file_filter* or *path_filter* but not both. - **connections**: A list of connection settings used to connect to the data source. Please refer to the connection settings below. - *process_files_alphabetically*: ``true`` to process all of the files retrieved by the connection(s) in alphabetical order. By default, this option is ``false``, so files are processed by last modified (or created) time with oldest files first. - *process_ordered_connections*: If there are multiple connections and you want to process (and transmit) the files received from one connection before another, set option to ``true`` to process files based on the order of the connection(s). By default, this option is set to ``false``. This option is usually related to the Kafka transmit because events within a Kafka topic need to be in order. For example, if one connection receives historical files and another connection receives "realtime" files, you'll want to enable this setting and order the connections with the historical connection first, so that events in the Kafka topic are in chronological order. Please note that the order of the files per connection is maintained: if ``process_files_alphabetically`` is enabled, files will be parsed in alphabetical order; if disabled, files will be parsed in chronological order based on the last modified time. - **parsers**: A list of parsers that can be used for the input streams. Each parser has the following default properties: - **parser_name**: A customizable name used by streams to identify the parser. - **parser_type**: Type of parser to apply (e.g. ``csv``) - *parser_version*: Version of the parser type to apply. FactoryTX has a few built-in parsers available for you to use. Please refer to the :ref:`Parsers Configurations` section of the manual for more details about them. - *skip_parser_errors*: Whether the receiver will allow parser errors when processing files. If True, parsing errors will be skipped and logged in a quarantine log file while the receiver continues processing. Otherwise if we encounter a parsing error, the receiver will halt until the file gets repaired. (Only works for csv files) - *archive_completed*: ``true`` to keep a local copy of each file received from the remote server, or ``false`` if local copies should not be kept. If enabled, files will be archived until their total size increases above the amount specified in the *max_archive_size_mb* parameter. - *max_archive_size_mb*: If *archive_completed* is ``true``, delete the archive completed files once the total size (in mb) is greater than this value. A negative value means never delete any files. Defaults to -1. - *delete_completed*: ``true`` if files should be deleted from the data directory after they have been received by FactoryTX, or ``false`` if files should never be deleted. Defaults to ``false`` to avoid accidentally losing data. - *archive_completed* and *delete_completed* are independent of each other. Archiving will create a copy of the file in a new directory, while deleting will remove the original file. - *read_in_progress_files*: Whether to read files as soon as they are created, or wait for the upstream service to stop writing to the file before reading it. - *emit_file_metadata*: Whether or not to inject additional columns into each record containing metadata about the file it came from. If this setting is enabled then every record will contain fields named ``file_name``, ``file_path``, and ``file_timestamp``. - *poll_interval*: Number of seconds to wait before fetching new data from the server. - *temporary_file_directory*: Specify the directory to temporarily store files that have been downloaded from the connection(s). By default, the directory used is ``/tmp``. - *deduplicate_rows*: Whether or not to deduplicate rows from processing files. This is done by hashing each row and saving it compare against later if the file is modified. If the file is modified, only the new rows will be processed. This should be used if the same file is expected to be re-processed multiple times and it will be regularly modified, rather than appended to. All data in each row must be a hashable type (no lists, dicts, etc.) File hash stores will be in the receiver's component directory in the "row_hash_stores" directory. - *deduplicate_row_retention_time_days*: The number of days to keep row hashes for deduplication (default 90). """ @classmethod def _clean_connections(cls: Type['FileReceiver'], config: dict, root_config: dict) -> List[ValidationMessage]: path: ConfigPath connection_names: Set[str] = set() validation_results: List[ValidationMessage] = [] for conn_no, conn_config in enumerate(config['connections']): if isinstance(conn_config, dict): connection_name = conn_config.get('connection_name') if connection_name is None: path = ('connections', conn_no) msg = f"A connection name is required for connection {conn_no + 1}." validation_results.append(ValidationError(path, msg)) continue if connection_name == '': path = ('connections', conn_no, 'connection_name') msg = "'' is too short." validation_results.append(ValidationError(path, msg)) continue if connection_name in connection_names: path = ('connections', conn_no, 'connection_name') msg = f'A connection named "{connection_name}" already exists.' validation_results.append(ValidationError(path, msg)) connection_names.add(connection_name) for validation_result in cls.connection_class.clean(conn_config, root_config): path = ('connections', conn_no) + validation_result.path # type: ignore validation_result = validation_result._replace(path=path) validation_results.append(validation_result) return validation_results @classmethod def _clean_parsers(cls: Type['FileReceiver'], config: dict, root_config: dict, parser_names: Set[str]) -> List[ValidationMessage]: path: ConfigPath validation_results: List[ValidationMessage] = [] validation_results.extend(clean_section(PARSER_INFO, config, root_config, parser_names)) if has_errors(validation_results): return validation_results # Flag a warning if any of the parsers use filter_stream. We only # warning for the first instance to keep the verbosity low. for parser_no, parser_config in enumerate(config['parsers']): if 'filter_stream' in parser_config: path = ('parsers', parser_no, 'filter_stream') msg = ('The "filter_stream" option is deprecated. Please set the ' 'stream\'s "parser" option instead.') validation_results.append(ValidationWarning(path, msg)) break return validation_results @classmethod def _clean_streams(cls: Type['FileReceiver'], config: dict, parser_names: Set[str]) -> List[ValidationMessage]: path: ConfigPath validation_results: List[ValidationMessage] = [] # There are two ways to associate parsers with streams: # 1. By specifying the name of the parser for the stream. # 2. By specifying a filter_stream on the parser. # # Specifying a parser is preferred, since it makes errors (eg. # associating multiple parsers with a stream) less likely. To simplify # things the constructor, we convert all instances of filter_stream into # parser names attached to the stream. for stream_no, stream_config in enumerate(config['streams']): parser = stream_config.get('parser') if parser is not None and parser not in parser_names: path = ('streams', stream_no, 'parser') msg = f'No such parser: "{parser}"' validation_results.append(ValidationError(path, msg)) stream_id = str(InputStreamId(stream_config['asset'], stream_config['stream_type'])) for parser_no, parser_config in enumerate(config['parsers']): filter_stream = parser_config.get('filter_stream') if filter_stream is None or not stream_matches(stream_id, filter_stream): continue if parser is None: parser = parser_config['parser_name'] else: path = ('parsers', parser_no, 'filter_stream') msg = f'{stream_id} is already associated with the parser "{parser}"' validation_results.append(ValidationError(path, msg)) if parser is None: path = ('streams', stream_no) msg = f"'parser' is a required property" validation_results.append(ValidationError(path, msg)) stream_config['parser'] = parser # Streams should have either 'file_filter' or 'path_filter' but not both if stream_config.get('file_filter') and stream_config.get('path_filter'): path = ('streams', stream_no, 'file_filter') msg = f"one stream can't have both 'file_filter' and 'path_filter' properties" validation_results.append(ValidationError(path, msg)) elif not(stream_config.get('file_filter') or stream_config.get('path_filter')): path = ('streams', stream_no) msg = f"either 'file_filter' or 'path_filter' is required" validation_results.append(ValidationError(path, msg)) return validation_results
[docs] @classmethod def clean(cls: Type['FileReceiver'], config: dict, root_config: dict) -> List[ValidationMessage]: path: ConfigPath parser_names: Set[str] = set() validation_results: List[ValidationMessage] = [] validation_results.extend(clean_with_json_schema(FileReceiver.schema, config)) if has_errors(validation_results): return validation_results validation_results.extend(cls._clean_connections(config, root_config)) # Nothing else depends on connections being validated, so we can continue # if there are any errors. validation_results.extend(cls._clean_parsers(config, root_config, parser_names)) if has_errors(validation_results): return validation_results # Streams must be cleaned AFTER parsers, and only if the parsers are valid. # Otherwise the stream validation won't have the list of parser names, and # may crash if it encounters an invalid stream. validation_results.extend(cls._clean_streams(config, parser_names)) if has_errors(validation_results): return validation_results ensure_dir_exists(config['temporary_file_directory']) return validation_results
@staticmethod def applies_to_filename(file_entry: FileEntry, file_filter: List[Pattern]) -> bool: return any(f.match(file_entry.filename) for f in file_filter) @staticmethod def applies_to_path(file_entry: FileEntry, file_filter: List[Pattern]) -> bool: return any(f.match(file_entry.path) for f in file_filter) def __init__(self, dataflow: DataflowGraph, config: dict, root_config: dict) -> None: super().__init__(dataflow, config, root_config) self.dataflow = dataflow self.connection_configs: List[dict] = config['connections'] self.root_config = root_config self.data_receiver_name = config['data_receiver_name'] self.monitors = Monitors('receiver', 'FileReceiver', self.data_receiver_name) # Set up file-logger for quarantine results quarantine_dir_path: str = os.path.join(factorytx.const.COMPONENT_DATA_PATH, self.data_receiver_name, 'quarantine') self.quarantine_log = FileContextLogger(os.path.join(quarantine_dir_path, 'quarantine.log')) parsers: Dict[str, Parser] = {} for parser_config in config['parsers']: parser_cls = get_component_class(PARSER_INFO, parser_config) parsers[parser_config['parser_name']] = parser_cls(parser_config, root_config) self.streams: List[_FileStream] = [] for stream_config in config['streams']: if 'file_filter' in stream_config: file_filter = [re.compile(fnmatch.translate(f)) for f in stream_config['file_filter']] applies_to = partial(FileReceiver.applies_to_filename, file_filter=file_filter) elif 'path_filter' in stream_config: file_filter = [re.compile(fnmatch.translate(f)) for f in stream_config['path_filter']] applies_to = partial(FileReceiver.applies_to_path, file_filter=file_filter) else: assert False self.streams.append(_FileStream( input_stream_id=InputStreamId(stream_config['asset'], stream_config['stream_type']), asset=stream_config['asset'], stream_type=stream_config['stream_type'], applies_to=applies_to, parser=parsers[stream_config['parser']], )) self.skip_parser_errors = config['skip_parser_errors'] self.archive_completed = config['archive_completed'] self.max_archive_size_mb = config['max_archive_size_mb'] self.delete_completed = config['delete_completed'] self.poll_interval = config['poll_interval'] self.read_in_progress_files = config['read_in_progress_files'] self.emit_file_metadata = config['emit_file_metadata'] self.local_temp_dir = config['temporary_file_directory'] self.last_file_entries: Dict[Tuple[int, str], FileEntry] = {} self.process_files_alphabetically = config['process_files_alphabetically'] self.process_ordered_connections = config['process_ordered_connections'] self.deduplicate_rows = config['deduplicate_rows'] self.deduplicate_rows_retention_td: timedelta = timedelta( days=int(config['deduplicate_row_retention_time_days']) ) self._row_hash_stores_path = safe_path_join(get_component_path(self.name), "row_hash_stores.db") self.row_hash_store = IntHashStore(state_path=self._row_hash_stores_path) component_path = get_component_path(self.name) self._completed_path = safe_path_join(component_path, 'completed') self._state_store_path = safe_path_join(component_path, 'state.db') self._store = StateStore(self._state_store_path) self._store.initialize(db_migrator=self.db_migrator_class(self)) def __getstate__(self) -> dict: """Deconstructs an object for pickling.""" # Don't try to pickle the state store -- we'll reconnect after forking. return {k: v for k, v in self.__dict__.items() if k != '_store'} def __setstate__(self, state: dict) -> None: """Reconstructs an object when unpickling.""" self.__dict__.update(state) self._store = StateStore(self._state_store_path)
[docs] def purge(self, streams: List[InputStreamId]) -> None: self._store.purge(streams) if self.row_hash_store: self.row_hash_store.purge(streams)
def _list_candidates(self, connections: List[Connection]) -> List[ParsingCandidate]: """Returns a list of ParsingCandidate objects representing files to be parsed and the parsers to parse them with. One file may be associated with multiple parsers and streams. """ new_file_entries: Dict[Tuple[int, str], FileEntry] = {} # (connection no, path) -> file entry candidates: List[ParsingCandidate] = [] for connection_no, connection in enumerate(connections): def _list_files(connection: Connection) -> Generator[FileEntry, None, None]: marker_category = f'connections.{connection.name}.list' try: last_completed_file: Optional[str] = None if self.process_files_alphabetically: last_completed_file = self._store.fetch_last_completed_file(connection.name) yield from conn_list_compat(connection, last_completed_file) markers.clear(marker_category) except ConnectionError as e: markers.error(marker_category, f'{connection.name} - {connection.connection_info} failed to list files. {e.message}') raise Failure for file_no, file_entry in enumerate(_list_files(connection)): if file_no % 5000 == 0: log.debug('List Candidate:%d:%s:%d:%s', connection_no, connection, file_no, file_entry) if file_entry.filename == '.DS_Store': continue # Make OS X users happy. matching_streams = [stream for stream in self.streams if stream.applies_to(file_entry)] if not matching_streams: # Don't bother saving this entry in new_file_entries if we're # not interested in it. This saves memory when the upstream # server has a large number of files and we're interested in # only a small number of them. continue if not self.read_in_progress_files: # Avoid reading files that are still being written. file_key = (connection_no, file_entry.path) new_file_entries[file_key] = file_entry prev_file_entry = self.last_file_entries.get(file_key) if (prev_file_entry is None or prev_file_entry.size != file_entry.size or prev_file_entry.mtime != file_entry.mtime): continue for stream in matching_streams: candidate = ParsingCandidate( connection=connection, parser=stream.parser, file=file_entry, input_stream_id=stream.input_stream_id, asset=stream.asset, stream_type=stream.stream_type, state=None, ) candidates.append(candidate) self.last_file_entries = new_file_entries return candidates def _delete_temp_file(self, filename: str) -> None: """Deletes a temporary file that's a copy of the source file""" try: os.remove(filename) except OSError as e: log.warning("Unable to delete temp file %s - %s. If the file still " "exists, you will need to manually clean it up.", filename, e.strerror)
[docs] @contextlib.contextmanager def handle_processing_failure(self, filename: str) -> Iterator[None]: """Handles cleaning up of a temporary file if we run into any errors while processing the file's data, so that we don't bloat the temporary storage. """ # TODO: FTX-575 - Keep track of invalid files, so we can keep a # single copy of it and having a list or table for reference will # help debugging try: yield except Failure: self._delete_temp_file(filename) raise
# TODO: Need to disambiguate state by connection? def _deduplicate_rows(self, frame: pd.DataFrame, file_entry: FileEntry, stream_id: InputStreamId) -> pd.DataFrame: if frame.empty: return frame # Check dataframe rows against file row-hash store data_id = file_entry.path.replace(os.path.sep, '_') # These hashes are tracked per file, even with reducing the number of unique hashes we shouldn't get collisions # SQLite max int is 32-bit, so we'll mod to the max value frame[ROW_HASH_COL_NAME] = frame.apply( lambda row: int(hashlib.md5(str(row).encode()).hexdigest(), 16) % HASH_MAX_INT, axis=1 ) hashes = set(frame[ROW_HASH_COL_NAME]) with self.row_hash_store as hash_store: new_hashes = hash_store.filter_new_hashes(hashes, data_id) # remove rows that already exist in the hash table frame = frame[frame[ROW_HASH_COL_NAME].isin(new_hashes)] # add new rows to the hash table hash_store.add_hashes(data_id=data_id, stream_id=stream_id, hash_data=new_hashes) # remove extra columns return frame.drop(columns=[ROW_HASH_COL_NAME]) def _process_grouped_candidates( self, grouped_candidates: Iterable[Tuple[Tuple[FileEntry, Connection], Iterable[ParsingCandidate]]] ) -> None: for (file_entry, connection), file_candidates in grouped_candidates: completed_path = safe_path_join(self._completed_path, file_entry.path) ensure_dir_exists(os.path.dirname(completed_path)) with tempfile.NamedTemporaryFile(delete=False, dir=self.local_temp_dir) as fp: with self.handle_processing_failure(fp.name): marker_category = f'connections.{connection.name}.copy' try: connection.copy(file_entry, fp.name) markers.clear(marker_category) except ConnectionError as e: msg = f'{connection.name} - {connection.connection_info} failed to copy {file_entry.path}. {e.message}' markers.error(marker_category, msg) raise Failure for c in file_candidates: # TODO: Pass stream_id instead? with push_context(component=self.name, component_type=self.component_type, asset=c.asset, stream_type=c.stream_type): log.info("Processing file %s from %s", file_entry.filename, file_entry.path) try: results = c.parser.process(file_entry, c.state, fp.name) for (frame, c.state), next_result in lookahead(results): frame = frame.copy(deep=True) # avoid SettingWithCopyError if we reassign dataframe cell values if self.deduplicate_rows: frame = self._deduplicate_rows(frame, file_entry, c.input_stream_id) modification_timestamp = datetime.fromtimestamp(file_entry.mtime, pytz.utc) additional_columns: dict = { const.ASSET_COLNAME: c.asset, const.STREAM_TYPE_COLNAME: c.stream_type, } if self.emit_file_metadata: additional_columns[const.FILE_NAME_COLNAME] = file_entry.filename additional_columns[const.FILE_PATH_COLNAME] = file_entry.path additional_columns[const.FILE_TIMESTAMP_COLNAME] = modification_timestamp add_columns_if_absent(frame, additional_columns) dataflow_process_marker = f'connections.{connection.name}.dataflow:process' receiver_log_context = markers.get_context() try: self.dataflow.process(c.input_stream_id, frame) markers.clear(dataflow_process_marker, receiver_log_context) except Failure: # Invalid file - Some ways to fail: # 1. Frame didn't pass validation # 2. A transform on the frame failed # 3. Frame couldn't be processed by transmit # TODO: Should we differentiate between failures? markers.error( category=dataflow_process_marker, message=f"Dataflow failed to process file: {file_entry.path}", context=receiver_log_context ) raise if next_result is not None: # Avoid writing the last state twice. self._store.record(c, completed=False) except ParserError as e: parser_error_msg = (f"File {file_entry.filename} from {file_entry.path} " f"could not be parsed: {str(e)}").replace('\n', '') log.warning(parser_error_msg) # If config is not set to skip errors, raise the exception if not self.skip_parser_errors: raise Failure from ParserError # Log the ParserError as a warning to system log and also out to # a separate 'quarantine.log' file # Then continue on and record ParsingCandidate result to DB try: self.quarantine_log.write_msg(f"{parser_error_msg}") except OSError as exc: markers.error( category=f"quarantine", message=f"Failed to write to quarantine log: {exc}", ) # Reset state if the file has been completed and we are keeping track of deduplicating row hashes since # future changes might come out of order, we can't just go by row count self._store.record(c, completed=True, clear_state_on_completed=self.deduplicate_rows) # optimize hash_store to save disk space with self.row_hash_store as hash_store: hash_store.optimize(hash_retention_timedelta=self.deduplicate_rows_retention_td) if self.archive_completed: shutil.move(fp.name, completed_path) if self.delete_completed: marker_category = f'connections.{connection.name}.delete' try: connection.delete(file_entry) markers.clear(marker_category) except ConnectionError as e: msg = f'{connection.name} - {connection.connection_info} failed to delete {file_entry.path}. {e.message}' markers.error(marker_category, msg) # Keep going if we can't delete the file, backing off # will only cause us to collect data slowly. if not self.archive_completed: self._delete_temp_file(fp.name) def _process_candidates(self, candidates: Iterable[ParsingCandidate]) -> None: """Parses a list of candidates, pushes them through the data pipeline, and stores any state emitted by the parsers. """ if self.process_ordered_connections: connection_files: Dict[str, List[ParsingCandidate]] = defaultdict(list) for candidate in candidates: connection_files[candidate.connection.name].append(candidate) for connection in self.connection_configs: connection_name: str = connection['connection_name'] file_list: List[ParsingCandidate] = connection_files.get(connection_name, []) grouped_candidates = grouped(file_list, key=lambda c: (c.file, c.connection)) self._process_grouped_candidates(grouped_candidates) else: grouped_candidates = grouped(candidates, key=lambda c: (c.file, c.connection)) self._process_grouped_candidates(grouped_candidates)
[docs] def test_connection(self) -> ConnectionTestResult: def inner() -> ConnectionTestResult: try: connections: List[Connection] = [] for connection_config in self.connection_configs: connection = self.connection_class(connection_config, self.root_config) connections.append(connection) self._list_candidates(connections) return ConnectionTestResult( message="Connection Successful!", connection_successful=True, ) except Exception as e: return ConnectionTestResult( message=f"Connection Failed! Error: {str(e)}", connection_successful=False, ) TIMEOUT_SECONDS = 5 try: return func_timeout(TIMEOUT_SECONDS, inner) except FunctionTimedOut: return ConnectionTestResult( message=f"Connection attempt timed out after {TIMEOUT_SECONDS} seconds.", connection_successful=False, )
[docs] def poll(self, connections: List[Connection]) -> None: """Processes all new or modified files from one or more connections.""" candidates: List[ParsingCandidate] = [] with Timer() as list_candidates_timer: candidates = self._list_candidates(connections) self.monitors.update_timings('candidate_listing', float(list_candidates_timer.duration)) log.info('Found %d parseable files', len(candidates)) self.monitors.update_qtys('parseable_files', float(len(candidates))) candidates = self._store.fetch_new_or_modified( candidates, alphabetical=self.process_files_alphabetically ) log.info('Processing %d new or modified files', len(candidates)) self.monitors.update_qtys('new_or_modified_parseable_files', float(len(candidates))) with Timer() as processing_timer: self._process_candidates(candidates) self.monitors.update_timings('candidate_processing', float(processing_timer.duration))
[docs] def run_once(self) -> None: """Runs a single polling pass. This method should only be used for unit tests since it sets up and tears down new connections whenever it runs. """ with contextlib.ExitStack() as stack: connections: List[Connection] = [] for connection_config in self.connection_configs: connection = self.connection_class(connection_config, self.root_config) stack.push(contextlib.closing(connection)) connections.append(connection) self.poll(connections)
[docs] def run(self) -> None: # clean up completed files if self.archive_completed and self.max_archive_size_mb >= 0: threading.Thread(target=self.clean_completed_dir).start() # XXX: Switch to only using a single connection. with contextlib.ExitStack() as stack: connections: List[Connection] = [] for connection_config in self.connection_configs: connection_name = connection_config['connection_name'] marker_category = f'connections.{connection_name}.connect' try: connection = self.connection_class(connection_config, self.root_config) markers.clear(marker_category) except ConnectionError as e: markers.error(marker_category, f'{connection_name} failed to connect. {e.message}') raise Failure stack.push(contextlib.closing(connection)) connections.append(connection) while True: self.poll(connections) time.sleep(self.poll_interval)
def clean_completed_dir(self, run_once: bool=False, sleep: float=300) -> None: marker_category = 'os.clean' max_size = self.max_archive_size_mb * 1024 * 1024 while True: try: total_archive_size = 0 ensure_dir_exists(self._completed_path) for dir_path, _, filenames in os.walk(self._completed_path): for filename in sorted(filenames, key=lambda f: -os.path.getmtime(os.path.join(dir_path, f))): path = os.path.join(dir_path, filename) filesize = os.path.getsize(path) total_archive_size += filesize if total_archive_size > max_size: os.remove(path) if run_once: break time.sleep(sleep) except OSError as e: msg = f'Error cleaning directory: {e.strerror}' markers.error(marker_category, msg)