Kafka Data Receiver¶
The kafka
data receiver connects to and reads data
from an Apache Kafka broker cluster.
- Example:
If we want to retrieve data for topic
asianpaints.SM_AA_Example_1.cycle
from a kafka broker cluster atkafka.sm.io:9093,kafka2.sm.io:9093
, our configuration will look something like this:{ "data_receiver": [ { "data_receiver_name": "MyKafkaReceiver", "protocol": "kafka", "connections": [ { "broker_hosts": "kafka.sm.io:9093,kafka2.sm.io:9093", "username": "username", "password": "password" } ], "streams": [ { "asset": "SM_AA_Example_1", "stream_type": "cycle", "topic": "asianpaints.SM_AA_Example_1.cycle" } ] } ] }
Configuration:
Required and optional properties that can be configured for a Kafka receiver:
connections: How to connect to a kafka broker cluster
A Kafka connection has the following configurable settings :
broker_hosts: comma-separated list of host:ports, such as kafka.sm.io:9093,kafka2.sm.io:9093
username: Username used along with a password to authenticate against a kafka broker cluster
password: Password used along with a username to authenticate against a kafka broker cluster
use_ssl: True if using SSL+SASL for authentication, False if not using ssl and username/password
streams: How to associate topics with assets and streams. Each input stream has the following configurable settings:
asset: Asset identifier
stream_type: Type of data stream
topic: The name of the topic where the data resides for the asset/stream_type combination
initial_offset: starting point for re-streaming data from this topic
poll_interval: The number of seconds to wait between attempts to fetch new data from the OPC UA server.
batch_size: The number of messages to consume from a kafka topic at a given time.
timeout: amount of time to wait before returning a call from kafka
add_partition_info: Add partition info to the message payload if True as (“_partition_id” and “_partition_offset”)