searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

RocketMQ消费负载均衡

2023-03-28 01:37:23
22
0

RocketMQ 消费自动负载均衡机制

1. 背景介绍

RocketMQ是一款具备低延时、高性能的消息队列产品。 通过namesrv这个功能简单的组件,将松散的不同broker连起来,组成一个分布式的消息队列集群。本文是基于4.9.4版本的RocketMQ进行分析。

1.1. 消费模式


RocketMQ有两种消费模式:集群消费和广播消费

  • 集群消费模式: 一个消息只被订阅组内的一个消费者实例消费。不同消费者实例互相分摊消息,实现消费负载均衡

  •  广播消费模式: 一个消费会被订阅组内的所有消费者实例消费。



要实现消费的自动负载均衡,只能使用集群消费模式,也是强烈推荐使用的消费模式。

1.2 队列(queue)


消息都存放在主题(topic)上。而一个主题可以有多个队列(queue)。同一个主题,可以分布在不同的broker上。
所以主题的队列数,是主题在不同broker上的队列数总和。
broker往namesrv上注册所有主题的信息。这样客户端通过namesrv可以看到主题的全貌。
而队列的主要作用是解决客户端的并发消费。

2. 消费负载均衡的功能介绍

单个消费进程是有性能的上限的。不同的业务场景,消耗的时间不同。短则毫秒级,长则可能几秒甚至几分钟。 为了提高消费的性能,需要可以并行去消费消息,而且并行消费期间,消费者实例的消费进度和运行情况应该不能互相干扰。

为了解决这个问题,需要实现消费端的自动负载均衡。 上一节有提到,队列正是解决这个问题的关键。

下面的JSON是订阅组sub_1保存topic_1主题的消费进度的文件保存信息。可见,消费进度,其实正是具体每个队列的消息位置(offset)

  • rocketmq内部使用JSON格式保存所有的消费进度,一般是放在数据存储目录下config/consumerOffset.json 文件里面
"topic_1@sub_1":{0:181,1:178,2:176,3:183,4:184,5:182,6:177,7:179}

为了不互相干扰也可以并发去消费,客户端需要分配queue的订阅,让每个客户端都去订阅不同的queue,没有重叠,也没有漏了。 达到如下的效果:

  • 客户端的数量比队列数多的时候。超出的客户端会无法订阅、消费任何消息。直到正在订阅的客户端停止。


3. 消费负载均衡的原理分析

 

结合客户端的源代码进行。

其中核心代码如下:

1. RebalanceService.java: 客户端自带的服务,默认每20秒进行一次doRebalance();
2. RebalanceImpl.java: 针对每个订阅组的每个主题,进行重新负载均衡
3. MQClientInstance.java: 客户端对象,默认每30秒从namesrv更新主题的路由信息。
4. AllocateMessageQueueAveragely.java: 负载均衡策略

核心的算法如下:
1. 客户端启动的时候,会从namesrv获取订阅主题的路由信息。后续默认每30秒更新一次
2. 客户端默认每20秒进行重新负载均衡计算。
  2.1. 消费的负载均衡计算,本质是将每个订阅组里面的每个主题都进行负载计算
  2.2. 单个订阅组的主题的负载计算方法如下: 基于步骤1得到主题的所有队列信息, 以及从服务端获取所有客户端ID列表 。 将两者排序,确保每个客户端都得到相同的结果。 再基于负载均衡策略进行计算。
  2.3 负载均衡的策略:默认使用 AllocateMessageQueueAveragely策略,也即是上面例子的介绍按顺序平均分配队列的方式。
  * 可以根据需要设置不同的策略,比如一致性HASH等。但要确保每个客户端都要有相同的配置。

下面的相关的代码。
RebalanceService.java:

public class RebalanceService extends ServiceThread {
    private static long waitInterval =
        Long.parseLong(System.getProperty(
            "rocketmq.client.rebalance.waitInterval", "20000"));
    private final InternalLogger log = ClientLogger.getLog();
    private final MQClientInstance mqClientFactory;

    public RebalanceService(MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }

    @Override
    public String getServiceName() {
        return RebalanceService.class.getSimpleName();
    }
}

 

RebalanceImpl.doRebalance(): 对所有订阅组订阅的所有TOPIC进行重新负载计算
RebalanceImpl.rebalanceByTopic(): 对具体每个topic进行重新负载计算

