Source code for factorytx.receivers.file.receiver

from typing import (
    Callable, Dict, Generator, Iterable, Iterator, List, NamedTuple,
    Optional, Pattern, Set, Tuple, Type, Union
)

import contextlib
from collections import defaultdict
from datetime import datetime
from functools import partial
import fnmatch
import inspect
import logging
import os
import re
import shutil
import tempfile
import textwrap
import threading
import time
import sqlite3
import warnings

import pytz
from pandas.errors import ParserError

import factorytx.const
from factorytx.base import Receiver
from factorytx.sqlite import MigrationDict
from factorytx.config import ConfigInfo, clean_section, get_component_class
from factorytx import const
from factorytx.dataflow import DataflowGraph, InputStreamId, stream_matches
from factorytx.exceptions import Failure
from factorytx.logs import push_context, FileContextLogger
from factorytx import markers
from factorytx.receivers.file.base import Connection, ConnectionError, FileEntry, Parser, parsers
from factorytx.receivers.file.state import ParsingCandidate, StateStore, StateStoreDBMigrator
from factorytx.utils import (
    add_columns_if_absent, ensure_dir_exists, get_component_path, grouped,
    lookahead, safe_path_join
)
from factorytx.validation import (
    ConfigPath, ValidationError, ValidationMessage, ValidationWarning,
    clean_with_json_schema, has_errors
)

# Set up regular app logger
log = logging.getLogger(__name__)

PARSER_INFO: ConfigInfo[Parser] = ConfigInfo(
    registry=parsers,
    section_key='parsers',
    type_key='parser_type',
    version_key='parser_version',
    name_key='parser_name',
)


class _FileStream(NamedTuple):
    """Represents a single stream configuration."""
    input_stream_id: InputStreamId
    asset: str
    stream_type: str
    applies_to: Callable
    parser: Parser


def conn_list_compat(connection: Connection, last_completed_file: Optional[str]) -> Generator[FileEntry, None, None]:
    # TODO: Remove the `inspect` detection when we want to drop
    #       backwards compatibility
    list_args = inspect.getfullargspec(connection.list).args
    if 'start_after_hint' in list_args:
        for f_entry in connection.list(start_after_hint=last_completed_file):
            yield f_entry
    else:
        warnings.warn("The base Connection.list() method now "
                      "supports using a 'start_after_hint' "
                      "parameter. Please update your custom "
                      "connection to support this signature.",
                      DeprecationWarning)
        for f_entry in connection.list():  # type: ignore
            yield f_entry


class FileReceiverDBMigrator(StateStoreDBMigrator):
    def __init__(self, rcvr: Union['FileReceiver']) -> None:
        self.rcvr = rcvr

    def _MIGRATION_SQL(self) -> MigrationDict:
        """
        Extend state DB migration table to contain additional migration scripts.
        """
        migration_sql: MigrationDict = super()._MIGRATION_SQL()

        migration_name = '0005-name-connections'
        assert migration_name not in migration_sql
        migration_sql[migration_name] = self.db_migration_name_connection

        return migration_sql

    def db_migration_name_connection(self, conn: sqlite3.Connection) -> None:
        """
        The state DB use to have nameless connections per (asset, stream_type, path) combinations. Now
        named connections are allowed which support the previous compound key to be duplicated between
        connections.  Leaving nameless connections in the state DB is a problem, so this migration script
        ensures all entries have a name.

        There is an assumption that at this point in time, there are no entries which violate the assumption
        that the state.db only contains one row per (asset, stream_type, path) combinations.

        :param conn: sqlite connection to apply changes to.
        :return:
        """
        rows = conn.execute("SELECT COUNT(1) FROM files WHERE connection IS NULL").fetchall()
        if rows[0][0] < 1:
            # Nothing to do, skip
            return

        params: List = []
        for connection_config in self.rcvr.connection_configs:
            with contextlib.closing(self.rcvr.connection_class(connection_config, self.rcvr.root_config)) as connection:
                for file_entry in conn_list_compat(connection, None):
                    matching_streams = [stream for stream in self.rcvr.streams if stream.applies_to(file_entry)]

                    # SQLite execute likes lists vs tuples
                    params.extend(
                        [connection.name, stream.asset, stream.stream_type, file_entry.path]
                        for stream in matching_streams
                    )

            if params:
                # Assume only one entry per (asset, stream_type, path) compound and no duplicately named connection.
                conn.executemany(
                    # language=SQLite
                    textwrap.dedent("""\
                    UPDATE files
                      SET connection = ?
                      WHERE connection is NULL
                        AND asset = ?
                        AND stream_type = ?
                        AND path = ?
                    """),
                    params
                ).fetchall()

        # Did we cover all the connections?
        # language=SQLite
        rows = conn.execute("SELECT * FROM files WHERE connection IS NULL").fetchall()
        if rows:
            # If this log message is too large, it will be truncated.
            columns = [
                r[1]
                for r in conn.execute("pragma table_info(files)").fetchall()
            ]
            log.warning(
                (
                    'factorytx.receivers.file.receiver.FileReceiver.db_migration_name_connection: '
                    'The following %s rows which can not be found will be deleted:\n'
                    '%s\n%s'
                ),
                len(rows),
                '|'.join(columns),
                '\n'.join(
                    '|'.join(repr(i) for i in row)
                    for row in rows
                ),
            )
            # language=SQLite
            conn.execute("DELETE FROM files WHERE connection IS NULL").fetchall()


