import contextlib
import json
import logging
import os
import sqlite3
import textwrap
from typing import List, Optional
from bson import json_util
from factorytx.dataflow import InputStreamId
from factorytx.receivers.file.base import Connection, FileEntry, Parser
from factorytx.sqlite import set_journal_size_limit, MigrationDict, DBMigrator
from factorytx.utils import ensure_dir_exists
log = logging.getLogger(__name__)
# TODO: Merge all file receiver-related files into a single source file.
[docs]class ParsingCandidate:
    """Holds the data necessary to process a file for a given stream. To
    simplify the act of fetching, parsing, transmitting, and deleting data this
    object has attributes that would normally be scattered over several data
    structures:
    :ivar connection: Connection to use to retrieve the file and (optionally)
        delete it.
    :ivar parser: Parser to apply to the file after retrieving it.
    :ivar file: metadata about the file, eg. it's path on the remote server.
    :ivar input_stream_id: identifier for the input stream associated with the file.
    :ivar asset: stream to associate with the data extracted from the file.
    :ivar stream_type: stream to associate with the data extracted from the file.
    :ivar state: state dictionary from the last time the file was parsed, or
        None if the file has not been processed before.
    """
    __slots__ = ['connection', 'parser', 'file', 'input_stream_id', 'asset', 'stream_type', 'state']
    def __init__(self, connection: Connection, parser: Parser, file: FileEntry,
                 input_stream_id: InputStreamId, asset: str, stream_type: str,
                 state: Optional[dict]) -> None:
        self.connection = connection
        self.parser = parser
        self.file = file
        self.input_stream_id = input_stream_id
        self.asset = asset
        self.stream_type = stream_type
        self.state = state
    def __eq__(self, other: object) -> bool:
        return (isinstance(other, ParsingCandidate) and
                self.connection == other.connection and
                self.parser == other.parser and
                self.file == other.file and
                self.input_stream_id == other.input_stream_id and
                self.asset == other.asset and
                self.stream_type == other.stream_type and
                self.state == other.state)
    def __repr__(self) -> str:
        s = (f'ParsingCandidate(connection={self.connection!r}, parser={self.parser!r}, '
             f'file={self.file!r}, input_stream_id={self.input_stream_id!r}, '
             f'asset={self.asset!r}, stream_type={self.stream_type!r} state={self.state!r})')
        return s 
class StateStoreDBMigrator(DBMigrator):
    def _SCHEMA_SQL(self) -> str:
        # language=SQLite
        return textwrap.dedent("""\
            CREATE TABLE IF NOT EXISTS files (
                -- Input stream that the file was parsed by. This is stored as asset
                -- and stream_type instead of a stream ID for backwards-compatibility.
                asset TEXT NOT NULL,
                stream_type TEXT NOT NULL,
                -- Path to the file on the remote system.
                path TEXT NOT NULL,
                -- Modification time of the file as seconds since the epoch.
                mtime REAL NOT NULL,
                -- Size of the file in bytes.
                size INTEGER NOT NULL,
                -- Arbitrary JSON data returned from the parser. This is used to extract
                -- new records from files that are continuously updated.
                state JSON NOT NULL,
                -- Has the file been completely parsed yet? If zero, the file has been
                -- only partially parsed (eg. FactoryTX was stopped before it finished
                -- parsing and emitting data frames.)
                completed TINYINT NOT NULL DEFAULT 1,
                -- Name of the connection that retrieved the file from the server
                connection TEXT DEFAULT NULL,
                PRIMARY KEY (connection, asset, stream_type, path) ON CONFLICT REPLACE
            );
            CREATE INDEX IF NOT EXISTS ix_files_connection_completed_path
                ON files(connection, completed, path);
            """)
    def _MIGRATION_SQL(self) -> MigrationDict:
        return {
            "0000-initial-schema": textwrap.dedent("""
            CREATE TABLE files (
                -- Input stream that the file was parsed by. This is stored as asset
                -- and stream_type instead of a stream ID for backwards-compatibility.
                asset TEXT NOT NULL,
                stream_type TEXT NOT NULL,
                -- Path to the file on the remote system.
                path TEXT NOT NULL,
                -- Modification time of the file as seconds since the epoch.
                mtime REAL NOT NULL,
                -- Size of the file in bytes.
                size INTEGER NOT NULL,
                -- Arbitrary JSON data returned from the parser. This is used to extract
                -- new records from files that are continuously updated.
                state JSON NOT NULL,
                PRIMARY KEY (asset, stream_type, path) ON CONFLICT REPLACE
            );
            """),
            "0001-add-completed": textwrap.dedent("""
            ALTER TABLE files ADD COLUMN completed TINYINT NOT NULL DEFAULT 1;
            """),
            "0002-add-connection-info": textwrap.dedent("""
            ALTER TABLE files ADD COLUMN connection TEXT DEFAULT NULL;
            """),
            "0003-create-idx-connection-completed-path": textwrap.dedent("""
            CREATE INDEX IF NOT EXISTS ix_files_connection_completed_path
                ON files(connection, completed, path);
            """),
            "0004-add-connection-info-to-pk": textwrap.dedent("""
            -- adapted from http://www.sqlitetutorial.net/sqlite-alter-table/
            ALTER TABLE files RENAME TO temp_files;
            -- https://stackoverflow.com/questions/42530689/how-to-rename-an-index-in-sqlite
            DROP INDEX ix_files_connection_completed_path;
            CREATE TABLE files (
                asset TEXT NOT NULL,
                stream_type TEXT NOT NULL,
                path TEXT NOT NULL,
                mtime REAL NOT NULL,
                size INTEGER NOT NULL,
                state JSON NOT NULL,
                completed TINYINT NOT NULL DEFAULT 1,
                connection TEXT DEFAULT NULL,
                PRIMARY KEY (connection, asset, stream_type, path) ON CONFLICT REPLACE
            );
            CREATE INDEX ix_files_connection_completed_path
            ON files(connection, completed, path);
            INSERT INTO files (asset, stream_type, path, mtime, size, state, completed, connection)
            SELECT asset, stream_type, path, mtime, size, state, completed, connection
            FROM temp_files;
            DROP TABLE temp_files;
            """),
        }
