searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Kafka发送消息耗时太长排查

2023-09-15 01:33:44
351
0

【问题现象】

  特殊网络环境下docker化部署程序 连接kafka生产消息时,跟踪每条消息耗时进行分析时,发现耗时基本都要超过20s。

【排查过程】

 ①排查网络情况,主机及docker容器内部 telnet kafka服务器端口及ping,发现耗时都在毫秒级,排除网络问题

②定位kafka生产者程序,发现每次都new kakfa producer factory 以至于每次发送消息要创建连接,导致每条消息耗时都很长。

   改为从自定义界面读取kafka配置时 创建对应的kafka template,多线程可以共用producer,减少创建连接耗时,

   提高生产消息性能,能使用批量发送。

③在上述调整完成后发现kakfa发送消息耗时大部分在毫秒级别,但首条发送还是会在20s左右,继续深入分析kafka发送源码,

最终定位发送java程序使用kafka client提供的send方法时会创建连接,本次在特殊环境下使用ip进行kakfa servers配置,

但底层未识别连接方式为ip,仍会进行dns相关解析。由于特殊网络情况下没有dns服务器,导致首次发送时要创建两次连接

但由于dns解析导致要等待约10s超时后才使用ip进行连接。 

// kafka client dosend方法创建连接代码段
KafkaProducer类 doSend方法
更新meta消息
   try {
                clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs);
            } catch (KafkaException var22) {
                if (this.metadata.isClosed()) {
                    throw new KafkaException("Producer closed while send in progress", var22);
                }

                throw var22;
            }

waitOnMetadata代码段
    this.metadata.add(topic, nowMs + elapsed);
                        int version = this.metadata.requestUpdateForTopic(topic);
                        this.sender.wakeup();

                        try {
                            this.metadata.awaitUpdate(version, remainingWaitMs);
                        } catch (TimeoutException var15) {
                            throw new TimeoutException(String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs));
                        }

                        cluster = this.metadata.fetch();
                        elapsed = this.time.milliseconds() - nowMs;
                        if (elapsed >= maxWaitMs) {
                            throw new TimeoutException(partitionsCount == null ? String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs) : String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.", partition, topic, partitionsCount, maxWaitMs));
                        }

                        this.metadata.maybeThrowExceptionForTopic(topic);

//最终调用的底层方法
当要更新metadata时会唤醒kakfa sender 进行连接
调用KafkaClient中的wakeup,最终调用jdk底层的 NetworkClient的wakeup

【解决方案】

定位根因为触发dns解析后 就针对性进行修复,减少dns解析过程耗时  

方法一: 查看主机上 /etc/resolve.conf,去除文件里面的nameserver以及 local search等,或者文件置为空,然后重启docker

docker 容器内会引用主机上的/etc/resolve.conf

方法二:在主机专门目录建相应文件 如  soc_resolve.conf,启动docker时 挂载该文件覆盖容器内部的/etc/resolve.conf文件

docker compose只要调整yaml文件 valumes挂载即可

方法三:kakfa连接servers 和broker都使用host,在主机上配置对应的host到相应的ip

0条评论
0 / 1000
long
5文章数
0粉丝数
long
5 文章 | 0 粉丝
原创

Kafka发送消息耗时太长排查

2023-09-15 01:33:44
351
0

【问题现象】

  特殊网络环境下docker化部署程序 连接kafka生产消息时,跟踪每条消息耗时进行分析时,发现耗时基本都要超过20s。

【排查过程】

 ①排查网络情况,主机及docker容器内部 telnet kafka服务器端口及ping,发现耗时都在毫秒级,排除网络问题

②定位kafka生产者程序,发现每次都new kakfa producer factory 以至于每次发送消息要创建连接,导致每条消息耗时都很长。

   改为从自定义界面读取kafka配置时 创建对应的kafka template,多线程可以共用producer,减少创建连接耗时,

   提高生产消息性能,能使用批量发送。

③在上述调整完成后发现kakfa发送消息耗时大部分在毫秒级别,但首条发送还是会在20s左右,继续深入分析kafka发送源码,

最终定位发送java程序使用kafka client提供的send方法时会创建连接,本次在特殊环境下使用ip进行kakfa servers配置,

但底层未识别连接方式为ip,仍会进行dns相关解析。由于特殊网络情况下没有dns服务器,导致首次发送时要创建两次连接

但由于dns解析导致要等待约10s超时后才使用ip进行连接。 

// kafka client dosend方法创建连接代码段
KafkaProducer类 doSend方法
更新meta消息
   try {
                clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs);
            } catch (KafkaException var22) {
                if (this.metadata.isClosed()) {
                    throw new KafkaException("Producer closed while send in progress", var22);
                }

                throw var22;
            }

waitOnMetadata代码段
    this.metadata.add(topic, nowMs + elapsed);
                        int version = this.metadata.requestUpdateForTopic(topic);
                        this.sender.wakeup();

                        try {
                            this.metadata.awaitUpdate(version, remainingWaitMs);
                        } catch (TimeoutException var15) {
                            throw new TimeoutException(String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs));
                        }

                        cluster = this.metadata.fetch();
                        elapsed = this.time.milliseconds() - nowMs;
                        if (elapsed >= maxWaitMs) {
                            throw new TimeoutException(partitionsCount == null ? String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs) : String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.", partition, topic, partitionsCount, maxWaitMs));
                        }

                        this.metadata.maybeThrowExceptionForTopic(topic);

//最终调用的底层方法
当要更新metadata时会唤醒kakfa sender 进行连接
调用KafkaClient中的wakeup,最终调用jdk底层的 NetworkClient的wakeup

【解决方案】

定位根因为触发dns解析后 就针对性进行修复,减少dns解析过程耗时  

方法一: 查看主机上 /etc/resolve.conf,去除文件里面的nameserver以及 local search等,或者文件置为空,然后重启docker

docker 容器内会引用主机上的/etc/resolve.conf

方法二:在主机专门目录建相应文件 如  soc_resolve.conf,启动docker时 挂载该文件覆盖容器内部的/etc/resolve.conf文件

docker compose只要调整yaml文件 valumes挂载即可

方法三:kakfa连接servers 和broker都使用host,在主机上配置对应的host到相应的ip

文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0