Source code for factorytx.receivers.file.state

import contextlib
import json
import logging
import os
import sqlite3
import textwrap
from typing import List, Optional

from bson import json_util

from factorytx.dataflow import InputStreamId
from factorytx.receivers.file.base import Connection, FileEntry, Parser
from factorytx.sqlite import set_journal_size_limit, MigrationDict, DBMigrator
from factorytx.utils import ensure_dir_exists


log = logging.getLogger(__name__)


# TODO: Merge all file receiver-related files into a single source file.
[docs]class ParsingCandidate: """Holds the data necessary to process a file for a given stream. To simplify the act of fetching, parsing, transmitting, and deleting data this object has attributes that would normally be scattered over several data structures: :ivar connection: Connection to use to retrieve the file and (optionally) delete it. :ivar parser: Parser to apply to the file after retrieving it. :ivar file: metadata about the file, eg. it's path on the remote server. :ivar input_stream_id: identifier for the input stream associated with the file. :ivar asset: stream to associate with the data extracted from the file. :ivar stream_type: stream to associate with the data extracted from the file. :ivar state: state dictionary from the last time the file was parsed, or None if the file has not been processed before. """ __slots__ = ['connection', 'parser', 'file', 'input_stream_id', 'asset', 'stream_type', 'state'] def __init__(self, connection: Connection, parser: Parser, file: FileEntry, input_stream_id: InputStreamId, asset: str, stream_type: str, state: Optional[dict]) -> None: self.connection = connection self.parser = parser self.file = file self.input_stream_id = input_stream_id self.asset = asset self.stream_type = stream_type self.state = state def __eq__(self, other: object) -> bool: return (isinstance(other, ParsingCandidate) and self.connection == other.connection and self.parser == other.parser and self.file == other.file and self.input_stream_id == other.input_stream_id and self.asset == other.asset and self.stream_type == other.stream_type and self.state == other.state) def __repr__(self) -> str: s = (f'ParsingCandidate(connection={self.connection!r}, parser={self.parser!r}, ' f'file={self.file!r}, input_stream_id={self.input_stream_id!r}, ' f'asset={self.asset!r}, stream_type={self.stream_type!r} state={self.state!r})') return s
class StateStoreDBMigrator(DBMigrator): def _SCHEMA_SQL(self) -> str: # language=SQLite return textwrap.dedent("""\ CREATE TABLE IF NOT EXISTS files ( -- Input stream that the file was parsed by. This is stored as asset -- and stream_type instead of a stream ID for backwards-compatibility. asset TEXT NOT NULL, stream_type TEXT NOT NULL, -- Path to the file on the remote system. path TEXT NOT NULL, -- Modification time of the file as seconds since the epoch. mtime REAL NOT NULL, -- Size of the file in bytes. size INTEGER NOT NULL, -- Arbitrary JSON data returned from the parser. This is used to extract -- new records from files that are continuously updated. state JSON NOT NULL, -- Has the file been completely parsed yet? If zero, the file has been -- only partially parsed (eg. FactoryTX was stopped before it finished -- parsing and emitting data frames.) completed TINYINT NOT NULL DEFAULT 1, -- Name of the connection that retrieved the file from the server connection TEXT DEFAULT NULL, PRIMARY KEY (connection, asset, stream_type, path) ON CONFLICT REPLACE ); CREATE INDEX IF NOT EXISTS ix_files_connection_completed_path ON files(connection, completed, path); """) def _MIGRATION_SQL(self) -> MigrationDict: return { "0000-initial-schema": textwrap.dedent(""" CREATE TABLE files ( -- Input stream that the file was parsed by. This is stored as asset -- and stream_type instead of a stream ID for backwards-compatibility. asset TEXT NOT NULL, stream_type TEXT NOT NULL, -- Path to the file on the remote system. path TEXT NOT NULL, -- Modification time of the file as seconds since the epoch. mtime REAL NOT NULL, -- Size of the file in bytes. size INTEGER NOT NULL, -- Arbitrary JSON data returned from the parser. This is used to extract -- new records from files that are continuously updated. state JSON NOT NULL, PRIMARY KEY (asset, stream_type, path) ON CONFLICT REPLACE ); """), "0001-add-completed": textwrap.dedent(""" ALTER TABLE files ADD COLUMN completed TINYINT NOT NULL DEFAULT 1; """), "0002-add-connection-info": textwrap.dedent(""" ALTER TABLE files ADD COLUMN connection TEXT DEFAULT NULL; """), "0003-create-idx-connection-completed-path": textwrap.dedent(""" CREATE INDEX IF NOT EXISTS ix_files_connection_completed_path ON files(connection, completed, path); """), "0004-add-connection-info-to-pk": textwrap.dedent(""" -- adapted from http://www.sqlitetutorial.net/sqlite-alter-table/ ALTER TABLE files RENAME TO temp_files; -- https://stackoverflow.com/questions/42530689/how-to-rename-an-index-in-sqlite DROP INDEX ix_files_connection_completed_path; CREATE TABLE files ( asset TEXT NOT NULL, stream_type TEXT NOT NULL, path TEXT NOT NULL, mtime REAL NOT NULL, size INTEGER NOT NULL, state JSON NOT NULL, completed TINYINT NOT NULL DEFAULT 1, connection TEXT DEFAULT NULL, PRIMARY KEY (connection, asset, stream_type, path) ON CONFLICT REPLACE ); CREATE INDEX ix_files_connection_completed_path ON files(connection, completed, path); INSERT INTO files (asset, stream_type, path, mtime, size, state, completed, connection) SELECT asset, stream_type, path, mtime, size, state, completed, connection FROM temp_files; DROP TABLE temp_files; """), }
[docs]class StateStore: """Tracks parser state and file metadata (eg. file size and modification time) for files that have been parsed and pushed to a transmit. """ _CONNECT_SQL = textwrap.dedent("""\ -- Keep the journal after committing. This ensures that if we run out of disk -- space we can still delete records from the database. -- We need to run this BEFORE any other statements. Otherwise sqlite may -- start an implicit transaction and delete the journal before we change modes. PRAGMA journal_mode = PERSIST; -- Enable online incremental vacuuming. PRAGMA auto_vacuum = INCREMENTAL; """) _JOURNAL_SIZE_BYTES = 1 * 1024 * 1024 # 1 MiB def __init__(self, path: str) -> None: """Instantiates a StateStore. :param path: path to a file on disk to use to store the state. It and its parent directory will be created if they do not already exist. """ ensure_dir_exists(os.path.dirname(path)) self.path = path self.sql_conn = sqlite3.connect(path) self.sql_conn.isolation_level = None self.sql_conn.executescript(self._CONNECT_SQL) self.initialized = False
[docs] def initialize( self, force_migrate: bool = False, db_migrator: Optional[StateStoreDBMigrator] = None ) -> None: """ :param force_migrate: if True, then apply migration scripts instead of the schema script even if the database does not exist. Defaults to False. :param additional_init_fn: An initialization function called within the context of state initialization, but after the primary state initialization has occurred. """ set_journal_size_limit(self.sql_conn, self.path, self._JOURNAL_SIZE_BYTES) if db_migrator is None: db_migrator = StateStoreDBMigrator() db_migrator.migrate(self.sql_conn, force_migrate=force_migrate) # Reset the journal size limit now that migrations are complete. self.sql_conn.execute('PRAGMA journal_size_limit = -1') self.initialized = True
def close(self) -> None: self.sql_conn.close() # language=SQLite _INSERT_SQL = textwrap.dedent("""\ INSERT INTO files (asset, stream_type, path, mtime, size, completed, state, connection) VALUES (:asset, :stream_type, :path, :mtime, :size, :completed, :state, :connection); """)
[docs] def record(self, parsed: ParsingCandidate, completed: bool, clear_state_on_completed: bool = False) -> None: """Stores or updates the state of a parsed file on disk. The `parsed` parameter must be a ParsingCandidate with an associated state dict. """ if clear_state_on_completed: parsed.state = {} with self.sql_conn: params = { 'asset': parsed.asset, 'stream_type': parsed.stream_type, 'path': parsed.file.path, 'mtime': parsed.file.mtime, 'size': parsed.file.size, 'completed': completed, 'state': json.dumps(parsed.state, default=json_util.default, separators=(',', ':')), 'connection': parsed.connection.name } self.sql_conn.execute(self._INSERT_SQL, params)
# language=SQLite _FILTER_SCHEMA_SQL = textwrap.dedent("""\ DROP TABLE IF EXISTS candidates; CREATE TEMP TABLE candidates ( -- obj_index is the position of the original ParsingCandidate in the -- candidate list. obj_index INTEGER, asset TEXT, stream_type TEXT, path TEXT, connection TEXT DEFAULT NULL, mtime REAL, size INTEGER ); """) # language=SQLite _FILTER_INSERT_SQL = "INSERT INTO candidates VALUES (?, ?, ?, ?, ?, ?, ?);" # language=SQLite _FILTER_SELECT_SQL = textwrap.dedent("""\ SELECT c.obj_index, f.state FROM candidates c LEFT JOIN files f ON c.connection = f.connection AND c.asset = f.asset AND c.stream_type = f.stream_type AND c.path = f.path WHERE NOT f.completed OR c.mtime IS NOT f.mtime OR c.size IS NOT f.size ORDER BY c.mtime ASC; """) # language=SQLite _FILTER_SELECT_ALPHA_ORDER_SQL = textwrap.dedent("""\ SELECT c.obj_index, f.state FROM candidates c LEFT JOIN files f ON c.connection = f.connection AND c.asset = f.asset AND c.stream_type = f.stream_type AND c.path = f.path WHERE NOT f.completed OR c.mtime IS NOT f.mtime OR c.size IS NOT f.size ORDER BY c.path ASC; """)
[docs] def fetch_new_or_modified(self, candidates: List[ParsingCandidate], alphabetical: bool = False) -> List[ParsingCandidate]: """Returns all candidates that have not been previously parsed or have changed since they were last parsed. As a side effect, this method fetches the state dictionary of previously-parsed files from persistent storage. """ results: List[ParsingCandidate] = [] with self.sql_conn, contextlib.closing(self.sql_conn.cursor()) as cursor: cursor.execute("BEGIN") cursor.executescript(self._FILTER_SCHEMA_SQL) rows = [(ix, c.input_stream_id.asset, c.input_stream_id.stream_type, c.file.path, c.connection.name, c.file.mtime, c.file.size) for ix, c in enumerate(candidates)] cursor.executemany(self._FILTER_INSERT_SQL, rows) if alphabetical: # Order by path (filename) cursor.execute(self._FILTER_SELECT_ALPHA_ORDER_SQL) else: # Order by last modified (or created) time cursor.execute(self._FILTER_SELECT_SQL) for row in cursor: candidate = candidates[row[0]] if row[1] is not None: candidate.state = json.loads(row[1], object_hook=json_util.object_hook) else: candidate.state = None results.append(candidate) return results
# language=SQLite _FETCH_LAST_COMPLETED_SQL = textwrap.dedent("""\ SELECT MAX(path) FROM files WHERE connection = :connection AND completed = 1; """)
[docs] def fetch_last_completed_file(self, connection_name: str) -> Optional[str]: """Fetches the path of the last completed file for a specific connection. :param connection_name: Name of the connection to the file server """ last_completed_file: Optional[str] = None params = {'connection': connection_name} with self.sql_conn: cur = self.sql_conn.execute(self._FETCH_LAST_COMPLETED_SQL, params) row = cur.fetchone() if row: last_completed_file = row[0] return last_completed_file
# TODO: Test performance against a large database. Use dynamic query instead? # language=SQLite _PURGE_SCHEMA_SQL = textwrap.dedent("""\ DROP TABLE IF EXISTS purged; CREATE TEMP TABLE purged (asset TEXT, stream_type TEXT); """) # language=SQLite _PURGE_INSERT_SQL = "INSERT INTO purged VALUES (?, ?);" # language=SQLite _PURGE_DELETE_SQL = textwrap.dedent("""\ DELETE FROM files WHERE EXISTS ( SELECT 1 FROM purged WHERE files.asset = purged.asset AND files.stream_type = purged.stream_type ); """)
[docs] def purge(self, streams: List[InputStreamId]) -> None: """Removes all buffered data for the specified streams. :param streams: list of (asset, stream_type) pairs. """ with self.sql_conn, contextlib.closing(self.sql_conn.cursor()) as cursor: params = [(s.asset, s.stream_type) for s in streams] cursor.execute("BEGIN") cursor.executescript(self._PURGE_SCHEMA_SQL) cursor.executemany(self._PURGE_INSERT_SQL, params) cursor.execute(self._PURGE_DELETE_SQL)