Kafka Data Receiver

The kafka data receiver connects to and reads data from an Apache Kafka broker cluster.

Example:

If we want to retrieve data for topic asianpaints.SM_AA_Example_1.cycle from a kafka broker cluster at kafka.sm.io:9093,kafka2.sm.io:9093, our configuration will look something like this:

{
    "data_receiver": [
        {
            "data_receiver_name": "MyKafkaReceiver",
            "protocol": "kafka",
            "connections": [
                {
                    "broker_hosts": "kafka.sm.io:9093,kafka2.sm.io:9093",
                    "username": "username",
                    "password": "password"
                }
            ],
            "streams": [
                {
                    "asset": "SM_AA_Example_1",
                    "stream_type": "cycle",
                    "topic": "asianpaints.SM_AA_Example_1.cycle"
                }
            ]
        }
    ]
 }

Configuration:

Required and optional properties that can be configured for a Kafka receiver:

  • connections: How to connect to a kafka broker cluster

    A Kafka connection has the following configurable settings :

    • broker_hosts: comma-separated list of host:ports, such as kafka.sm.io:9093,kafka2.sm.io:9093

    • username: Username used along with a password to authenticate against a kafka broker cluster

    • password: Password used along with a username to authenticate against a kafka broker cluster

    • use_ssl: True if using SSL+SASL for authentication, False if not using ssl and username/password

    • ca_cert_path: Path to CA certificate for SSL connection

    • sasl_mechanism: SASL mechanism to use when connecting to Kafka broker, default is SCRAM-SHA-256

    • security_protocol: Security protocol to use when connecting to Kafka broker, default is SASL_SSL

  • streams: How to associate topics with assets and streams. Each input stream has the following configurable settings:

    • asset: Asset identifier

    • stream_type: Type of data stream

    • topic: The name of the topic where the data resides for the asset/stream_type combination

    • initial_offset: starting point for re-streaming data from this topic

    • consumer_group_id: Kafka consumer group id, default is empty string

    • normalize_payload: If True, attempt to normalize the message payload into a dictionary format if there is a JSON

    object in one of the message fields. Note that this won’t work if there is a list of objects in that field. An additional NormalizeTransform may be needed downstream to fully normalize the data.

  • poll_interval: The number of seconds to wait between attempts to fetch new data from the OPC UA server.

  • batch_size: The number of messages to consume from a kafka topic at a given time.

  • timeout: amount of time to wait before returning a call from kafka

  • add_partition_info: Add partition info to the message payload if True as (“_partition_id” and “_partition_offset”)