SQL Data Receiver¶
The sql
data receiver connects to SQL database and reads data stored
in its tables.
- Example:
If we want to retrieve
Students
data from a PostgreSQL database namedp1
atlocalhost
, our configuration file will look something like this:{ "data_receiver": [ { "data_receiver_name": "my_psql_receiver", "protocol": "sql", "poll_interval": 10, "connections": [ { "host": "127.0.0.1", "port": 5432, "username": "postgres", "password": "postgres", "database_type": "postgresql", "database_name": "p1", "ssl": false } ], "streams": [ { "asset": "Students", "stream_type": "cycle", "query": "SELECT * FROM students WHERE id > :id ORDER BY id ASC LIMIT 10", "state_fields": [ { "column_name": "id", "data_type": "integer" } ] } ] } ] }
Example BigQuery configuration:
{ "connections": [ { "database_type": "bigquery", "bigquery_project_id": "sightmachine-sandbox", "bigquery_dataset": "test_dataset", "gcloud_b64_credentials": "${FTX_CREDS_GCLOUD}" } ], "data_receiver_name": "my_bigquery_receiver", "poll_interval": 10, "protocol": "sql", "streams": [ { "asset": "SOME_ASSET", "query": "SELECT * FROM ftx_integration_test_bigquery " "WHERE DummyID > :DummyID ORDER BY DummyID ASC", "state_fields": [ { "column_name": "DummyID", "data_type": "string" } ], "stream_type": "some_stream_type" } ] }
Configuration:
Required and optional properties that can be configured for an OPC UA receiver:
data_receiver_name: Unique name of the data receiver. This name will be used to track the progress state of the data stream.
protocol: Protocol to be used. Should be set to
sql
.connections: How to connect to the SQL database. A SQL connection has the following configurable settings:
database_url: A SQLAlchemy-compatible connection URL string of the form
database_type+driver://{user}:{password}@{host}[:{port}]/{database_name}[?key1=value1&key2=value2...]
, where fields in square brackets are optional.This setting can be used instead of host, port, username, password, and database_name. It permits passing advanced parameters to the database driver (e.g.
mssql+pymssql://sm:sm@196.168.19.22:5555/weld?appname=Sight%20Machine
).FactoryTX supports the following database_type+driver combinations:
mssql+pymssql
,mysql+mysqldb
,postgresql+psycopg2
, andsqlite+pysqlite
,kustosql+https
.For connecting to BigQuery, use the following format:
bigquery://{project_id}/{dataset}
(dataset is optional and just used as the default for stream queries).For connecting to Snowflake, use the following format:
snowflake://{user}:{password}@{account}
host: Hostname or IP address of the database server
port: Port of the database server. Defaults to 5432.
username: Username to use to connect to the database
password: Password to use to connect to the database
database_type: Type of database (i.e.
postgresql
,mssql
,mysql
,sqlite
,oracle
,kustosql
,bigquery
).database_name: Name of the database to query
ssl: If set to true, only a secure SSL TCP/IP connection will be established with the database server. Defaults to False.
max_concurrent_connections: Maximum number of connections to establish with the SQL server.
max_connection_wait: Maximum number of seconds to wait for an available connection to the SQL server. Defaults to 300, i.e. five minutes. You may need to increase the default if you encounter frequent errors like “Query failed: QueuePool limit of size … overflow 0 reached, connection timed out”.
records_per_file (DEPRECATED): This option is ignored.
- Oracle-specific connection options:
service_name: Oracle service name to connect to. May be used instead of database_name or sid.
sid: Oracle SID to connect to. Alias for database_name.
- Azure Data Explorer-specific connection options:
azure_ad_client_id: Client id for the Azure authentication method using Active Directory.
azure_ad_client_secret: Client secret for the Azure authentication method using Active Directory.
azure_ad_tenant_id: Tenant id for the Azure authentication method using Active Directory.
azure_user_msi: Azure authentication method using the managed service identity.
- BigQuery-specific connection options:
gcloud_b64_credentials: Base64-encoded content of the Google Cloud Service Account credentials json file.
bigquery_project_id: BigQuery Project ID/Name.
bigquery_dataset: Default dataset to use for stream queries, if not provided then all queries must include the dataset name.
- Snowflake-specific connection options:
snowflake_account: Snowflake account name (account identifier). Has the format {organization_name}-{account_name} e.g. PGBRUSM-LZ94417.
user: Snowflake user name (user login).
password: Snowflake password (user password).
Also able to be specified with a full database_url instead.
streams: List of SQL data streams. Each input stream has the following configurable settings:
asset: Asset identifier. If a
source
field is not specified in the SQL query, this field is used assource
in records.stream_type: Type of data stream
query: Sets the SQL SELECT statement to fetch data from the specified data table. SQL aliases are supported. Query example: SELECT column1 AS c1 FROM table_name.
state_fields: Fields saved from the last row of one query for use in the subsequent query. This enables live streaming data from a SQL table, instead of pulling all rows a few times per day. State fields can be referenced in SQL queries as :column_name, eg. ‘SELECT * FROM tbl WHERE timestamp > :timestamp’. Each state_field configuration can have the following properties:
column_name: Name of the column to store between queries. Case-sensitive, so it must exactly match the column name returned by your database.
data_type: Type of the column. Must be one of
integer
,float
,string
, ortimestamp
.initial_value: The value of the field to use in the first query against a data source. If the initial value is not set, then it will be determined from the data type:
integers and floats default to
-1
strings default to
""
timestamps default to
1
CE
poll_interval: The number of seconds to wait between attempts to fetch new data from the SQL server.