Kafka Data Receiver

The kafka data receiver connects to and reads data from an Apache Kafka broker cluster.

Example:

If we want to retrieve data for topic asianpaints.SM_AA_Example_1.cycle from a kafka broker cluster at kafka.sm.io:9093,kafka2.sm.io:9093, our configuration will look something like this:

{
    "data_receiver": [
        {
            "data_receiver_name": "MyKafkaReceiver",
            "protocol": "kafka",
            "connections": [
                {
                    "broker_hosts": "kafka.sm.io:9093,kafka2.sm.io:9093",
                    "username": "username",
                    "password": "password"
                }
            ],
            "streams": [
                {
                    "asset": "SM_AA_Example_1",
                    "stream_type": "cycle",
                    "topic": "asianpaints.SM_AA_Example_1.cycle"
                }
            ]
        }
    ]
 }

Configuration:

Required and optional properties that can be configured for a Kafka receiver:

  • connections: How to connect to a kafka broker cluster

    A Kafka connection has the following configurable settings :

    • broker_hosts: comma-separated list of host:ports, such as kafka.sm.io:9093,kafka2.sm.io:9093

    • username: Username used along with a password to authenticate against a kafka broker cluster

    • password: Password used along with a username to authenticate against a kafka broker cluster

    • use_ssl: True if using SSL+SASL for authentication, False if not using ssl and username/password

  • streams: How to associate topics with assets and streams. Each input stream has the following configurable settings:

    • asset: Asset identifier

    • stream_type: Type of data stream

    • topic: The name of the topic where the data resides for the asset/stream_type combination

    • initial_offset: starting point for re-streaming data from this topic

  • poll_interval: The number of seconds to wait between attempts to fetch new data from the OPC UA server.

  • batch_size: The number of messages to consume from a kafka topic at a given time.

  • timeout: amount of time to wait before returning a call from kafka

  • add_partition_info: Add partition info to the message payload if True as (“_partition_id” and “_partition_offset”)