Skip to content

Compatibility with Kafka consumers and usage notes

Franz-go parameters

parameterDescription
kgo.SeedBrokersThe cluster address for initial connection, in the format of Project.Endpoint:Port. Configure this parameter based on the endpoint of the project. For more information, see View endpoints.
- Alibaba Cloud internal network: The port number is 10011. Example: project.cn-hangzhou-intranet.log.aliyuncs.com:10011
- Internet: The port number is 10012. Example: project.cn-hangzhou.log.aliyuncs.com:10012
kgo.ConsumerGroupThe ID of the consumer group. You can add consumers to a consumer group. Then, you can specify the ID of the consumer group to implement load balancing, data processing, and data delivery. Example:"kafka-test"
kgo.ConsumeTopicsThe name of the Simple Log Service Logstore.
kgo.SASLTo ensure the data transmission security, set the value to SASL_SSL
UserThe name of the Simple Log Service Project.
PassThe AccessKey pair of your Alibaba Cloud account, in the {access-key-id}#{access-key-secret} format. Replace {access-key-id} with your AccessKey ID and {access-key-secret} with your AccessKey secret. We recommend that you use the AccessKey pair of a Resource Access Management (RAM) user. For more information, see Create a RAM user and authorize the RAM user to access Simple Log Service
kgo.DialerIf you use the Simple Authentication Security Layer (SASL) protocol, you must configure this parameter. Otherwise, consumption fails. For more information, see franz-go on GitHub

Franz-go sample code

go
package main

import (
	"context"
	"crypto/tls"
	"fmt"
        "os"
	"github.com/twmb/franz-go/pkg/kgo"
	"github.com/twmb/franz-go/pkg/sasl/plain"
	"net"
	"time"
)

func main() {
	project         := "project"
	logstore        := "logstore"
	endpoint        := "cn-hangzhou.log.aliyuncs.com"
	port            := "10012"
	
	groupId         := "kafka-test"
	accessKeyID     := os.Getenv("SLS_ACCESS_KEY_ID")
        accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET")

 	//using sasl we must add tisDialer
	tlsDialer := &tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}}
	seeds := []string{fmt.Sprintf("%s.%s:%s", project, endpoint, port)}

	//get Kgo client
	client, err := kgo.NewClient(
		kgo.SeedBrokers(seeds...),
		kgo.ConsumerGroup(groupId),
		kgo.ConsumeTopics(logstore),
		kgo.SASL(plain.Auth{
			User: project,
			Pass: fmt.Sprintf("%s#%s", accessKeyID, accessKeySecret),
		}.AsMechanism()),
		kgo.Dialer(tlsDialer.DialContext),
	)

	if err != nil {
		panic(err)
	}
	defer client.Close()
	ctx := context.Background()

	for {
		fetches := client.PollFetches(ctx)
		if errs := fetches.Errors(); len(errs) > 0 {
			panic(fmt.Sprint(errs))
		}

		iter := fetches.RecordIter()
		for !iter.Done() {
			record := iter.Next()
			fmt.Println(string(record.Value), "from an iterator!")
		}
	}
}

Comparison between franz-go and confluent-kafka-go

franz-go has 857 stars on GitHub, whereas confluent-kafka-go has 3,700 stars on GitHub. In real applications, franz-go reports fewer errors than confluent-kafka-go in case of a parameter configuration error, or even does not report errors. This increases the troubleshooting difficulty. We recommend that you use confluent-kafka-go.