Source code for factorytx.receivers.localfile

import os
import shutil
from typing import Generator, List, Optional

from factorytx.receivers.file import (
    Connection, FileEntry, ConnectionError, FileReceiver
)
from factorytx.base import receivers
from factorytx.validation import ValidationMessage, clean_with_json_schema


class _LocalFileConnection(Connection):

    schema = {
        "type": "object",
        "properties": {
            "connection_name": {
                "type": "string",
                "minLength": 1,
            },
            "data_directory": {
                "type": "string",
                "minLength": 1,
            },
            "recursive": {
                "type": "boolean",
                "default": True
            },
        },
        "required": ["connection_name", "data_directory"],
    }
    """
    **Connection Settings:**
        **Required** and *optional* properties that can be configured for a
        local file connection:

        - **connection_name**: Unique name for the connection.
        - **data_directory**: Path to a directory on the local filesystem
          containing input files.
        - **recursive**: When enabled, the localfile connection will recursively
          search for files in subdirectories of the ``data_directory``. By 
          default, this setting is set to ``true``.

    """

    @classmethod
    def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]:
        return clean_with_json_schema(_LocalFileConnection.schema, config)

    def __init__(self, config: dict, root_config: dict) -> None:
        super().__init__(config, root_config)

        self.data_directory = config['data_directory']
        self.recursive_search = config['recursive']

    @property
    def connection_info(self) -> str:
        return f'localfile://{self.data_directory}'

    def close(self) -> None:
        pass

    def _format_error(self, e: Exception) -> str:
        if isinstance(e, OSError):
            return e.strerror
        else:
            return 'Unexpected error: {e!r}'

    def _list_files(self) -> Generator[FileEntry, None, None]:
        for dir_entry in os.scandir(self.data_directory):
            if dir_entry.is_file():
                path = dir_entry.path
                try:
                    file_entry = FileEntry(
                        path=os.path.relpath(path, start=self.data_directory),
                        filename=os.path.basename(path),
                        mtime=os.path.getmtime(path),
                        size=os.path.getsize(path),
                    )
                    yield file_entry
                except FileNotFoundError:
                    # Skip files that were deleted while we were looking at them.
                    pass
            # Else, skip directories and symbolic links

    def _recursively_list_files(self) -> Generator[FileEntry, None, None]:
        for dirpath, dirnames, filenames in os.walk(self.data_directory):
            for filename in filenames:
                path = os.path.join(dirpath, filename)
                try:
                    file_entry = FileEntry(
                        path=os.path.relpath(path, start=self.data_directory),
                        filename=os.path.basename(path),
                        mtime=os.path.getmtime(path),
                        size=os.path.getsize(path),
                    )
                    yield file_entry
                except FileNotFoundError:
                    # Skip files that were deleted while we were looking at them.
                    pass

    def list(self, start_after_hint: Optional[str]) -> Generator[FileEntry, None, None]:
        try:
            if self.recursive_search:
                file_entries = self._recursively_list_files()
            else:
                file_entries = self._list_files()
            return file_entries
        except Exception as e:
            raise ConnectionError(self._format_error(e))

    def copy(self, file_entry: FileEntry, local_path: str) -> None:
        try:
            path = os.path.join(self.data_directory, file_entry.path)
            shutil.copy2(path, local_path)
        except Exception as e:
            raise ConnectionError(self._format_error(e))

    def delete(self, file_entry: FileEntry) -> None:
        try:
            path = os.path.join(self.data_directory, file_entry.path)
            os.remove(path)
        except Exception as e:
            raise ConnectionError(self._format_error(e))


[docs]@receivers.register('localfile') class LocalFileReceiver(FileReceiver): """The ``localfile`` data receiver fetches and reads files on the local system. Example: If we want to retrieve data from a local CSV file in */data/test-data/multiple-files*, our configuration will look something like this: .. code-block:: json { "data_receiver": [ { "data_receiver_name": "my_localfile_receiver", "protocol": "localfile", "delete_completed": true, "poll_interval": 4, "connections": [ { "data_directory": "/data/test-data/multiple-files", "connection_name": "my_connection" } ], "streams": [ { "asset": "Station_CSV_3", "stream_type": "cycle", "file_filter": ["station3.csv"], "parser": "My_CSV_Parser" } ], "parsers": [ { "parser_name": "My_CSV_Parser", "parser_type": "csv" } ] } ] } **Configuration:** The ``localfile`` data receiver uses a file receiver that connects to the local system. .. autoheadlessattribute:: factorytx.receivers.file.receiver.FileReceiver.schema :noindex: .. autoheadlessattribute:: factorytx.receivers.localfile._LocalFileConnection.schema :annotation: """ connection_class = _LocalFileConnection