[docs]class FileReceiver(Receiver): """Receiver that fetches files from instances of a Connection subclass and parses them using the configured Parser types. Here's how to create a custom FileReceiver class to handle a new protocol: 1. Define a Connection subclass that uses the protocol to list available files, fetch files, and optionally delete files from a remote server. .. code-block:: Python class MyConnection(Connection): ... 2. Register a custom FileReceiver subclass for your Connection type: .. code-block:: Python @receivers.register('my_receiver') class MyReceiver(FileReceiver): connection_class = MyConnection """ connection_class: Type[Connection] db_migrator_class: Type[FileReceiverDBMigrator] = FileReceiverDBMigrator schema = { "type": "object", "properties": { "data_receiver_name": {"type": "string"}, "protocol": {"type": "string"}, "connections": {"type": "array"}, "process_files_alphabetically": { "type": "boolean", "default": False }, "process_ordered_connections": { "type": "boolean", "default": False }, "streams": { "type": "array", "items": { "type": "object", "properties": { "asset": {"type": "string"}, "stream_type": {"type": "string"}, "file_filter": { "type": "array", "items": {"type": "string", "minLength": 1}, }, "parser": {"type": "string", "minLength": 1}, "path_filter": { "type": "array", "items": {"type": "string", "minLength": 1}, }, }, "additionalProperties": True, "required": ["asset", "stream_type"], }, }, "parsers": { "type": "array", "items": { "type": "object", "properties": { "parser_name": {"type": "string", "minLength": 1}, "parser_type": {"type": "string", "minLength": 1}, "parser_version": {"type": "integer", "minimum": 1}, "filter_stream": { "type": "array", "items": {"type": "string", "minLength": 1}, }, }, "additionalProperties": True, "required": ["parser_name", "parser_type"], }, }, "skip_parser_errors": { "type": "boolean", "default": False, "description": ( "Whether the receiver will allow parser errors when processing files. If True, parsing errors will " "be skipped and logged in a quarantine log file while the receiver continues processing. " "Otherwise if we encounter a parsing error, the receiver will halt until the file gets repaired." ), }, "archive_completed": { "type": "boolean", "default": False, "description": ( "Whether to keep a local copy of each file receiver from the remote " "server or not. If enabled, files will be archived until their total " "size increases above the amount specified in the 'max_archive_size_mb' " "parameter." ), }, "max_archive_size_mb": { "type": "number", "default": -1, "description": ( "If 'archive_completed' is True, delete the archive completed files " "once the total size (in mb) is greater than this value. A negative " "value means never delete any files." ) }, "delete_completed": { "type": "boolean", "default": False, "description": ( "Whether to delete each file from the remote server once it " "has been successfully processed." ), }, "read_in_progress_files": { "type": "boolean", "default": False, "description": ( "Whether to read files as soon as they are created, or wait " "for the upstream service to stop writing to the file before " "reading it." ), }, "emit_file_metadata": { "type": "boolean", "default": False, "description": ( "Whether or not to inject additional columns into each record " "containing metadata about the file it came from. If this setting " "is enabled then every record will contain fields named `file_name`, " "`file_path`, and `file_timestamp`." ), }, "poll_interval": { "type": "number", "minimum": 0.1, "default": 60, "description": "Number of seconds to wait before fetching new data from the server.", }, "temporary_file_directory": { "type": "string", "default": "/tmp", "description": "Specify the directory to temporarily store " "files that have been downloaded from the " "connection(s). By default, the directory is " "used is `/tmp`.", }, }, "required": ["data_receiver_name", "protocol", "connections", "streams", "parsers"], "additionalProperties": False, } """All file receivers support these **required** and *optional* properties: - **data_receiver_name**: Unique name of the data receiver. This name will be used to track the progress state of the data stream. - **protocol**: Protocol to be used. - **streams**: List of input data streams to read from. - **asset**: Asset identifier - **stream_type**: Type of data stream - **parser**: Name of the parser used to convert the file. This has to match one of the *parser_name* values in the *parsers* section. - *file_filter*: List of files to filter on. Items can be regular expressions. You must select either *file_filter* or *path_filter* but not both. - *path_filter*: List of paths to filter on. Items can be regular expressions. You must select either *file_filter* or *path_filter* but not both. - **connections**: A list of connection settings used to connect to the data source. Please refer to the connection settings below. - *process_files_alphabetically*: ``true`` to process all of the files retrieved by the connection(s) in alphabetical order. By default, this option is ``false``, so files are processed by last modified (or created) time with oldest files first. - *process_ordered_connections*: If there are multiple connections and you want to process (and transmit) the files received from one connection before another, set option to ``true`` to process files based on the order of the connection(s). By default, this option is set to ``false``. This option is usually related to the Kafka transmit because events within a Kafka topic need to be in order. For example, if one connection receives historical files and another connection receives "realtime" files, you'll want to enable this setting and order the connections with the historical connection first, so that events in the Kafka topic are in chronological order. Please note that the order of the files per connection is maintained: if ``process_files_alphabetically`` is enabled, files will be parsed in alphabetical order; if disabled, files will be parsed in chronological order based on the last modified time. - **parsers**: A list of parsers that can be used for the input streams. Each parser has the following default properties: - **parser_name**: A customizable name used by streams to identify the parser. - **parser_type**: Type of parser to apply (e.g. ``csv``) - *parser_version*: Version of the parser type to apply. FactoryTX has a few built-in parsers available for you to use. Please refer to the :ref:`Parsers Configurations` section of the manual for more details about them. - *skip_parser_errors*: Whether the receiver will allow parser errors when processing files. If True, parsing errors will be skipped and logged in a quarantine log file while the receiver continues processing. Otherwise if we encounter a parsing error, the receiver will halt until the file gets repaired. (Only works for csv files) - *archive_completed*: ``true`` to keep a local copy of each file received from the remote server, or ``false`` if local copies should not be kept. If enabled, files will be archived until their total size increases above the amount specified in the *max_archive_size_mb* parameter. - *max_archive_size_mb*: If *archive_completed* is ``true``, delete the archive completed files once the total size (in mb) is greater than this value. A negative value means never delete any files. Defaults to -1. - *delete_completed*: ``true`` if files should be deleted from the data directory after they have been received by FactoryTX, or ``false`` if files should never be deleted. Defaults to ``false`` to avoid accidentally losing data. - *archive_completed* and *delete_completed* are independent of each other. Archiving will create a copy of the file in a new directory, while deleting will remove the original file. - *read_in_progress_files*: Whether to read files as soon as they are created, or wait for the upstream service to stop writing to the file before reading it. - *emit_file_metadata*: Whether or not to inject additional columns into each record containing metadata about the file it came from. If this setting is enabled then every record will contain fields named ``file_name``, ``file_path``, and ``file_timestamp``. - *poll_interval*: Number of seconds to wait before fetching new data from the server. - *temporary_file_directory*: Specify the directory to temporarily store files that have been downloaded from the connection(s). By default, the directory used is ``/tmp``. """ @classmethod def _clean_connections(cls: Type['FileReceiver'], config: dict, root_config: dict) -> List[ValidationMessage]: path: ConfigPath connection_names: Set[str] = set() validation_results: List[ValidationMessage] = [] for conn_no, conn_config in enumerate(config['connections']): if isinstance(conn_config, dict): connection_name = conn_config.get('connection_name') if not connection_name: path = ('connections', conn_no) msg = f"A connection name is required for connection {conn_no + 1}." validation_results.append(ValidationError(path, msg)) continue if connection_name in connection_names: path = ('connections', conn_no, 'connection_name') msg = f'A connection named "{connection_name}" already exists.' validation_results.append(ValidationError(path, msg)) connection_names.add(connection_name) for validation_result in cls.connection_class.clean(conn_config, root_config): path = ('connections', conn_no) + validation_result.path # type: ignore validation_result = validation_result._replace(path=path) validation_results.append(validation_result) return validation_results @classmethod def _clean_parsers(cls: Type['FileReceiver'], config: dict, root_config: dict, parser_names: Set[str]) -> List[ValidationMessage]: path: ConfigPath validation_results: List[ValidationMessage] = [] validation_results.extend(clean_section(PARSER_INFO, config, root_config, parser_names)) if has_errors(validation_results): return validation_results # Flag a warning if any of the parsers use filter_stream. We only # warning for the first instance to keep the verbosity low. for parser_no, parser_config in enumerate(config['parsers']): if 'filter_stream' in parser_config: path = ('parsers', parser_no, 'filter_stream') msg = ('The "filter_stream" option is deprecated. Please set the ' 'stream\'s "parser" option instead.') validation_results.append(ValidationWarning(path, msg)) break return validation_results @classmethod def _clean_streams(cls: Type['FileReceiver'], config: dict, parser_names: Set[str]) -> List[ValidationMessage]: path: ConfigPath validation_results: List[ValidationMessage] = [] # There are two ways to associate parsers with streams: # 1. By specifying the name of the parser for the stream. # 2. By specifying a filter_stream on the parser. # # Specifying a parser is preferred, since it makes errors (eg. # associating multiple parsers with a stream) less likely. To simplify # things the constructor, we convert all instances of filter_stream into # parser names attached to the stream. for stream_no, stream_config in enumerate(config['streams']): parser = stream_config.get('parser') if parser is not None and parser not in parser_names: path = ('streams', stream_no, 'parser') msg = f'No such parser: "{parser}"' validation_results.append(ValidationError(path, msg)) stream_id = str(InputStreamId(stream_config['asset'], stream_config['stream_type'])) for parser_no, parser_config in enumerate(config['parsers']): filter_stream = parser_config.get('filter_stream') if filter_stream is None or not stream_matches(stream_id, filter_stream): continue if parser is None: parser = parser_config['parser_name'] else: path = ('parsers', parser_no, 'filter_stream') msg = f'{stream_id} is already associated with the parser "{parser}"' validation_results.append(ValidationError(path, msg)) if parser is None: path = ('streams', stream_no) msg = f"'parser' is a required property" validation_results.append(ValidationError(path, msg)) stream_config['parser'] = parser # Streams should have either 'file_filter' or 'path_filter' but not both if stream_config.get('file_filter') and stream_config.get('path_filter'): path = ('streams', stream_no, 'file_filter') msg = f"one stream can't have both 'file_filter' and 'path_filter' properties" validation_results.append(ValidationError(path, msg)) elif not(stream_config.get('file_filter') or stream_config.get('path_filter')): path = ('streams', stream_no) msg = f"either 'file_filter' or 'path_filter' is required" validation_results.append(ValidationError(path, msg)) return validation_results
[docs] @classmethod def clean(cls: Type['FileReceiver'], config: dict, root_config: dict) -> List[ValidationMessage]: path: ConfigPath parser_names: Set[str] = set() validation_results: List[ValidationMessage] = [] validation_results.extend(clean_with_json_schema(FileReceiver.schema, config)) if has_errors(validation_results): return validation_results validation_results.extend(cls._clean_connections(config, root_config)) # Nothing else depends on connections being validated, so we can continue # if there are any errors. validation_results.extend(cls._clean_parsers(config, root_config, parser_names)) if has_errors(validation_results): return validation_results # Streams must be cleaned AFTER parsers, and only if the parsers are valid. # Otherwise the stream validation won't have the list of parser names, and # may crash if it encounters an invalid stream. validation_results.extend(cls._clean_streams(config, parser_names)) if has_errors(validation_results): return validation_results ensure_dir_exists(config['temporary_file_directory']) return validation_results
@staticmethod def applies_to_filename(file_entry: FileEntry, file_filter: List[Pattern]) -> bool: return any(f.match(file_entry.filename) for f in file_filter) @staticmethod def applies_to_path(file_entry: FileEntry, file_filter: List[Pattern]) -> bool: return any(f.match(file_entry.path) for f in file_filter) def __init__(self, dataflow: DataflowGraph, config: dict, root_config: dict) -> None: super().__init__(dataflow, config, root_config) self.dataflow = dataflow self.connection_configs: List[dict] = config['connections'] self.root_config = root_config self.data_receiver_name = config['data_receiver_name'] # Set up file-logger for quarantine results quarantine_dir_path: str = os.path.join(factorytx.const.COMPONENT_DATA_PATH, self.data_receiver_name, 'quarantine') self.quarantine_log = FileContextLogger(os.path.join(quarantine_dir_path, 'quarantine.log')) parsers: Dict[str, Parser] = {} for parser_config in config['parsers']: parser_cls = get_component_class(PARSER_INFO, parser_config) parsers[parser_config['parser_name']] = parser_cls(parser_config, root_config) self.streams: List[_FileStream] = [] for stream_config in config['streams']: if 'file_filter' in stream_config: file_filter = [re.compile(fnmatch.translate(f)) for f in stream_config['file_filter']] applies_to = partial(FileReceiver.applies_to_filename, file_filter=file_filter) elif 'path_filter' in stream_config: file_filter = [re.compile(fnmatch.translate(f)) for f in stream_config['path_filter']] applies_to = partial(FileReceiver.applies_to_path, file_filter=file_filter) else: assert False self.streams.append(_FileStream( input_stream_id=InputStreamId(stream_config['asset'], stream_config['stream_type']), asset=stream_config['asset'], stream_type=stream_config['stream_type'], applies_to=applies_to, parser=parsers[stream_config['parser']], )) self.skip_parser_errors = config['skip_parser_errors'] self.archive_completed = config['archive_completed'] self.max_archive_size_mb = config['max_archive_size_mb'] self.delete_completed = config['delete_completed'] self.poll_interval = config['poll_interval'] self.read_in_progress_files = config['read_in_progress_files'] self.emit_file_metadata = config['emit_file_metadata'] self.local_temp_dir = config['temporary_file_directory'] self.last_file_entries: Dict[Tuple[int, str], FileEntry] = {} self.process_files_alphabetically = config['process_files_alphabetically'] self.process_ordered_connections = config['process_ordered_connections'] component_path = get_component_path(self.name) self._completed_path = safe_path_join(component_path, 'completed') self._state_store_path = safe_path_join(component_path, 'state.db') self._store = StateStore(self._state_store_path) self._store.initialize(db_migrator=self.db_migrator_class(self)) def __getstate__(self) -> dict: """Deconstructs an object for pickling.""" # Don't try to pickle the state store -- we'll reconnect after forking. return {k: v for k, v in self.__dict__.items() if k != '_store'} def __setstate__(self, state: dict) -> None: """Reconstructs an object when unpickling.""" self.__dict__.update(state) self._store = StateStore(self._state_store_path)
[docs] def purge(self, streams: List[InputStreamId]) -> None: self._store.purge(streams)
def _list_candidates(self, connections: List[Connection]) -> List[ParsingCandidate]: """Returns a list of ParsingCandidate objects representing files to be parsed and the parsers to parse them with. One file may be associated with multiple parsers and streams. """ new_file_entries: Dict[Tuple[int, str], FileEntry] = {} # (connection no, path) -> file entry candidates: List[ParsingCandidate] = [] for connection_no, connection in enumerate(connections): def _list_files(connection: Connection) -> Generator[FileEntry, None, None]: marker_category = f'connections.{connection.name}.list' try: last_completed_file: Optional[str] = None if self.process_files_alphabetically: last_completed_file = self._store.fetch_last_completed_file(connection.name) yield from conn_list_compat(connection, last_completed_file) markers.clear(marker_category) except ConnectionError as e: markers.error(marker_category, f'{connection.name} - {connection.connection_info} failed to list files. {e.message}') raise Failure for file_no, file_entry in enumerate(_list_files(connection)): if file_no % 5000 == 0: log.debug('List Candidate:%d:%s:%d:%s', connection_no, connection, file_no, file_entry) if file_entry.filename == '.DS_Store': continue # Make OS X users happy. matching_streams = [stream for stream in self.streams if stream.applies_to(file_entry)] if not matching_streams: # Don't bother saving this entry in new_file_entries if we're # not interested in it. This saves memory when the upstream # server has a large number of files and we're interested in # only a small number of them. continue if not self.read_in_progress_files: # Avoid reading files that are still being written. file_key = (connection_no, file_entry.path) new_file_entries[file_key] = file_entry prev_file_entry = self.last_file_entries.get(file_key) if (prev_file_entry is None or prev_file_entry.size != file_entry.size or prev_file_entry.mtime != file_entry.mtime): continue for stream in matching_streams: candidate = ParsingCandidate( connection=connection, parser=stream.parser, file=file_entry, input_stream_id=stream.input_stream_id, asset=stream.asset, stream_type=stream.stream_type, state=None, ) candidates.append(candidate) self.last_file_entries = new_file_entries return candidates def _delete_temp_file(self, filename: str) -> None: """Deletes a temporary file that's a copy of the source file""" try: os.remove(filename) except OSError as e: log.warning("Unable to delete temp file %s - %s. If the file still " "exists, you will need to manually clean it up.", filename, e.strerror)
[docs] @contextlib.contextmanager def handle_processing_failure(self, filename: str) -> Iterator[None]: """Handles cleaning up of a temporary file if we run into any errors while processing the file's data, so that we don't bloat the temporary storage. """ # TODO: FTX-575 - Keep track of invalid files, so we can keep a # single copy of it and having a list or table for reference will # help debugging try: yield except Failure: self._delete_temp_file(filename) raise
# TODO: Need to disambiguate state by connection? def _process_grouped_candidates( self, grouped_candidates: Iterable[Tuple[Tuple[FileEntry, Connection], Iterable[ParsingCandidate]]] ) -> None: for (file_entry, connection), file_candidates in grouped_candidates: completed_path = safe_path_join(self._completed_path, file_entry.path) ensure_dir_exists(os.path.dirname(completed_path)) with tempfile.NamedTemporaryFile(delete=False, dir=self.local_temp_dir) as fp: with self.handle_processing_failure(fp.name): marker_category = f'connections.{connection.name}.copy' try: connection.copy(file_entry, fp.name) markers.clear(marker_category) except ConnectionError as e: msg = f'{connection.name} - {connection.connection_info} failed to copy {file_entry.path}. {e.message}' markers.error(marker_category, msg) raise Failure for c in file_candidates: # TODO: Pass stream_id instead? with push_context(component=self.name, component_type=self.component_type, asset=c.asset, stream_type=c.stream_type): log.info("Processing file %s from %s", file_entry.filename, file_entry.path) try: results = c.parser.process(file_entry, c.state, fp.name) for (frame, c.state), next_result in lookahead(results): frame = frame.copy(deep=True) # avoid SettingWithCopyError if we reassign dataframe cell values modification_timestamp = datetime.fromtimestamp(file_entry.mtime, pytz.utc) additional_columns: dict = { const.ASSET_COLNAME: c.asset, const.STREAM_TYPE_COLNAME: c.stream_type, } if self.emit_file_metadata: additional_columns[const.FILE_NAME_COLNAME] = file_entry.filename additional_columns[const.FILE_PATH_COLNAME] = file_entry.path additional_columns[const.FILE_TIMESTAMP_COLNAME] = modification_timestamp add_columns_if_absent(frame, additional_columns) dataflow_process_marker = f'connections.{connection.name}.dataflow:process' receiver_log_context = markers.get_context() try: self.dataflow.process(c.input_stream_id, frame) markers.clear(dataflow_process_marker, receiver_log_context) except Failure: # Invalid file - Some ways to fail: # 1. Frame didn't pass validation # 2. A transform on the frame failed # 3. Frame couldn't be processed by transmit # TODO: Should we differentiate between failures? markers.error( category=dataflow_process_marker, message=f"Dataflow failed to process file: {file_entry.path}", context=receiver_log_context ) raise if next_result is not None: # Avoid writing the last state twice. self._store.record(c, completed=False) except ParserError as e: parser_error_msg = (f"File {file_entry.filename} from {file_entry.path} " f"could not be parsed: {str(e)}").replace('\n', '') log.warning(parser_error_msg) # If config is not set to skip errors, raise the exception if not self.skip_parser_errors: raise Failure from ParserError # Log the ParserError as a warning to system log and also out to # a separate 'quarantine.log' file # Then continue on and record ParsingCandidate result to DB try: self.quarantine_log.write_msg(f"{parser_error_msg}") except OSError as exc: markers.error( category=f"quarantine", message=f"Failed to write to quarantine log: {exc}", ) self._store.record(c, completed=True) if self.archive_completed: shutil.move(fp.name, completed_path) if self.delete_completed: marker_category = f'connections.{connection.name}.delete' try: connection.delete(file_entry) markers.clear(marker_category) except ConnectionError as e: msg = f'{connection.name} - {connection.connection_info} failed to delete {file_entry.path}. {e.message}' markers.error(marker_category, msg) # Keep going if we can't delete the file, backing off # will only cause us to collect data slowly. if not self.archive_completed: self._delete_temp_file(fp.name) def _process_candidates(self, candidates: Iterable[ParsingCandidate]) -> None: """Parses a list of candidates, pushes them through the data pipeline, and stores any state emitted by the parsers. """ if self.process_ordered_connections: connection_files: Dict[str, List[ParsingCandidate]] = defaultdict(list) for candidate in candidates: connection_files[candidate.connection.name].append(candidate) for connection in self.connection_configs: connection_name: str = connection['connection_name'] file_list: List[ParsingCandidate] = connection_files.get(connection_name, []) grouped_candidates = grouped(file_list, key=lambda c: (c.file, c.connection)) self._process_grouped_candidates(grouped_candidates) else: grouped_candidates = grouped(candidates, key=lambda c: (c.file, c.connection)) self._process_grouped_candidates(grouped_candidates)
[docs] def poll(self, connections: List[Connection]) -> None: """Processes all new or modified files from one or more connections.""" candidates: List[ParsingCandidate] = self._list_candidates(connections) log.info('Found %d parseable files', len(candidates)) candidates = self._store.fetch_new_or_modified( candidates, alphabetical=self.process_files_alphabetically ) log.info('Processing %d new or modified files', len(candidates)) self._process_candidates(candidates)
[docs] def run_once(self) -> None: """Runs a single polling pass. This method should only be used for unit tests since it sets up and tears down new connections whenever it runs. """ with contextlib.ExitStack() as stack: connections: List[Connection] = [] for connection_config in self.connection_configs: connection = self.connection_class(connection_config, self.root_config) stack.push(contextlib.closing(connection)) connections.append(connection) self.poll(connections)
[docs] def run(self) -> None: # clean up completed files if self.archive_completed and self.max_archive_size_mb >= 0: threading.Thread(target=self.clean_completed_dir).start() # XXX: Switch to only using a single connection. with contextlib.ExitStack() as stack: connections: List[Connection] = [] for connection_config in self.connection_configs: connection_name = connection_config['connection_name'] marker_category = f'connections.{connection_name}.connect' try: connection = self.connection_class(connection_config, self.root_config) markers.clear(marker_category) except ConnectionError as e: markers.error(marker_category, f'{connection_name} failed to connect. {e.message}') raise Failure stack.push(contextlib.closing(connection)) connections.append(connection) while True: self.poll(connections) time.sleep(self.poll_interval)
def clean_completed_dir(self, run_once: bool=False, sleep: float=300) -> None: marker_category = 'os.clean' max_size = self.max_archive_size_mb * 1024 * 1024 while True: try: total_archive_size = 0 ensure_dir_exists(self._completed_path) for dir_path, _, filenames in os.walk(self._completed_path): for filename in sorted(filenames, key=lambda f: -os.path.getmtime(os.path.join(dir_path, f))): path = os.path.join(dir_path, filename) filesize = os.path.getsize(path) total_archive_size += filesize if total_archive_size > max_size: os.remove(path) if run_once: break time.sleep(sleep) except OSError as e: msg = f'Error cleaning directory: {e.strerror}' markers.error(marker_category, msg)