public void doRebalance(final boolean isOrder) {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                this.rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }

    this.truncateMessageQueueNotMyTopic();
}

private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
        case BROADCASTING: {
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) {
                boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                if (changed) {
                    this.messageQueueChanged(topic, mqSet, mqSet);
                    log.info("messageQueueChanged {} {} {} {}",
                        consumerGroup,
                        topic,
                        mqSet,
                        mqSet);
                }
            } else {
                log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
            }
            break;
        }
        case CLUSTERING: {
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
            if (null == mqSet) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
            }

            if (null == cidAll) {
                log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
            }

            if (mqSet != null && cidAll != null) {
                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                mqAll.addAll(mqSet);

                Collections.sort(mqAll);
                Collections.sort(cidAll);

                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                List<MessageQueue> allocateResult = null;
                try {
                    allocateResult = strategy.allocate(
                        this.consumerGroup,
                        this.mQClientFactory.getClientId(),
                        mqAll,
                        cidAll);
                } catch (Throwable e) {
                    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                        e);
                    return;
                }

                Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                if (allocateResult != null) {
                    allocateResultSet.addAll(allocateResult);
                }

                boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                if (changed) {
                    log.info(
                        "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                        strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                        allocateResultSet.size(), allocateResultSet);
                    this.messageQueueChanged(topic, mqSet, allocateResultSet);
                }
            }
            break;
        }
        default:
            break;
    }
}

 

MQClientInstance.startScheduledTask():启动一系列的定时任务。其中包括默认每30秒从namesrv获取最新的主题路由信息 

private void startScheduledTask() {
    if (null == this.clientConfig.getNamesrvAddr()) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 1, 1, TimeUnit.MINUTES);
}

 

AllocateMessageQueueAveragely.allocate(): 分配消费负载

@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
    List<String> cidAll) {

    List<MessageQueue> result = new ArrayList<MessageQueue>();
    if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
        return result;
    }

    int index = cidAll.indexOf(currentCID);
    int mod = mqAll.size() % cidAll.size();
    int averageSize =
        mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
            + 1 : mqAll.size() / cidAll.size());
    int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
    int range = Math.min(averageSize, mqAll.size() - startIndex);
    for (int i = 0; i < range; i++) {
        result.add(mqAll.get((startIndex + i) % mqAll.size()));
    }
    return result;
}

 

4. 默认消费的负载均衡的机制分析和问题

分布式消息RocketMQ采取了一种比较松散的架构。 通过namesrv把多个独立运行broker信息汇总在一起。而客户端进行负载均衡的时候,也是独立计算和运行的。 服务端实际没有参与其中。
 优点是充分利用了分布式的特征,可以非常线性的增加性能,可以支撑一个非常大规模的集群和客户端数量也没问题。
 缺点也有如下几个:
 a. 负载均衡的计算并不是同时进行,也没有统一进行分配,都是客户端自行独立计算。因而很容易出现的队列分配重叠情况。
 b. 不能马上感知到集群的消费者实例或主题的变化,存在一定的延时和滞后。实际负载是经过一定时间最终达到负载均衡分配而已。
 c. 消费者的消费进度是客户端定时上传更新的。对新加入的消费者实例而言,自行计算到分配的队列后,从broker上获取的消费进度,可能也有一定的延时。
 d. 消费者实例比队列数多的情况,肯定存在消费者实例无法订阅消费任何消息的情况。为了缓解这个问题,可以一开始创建更多的队列数。 但队列数多了也会占用更多的资源,对性能会有一定的影响。
 f. 存在单个队列消息量过多,消费无法负载分摊的问题。
 
 以上的问题a,c都会导致消费重复的问题。 因而要正确的使用RocketMQ,需要确保业务侧实现消息的幂等性处理。


 5. 展望未来


 既然发现了这么多问题,那是否有一个从根本上解决上述问题的方法或思路呢。 答案在RocketMQ 5.0 引入的新消费接口上,把消费负载分配的逻辑都放到了broker上。有兴趣的可以自行查看RIP 19的描述。

0条评论
0 / 1000
叶****伟
6文章数
0粉丝数
叶****伟
6 文章 | 0 粉丝