searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

logstash 条件输出

2024-05-29 09:16:13
8
0

在 Filebeat-Logstash-Elasticsearch 日志采集链路中,需要区分日志事件来源,输出到不同的目标系统,或者判断时间区间有条件地输出到目标系统,可在 logstash.conf 配置中加判断。

配置样例

input {
  # 示例输入插件,可以根据实际情况进行修改
  file {
    path => "/path/to/your/logfile.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"  # 仅用于测试,实际使用时请设置合适的路径
  }
}

filter {

  # 使用 grok 从路径中提取应用名
  grok {
    match => { "path" => "/path/to/logs/%{WORD:appName}/%{GREEDYDATA:fileName}.log" }
  }
  
  # 解析和处理日志事件
  # 示例中假设使用 grok 解析 Apache 日志,可以根据实际情况调整
  grok {
    match => { "message" => "%{COMMONAPACHELOG}" }
  }

  # 添加时间过滤条件
  ruby {
    code => "
      require 'time'
      
      event_time = event.get('@timestamp').time
      # 北京时间不需要转 utc
      start_time = Time.parse('17:00:00').utc
      end_time = Time.parse('17:05:00').utc
      
      # 提取当前事件的日期
      event_date = event_time.to_date

      # 生成完整的开始和结束时间
      full_start_time = Time.new(event_date.year, event_date.month, event_date.day, start_time.hour, start_time.min, start_time.sec, start_time.utc_offset)
      full_end_time = Time.new(event_date.year, event_date.month, event_date.day, end_time.hour, end_time.min, end_time.sec, end_time.utc_offset)

      if event_time >= full_start_time && event_time < full_end_time
        event.set('within_time_range', true)
      else
#        event.set('within_time_range', false)
      end
    "
  }
}

output {
    if [appName] == "app1" {
      elasticsearch {
          hosts => ["localhost:9200"]
          index => "your_index_name1"
          template => "template1.json"
          template_name => "template1"
          template_overwrite => true
          document_type => "_doc"
      }
    } else {
      elasticsearch {
          hosts => ["localhost:9200"]
          index => "your_index_name2"
          template => "template2.json"
          template_name => "template2"
          template_overwrite => true
          document_type => "_doc"
      }
    }

  if [within_time_range] {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "your_index_name3"
    }
  } else {
    # 可以选择将不在时间范围内的事件输出到其他地方或丢弃
    stdout { codec => rubydebug }
  }
}

配置解释

  1. 输入插件(input):
    使用 file 插件读取日志文件。根据实际输入源(如 stdinbeatskafka 等)进行修改。
  2. 过滤器插件(filter):
    使用 grok 从路径中提取应用名,用于后续判断。
    使用 grok 解析日志事件。这里假设使用 COMMONAPACHELOG 格式
    解析 Apache 日志,根据你的日志格式进行调整。
    使用 ruby 插件进行时间过滤:
    • 获取事件的时间戳 event.get('@timestamp').time
    • 定义时间区间的起始时间和结束时间。
    • 提取事件日期,并生成完整的开始和结束时间。
    • 比较事件时间与完整的开始和结束时间,设置字段 within_time_rangetruefalse
  3. 输出插件(output):
    使用条件语句 if [appName] == "app1" 判断事件来源不同的应用,不同的应用输出到不同的 index,可根据需要设置对应 index 的 settings, mapping。
    使用条件语句 if [within_time_range] == true 判断事件是否在时间范围内。如果在范围内,则将事件输出到 Elasticsearch。

注意事项

  • 确保 Logstash 和 Elasticsearch 的时间设置一致。
  • 配置中的路径、日志解析方式和索引名称应根据实际情况进行调整。
  • sincedb_path 设置为 /dev/null 仅用于测试,实际使用时请设置一个合适的路径以便 Logstash 记录读取进度。
