Logstash作为一个强大的数据收集和转发工具,可以从各种来源(如Kafka、数据库等)获取日志数据,并通过灵活的过滤器对数据进行清洗、格式转换和增强。经过处理后的数据可以被发送到多个目标系统,如Elasticsearch、OpenSearch或者其他数据库和分析平台,从而帮助企业在更短的时间内获取、分析和展示数据。这种处理方式对于日志分析、监控、事件追踪等场景尤为重要。
这里,我们以一个示范的例子读取Kafka中的数据转换后写入到Elasticsearch,向您简单介绍天翼云Logstash实例如何帮助企业实现高效的日志数据流处理。
前提条件
已在同VPC下开通Kafka服务和Elasticsearch实例。
获取相应内网地址和端口,例如192.168.XX.XX:9200。
操作步骤
创建Logstash实例
在“云搜索服务”产品页,单击“立即开通”。
在云搜索服务实例创建页面选择已经开通好的Elasticsearch实例,点击进入Elasticsearch实例,选择Logstash加装。
订购周期、当前区域(即资源池)、可用区、填写实例名称、节点规格等基础配置信息后,按页面提示并根据需要进行配置,然后点击下一步。
按页面提示,勾选相关协议,公测期间支付0元,即可完成订购,等待资源开通完成,对应实例处于“运行中”状态,即为开通成功。
配置处理任务
在云搜索服务实例创建页面选择Logstash实例管理,选择已经加装的Logstash实例。
选择“管道管理”,选择“新建管道”。
在“管道配置”窗口中,配置相关的管道配置。
input {
kafka {
auto_offset_reset => "latest"
bootstrap_servers => "ip:port,ip:port,ip:port"
consumer_threads => 88
group_id => "your_group_id"
topics => ["your_kafka_topic"]
codec => json
}
}
filter {
}
output {
elasticsearch {
hosts => ["ip:port"]
index => "your_index_name-%{+YYYY.MM.dd}"
user => "your_username"
password => "your_password"
}
}
input部分用于定义数据的来源,即Logstash从哪里获取数据。这里以Kafka为例:
auto_offset_reset:会决定从哪个位置开始消费消息
bootstrap_servers:Kafka集群的地址
consumer_threads:控制 Kafka 消费者线程的数量
group_id:消费者组ID
topics:Kafka的主题列表
codec:指定数据格式
filter部分可以进行消息处理、数据解析等操作。根据实际业务规则来处理。
output部分用于将处理后的数据推送到外部系统(如数据库、消息队列、搜索引擎等)。这里以Elasticsearch为例:
hosts:指定Elasticsearch的地址。
index:指定存储到Elasticsearch的索引名称,可以使用日期变量%{+YYYY.MM.dd} 来动态生成基于日期的索引。
user/password:提供Elasticsearch用户名和密码
配置完点击“预校验”,预校验会使用Logstash的语法功能来检测基础的语法,不检测逻辑和连通性。
预校验通过后可以执行后续操作。
在“参数配置”中,配置相关的参数。
参数 | 说明 |
---|---|
管道名称 | 只能包含字母、数字、中划线或下划线,且必须以字母开头,限制4-30个字符,实例内唯一 |
管道描述 | 限制长度50 |
管道工作线程数 | 控制Logstash同时处理事件的线程数 |
管道批处理大小 | 单个线程从input部分收集的最大事件数 |
管道批处理延迟 | 不满足批处理大小最大等待时间 |
队列类型 | memory模式、persisted模式 memory:内存队列 persisted:磁盘队列 |
队列最大字节数 | persisted模式生效 队列最大存储大小 |
队列检查点写入数 | persisted模式生效 队列检查点数量 |
启动处理任务
配置完参数后点击“保存并部署”。就启动了读取Kafka中的数据转换后写入到Elasticsearch的管道任务。
等到管道“运行中”可以尝试配置的Kafka的topics中导入一些数据,例如使用Filebeat或其他程序。
可以通过Curl命令查看logstash中配置的索引名,在Elasticsearch实例是否有数据进来。
curl "http://<host>:<port>/_cat/indices"
停止任务
当任务执行结束后,可以在“管道管理”页面,点击“停止”,会执行停止管道操作。
删除管道
当管道处于停止状态,如后续不再需要执行该任务,点击“删除”,可删除该管道的信息。
删除实例
当任务结束,无需使用加装的Logstash时,可以选择退订实例,删除前保证正在运行的管道都已经停止。
在此执行的退订功能仅退订Logstash实例,Elasticsearch实例退订在Elasticsearch实例管理退订。