Compatibility with Kafka consumers and usage notes
Python parameters
parameter | Description |
---|---|
bootstrap_servers | The cluster address for initial connection, in the format of Project.Endpoint:Port. Configure this parameter based on the endpoint of the project. For more information, see View endpoints - Alibaba Cloud internal network: The port number is 10011. Example: project.cn-hangzhou-intranet.log.aliyuncs.com:10011 - Internet: The port number is 10012. Example: project.cn-hangzhou.log.aliyuncs.com:10012 |
group_id | The ID of the consumer group. You can add consumers to a consumer group. Then, you can specify the ID of the consumer group to implement load balancing, data processing, and data delivery. Example: "kafka-test" |
security_protocol | To ensure the data transmission security Set the value to SASL_SSL |
sasl_plain_username | The name of the Simple Log Service project. |
sasl_plain_password | The AccessKey pair of your Alibaba Cloud account, in the {access-key-id}#{access-key-secret format. Replace {access-key-id with your AccessKey ID and {access-key-secret} with your AccessKey secret. We recommend that you use the AccessKey pair of a Resource Access Management (RAM) user. For more information, see Create a RAM user and authorize the RAM user to access Simple Log Service |
sasl_mechanism | Set the value to"PLAIN" |
max_poll_interval_ms | The time range during which a consumer group waits for all consumers to join the group after a consumer initiates a request to join the group. Consumers that join the consumer group within this time range become members of the consumer group and are allocated partitions. Each consumer consumes only data in the allocated partitions. If consumers do not join the consumer group within this time range, rebalancing is triggered within the consumer group. In the rebalancing process, no data is consumed, which causes consumption latency. Recommended value: 130000. Unit: ms. This value allows for sufficient time for consumers to join a consumer group. If you use confluent-kafka-go to consume data, you must set the max.poll.interval.ms parameter to a value that is greater than the value of the session.timeout.ms parameter. |
session_timeout_ms | The heartbeat timeout period. If a consumer does not send a heartbeat request within the timeout period, the consumer is considered abnormal, and rebalancing is triggered within the consumer group. Recommended value:,session.timeout.ms 为 120000MS |
auto_offset_reset | auto.offset.reset The start position of consumption. Common values: latest and earliest. Default value: latest |
earliest If an offset is committed, a consumer starts to read data from the committed offset. If no offset is committed, a consumer starts to read data from the beginning. | |
If an offset is committed, a consumer starts to read data from the committed offset. If no offset is committed, a consumer starts to read data from the latest message. |
Python sample code
python
import os
import time
from kafka import KafkaConsumer
accessKeyId = os.environ.get('SLS_ACCESS_KEY_ID')
accessKeySecret = os.environ.get('SLS_ACCESS_KEY_SECRET')
project = "project"
logstore = "logstore"
endpoint = "cn-hangzhou.log.aliyuncs.com"
port = "10012"
groupId = "kafka-test"
kafkaEndpoint = "{}.{}:{}".format(project, endpoint, port)
def getKafkaConsumer():
consumer = KafkaConsumer(logstore,
bootstrap_servers=kafkaEndpoint,
sasl_plain_username=project,
group_id=groupId,
auto_offset_reset='earliest',
sasl_plain_password="{}#{}".format(accessKeyId, accessKeySecret),
sasl_mechanism="PLAIN",
max_poll_interval_ms=130000,
session_timeout_ms=120000,
api_version=(2, 1, 0),
security_protocol="SASL_SSL")
return consumer
def main():
consumer = getKafkaConsumer()
consumer.subscribe(logstore)
for message in consumer:
print(message.topic, message.offset, message.key, message.value, message.value, message.partition)
if __name__ == '__main__':
main()