import contextlib
import json
import logging
import os
import sqlite3
import textwrap
from typing import List, Optional
from bson import json_util
from factorytx.dataflow import InputStreamId
from factorytx.receivers.file.base import Connection, FileEntry, Parser
from factorytx.sqlite import set_journal_size_limit, MigrationDict, DBMigrator
from factorytx.utils import ensure_dir_exists
log = logging.getLogger(__name__)
# TODO: Merge all file receiver-related files into a single source file.
[docs]class ParsingCandidate:
"""Holds the data necessary to process a file for a given stream. To
simplify the act of fetching, parsing, transmitting, and deleting data this
object has attributes that would normally be scattered over several data
structures:
:ivar connection: Connection to use to retrieve the file and (optionally)
delete it.
:ivar parser: Parser to apply to the file after retrieving it.
:ivar file: metadata about the file, eg. it's path on the remote server.
:ivar input_stream_id: identifier for the input stream associated with the file.
:ivar asset: stream to associate with the data extracted from the file.
:ivar stream_type: stream to associate with the data extracted from the file.
:ivar state: state dictionary from the last time the file was parsed, or
None if the file has not been processed before.
"""
__slots__ = ['connection', 'parser', 'file', 'input_stream_id', 'asset', 'stream_type', 'state']
def __init__(self, connection: Connection, parser: Parser, file: FileEntry,
input_stream_id: InputStreamId, asset: str, stream_type: str,
state: Optional[dict]) -> None:
self.connection = connection
self.parser = parser
self.file = file
self.input_stream_id = input_stream_id
self.asset = asset
self.stream_type = stream_type
self.state = state
def __eq__(self, other: object) -> bool:
return (isinstance(other, ParsingCandidate) and
self.connection == other.connection and
self.parser == other.parser and
self.file == other.file and
self.input_stream_id == other.input_stream_id and
self.asset == other.asset and
self.stream_type == other.stream_type and
self.state == other.state)
def __repr__(self) -> str:
s = (f'ParsingCandidate(connection={self.connection!r}, parser={self.parser!r}, '
f'file={self.file!r}, input_stream_id={self.input_stream_id!r}, '
f'asset={self.asset!r}, stream_type={self.stream_type!r} state={self.state!r})')
return s
class StateStoreDBMigrator(DBMigrator):
def _SCHEMA_SQL(self) -> str:
# language=SQLite
return textwrap.dedent("""\
CREATE TABLE IF NOT EXISTS files (
-- Input stream that the file was parsed by. This is stored as asset
-- and stream_type instead of a stream ID for backwards-compatibility.
asset TEXT NOT NULL,
stream_type TEXT NOT NULL,
-- Path to the file on the remote system.
path TEXT NOT NULL,
-- Modification time of the file as seconds since the epoch.
mtime REAL NOT NULL,
-- Size of the file in bytes.
size INTEGER NOT NULL,
-- Arbitrary JSON data returned from the parser. This is used to extract
-- new records from files that are continuously updated.
state JSON NOT NULL,
-- Has the file been completely parsed yet? If zero, the file has been
-- only partially parsed (eg. FactoryTX was stopped before it finished
-- parsing and emitting data frames.)
completed TINYINT NOT NULL DEFAULT 1,
-- Name of the connection that retrieved the file from the server
connection TEXT DEFAULT NULL,
PRIMARY KEY (connection, asset, stream_type, path) ON CONFLICT REPLACE
);
CREATE INDEX IF NOT EXISTS ix_files_connection_completed_path
ON files(connection, completed, path);
""")
def _MIGRATION_SQL(self) -> MigrationDict:
return {
"0000-initial-schema": textwrap.dedent("""
CREATE TABLE files (
-- Input stream that the file was parsed by. This is stored as asset
-- and stream_type instead of a stream ID for backwards-compatibility.
asset TEXT NOT NULL,
stream_type TEXT NOT NULL,
-- Path to the file on the remote system.
path TEXT NOT NULL,
-- Modification time of the file as seconds since the epoch.
mtime REAL NOT NULL,
-- Size of the file in bytes.
size INTEGER NOT NULL,
-- Arbitrary JSON data returned from the parser. This is used to extract
-- new records from files that are continuously updated.
state JSON NOT NULL,
PRIMARY KEY (asset, stream_type, path) ON CONFLICT REPLACE
);
"""),
"0001-add-completed": textwrap.dedent("""
ALTER TABLE files ADD COLUMN completed TINYINT NOT NULL DEFAULT 1;
"""),
"0002-add-connection-info": textwrap.dedent("""
ALTER TABLE files ADD COLUMN connection TEXT DEFAULT NULL;
"""),
"0003-create-idx-connection-completed-path": textwrap.dedent("""
CREATE INDEX IF NOT EXISTS ix_files_connection_completed_path
ON files(connection, completed, path);
"""),
"0004-add-connection-info-to-pk": textwrap.dedent("""
-- adapted from http://www.sqlitetutorial.net/sqlite-alter-table/
ALTER TABLE files RENAME TO temp_files;
-- https://stackoverflow.com/questions/42530689/how-to-rename-an-index-in-sqlite
DROP INDEX ix_files_connection_completed_path;
CREATE TABLE files (
asset TEXT NOT NULL,
stream_type TEXT NOT NULL,
path TEXT NOT NULL,
mtime REAL NOT NULL,
size INTEGER NOT NULL,
state JSON NOT NULL,
completed TINYINT NOT NULL DEFAULT 1,
connection TEXT DEFAULT NULL,
PRIMARY KEY (connection, asset, stream_type, path) ON CONFLICT REPLACE
);
CREATE INDEX ix_files_connection_completed_path
ON files(connection, completed, path);
INSERT INTO files (asset, stream_type, path, mtime, size, state, completed, connection)
SELECT asset, stream_type, path, mtime, size, state, completed, connection
FROM temp_files;
DROP TABLE temp_files;
"""),
}
[docs]class StateStore:
"""Tracks parser state and file metadata (eg. file size and modification
time) for files that have been parsed and pushed to a transmit.
"""
_CONNECT_SQL = textwrap.dedent("""\
-- Keep the journal after committing. This ensures that if we run out of disk
-- space we can still delete records from the database.
-- We need to run this BEFORE any other statements. Otherwise sqlite may
-- start an implicit transaction and delete the journal before we change modes.
PRAGMA journal_mode = PERSIST;
-- Enable online incremental vacuuming.
PRAGMA auto_vacuum = INCREMENTAL;
""")
_JOURNAL_SIZE_BYTES = 1 * 1024 * 1024 # 1 MiB
def __init__(self, path: str) -> None:
"""Instantiates a StateStore.
:param path: path to a file on disk to use to store the state. It and
its parent directory will be created if they do not already exist.
"""
ensure_dir_exists(os.path.dirname(path))
self.path = path
self.sql_conn = sqlite3.connect(path)
self.sql_conn.isolation_level = None
self.sql_conn.executescript(self._CONNECT_SQL)
self.initialized = False
[docs] def initialize(
self, force_migrate: bool = False, db_migrator: Optional[StateStoreDBMigrator] = None
) -> None:
"""
:param force_migrate: if True, then apply migration scripts instead of the
schema script even if the database does not exist. Defaults to False.
:param additional_init_fn: An initialization function called within the context of state initialization,
but after the primary state initialization has occurred.
"""
set_journal_size_limit(self.sql_conn, self.path, self._JOURNAL_SIZE_BYTES)
if db_migrator is None:
db_migrator = StateStoreDBMigrator()
db_migrator.migrate(self.sql_conn, force_migrate=force_migrate)
# Reset the journal size limit now that migrations are complete.
self.sql_conn.execute('PRAGMA journal_size_limit = -1')
self.initialized = True
def close(self) -> None:
self.sql_conn.close()
# language=SQLite
_INSERT_SQL = textwrap.dedent("""\
INSERT INTO files (asset, stream_type, path, mtime, size, completed, state, connection)
VALUES (:asset, :stream_type, :path, :mtime, :size, :completed, :state, :connection);
""")
[docs] def record(self, parsed: ParsingCandidate, completed: bool, clear_state_on_completed: bool = False) -> None:
"""Stores or updates the state of a parsed file on disk. The `parsed`
parameter must be a ParsingCandidate with an associated state dict.
"""
if clear_state_on_completed:
parsed.state = {}
with self.sql_conn:
params = {
'asset': parsed.asset,
'stream_type': parsed.stream_type,
'path': parsed.file.path,
'mtime': parsed.file.mtime,
'size': parsed.file.size,
'completed': completed,
'state': json.dumps(parsed.state, default=json_util.default, separators=(',', ':')),
'connection': parsed.connection.name
}
self.sql_conn.execute(self._INSERT_SQL, params)
# language=SQLite
_FILTER_SCHEMA_SQL = textwrap.dedent("""\
DROP TABLE IF EXISTS candidates;
CREATE TEMP TABLE candidates (
-- obj_index is the position of the original ParsingCandidate in the
-- candidate list.
obj_index INTEGER,
asset TEXT,
stream_type TEXT,
path TEXT,
connection TEXT DEFAULT NULL,
mtime REAL,
size INTEGER
);
""")
# language=SQLite
_FILTER_INSERT_SQL = "INSERT INTO candidates VALUES (?, ?, ?, ?, ?, ?, ?);"
# language=SQLite
_FILTER_SELECT_SQL = textwrap.dedent("""\
SELECT c.obj_index, f.state
FROM candidates c
LEFT JOIN files f
ON c.connection = f.connection
AND c.asset = f.asset
AND c.stream_type = f.stream_type
AND c.path = f.path
WHERE NOT f.completed
OR c.mtime IS NOT f.mtime
OR c.size IS NOT f.size
ORDER BY c.mtime ASC;
""")
# language=SQLite
_FILTER_SELECT_ALPHA_ORDER_SQL = textwrap.dedent("""\
SELECT c.obj_index, f.state
FROM candidates c
LEFT JOIN files f
ON c.connection = f.connection
AND c.asset = f.asset
AND c.stream_type = f.stream_type
AND c.path = f.path
WHERE NOT f.completed
OR c.mtime IS NOT f.mtime
OR c.size IS NOT f.size
ORDER BY c.path ASC;
""")
[docs] def fetch_new_or_modified(self, candidates: List[ParsingCandidate],
alphabetical: bool = False) -> List[ParsingCandidate]:
"""Returns all candidates that have not been previously parsed or have
changed since they were last parsed. As a side effect, this method
fetches the state dictionary of previously-parsed files from persistent
storage.
"""
results: List[ParsingCandidate] = []
with self.sql_conn, contextlib.closing(self.sql_conn.cursor()) as cursor:
cursor.execute("BEGIN")
cursor.executescript(self._FILTER_SCHEMA_SQL)
rows = [(ix, c.input_stream_id.asset, c.input_stream_id.stream_type,
c.file.path, c.connection.name,
c.file.mtime, c.file.size)
for ix, c in enumerate(candidates)]
cursor.executemany(self._FILTER_INSERT_SQL, rows)
if alphabetical:
# Order by path (filename)
cursor.execute(self._FILTER_SELECT_ALPHA_ORDER_SQL)
else:
# Order by last modified (or created) time
cursor.execute(self._FILTER_SELECT_SQL)
for row in cursor:
candidate = candidates[row[0]]
if row[1] is not None:
candidate.state = json.loads(row[1], object_hook=json_util.object_hook)
else:
candidate.state = None
results.append(candidate)
return results
# language=SQLite
_FETCH_LAST_COMPLETED_SQL = textwrap.dedent("""\
SELECT MAX(path)
FROM files
WHERE connection = :connection
AND completed = 1;
""")
[docs] def fetch_last_completed_file(self, connection_name: str) -> Optional[str]:
"""Fetches the path of the last completed file for a specific
connection.
:param connection_name: Name of the connection to the file server
"""
last_completed_file: Optional[str] = None
params = {'connection': connection_name}
with self.sql_conn:
cur = self.sql_conn.execute(self._FETCH_LAST_COMPLETED_SQL, params)
row = cur.fetchone()
if row:
last_completed_file = row[0]
return last_completed_file
# TODO: Test performance against a large database. Use dynamic query instead?
# language=SQLite
_PURGE_SCHEMA_SQL = textwrap.dedent("""\
DROP TABLE IF EXISTS purged;
CREATE TEMP TABLE purged (asset TEXT, stream_type TEXT);
""")
# language=SQLite
_PURGE_INSERT_SQL = "INSERT INTO purged VALUES (?, ?);"
# language=SQLite
_PURGE_DELETE_SQL = textwrap.dedent("""\
DELETE FROM files
WHERE EXISTS (
SELECT 1
FROM purged
WHERE files.asset = purged.asset
AND files.stream_type = purged.stream_type
);
""")
[docs] def purge(self, streams: List[InputStreamId]) -> None:
"""Removes all buffered data for the specified streams.
:param streams: list of (asset, stream_type) pairs.
"""
with self.sql_conn, contextlib.closing(self.sql_conn.cursor()) as cursor:
params = [(s.asset, s.stream_type) for s in streams]
cursor.execute("BEGIN")
cursor.executescript(self._PURGE_SCHEMA_SQL)
cursor.executemany(self._PURGE_INSERT_SQL, params)
cursor.execute(self._PURGE_DELETE_SQL)