Kafka 消费兼容概述和注意事项
Python消费配置参数
参数 | 描述 |
---|---|
bootstrap_servers | 初始连接的集群地址,格式为Project.Endpoint:Port,请根据Project所在的Endpoint进行配置。更多信息,请参见服务入口 - 阿里云内网:端口号为10011,例如 project.cn-hangzhou-intranet.log.aliyuncs.com:10011 - 公网:端口号为10012,例如 project.cn-hangzhou.log.aliyuncs.com:10012 |
group_id | 消费组id, 是用于指定消费者组的标识符,用于将消费组内的消费者分组,通过配置消费组id,可以实现消费者组内的负载均衡,实现数据的处理和分发.例如 "kafka-test" |
security_protocol | 为了保证数据传输的安全性,必须使用SASL_SSL |
sasl_plain_username | 配置为日志服务Project名称 |
sasl_plain_password | 配置为阿里云AK,格式为 {access-key-id}#{access-key-secret}。请根据实际情况,将 {access-key-id} 替换为您的AccessKey ID,将 {access-key-secret} 替换为您的AccessKey Secret。建议使用RAM用户的AK。更多信息,请参见授权 |
sasl_mechanism | 必须使用"PLAIN" |
max_poll_interval_ms | 消费组在消费者发起加入组请求后,等待所有消费者加入的时间间隔,在这个时间间隔内加入组的消费者为消费组的成员,进行分区分配,各个消费者按分配的分区开发消费数据,如果在这个时间内还有消费者没有加入消费组,则会触发消费组再平衡操作,再平衡期间不会消费数据,会导致消费延迟,建议max.poll.interval.ms为130000MS,保证所有消费者都能加入消费组 |
session_timeout_ms | 心跳最大超时时间,在该时间如果消费者没有发送心跳请求,则视为该消费者发生异常,触发消费组再平衡操作,session.timeout.ms为120000MS |
auto_offset_reset | auto.offset.reset 消费起始点位 常用的二个值是latest和earliest,默认是latest |
earliest 当有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 | |
latest 当有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的数据 |
Python消费代码示例
python
import os
import time
from kafka import KafkaConsumer
accessKeyId = os.environ.get('SLS_ACCESS_KEY_ID')
accessKeySecret = os.environ.get('SLS_ACCESS_KEY_SECRET')
project = "project"
logstore = "logstore"
endpoint = "cn-hangzhou.log.aliyuncs.com"
port = "10012"
#内网endpoint和对应port,可以通过阿里云内部网络访问日志服务,相比公网有更好的链路质量和安全性,详见文档 https://help.aliyun.com/document_detail/29008.htm#reference-wgx-pwq-zdb
#endpoint = "cn-hangzhou-intranet.log.aliyuncs.com"
#port = "10011"
groupId = "kafka-test"
kafkaEndpoint = "{}.{}:{}".format(project, endpoint, port)
def getKafkaConsumer():
consumer = KafkaConsumer(logstore,
bootstrap_servers=kafkaEndpoint,
sasl_plain_username=project,
group_id=groupId,
auto_offset_reset='earliest',
sasl_plain_password="{}#{}".format(accessKeyId, accessKeySecret),
sasl_mechanism="PLAIN",
max_poll_interval_ms=130000,
session_timeout_ms=120000,
api_version=(2, 1, 0),
security_protocol="SASL_SSL")
return consumer
def main():
consumer = getKafkaConsumer()
consumer.subscribe(logstore)
for message in consumer:
print(message.topic, message.offset, message.key, message.value, message.value, message.partition)
if __name__ == '__main__':
main()