Kafka 消费兼容概述和注意事项
Confluent-kafka-c# 消费配置参数
参数 | 描述 |
---|---|
BootstrapServers | 初始连接的集群地址,格式为Project.Endpoint:Port,请根据Project所在的Endpoint进行配置。更多信息,请参见服务入口 - 阿里云内网:端口号为10011,例如 project.cn-hangzhou-intranet.log.aliyuncs.com:10011 - 公网:端口号为10012,例如 project.cn-hangzhou.log.aliyuncs.com:10012 |
SaslMechanism | 必须使用 SaslMechanism.Plain |
SecurityProtocol | 为了保证数据传输的安全性,必须使用SecurityProtocol.SaslSsl |
SaslUsername | 配置为日志服务Project名称 |
SaslPassword | 配置为阿里云AK,格式为 {access-key-id}#{access-key-secret。请根据实际情况,将 {access-key-id 替换为您的AccessKey ID,将 {access-key-secret} 替换为您的AccessKey Secret。建议使用RAM用户的AK。更多信息,请参见授权 |
GroupId | 消费组id, 是用于指定消费者组的标识符,用于将消费组内的消费者分组,通过配置消费组id,可以实现消费者组内的负载均衡,实现数据的处理和分发.例如 "kafka-test" |
AutoOffsetReset | 消费起始点位 常用的二个值是 AutoOffsetReset.Latest 和 AutoOffsetReset.Earliest, Earliest:表示消费者从最早的可用记录开始消费。如果消费者组之前没有消费过该主题,则会从最早的可用记录开始消费。如果之前已经消费过,则会从该消费者组最后提交的偏移量开始消费。 Latest:表示消费者从最新的可用记录开始消费。如果消费者组之前没有消费过该主题,则只会消费该主题上生产者将来生产的消息。如果之前已经消费过,则会从该消费者组最后提交的偏移量开始消费。 |
环境准备
测试环境为centos 7环境为例子
- 安装dotnet core sdk
yum install dotnet-sdk-3.1
- 新建dotnet项目
dotnet new console -n myconsumer -o /home/user/projects/myconsumer
- 安装 confluent kafka 依赖
dotnet add package -v 1.9.4-RC1 Confluent.Kafka
Confluent-kafka-c# 消费代码示例
c#
using System;
using System.Threading;
using Confluent.Kafka;
class Consumer
{
public static void Main(string[] args)
{
string accessKeyId = Environment.GetEnvironmentVariable("SLS_ACCESS_KEY_ID");
string accessKeySecret = Environment.GetEnvironmentVariable("SLS_ACCESS_KEY_SECRET");
string project = "project";
//内网endpoint和对应port,可以通过阿里云内部网络访问日志服务,相比公网有更好的链路质量和安全性,详见文档 https://help.aliyun.com/document_detail/29008.htm#reference-wgx-pwq-zdb
//endpoint = "cn-hangzhou-intranet.log.aliyuncs.com"
//port = "10011"
string endpoint = "cn-shenzhen.log.aliyuncs.com";
string port = "10012";
string host = project + "." + endpoint + ":" + port;
string password = accessKeyId + "#" +accessKeySecret;
string groupId = "test002";
string topic = "your logstore";
var conf = new ConsumerConfig {
GroupId = groupId,
BootstrapServers = host,
AutoOffsetReset = AutoOffsetReset.Earliest,
SaslMechanism = SaslMechanism.Plain,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslUsername = project,
SaslPassword = password
};
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
}
}
}