Skip to content

Compatibility with Kafka consumers and usage notes

Python parameters

parameterDescription
bootstrap_serversThe cluster address for initial connection, in the format of Project.Endpoint:Port. Configure this parameter based on the endpoint of the project. For more information, see View endpoints
- Alibaba Cloud internal network: The port number is 10011. Example: project.cn-hangzhou-intranet.log.aliyuncs.com:10011
- Internet: The port number is 10012. Example: project.cn-hangzhou.log.aliyuncs.com:10012
group_idThe ID of the consumer group. You can add consumers to a consumer group. Then, you can specify the ID of the consumer group to implement load balancing, data processing, and data delivery. Example: "kafka-test"
security_protocolTo ensure the data transmission security Set the value to SASL_SSL
sasl_plain_usernameThe name of the Simple Log Service project.
sasl_plain_passwordThe AccessKey pair of your Alibaba Cloud account, in the {access-key-id}#{access-key-secret format. Replace {access-key-id with your AccessKey ID and {access-key-secret} with your AccessKey secret. We recommend that you use the AccessKey pair of a Resource Access Management (RAM) user. For more information, see Create a RAM user and authorize the RAM user to access Simple Log Service
sasl_mechanismSet the value to"PLAIN"
max_poll_interval_msThe time range during which a consumer group waits for all consumers to join the group after a consumer initiates a request to join the group. Consumers that join the consumer group within this time range become members of the consumer group and are allocated partitions. Each consumer consumes only data in the allocated partitions. If consumers do not join the consumer group within this time range, rebalancing is triggered within the consumer group. In the rebalancing process, no data is consumed, which causes consumption latency. Recommended value: 130000. Unit: ms. This value allows for sufficient time for consumers to join a consumer group. If you use confluent-kafka-go to consume data, you must set the max.poll.interval.ms parameter to a value that is greater than the value of the session.timeout.ms parameter.
session_timeout_msThe heartbeat timeout period. If a consumer does not send a heartbeat request within the timeout period, the consumer is considered abnormal, and rebalancing is triggered within the consumer group. Recommended value:,session.timeout.ms 为 120000MS
auto_offset_resetauto.offset.reset The start position of consumption. Common values: latest and earliest. Default value: latest
earliest If an offset is committed, a consumer starts to read data from the committed offset. If no offset is committed, a consumer starts to read data from the beginning.
If an offset is committed, a consumer starts to read data from the committed offset. If no offset is committed, a consumer starts to read data from the latest message.

Python sample code

python
import os
import time

from kafka import KafkaConsumer

accessKeyId = os.environ.get('SLS_ACCESS_KEY_ID')
accessKeySecret = os.environ.get('SLS_ACCESS_KEY_SECRET')
project = "project"
logstore = "logstore"
endpoint = "cn-hangzhou.log.aliyuncs.com"
port = "10012"

groupId = "kafka-test"
kafkaEndpoint = "{}.{}:{}".format(project, endpoint, port)


def getKafkaConsumer():
    consumer = KafkaConsumer(logstore,
                             bootstrap_servers=kafkaEndpoint,
                             sasl_plain_username=project,
                             group_id=groupId,
                             auto_offset_reset='earliest',
                             sasl_plain_password="{}#{}".format(accessKeyId, accessKeySecret),
                             sasl_mechanism="PLAIN",
                             max_poll_interval_ms=130000,
                             session_timeout_ms=120000,
                             api_version=(2, 1, 0),
                             security_protocol="SASL_SSL")
    return consumer


def main():
    consumer = getKafkaConsumer()
    consumer.subscribe(logstore)
    for message in consumer:
        print(message.topic, message.offset, message.key, message.value, message.value, message.partition)


if __name__ == '__main__':
    main()