Source code for factorytx.receivers.file.base

from abc import ABCMeta, abstractmethod
from typing import Generator, List, Optional, Tuple

import pandas as pd

from factorytx.registry import Registry
from factorytx.validation import ValidationMessage
from factorytx.utils import class_name


[docs]class FileEntry: """Represents a file or resource on a remote system. :ivar path: relative path of the file on the remote system. If the FileEntry represents a virtual file, the path should remain the same across subsequent calls to list_files (ie. if the remote system updates a/b.c, the path should still remain the same instead of becoming a/b.v2.c.) :ivar filename: base name of the file on the remote system. This is stored separately from the path since the directory separator my vary. :ivar mtime: file modification time as seconds since the Unix epoch. :ivar size: size of the file in bytes. """ __slots__ = ['path', 'filename', 'mtime', 'size'] def __init__(self, path: str, filename: str, mtime: float, size: int) -> None: self.path = path self.filename = filename self.mtime = mtime self.size = size def __eq__(self, other: object) -> bool: return (isinstance(other, FileEntry) and self.path == other.path and self.filename == other.filename and self.mtime == other.mtime and self.size == other.size) def __hash__(self) -> int: # The path should uniquely identify a FileEntry, so hashing the other # properties won't help avoid collisions anyway. return hash(self.path) def __repr__(self) -> str: s = (f'FileEntry(path={self.path!r}, filename={self.filename!r}, ' f'mtime={self.mtime}, size={self.size})') return s
[docs]class ConnectionError(Exception): """Represents an error that occurred while using a Connection.""" def __init__(self, message: str) -> None: """Instantiates a ConnectionError with the user-facing message `message`.""" super().__init__() self.message = message def __repr__(self) -> str: return f'ConnectionError({self.message!r})' def __str__(self) -> str: return self.message
[docs]class Connection(metaclass=ABCMeta): """Represents a connection to a remote server. Each supported type of file server (eg. FTP, SMB, WebDAV, etc.) should have a corresponding connection type. You can define a custom file receiver type using a connection as follows: >>> from factorytx.base import receivers >>> from factorytx.receivers.file import Connection, FileReceiver >>> class MyFileConnection(Connection): ... pass # Implementation goes here. >>> @receivers.register('my_file_connection') ... class MyFileReceiver(FileReceiver): ... connection_class = MyFileConnection """
[docs] @classmethod @abstractmethod def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]: """Normalizes and validates a configuration dictionary. This method is responsible for validating that the configuration it receives is well-formed and will be accepted by the constructor. Any issues that prevent the configuration from being loaded must be signalled by returning one or more error-level ValidationMessages. Less severe issues should be reported as warning-level messages instead. This method may also modify the `config` dictionary, eg. to insert default values or fix up legacy configuration structures. Any changes made to the config will persist in the configuration dictionary passed to the constructor. :param config: the configuration dictionary to validate. :param root_config: the root configuration dictionary, which includes global settings and all component configurations. This dictionary **must not** be modified. """ pass # pragma: no cover
@abstractmethod def __init__(self, config: dict, root_config: dict) -> None: """Initializes a Connection instance. :param config: a configuration dictionary which has already been validated and normalized using the `clean` method. __init__ will only be called if the prior call to clean did not return any validation errors. :param root_config: the root configuration dictionary. This can be used to access global settings, eg. whether debugging is enabled or what the Sight Machine platform credentials are. """ self.name: str = config['connection_name'] @property @abstractmethod def connection_info(self) -> str: """Returns a short string that with detailed connection information. This is usually formatted as a URI, eg. 's3://my-bucket/bucket-prefix'. """ pass
[docs] @abstractmethod def close(self) -> None: """Closes the connection and frees any associated resources, eg. sockets.""" pass # pragma: no cover
[docs] @abstractmethod def list(self, start_after_hint: Optional[str]) -> Generator[FileEntry, None, None]: """Lists files on the remote server. :param start_after_hint: Latest file path (in lexicographic order) received by this connection, or ``None`` if no files have been received so far. This can be used to optimize fetching new files if (1) the data source generates file paths that are lexicographically-ordered, and (2) the server can fetch a subset of files with paths greater than a specified parameter (e.g. the ``StartAfter`` parameter for an S3 connection). """ pass # pragma: no cover
[docs] @abstractmethod def copy(self, file_entry: FileEntry, local_path: str) -> None: """Copies a file's contents from a remote server to a specified temporary file on the local disk. :param file_entry: metadata about the file, such as the path to the file on the remote server. :param local_path: path to write the file to. `local_path` will always point to an existing zero-byte file. """ pass # pragma: no cover
[docs] @abstractmethod def delete(self, file_entry: FileEntry) -> None: """Deletes a file from the remote server. Connection classes that do not support deletion may raise a NotImplementedError instead. :param file_entry: metadata about the file, such as the path to the file on the remote server. """ pass # pragma: no cover
def __repr__(self) -> str: cls_nm = class_name(type(self)) return f'{cls_nm}(connection_info={self.connection_info!r})'
[docs]class Parser(metaclass=ABCMeta): """Converts file contents into structured data. Each supported class of file (eg. CSV files, Excel files, unparseable binary files, etc.) should have a corresponding parser type which is responsible for converting files into pandas DataFrames. Though a parser should be stateless, it can associate a state dictionary with each file it parses. See the `process` method for details. """
[docs] @classmethod @abstractmethod def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]: """Normalizes and validates a configuration dictionary. This method is responsible for validating that the configuration it receives is well-formed and will be accepted by the constructor. Any issues that prevent the configuration from being loaded must be signalled by returning one or more error-level ValidationMessages. Less severe issues should be reported as warning-level messages instead. This method may also modify the `config` dictionary, eg. to insert default values or fix up legacy configuration structures. Any changes made to the config will persist in the configuration dictionary passed to the constructor. :param config: the configuration dictionary to validate. :param root_config: the root configuration dictionary, which includes global settings and all component configurations. This dictionary **must not** be modified. """ pass # pragma: no cover
@abstractmethod def __init__(self, config: dict, root_config: dict) -> None: """Initializes a Parser instance. :param config: a configuration dictionary which has already been validated and normalized using the `clean` method. __init__ will only be called if the prior call to clean did not return any validation errors. :param root_config: the root configuration dictionary. This can be used to access global settings, eg. whether debugging is enabled or what the Sight Machine platform credentials are. """ self.name: str = config['parser_name']
[docs] @abstractmethod def process(self, file_entry: FileEntry, state: Optional[dict], local_path: str) \ -> Generator[Tuple[pd.DataFrame, dict], None, None]: """Processes a file and converts it into a pandas DataFrame. :param file_entry: metadata about the file, eg. its name, path on the remote server, last modification time, etc. :param state: None if the file has not been previously processed, or the state dictionary returned the last time this file was processed. :param local_path: path to a temporary file containing the file contents on the local disk. This file will be removed once file has been parsed. :returns: an iterator that yields tuples. Each tuple holds a DataFrame containing data extracted from the file, and a JSON-serializable state dictionary. The state dictionary will be persisted and passed back to the `process` method when the file is next parsed, eg. after the file is modified. """ pass # pragma: no cover
parsers = Registry(Parser)