Skip to content

Kafka 消费兼容概述和注意事项

Franz-go消费配置参数

参数描述
kgo.SeedBrokers初始连接的集群地址,格式为Project.Endpoint:Port,请根据Project所在的Endpoint进行配置。更多信息,请参见服务入口
- 阿里云内网:端口号为10011,例如 project.cn-hangzhou-intranet.log.aliyuncs.com:10011
- 公网:端口号为10012,例如 project.cn-hangzhou.log.aliyuncs.com:10012
kgo.ConsumerGroup配置消费组id, 是用于指定消费者组的标识符,用于将消费组内的消费者分组,通过配置消费组id ,可以实现消费者组内的负载均衡,实现数据的处理和分发.例如 "kafka-test"
kgo.ConsumeTopics填写日志服务Logstore名称
kgo.SASL为了保证数据传输的安全性,必须使用SASL_SSL
User填写日志服务Project名称
Pass配置为阿里云AK,格式为 {access-key-id}#{access-key-secret}。请根据实际情况,将 {access-key-id} 替换为您的AccessKey ID,将 {access-key-secret} 替换为您的AccessKey Secret。建议使用RAM用户的AK。更多信息,请参见授权
kgo.Dialer使用sasl必须设置这个参数,否则无法正常消费.详见franz-go github 案例

Franz-go消费代码示例

go
package main

import (
	"context"
	"crypto/tls"
	"fmt"
        "os"
	"github.com/twmb/franz-go/pkg/kgo"
	"github.com/twmb/franz-go/pkg/sasl/plain"
	"net"
	"time"
)

func main() {
	project         := "project"
	logstore        := "logstore"
	endpoint        := "cn-hangzhou.log.aliyuncs.com"
	port            := "10012"
	//内网endpoint和对应port,可以通过阿里云内部网络访问日志服务,相比公网有更好的链路质量和安全性,详见文档 https://help.aliyun.com/document_detail/29008.htm#reference-wgx-pwq-zdb
	//endpoint = "cn-hangzhou-intranet.log.aliyuncs.com"
	//port     = "10011"
	groupId         := "kafka-test"
	accessKeyID     := os.Getenv("SLS_ACCESS_KEY_ID")	
        accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET")

 	//using sasl we must add tisDialer
	tlsDialer := &tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}}
	seeds := []string{fmt.Sprintf("%s.%s:%s", project, endpoint, port)}
 
	//get Kgo client
	client, err := kgo.NewClient(
		kgo.SeedBrokers(seeds...),
		kgo.ConsumerGroup(groupId),
		kgo.ConsumeTopics(logstore),
		kgo.SASL(plain.Auth{
			User: project,
			Pass: fmt.Sprintf("%s#%s", accessKeyID, accessKeySecret),
		}.AsMechanism()),
		kgo.Dialer(tlsDialer.DialContext),
	)

	if err != nil {
		panic(err)
	}
	defer client.Close()
	ctx := context.Background()

	for {
		fetches := client.PollFetches(ctx)
		if errs := fetches.Errors(); len(errs) > 0 {
			panic(fmt.Sprint(errs))
		}

		iter := fetches.RecordIter()
		for !iter.Done() {
			record := iter.Next()
			fmt.Println(string(record.Value), "from an iterator!")
		}
	}
}

Franz-go与Confluent-kafka-go简单对比

Franz-kafka-go 目前github star数为857 ,Confluent-kafka-go目前github star数为3.7k , 实际使用过程中franz-kafka-go 如果参数配置有误,能返回的报错信息相比后者少很多,甚至完全不报错,此时排查起来有较大的难度,建议使用confluent-kafka-go