使用Python消费组基于SPL消费日志
本文介绍通过Python消费组的方式,设置SPL语句来消费Logstore中的数据,通过消费组(ConsumerGroup)消费数据无需关注日志服务的实现细节及消费者之间的负载均衡、故障转移(Failover)等,只需要专注于业务逻辑。
前提条件
- 已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权。
- 已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见配置环境变量。 重要
- 阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。
- 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
- 已安装SDK开发环境。具体操作,请参见SDK参考。
使用SPL消费数据
以Python SDK为例实现消费组消费数据。
- 创建项目目录spl_consumer_demo,在目录下安装日志服务SDK。shell
pip install -U aliyun-log-python-sdk
- 在spl_consumer_demo目录下创建main.py文件。创建一个消费组并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。其中AK、SK配置在环境变量中,query字段填写SPL语句。python
import os import time from aliyun.log.consumer import * from aliyun.log import * class SPLConsumer(ConsumerProcessorBase): shard_id = -1 last_check_time = 0 def initialize(self, shard): self.shard_id = shard def process(self, log_groups, check_point_tracker): for log_group in log_groups.LogGroups: items = [] for log in log_group.Logs: item = dict() item['time'] = log.Time for content in log.Contents: item[content.Key] = content.Value items.append(item) log_items = dict() log_items['topic'] = log_group.Topic log_items['source'] = log_group.Source log_items['logs'] = items print(log_items) current_time = time.time() if current_time - self.last_check_time > 3: try: self.last_check_time = current_time check_point_tracker.save_check_point(True) except Exception: import traceback traceback.print_exc() else: try: check_point_tracker.save_check_point(False) except Exception: import traceback traceback.print_exc() # None means succesful process # if need to roll-back to previous checkpoint,return check_point_tracker.get_check_point() return None def shutdown(self, check_point_tracker): try: check_point_tracker.save_check_point(True) except Exception: import traceback traceback.print_exc() def sleep_until(seconds, exit_condition=None, expect_error=False): if not exit_condition: time.sleep(seconds) return s = time.time() while time.time() - s < seconds: try: if exit_condition(): break except Exception: if expect_error: continue time.sleep(1) def spl_consumer_group(): # 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。 endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com') # 本示例从环境变量中获取AccessKey ID和AccessKey Secret。 access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '') access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '') project = 'your_project' logstore = 'your_logstore' # 消费组名称。您无需提前创建,SDK会自动创建该消费组。 consumer_group = 'consumer-group' consumer_name = "consumer-group-name" query = "* | where cast(cdn_in as bigint) > 70" # 在消费组中创建2个消费者消费数据。 option = LogHubConfig(endpoint, access_key_id, access_key, project, logstore, consumer_group, consumer_name, query=query, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6, data_fetch_interval=1) print("*** start to consume data...") client_worker = ConsumerWorker(SPLConsumer, consumer_option=option) client_worker.start() time.sleep(10000) if __name__ == '__main__': spl_consumer_group()
- 在spl_consumer_demo目录下运行main.py,查看结果python
python main.py