节点重启后消费者如何重连
 
                  更新时间 2024-01-18 10:19:24
                 
 
                    最近更新时间: 2024-01-18 10:19:24
                  
 本文主要介绍节点重启后消费者如何重连分布式消息服务RabbitMQ。
 本章节以Java中使用的RabbitMQ客户端amqp-client为例介绍节点重启后消费者如何重连。
amqp-client自带重连机制,但是自带的重连机制只会重试一次,一次连不上后就不会再执行了,这时如果消费者没有做额外的重试机制,那么这个消费者就彻底丧失的消费能力。
amqp-client在节点断连后,根据与通道建立的节点不同,产生不同的错误。
- 如果通道连接的是队列所在的节点,消费者就会收到一个shutdown信号,这时amqp-client的重连机制就会生效,尝试重新连接服务端。如果连上了,这个通道就会继续连接消费。如果连不上,就会执行channel.close方法,关闭这个通道。
- 如果通道连接的不是队列所在的节点,消费者不会触发关闭动作,而是由服务端发送的一个取消动作,这个动作对amqp-client来说并不是异常行为,所以日志上不会有明显的报错,但是连接最终还是会关闭。
amqp-client出现上面两种错误时,会分别回调handleShutdownSignal以及handleCancel方法,您可以通过重写这两种方法,在回调时执行重写的重连逻辑,就能在通道关闭后重新创建消费者的新通道继续消费。
以下提供一个简单的代码示例,此示例能够解决上面的两种错误,实现消费者的持续消费。
package rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class RabbitConsumer {
    public static void main(String... args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("100.00.000.000");
        factory.setPort(5672);
        factory.setUsername("name");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        createNewConnection(connection);
    }
    public static void createNewConnection(Connection connection) {
        try {
            Thread.sleep(1000);
            Channel channel = connection.createChannel();
            channel.basicQos(64);
            channel.basicConsume("queue-01", false, new CustomConsumer(channel, connection));
        } catch (Exception e) {
//            e.printStackTrace();
            createNewConnection(connection);
        }
    }
    static class CustomConsumer implements Consumer {
        private final Channel _channel;
        private final Connection _connection;
        public CustomConsumer(Channel channel, Connection connection) {
            _channel = channel;
            _connection = connection;
        }
        @Override
        public void handleConsumeOk(String consumerTag) {
        }
        @Override
        public void handleCancelOk(String consumerTag) {
        }
        @Override
        public void handleCancel(String consumerTag) throws IOException {
            System.out.println("handleCancel");
            System.out.println(consumerTag);
            createNewConnection(_connection);
        }
        @Override
        public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
            System.out.println("handleShutdownSignal");
            System.out.println(consumerTag);
            System.out.println(sig.getReason());
            createNewConnection(_connection);
        }
        @Override
        public void handleRecoverOk(String consumerTag) {
        }
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
            _channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
}