0条评论
0 / 1000
朱****斌
10文章数
0粉丝数
朱****斌
10 文章 | 0 粉丝
原创

logstash 条件输出

2024-05-29 09:16:13
8
0

在 Filebeat-Logstash-Elasticsearch 日志采集链路中,需要区分日志事件来源,输出到不同的目标系统,或者判断时间区间有条件地输出到目标系统,可在 logstash.conf 配置中加判断。

配置样例

input {
  # 示例输入插件,可以根据实际情况进行修改
  file {
    path => "/path/to/your/logfile.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"  # 仅用于测试,实际使用时请设置合适的路径
  }
}

filter {

  # 使用 grok 从路径中提取应用名
  grok {
    match => { "path" => "/path/to/logs/%{WORD:appName}/%{GREEDYDATA:fileName}.log" }
  }
  
  # 解析和处理日志事件
  # 示例中假设使用 grok 解析 Apache 日志,可以根据实际情况调整
  grok {
    match => { "message" => "%{COMMONAPACHELOG}" }
  }

  # 添加时间过滤条件
  ruby {
    code => "
      require 'time'
      
      event_time = event.get('@timestamp').time
      # 北京时间不需要转 utc
      start_time = Time.parse('17:00:00').utc
      end_time = Time.parse('17:05:00').utc
      
      # 提取当前事件的日期
      event_date = event_time.to_date

      # 生成完整的开始和结束时间
      full_start_time = Time.new(event_date.year, event_date.month, event_date.day, start_time.hour, start_time.min, start_time.sec, start_time.utc_offset)
      full_end_time = Time.new(event_date.year, event_date.month, event_date.day, end_time.hour, end_time.min, end_time.sec, end_time.utc_offset)

      if event_time >= full_start_time && event_time < full_end_time
        event.set('within_time_range', true)
      else
#        event.set('within_time_range', false)
      end
    "
  }
}

output {
    if [appName] == "app1" {
      elasticsearch {
          hosts => ["localhost:9200"]
          index => "your_index_name1"
          template => "template1.json"
          template_name => "template1"
          template_overwrite => true
          document_type => "_doc"
      }
    } else {
      elasticsearch {
          hosts => ["localhost:9200"]
          index => "your_index_name2"
          template => "template2.json"
          template_name => "template2"
          template_overwrite => true
          document_type => "_doc"
      }
    }

  if [within_time_range] {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "your_index_name3"
    }
  } else {
    # 可以选择将不在时间范围内的事件输出到其他地方或丢弃
    stdout { codec => rubydebug }
  }
}

配置解释

  1. 输入插件(input):
    使用 file 插件读取日志文件。根据实际输入源(如 stdinbeatskafka 等)进行修改。
  2. 过滤器插件(filter):
    使用 grok 从路径中提取应用名,用于后续判断。
    使用 grok 解析日志事件。这里假设使用 COMMONAPACHELOG 格式
    解析 Apache 日志,根据你的日志格式进行调整。
    使用 ruby 插件进行时间过滤:
    • 获取事件的时间戳 event.get('@timestamp').time
    • 定义时间区间的起始时间和结束时间。
    • 提取事件日期,并生成完整的开始和结束时间。
    • 比较事件时间与完整的开始和结束时间,设置字段 within_time_rangetruefalse
  3. 输出插件(output):
    使用条件语句 if [appName] == "app1" 判断事件来源不同的应用,不同的应用输出到不同的 index,可根据需要设置对应 index 的 settings, mapping。
    使用条件语句 if [within_time_range] == true 判断事件是否在时间范围内。如果在范围内,则将事件输出到 Elasticsearch。

注意事项

  • 确保 Logstash 和 Elasticsearch 的时间设置一致。
  • 配置中的路径、日志解析方式和索引名称应根据实际情况进行调整。
  • sincedb_path 设置为 /dev/null 仅用于测试,实际使用时请设置一个合适的路径以便 Logstash 记录读取进度。
文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0