import os
import shutil
from typing import Generator, List, Optional
from factorytx.receivers.file import (
    Connection, FileEntry, ConnectionError, FileReceiver
)
from factorytx.base import receivers
from factorytx.validation import ValidationMessage, clean_with_json_schema
class _LocalFileConnection(Connection):
    schema = {
        "type": "object",
        "properties": {
            "connection_name": {
                "type": "string",
                "minLength": 1,
            },
            "data_directory": {
                "type": "string",
                "minLength": 1,
            },
            "recursive": {
                "type": "boolean",
                "default": True
            },
        },
        "required": ["connection_name", "data_directory"],
    }
    """
    **Connection Settings:**
        **Required** and *optional* properties that can be configured for a
        local file connection:
        - **connection_name**: Unique name for the connection.
        - **data_directory**: Path to a directory on the local filesystem
          containing input files.
        - **recursive**: When enabled, the localfile connection will recursively
          search for files in subdirectories of the ``data_directory``. By 
          default, this setting is set to ``true``.
    """
    @classmethod
    def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]:
        return clean_with_json_schema(_LocalFileConnection.schema, config)
    def __init__(self, config: dict, root_config: dict) -> None:
        super().__init__(config, root_config)
        self.data_directory = config['data_directory']
        self.recursive_search = config['recursive']
    @property
    def connection_info(self) -> str:
        return f'localfile://{self.data_directory}'
    def close(self) -> None:
        pass
    def _format_error(self, e: Exception) -> str:
        if isinstance(e, OSError):
            return e.strerror
        else:
            return 'Unexpected error: {e!r}'
    def _list_files(self) -> Generator[FileEntry, None, None]:
        for dir_entry in os.scandir(self.data_directory):
            if dir_entry.is_file():
                path = dir_entry.path
                try:
                    file_entry = FileEntry(
                        path=os.path.relpath(path, start=self.data_directory),
                        filename=os.path.basename(path),
                        mtime=os.path.getmtime(path),
                        size=os.path.getsize(path),
                    )
                    yield file_entry
                except FileNotFoundError:
                    # Skip files that were deleted while we were looking at them.
                    pass
            # Else, skip directories and symbolic links
    def _recursively_list_files(self) -> Generator[FileEntry, None, None]:
        for dirpath, dirnames, filenames in os.walk(self.data_directory):
            for filename in filenames:
                path = os.path.join(dirpath, filename)
                try:
                    file_entry = FileEntry(
                        path=os.path.relpath(path, start=self.data_directory),
                        filename=os.path.basename(path),
                        mtime=os.path.getmtime(path),
                        size=os.path.getsize(path),
                    )
                    yield file_entry
                except FileNotFoundError:
                    # Skip files that were deleted while we were looking at them.
                    pass
    def list(self, start_after_hint: Optional[str]) -> Generator[FileEntry, None, None]:
        try:
            if self.recursive_search:
                file_entries = self._recursively_list_files()
            else:
                file_entries = self._list_files()
            return file_entries
        except Exception as e:
            raise ConnectionError(self._format_error(e))
    def copy(self, file_entry: FileEntry, local_path: str) -> None:
        try:
            path = os.path.join(self.data_directory, file_entry.path)
            shutil.copy2(path, local_path)
        except Exception as e:
            raise ConnectionError(self._format_error(e))
    def delete(self, file_entry: FileEntry) -> None:
        try:
            path = os.path.join(self.data_directory, file_entry.path)
            os.remove(path)
        except Exception as e:
            raise ConnectionError(self._format_error(e))
[docs]@receivers.register('localfile')
class LocalFileReceiver(FileReceiver):
    """The ``localfile`` data receiver fetches and reads files on the local
    system.
    Example:
        If we want to retrieve data from a local CSV file in
        */data/test-data/multiple-files*, our configuration will look
        something like this:
        .. code-block:: json
            {
                "data_receiver": [
                    {
                        "data_receiver_name": "File Plugin 1",
                        "protocol": "localfile",
                        "delete_completed": true,
                        "poll_interval": 4,
                        "connections": [
                            {"data_directory": "/data/test-data/multiple-files"}
                        ],
                        "streams": [
                            {
                                "asset": "Station_CSV_3",
                                "stream_type": "cycle",
                                "file_filter": ["station3.csv"]
                            }
                        ]
                    }
                ]
            }
    **Configuration:**
        The ``localfile`` data receiver uses a file receiver that connects to
        the local system.
        .. autoheadlessattribute:: factorytx.receivers.file.receiver.FileReceiver.schema
            :noindex:
        .. autoheadlessattribute:: factorytx.receivers.localfile._LocalFileConnection.schema
            :annotation:
    """
    connection_class = _LocalFileConnection