Skip to content

Compatibility with Kafka consumers and usage notes

Confluent-kafka-go parameters

parameterDescription
bootstrap.serversThe cluster address for initial connection, in the format of Project.Endpoint:Port. Configure this parameter based on the endpoint of the project. For more information, see View endpoints
- Alibaba Cloud internal network: The port number is 10011. Example: project.cn-hangzhou-intranet.log.aliyuncs.com:10011
- Internet: The port number is 10012. Example: project.cn-hangzhou.log.aliyuncs.com:10012
sasl.mechanismSet the value to"PLAIN"
security.protocolTo ensure the data transmission security, set the value to SASL_SSL
sasl.usernameThe name of the Simple Log Service project.
sasl.passwordThe AccessKey pair of your Alibaba Cloud account, in the {access-key-id}#{access-key-secret format. Replace {access-key-id with your AccessKey ID and {access-key-secret} with your AccessKey secret. We recommend that you use the AccessKey pair of a Resource Access Management (RAM) user. For more information, see Create a RAM user and authorize the RAM user to access Simple Log Service.
group.idThe ID of the consumer group. You can add consumers to a consumer group. Then, you can specify the ID of the consumer group to implement load balancing, data processing, and data delivery. Example: "kafka-test"
enable.auto.commitSpecifies whether to automatically commit offsets. Recommended value:true
auto.commit.interval.msThe interval at which offsets are automatically committed. Recommend value:30000MS
max.poll.interval.msThe time range during which a consumer group waits for all consumers to join the group after a consumer initiates a request to join the group. Consumers that join the consumer group within this time range become members of the consumer group and are allocated partitions. Each consumer consumes only data in the allocated partitions. If consumers do not join the consumer group within this time range, rebalancing is triggered within the consumer group. In the rebalancing process, no data is consumed, which causes consumption latency. Recommended value: 130000. Unit: ms. This value allows for sufficient time for consumers to join a consumer group. If you use confluent-kafka-go to consume data, you must set the max.poll.interval.ms parameter to a value that is greater than the value of the session.timeout.ms parameter.
session.timeout.msThe heartbeat timeout period. If a consumer does not send a heartbeat request within the timeout period, the consumer is considered abnormal, and rebalancing is triggered within the consumer group. Recommended value: 120000MS**
heartbeat.interval.msThe heartbeat timeout period. If a consumer does not send a heartbeat request within the timeout period, the consumer is considered abnormal, and rebalancing is triggered within the consumer group. Recommended value: 5000MS
auto.offset.resetauto.offset.reset The start position of consumption. Common values: latest and earliest. Default value:latest
earliest If an offset is committed, a consumer starts to read data from the committed offset. If no offset is committed, a consumer starts to read data from the beginning.
latest If an offset is committed, a consumer starts to read data from the committed offset. If no offset is committed, a consumer starts to read data from the latest message.

Confluent-kafka-go sample code

go
package main

import (
	"fmt"
        "os"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	endpoint := "cn-hangzhou.log.aliyuncs.com"
	accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID")
	accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET")
	project := "project"
	logstore := "logstore"
	port := "10012"

	consumer := getKafkaConsumer(project, endpoint, port, accessKeyID, accessKeySecret)
	consumer.SubscribeTopics([]string{logstore}, nil)
	for {
		msg, err := consumer.ReadMessage(-1)
		if err == nil {
			fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
		} else {
			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
		}
	}
	consumer.Close()
}

func getKafkaConsumer(project string, endpoint string, port string, accessKeyID string, accessKeySecret string) *kafka.Consumer {
	var kafkaConf = &kafka.ConfigMap{
		"bootstrap.servers":       fmt.Sprintf("%s.%s:%s", project, endpoint, port),
		"sasl.mechanism":          "PLAIN",
		"security.protocol":       "sasl_ssl",
		"sasl.username":           project,
		"sasl.password":           fmt.Sprintf("%s#%s", accessKeyID, accessKeySecret),
		"group.id":                "kafka-test",
		"enable.auto.commit":      "true",
		"auto.commit.interval.ms": 30000,
		"session.timeout.ms":      120000,
		"auto.offset.reset":       "earliest",
		"max.poll.interval.ms":    130000,
		"heartbeat.interval.ms":   5000,
	}
	consumer, err := kafka.NewConsumer(kafkaConf)
	if err != nil {
		panic(err)
	}
	fmt.Print("init kafka consumer success\n")
	return consumer
}