[docs]class StateStore:
    """Tracks parser state and file metadata (eg. file size and modification
    time) for files that have been parsed and pushed to a transmit.
    """
    _CONNECT_SQL = textwrap.dedent("""\
    -- Keep the journal after committing. This ensures that if we run out of disk
    -- space we can still delete records from the database.
    -- We need to run this BEFORE any other statements. Otherwise sqlite may
    -- start an implicit transaction and delete the journal before we change modes.
    PRAGMA journal_mode = PERSIST;
    -- Enable online incremental vacuuming.
    PRAGMA auto_vacuum = INCREMENTAL;
    """)
    _JOURNAL_SIZE_BYTES = 1 * 1024 * 1024  # 1 MiB
    def __init__(self, path: str) -> None:
        """Instantiates a StateStore.
        :param path: path to a file on disk to use to store the state. It and
            its parent directory will be created if they do not already exist.
        """
        ensure_dir_exists(os.path.dirname(path))
        self.path = path
        self.sql_conn = sqlite3.connect(path)
        self.sql_conn.isolation_level = None
        self.sql_conn.executescript(self._CONNECT_SQL)
        self.initialized = False
[docs]    def initialize(
            self, force_migrate: bool = False, db_migrator: Optional[StateStoreDBMigrator] = None
    ) -> None:
        """
        :param force_migrate: if True, then apply migration scripts instead of the
            schema script even if the database does not exist. Defaults to False.
        :param additional_init_fn: An initialization function called within the context of state initialization,
            but after the primary state initialization has occurred.
        """
        set_journal_size_limit(self.sql_conn, self.path, self._JOURNAL_SIZE_BYTES)
        if db_migrator is None:
            db_migrator = StateStoreDBMigrator()
        db_migrator.migrate(self.sql_conn, force_migrate=force_migrate)
        # Reset the journal size limit now that migrations are complete.
        self.sql_conn.execute('PRAGMA journal_size_limit = -1')
        self.initialized = True 
    def close(self) -> None:
        self.sql_conn.close()
    # language=SQLite
    _INSERT_SQL = textwrap.dedent("""\
    INSERT INTO files (asset, stream_type, path, mtime, size, completed, state, connection)
         VALUES (:asset, :stream_type, :path, :mtime, :size, :completed, :state, :connection);
    """)
