from abc import ABCMeta, abstractmethod
from typing import Generator, List, Optional, Tuple
import pandas as pd
from factorytx.registry import Registry
from factorytx.validation import ValidationMessage
from factorytx.utils import class_name
[docs]class FileEntry:
"""Represents a file or resource on a remote system.
:ivar path: relative path of the file on the remote system. If the FileEntry
represents a virtual file, the path should remain the same across
subsequent calls to list_files (ie. if the remote system updates a/b.c,
the path should still remain the same instead of becoming a/b.v2.c.)
This *includes* the filename.
:ivar filename: base name of the file on the remote system. This is stored
separately from the path since the directory separator my vary.
:ivar mtime: file modification time as seconds since the Unix epoch.
:ivar size: size of the file in bytes.
"""
__slots__ = ['path', 'filename', 'mtime', 'size']
def __init__(self, path: str, filename: str, mtime: float, size: int) -> None:
self.path = path
self.filename = filename
self.mtime = mtime
self.size = size
def __eq__(self, other: object) -> bool:
return (isinstance(other, FileEntry) and
self.path == other.path and
self.filename == other.filename and
self.mtime == other.mtime and
self.size == other.size)
def __hash__(self) -> int:
# The path should uniquely identify a FileEntry, so hashing the other
# properties won't help avoid collisions anyway.
return hash(self.path)
def __repr__(self) -> str:
s = (f'FileEntry(path={self.path!r}, filename={self.filename!r}, '
f'mtime={self.mtime}, size={self.size})')
return s
[docs]class ConnectionError(Exception):
"""Represents an error that occurred while using a Connection."""
def __init__(self, message: str) -> None:
"""Instantiates a ConnectionError with the user-facing message `message`."""
super().__init__()
self.message = message
def __repr__(self) -> str:
return f'ConnectionError({self.message!r})'
def __str__(self) -> str:
return self.message
[docs]class Connection(metaclass=ABCMeta):
"""Represents a connection to a remote server. Each supported type of file
server (eg. FTP, SMB, WebDAV, etc.) should have a corresponding connection
type.
You can define a custom file receiver type using a connection as follows:
>>> from factorytx.base import receivers
>>> from factorytx.receivers.file import Connection, FileReceiver
>>> class MyFileConnection(Connection):
... pass # Implementation goes here.
>>> @receivers.register('my_file_connection')
... class MyFileReceiver(FileReceiver):
... connection_class = MyFileConnection
"""
[docs] @classmethod
@abstractmethod
def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]:
"""Normalizes and validates a configuration dictionary.
This method is responsible for validating that the configuration it
receives is well-formed and will be accepted by the constructor. Any
issues that prevent the configuration from being loaded must be
signalled by returning one or more error-level ValidationMessages. Less
severe issues should be reported as warning-level messages instead.
This method may also modify the `config` dictionary, eg. to insert
default values or fix up legacy configuration structures. Any changes
made to the config will persist in the configuration dictionary passed
to the constructor.
:param config: the configuration dictionary to validate.
:param root_config: the root configuration dictionary, which includes
global settings and all component configurations. This dictionary
**must not** be modified.
"""
pass # pragma: no cover
@abstractmethod
def __init__(self, config: dict, root_config: dict) -> None:
"""Initializes a Connection instance.
:param config: a configuration dictionary which has already been
validated and normalized using the `clean` method. __init__
will only be called if the prior call to clean did not return any
validation errors.
:param root_config: the root configuration dictionary. This can be used
to access global settings, eg. whether debugging is enabled or what
the Sight Machine platform credentials are.
"""
self.name: str = config['connection_name']
@property
@abstractmethod
def connection_info(self) -> str:
"""Returns a short string that with detailed connection information.
This is usually formatted as a URI, eg. 's3://my-bucket/bucket-prefix'.
"""
pass
[docs] @abstractmethod
def close(self) -> None:
"""Closes the connection and frees any associated resources, eg. sockets."""
pass # pragma: no cover
[docs] @abstractmethod
def list(self, start_after_hint: Optional[str]) -> Generator[FileEntry, None, None]:
"""Lists files on the remote server.
:param start_after_hint: Latest file path (in lexicographic order)
received by this connection, or ``None`` if no files have been
received so far. This can be used to optimize fetching new files if
(1) the data source generates file paths that are
lexicographically-ordered, and (2) the server can fetch a subset of
files with paths greater than a specified parameter (e.g. the
``StartAfter`` parameter for an S3 connection).
"""
pass # pragma: no cover
[docs] @abstractmethod
def copy(self, file_entry: FileEntry, local_path: str) -> None:
"""Copies a file's contents from a remote server to a specified
temporary file on the local disk.
:param file_entry: metadata about the file, such as the path to the file
on the remote server.
:param local_path: path to write the file to. `local_path` will always
point to an existing zero-byte file.
"""
pass # pragma: no cover
[docs] @abstractmethod
def delete(self, file_entry: FileEntry) -> None:
"""Deletes a file from the remote server. Connection classes that do not
support deletion may raise a NotImplementedError instead.
:param file_entry: metadata about the file, such as the path to the file
on the remote server.
"""
pass # pragma: no cover
def __repr__(self) -> str:
cls_nm = class_name(type(self))
return f'{cls_nm}(connection_info={self.connection_info!r})'
[docs]class Parser(metaclass=ABCMeta):
"""Converts file contents into structured data. Each supported class of
file (eg. CSV files, Excel files, unparseable binary files, etc.) should
have a corresponding parser type which is responsible for converting files
into pandas DataFrames.
Though a parser should be stateless, it can associate a state dictionary
with each file it parses. See the `process` method for details.
"""
[docs] @classmethod
@abstractmethod
def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]:
"""Normalizes and validates a configuration dictionary.
This method is responsible for validating that the configuration it
receives is well-formed and will be accepted by the constructor. Any
issues that prevent the configuration from being loaded must be
signalled by returning one or more error-level ValidationMessages. Less
severe issues should be reported as warning-level messages instead.
This method may also modify the `config` dictionary, eg. to insert
default values or fix up legacy configuration structures. Any changes
made to the config will persist in the configuration dictionary passed
to the constructor.
:param config: the configuration dictionary to validate.
:param root_config: the root configuration dictionary, which includes
global settings and all component configurations. This dictionary
**must not** be modified.
"""
pass # pragma: no cover
@abstractmethod
def __init__(self, config: dict, root_config: dict) -> None:
"""Initializes a Parser instance.
:param config: a configuration dictionary which has already been
validated and normalized using the `clean` method. __init__
will only be called if the prior call to clean did not return any
validation errors.
:param root_config: the root configuration dictionary. This can be used
to access global settings, eg. whether debugging is enabled or what
the Sight Machine platform credentials are.
"""
self.name: str = config['parser_name']
[docs] @abstractmethod
def process(self, file_entry: FileEntry, state: Optional[dict], local_path: str) \
-> Generator[Tuple[pd.DataFrame, dict], None, None]:
"""Processes a file and converts it into a pandas DataFrame.
:param file_entry: metadata about the file, eg. its name, path on the
remote server, last modification time, etc.
:param state: None if the file has not been previously processed, or
the state dictionary returned the last time this file was processed.
:param local_path: path to a temporary file containing the file contents
on the local disk. This file will be removed once file has been
parsed.
:returns: an iterator that yields tuples. Each tuple holds a DataFrame
containing data extracted from the file, and a JSON-serializable
state dictionary. The state dictionary will be persisted and passed
back to the `process` method when the file is next parsed,
eg. after the file is modified.
"""
pass # pragma: no cover
parsers = Registry(Parser)