SQL Data Receiver

The sql data receiver connects to SQL database and reads data stored in its tables.

Example:

If we want to retrieve Students data from a PostgreSQL database named p1 at localhost, our configuration file will look something like this:

{
  "data_receiver": [
    {
      "data_receiver_name": "my_psql_receiver",
      "protocol": "sql",
      "poll_interval": 10,
      "connections": [
        {
          "host": "127.0.0.1",
          "port": 5432,
          "username": "postgres",
          "password": "postgres",
          "database_type": "postgresql",
          "database_name": "p1",
          "ssl": false
        }
      ],
      "streams": [
        {
          "asset": "Students",
          "stream_type": "cycle",
          "query": "SELECT * FROM students WHERE id > :id ORDER BY id ASC LIMIT 10",
          "state_fields": [
            {
              "column_name": "id",
              "data_type": "integer"
            }
          ]
        }
      ]
    }
  ]
}

Example BigQuery configuration:

{
  "connections": [
    {
      "database_type": "bigquery",
      "bigquery_project_id": "sightmachine-sandbox",
      "bigquery_dataset": "test_dataset",
      "gcloud_b64_credentials": "${FTX_CREDS_GCLOUD}"
    }
  ],
  "data_receiver_name": "my_bigquery_receiver",
  "poll_interval": 10,
  "protocol": "sql",
  "streams": [
    {
      "asset": "SOME_ASSET",
      "query": "SELECT * FROM ftx_integration_test_bigquery "
               "WHERE DummyID > :DummyID ORDER BY DummyID ASC",
      "state_fields": [
        {
          "column_name": "DummyID",
          "data_type": "string"
        }
      ],
      "stream_type": "some_stream_type"
    }
  ]
}

Configuration:

Required and optional properties that can be configured for an OPC UA receiver:

  • data_receiver_name: Unique name of the data receiver. This name will be used to track the progress state of the data stream.

  • protocol: Protocol to be used. Should be set to sql.

  • connections: How to connect to the SQL database. A SQL connection has the following configurable settings:

    • database_url: A SQLAlchemy-compatible connection URL string of the form database_type+driver://{user}:{password}@{host}[:{port}]/{database_name}[?key1=value1&key2=value2...], where fields in square brackets are optional.

      This setting can be used instead of host, port, username, password, and database_name. It permits passing advanced parameters to the database driver (e.g. mssql+pymssql://sm:sm@196.168.19.22:5555/weld?appname=Sight%20Machine).

      FactoryTX supports the following database_type+driver combinations: mssql+pymssql, mysql+mysqldb, postgresql+psycopg2, and sqlite+pysqlite, kustosql+https.

      For connecting to BigQuery, use the following format: bigquery://{project_id}/{dataset} (dataset is optional and just used as the default for stream queries).

      For connecting to Snowflake, use the following format: snowflake://{user}:{password}@{account}

    • host: Hostname or IP address of the database server

    • port: Port of the database server. Defaults to 5432.

    • username: Username to use to connect to the database

    • password: Password to use to connect to the database

    • database_type: Type of database (i.e. postgresql, mssql, mysql, sqlite, oracle, kustosql, bigquery).

    • database_name: Name of the database to query

    • ssl: If set to true, only a secure SSL TCP/IP connection will be established with the database server. Defaults to False.

    • max_concurrent_connections: Maximum number of connections to establish with the SQL server.

    • max_connection_wait: Maximum number of seconds to wait for an available connection to the SQL server. Defaults to 300, i.e. five minutes. You may need to increase the default if you encounter frequent errors like “Query failed: QueuePool limit of size … overflow 0 reached, connection timed out”.

    • default_query_timeout: Maximum number of seconds to wait for a query to complete. Defaults to 1800, i.e. thirty minutes. This can be overridden on a per-stream/query basis. Setting this to 0 will disable the timeout.

    • records_per_file (DEPRECATED): This option is ignored.

    • Oracle-specific connection options:
      • service_name: Oracle service name to connect to. May be used instead of database_name or sid.

      • sid: Oracle SID to connect to. Alias for database_name.

    • Azure Data Explorer-specific connection options:
      • azure_ad_client_id: Client id for the Azure authentication method using Active Directory.

      • azure_ad_client_secret: Client secret for the Azure authentication method using Active Directory.

      • azure_ad_tenant_id: Tenant id for the Azure authentication method using Active Directory.

      • azure_user_msi: Azure authentication method using the managed service identity.

    • BigQuery-specific connection options:
      • gcloud_b64_credentials: Base64-encoded content of the Google Cloud Service Account credentials json file.

      • bigquery_project_id: BigQuery Project ID/Name.

      • bigquery_dataset: Default dataset to use for stream queries, if not provided then all queries must include the dataset name.

    • Snowflake-specific connection options:
      • snowflake_account: Snowflake account name (account identifier). Has the format {organization_name}-{account_name} e.g. PGBRUSM-LZ94417.

      • user: Snowflake user name (user login).

      • password: Snowflake password (user password).

      • Also able to be specified with a full database_url instead.

  • streams: List of SQL data streams. Each input stream has the following configurable settings:

    • asset: Asset identifier. If a source field is not specified in the SQL query, this field is used as source in records.

    • stream_type: Type of data stream

    • query: Sets the SQL SELECT statement to fetch data from the specified data table. SQL aliases are supported. Query example: SELECT column1 AS c1 FROM table_name.

    • state_fields: Fields saved from the last row of one query for use in the subsequent query. This enables live streaming data from a SQL table, instead of pulling all rows a few times per day. State fields can be referenced in SQL queries as :column_name, eg. ‘SELECT * FROM tbl WHERE timestamp > :timestamp’. Each state_field configuration can have the following properties:

      • column_name: Name of the column to store between queries. Case-sensitive, so it must exactly match the column name returned by your database.

      • data_type: Type of the column. Must be one of integer, float, string, or timestamp.

      • initial_value: The value of the field to use in the first query against a data source. If the initial value is not set, then it will be determined from the data type:

        • integers and floats default to -1

        • strings default to ""

        • timestamps default to 1 CE

    • query_timeout: Maximum number of seconds to wait for a query to complete. Defaults to whatever default_query_timeout is, from the Connection config. Setting this to 0 will disable the timeout.

  • poll_interval: The number of seconds to wait between attempts to fetch new data from the SQL server.