[docs]    def record(self, parsed: ParsingCandidate, completed: bool) -> None:
        """Stores or updates the state of a parsed file on disk. The `parsed`
        parameter must be a ParsingCandidate with an associated state dict.
        """
        with self.sql_conn:
            params = {
                'asset': parsed.asset,
                'stream_type': parsed.stream_type,
                'path': parsed.file.path,
                'mtime': parsed.file.mtime,
                'size': parsed.file.size,
                'completed': completed,
                'state': json.dumps(parsed.state, default=json_util.default, separators=(',', ':')),
                'connection': parsed.connection.name
            }
            self.sql_conn.execute(self._INSERT_SQL, params) 
    # language=SQLite
    _FILTER_SCHEMA_SQL = textwrap.dedent("""\
    DROP TABLE IF EXISTS candidates;
    CREATE TEMP TABLE candidates (
        -- obj_index is the position of the original ParsingCandidate in the
        -- candidate list.
        obj_index INTEGER,
        asset TEXT,
        stream_type TEXT,
        path TEXT,
        connection TEXT DEFAULT NULL,
        mtime REAL,
        size INTEGER
    );
    """)
    # language=SQLite
    _FILTER_INSERT_SQL = "INSERT INTO candidates VALUES (?, ?, ?, ?, ?, ?, ?);"
    # language=SQLite
    _FILTER_SELECT_SQL = textwrap.dedent("""\
    SELECT c.obj_index, f.state
      FROM candidates c
      LEFT JOIN files f
       ON c.connection = f.connection
       AND c.asset = f.asset
       AND c.stream_type = f.stream_type
       AND c.path = f.path
      WHERE NOT f.completed
        OR c.mtime IS NOT f.mtime
        OR c.size IS NOT f.size
      ORDER BY c.mtime ASC;
    """)
    # language=SQLite
    _FILTER_SELECT_ALPHA_ORDER_SQL = textwrap.dedent("""\
    SELECT c.obj_index, f.state
      FROM candidates c
      LEFT JOIN files f
       ON c.connection = f.connection
       AND c.asset = f.asset
       AND c.stream_type = f.stream_type
       AND c.path = f.path
      WHERE NOT f.completed
        OR c.mtime IS NOT f.mtime
        OR c.size IS NOT f.size
      ORDER BY c.path ASC;
    """)
[docs]    def fetch_new_or_modified(self, candidates: List[ParsingCandidate],
                              alphabetical: bool = False) -> List[ParsingCandidate]:
        """Returns all candidates that have not been previously parsed or have
        changed since they were last parsed. As a side effect, this method
        fetches the state dictionary of previously-parsed files from persistent
        storage.
        """
        results: List[ParsingCandidate] = []
        with self.sql_conn, contextlib.closing(self.sql_conn.cursor()) as cursor:
            cursor.execute("BEGIN")
            cursor.executescript(self._FILTER_SCHEMA_SQL)
            rows = [(ix, c.input_stream_id.asset, c.input_stream_id.stream_type,
                     c.file.path, c.connection.name,
                     c.file.mtime, c.file.size)
                    for ix, c in enumerate(candidates)]
            cursor.executemany(self._FILTER_INSERT_SQL, rows)
            if alphabetical:
                # Order by path (filename)
                cursor.execute(self._FILTER_SELECT_ALPHA_ORDER_SQL)
            else:
                # Order by last modified (or created) time
                cursor.execute(self._FILTER_SELECT_SQL)
            for row in cursor:
                candidate = candidates[row[0]]
                if row[1] is not None:
                    candidate.state = json.loads(row[1], object_hook=json_util.object_hook)
                else:
                    candidate.state = None
                results.append(candidate)
        return results 
    # language=SQLite
    _FETCH_LAST_COMPLETED_SQL = textwrap.dedent("""\
    SELECT MAX(path)
      FROM files
      WHERE connection = :connection
        AND completed = 1;
    """)
[docs]    def fetch_last_completed_file(self, connection_name: str) -> Optional[str]:
        """Fetches the path of the last completed file for a specific
        connection.
        :param connection_name: Name of the connection to the file server
        """
        last_completed_file: Optional[str] = None
        params = {'connection': connection_name}
        with self.sql_conn:
            cur = self.sql_conn.execute(self._FETCH_LAST_COMPLETED_SQL, params)
            row = cur.fetchone()
            if row:
                last_completed_file = row[0]
        return last_completed_file 
    # TODO: Test performance against a large database. Use dynamic query instead?
    # language=SQLite
    _PURGE_SCHEMA_SQL = textwrap.dedent("""\
    DROP TABLE IF EXISTS purged;
    CREATE TEMP TABLE purged (asset TEXT, stream_type TEXT);
    """)
    # language=SQLite
    _PURGE_INSERT_SQL = "INSERT INTO purged VALUES (?, ?);"
    # language=SQLite
    _PURGE_DELETE_SQL = textwrap.dedent("""\
    DELETE FROM files
     WHERE EXISTS (
       SELECT 1
         FROM purged
        WHERE files.asset = purged.asset
          AND files.stream_type = purged.stream_type
     );
    """)
[docs]    def purge(self, streams: List[InputStreamId]) -> None:
        """Removes all buffered data for the specified streams.
        :param streams: list of (asset, stream_type) pairs.
        """
        with self.sql_conn, contextlib.closing(self.sql_conn.cursor()) as cursor:
            params = [(s.asset, s.stream_type) for s in streams]
            cursor.execute("BEGIN")
            cursor.executescript(self._PURGE_SCHEMA_SQL)
            cursor.executemany(self._PURGE_INSERT_SQL, params)
            cursor.execute(self._PURGE_DELETE_SQL)