import os
import shutil
from typing import Generator, List, Optional
from factorytx.receivers.file import (
Connection, FileEntry, ConnectionError, FileReceiver
)
from factorytx.base import receivers
from factorytx.validation import ValidationMessage, clean_with_json_schema
class _LocalFileConnection(Connection):
schema = {
"type": "object",
"properties": {
"connection_name": {
"type": "string",
"minLength": 1,
},
"data_directory": {
"type": "string",
"minLength": 1,
},
"recursive": {
"type": "boolean",
"default": True
},
},
"required": ["connection_name", "data_directory"],
}
"""
**Connection Settings:**
**Required** and *optional* properties that can be configured for a
local file connection:
- **connection_name**: Unique name for the connection.
- **data_directory**: Path to a directory on the local filesystem
containing input files.
- **recursive**: When enabled, the localfile connection will recursively
search for files in subdirectories of the ``data_directory``. By
default, this setting is set to ``true``.
"""
@classmethod
def clean(cls, config: dict, root_config: dict) -> List[ValidationMessage]:
return clean_with_json_schema(_LocalFileConnection.schema, config)
def __init__(self, config: dict, root_config: dict) -> None:
super().__init__(config, root_config)
self.data_directory = config['data_directory']
self.recursive_search = config['recursive']
@property
def connection_info(self) -> str:
return f'localfile://{self.data_directory}'
def close(self) -> None:
pass
def _format_error(self, e: Exception) -> str:
if isinstance(e, OSError):
return e.strerror
else:
return 'Unexpected error: {e!r}'
def _list_files(self) -> Generator[FileEntry, None, None]:
for dir_entry in os.scandir(self.data_directory):
if dir_entry.is_file():
path = dir_entry.path
try:
file_entry = FileEntry(
path=os.path.relpath(path, start=self.data_directory),
filename=os.path.basename(path),
mtime=os.path.getmtime(path),
size=os.path.getsize(path),
)
yield file_entry
except FileNotFoundError:
# Skip files that were deleted while we were looking at them.
pass
# Else, skip directories and symbolic links
def _recursively_list_files(self) -> Generator[FileEntry, None, None]:
for dirpath, dirnames, filenames in os.walk(self.data_directory):
for filename in filenames:
path = os.path.join(dirpath, filename)
try:
file_entry = FileEntry(
path=os.path.relpath(path, start=self.data_directory),
filename=os.path.basename(path),
mtime=os.path.getmtime(path),
size=os.path.getsize(path),
)
yield file_entry
except FileNotFoundError:
# Skip files that were deleted while we were looking at them.
pass
def list(self, start_after_hint: Optional[str]) -> Generator[FileEntry, None, None]:
try:
if self.recursive_search:
file_entries = self._recursively_list_files()
else:
file_entries = self._list_files()
return file_entries
except Exception as e:
raise ConnectionError(self._format_error(e))
def copy(self, file_entry: FileEntry, local_path: str) -> None:
try:
path = os.path.join(self.data_directory, file_entry.path)
shutil.copy2(path, local_path)
except Exception as e:
raise ConnectionError(self._format_error(e))
def delete(self, file_entry: FileEntry) -> None:
try:
path = os.path.join(self.data_directory, file_entry.path)
os.remove(path)
except Exception as e:
raise ConnectionError(self._format_error(e))
[docs]@receivers.register('localfile')
class LocalFileReceiver(FileReceiver):
"""The ``localfile`` data receiver fetches and reads files on the local
system.
Example:
If we want to retrieve data from a local CSV file in
*/data/test-data/multiple-files*, our configuration will look
something like this:
.. code-block:: json
{
"data_receiver": [
{
"data_receiver_name": "File Plugin 1",
"protocol": "localfile",
"delete_completed": true,
"poll_interval": 4,
"connections": [
{"data_directory": "/data/test-data/multiple-files"}
],
"streams": [
{
"asset": "Station_CSV_3",
"stream_type": "cycle",
"file_filter": ["station3.csv"]
}
]
}
]
}
**Configuration:**
The ``localfile`` data receiver uses a file receiver that connects to
the local system.
.. autoheadlessattribute:: factorytx.receivers.file.receiver.FileReceiver.schema
:noindex:
.. autoheadlessattribute:: factorytx.receivers.localfile._LocalFileConnection.schema
:annotation:
"""
connection_class = _LocalFileConnection