searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

logstash 消费 kafka 超时问题排查记录

2023-04-28 06:52:43
114
0

logstash 日志

[2023-04-11T01:22:02,194][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2023-04-11T01:22:02,195][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2023-04-11T01:22:03,196][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-2, groupId=logstash] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2023-04-11T01:27:25,361][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-3, groupId=logstash] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2023-04-11T01:27:25,361][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-5, groupId=logstash] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2023-04-11T01:27:25,361][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-4, groupId=logstash] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2023-04-11T01:32:28,517][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-7, groupId=logstash] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2023-04-11T01:32:28,518][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-6, groupId=logstash] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

根据日志信息,搜索 kafka 配置说明。

kafka 配置

参考:max.poll.interval.ms

max.poll.interval.ms
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. For consumers using a non-null group.instance.id which reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be reassigned after expiration of session.timeout.ms. This mirrors the behavior of a static consumer which has shutdown.

max.poll.records
The maximum number of records returned in a single call to poll(). Note, that max.poll.records does not impact the underlying fetching behavior. The consumer will cache the records from each fetch request and returns them incrementally from each poll.

kafka 默认配置正常已经够了,

发现 logstash WARN 级别日志没办法定位原因。

调整 logstash 日志级别

/logstash.yml

log.level: warn 改成 info 

重启 logstash 服务,看到有如下日志输出:

[2023-04-12T11:14:21,024][INFO ][logstash.outputs.elasticsearch] retrying failed action with response code: 403 ({"type"=>"cluster_block_exception", "reason"=>"blocked by: [FORBIDDEN/12/index read-only / allow delete (api)];"})

elasticsearch 配置

根据关键字查 elasticsearch 的配置说明:

modules-cluster

cluster.routing.allocation.disk.watermark.flood_stage logo cloud
(Dynamic) Controls the flood stage watermark, which defaults to 95%. Elasticsearch enforces a read-only index block (index.blocks.read_only_allow_delete) on every index that has one or more shards allocated on the node, and that has at least one disk exceeding the flood stage. This setting is a last resort to prevent nodes from running out of disk space. The index block is automatically released when the disk utilization falls below the high watermark. Similarly to the low and high watermark values, it can alternatively be set to a ratio value, e.g., 0.95, or an absolute byte value.

结论

根据关键字查原因,磁盘使用率到达95% 后,所有的 index 进入只读可删状态,不会自动恢复。查看磁盘使用率监控,发现确实使用率超过 95%

 

解决方案

1、磁盘空间使用率告警后,及时处理。

2、删索引腾空间

curl -X DELETE /{{index_name}}


3、把所有的索引恢复成可写

curl -X PUT /*/_settings
{
  "index.blocks.read_only_allow_delete": null
}

4、cron job 定期清理日志

0条评论
0 / 1000
朱****斌
10文章数
0粉丝数
朱****斌
10 文章 | 0 粉丝
原创

logstash 消费 kafka 超时问题排查记录

2023-04-28 06:52:43
114
0

logstash 日志

[2023-04-11T01:22:02,194][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2023-04-11T01:22:02,195][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2023-04-11T01:22:03,196][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-2, groupId=logstash] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2023-04-11T01:27:25,361][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-3, groupId=logstash] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2023-04-11T01:27:25,361][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-5, groupId=logstash] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2023-04-11T01:27:25,361][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-4, groupId=logstash] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2023-04-11T01:32:28,517][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-7, groupId=logstash] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2023-04-11T01:32:28,518][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-6, groupId=logstash] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

根据日志信息,搜索 kafka 配置说明。

kafka 配置

参考:max.poll.interval.ms

max.poll.interval.ms
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. For consumers using a non-null group.instance.id which reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be reassigned after expiration of session.timeout.ms. This mirrors the behavior of a static consumer which has shutdown.

max.poll.records
The maximum number of records returned in a single call to poll(). Note, that max.poll.records does not impact the underlying fetching behavior. The consumer will cache the records from each fetch request and returns them incrementally from each poll.

kafka 默认配置正常已经够了,

发现 logstash WARN 级别日志没办法定位原因。

调整 logstash 日志级别

/logstash.yml

log.level: warn 改成 info 

重启 logstash 服务,看到有如下日志输出:

[2023-04-12T11:14:21,024][INFO ][logstash.outputs.elasticsearch] retrying failed action with response code: 403 ({"type"=>"cluster_block_exception", "reason"=>"blocked by: [FORBIDDEN/12/index read-only / allow delete (api)];"})

elasticsearch 配置

根据关键字查 elasticsearch 的配置说明:

modules-cluster

cluster.routing.allocation.disk.watermark.flood_stage logo cloud
(Dynamic) Controls the flood stage watermark, which defaults to 95%. Elasticsearch enforces a read-only index block (index.blocks.read_only_allow_delete) on every index that has one or more shards allocated on the node, and that has at least one disk exceeding the flood stage. This setting is a last resort to prevent nodes from running out of disk space. The index block is automatically released when the disk utilization falls below the high watermark. Similarly to the low and high watermark values, it can alternatively be set to a ratio value, e.g., 0.95, or an absolute byte value.

结论

根据关键字查原因,磁盘使用率到达95% 后,所有的 index 进入只读可删状态,不会自动恢复。查看磁盘使用率监控,发现确实使用率超过 95%

 

解决方案

1、磁盘空间使用率告警后,及时处理。

2、删索引腾空间

curl -X DELETE /{{index_name}}


3、把所有的索引恢复成可写

curl -X PUT /*/_settings
{
  "index.blocks.read_only_allow_delete": null
}

4、cron job 定期清理日志

文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0