在 Filebeat-Logstash-Elasticsearch 日志采集链路中,需要区分日志事件来源,输出到不同的目标系统,或者判断时间区间有条件地输出到目标系统,可在 logstash.conf
配置中加判断。
配置样例
input {
# 示例输入插件,可以根据实际情况进行修改
file {
path => "/path/to/your/logfile.log"
start_position => "beginning"
sincedb_path => "/dev/null" # 仅用于测试,实际使用时请设置合适的路径
}
}
filter {
# 使用 grok 从路径中提取应用名
grok {
match => { "path" => "/path/to/logs/%{WORD:appName}/%{GREEDYDATA:fileName}.log" }
}
# 解析和处理日志事件
# 示例中假设使用 grok 解析 Apache 日志,可以根据实际情况调整
grok {
match => { "message" => "%{COMMONAPACHELOG}" }
}
# 添加时间过滤条件
ruby {
code => "
require 'time'
event_time = event.get('@timestamp').time
# 北京时间不需要转 utc
start_time = Time.parse('17:00:00').utc
end_time = Time.parse('17:05:00').utc
# 提取当前事件的日期
event_date = event_time.to_date
# 生成完整的开始和结束时间
full_start_time = Time.new(event_date.year, event_date.month, event_date.day, start_time.hour, start_time.min, start_time.sec, start_time.utc_offset)
full_end_time = Time.new(event_date.year, event_date.month, event_date.day, end_time.hour, end_time.min, end_time.sec, end_time.utc_offset)
if event_time >= full_start_time && event_time < full_end_time
event.set('within_time_range', true)
else
# event.set('within_time_range', false)
end
"
}
}
output {
if [appName] == "app1" {
elasticsearch {
hosts => ["localhost:9200"]
index => "your_index_name1"
template => "template1.json"
template_name => "template1"
template_overwrite => true
document_type => "_doc"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
index => "your_index_name2"
template => "template2.json"
template_name => "template2"
template_overwrite => true
document_type => "_doc"
}
}
if [within_time_range] {
elasticsearch {
hosts => ["localhost:9200"]
index => "your_index_name3"
}
} else {
# 可以选择将不在时间范围内的事件输出到其他地方或丢弃
stdout { codec => rubydebug }
}
}
配置解释
- 输入插件(input):
使用file
插件读取日志文件。根据实际输入源(如stdin
、beats
、kafka
等)进行修改。 - 过滤器插件(filter):
使用 grok 从路径中提取应用名,用于后续判断。
使用grok
解析日志事件。这里假设使用COMMONAPACHELOG
格式
解析 Apache 日志,根据你的日志格式进行调整。
使用ruby
插件进行时间过滤:- 获取事件的时间戳
event.get('@timestamp').time
。 - 定义时间区间的起始时间和结束时间。
- 提取事件日期,并生成完整的开始和结束时间。
- 比较事件时间与完整的开始和结束时间,设置字段
within_time_range
为true
或false
。
- 获取事件的时间戳
- 输出插件(output):
使用条件语句if [appName] == "app1"
判断事件来源不同的应用,不同的应用输出到不同的 index,可根据需要设置对应 index 的 settings, mapping。
使用条件语句if [within_time_range] == true
判断事件是否在时间范围内。如果在范围内,则将事件输出到 Elasticsearch。
注意事项
- 确保 Logstash 和 Elasticsearch 的时间设置一致。
- 配置中的路径、日志解析方式和索引名称应根据实际情况进行调整。
sincedb_path
设置为/dev/null
仅用于测试,实际使用时请设置一个合适的路径以便 Logstash 记录读取进度。