生产消费 消费消息 消费者需要创建一个连接到RabbitMQ服务器,然后创建一个通道(Channel)来进行消息的订阅。在订阅消息之前,消费者通常需要先声明一个队列,以确保能够正确地接收和处理消息。 一旦连接和通道建立完成,消费者可以使用basicConsume()方法来订阅指定的队列,并注册一个回调函数来处理接收到的消息。当有消息到达队列时,RabbitMQ会将消息推送给消费者,消费者的回调函数将被调用,从而可以对消息进行处理。 消费者可以根据自己的需求设置消息的确认机制。在默认情况下,消费者在接收到消息后,会自动向RabbitMQ发送一个确认(ack)消息,表示已成功接收并处理该消息。如果消费者在处理消息时发生错误,可以选择不发送确认消息,从而使消息重新进入队列,以便其他消费者重新处理。 通过使用RabbitMQ,消费者可以实现解耦,即它们可以独立地进行开发和部署。消费者可以根据自己的处理能力和负载来接收和处理消息,从而实现负载均衡和水平扩展。 代码示例: import com.rabbitmq.client.; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class RabbitmqConsumer { //队列名称 private final static String QUEUENAME "Hello,RabbitMQ"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory new ConnectionFactory(); //设置主机ip factory.setHost("YOUR HOST IP"); //设置amqp的端口号 factory.setPort(YOUR PORT); //设置用户名密码 factory.setUsername("YOUR USER NAME"); factory.setPassword("YOUR USER PASSWORD"); //设置Vhost,需要在控制台先创建 factory.setVirtualHost("YOUR VHOST"); //基于网络环境合理设置超时时间 factory.setConnectionTimeout(30 1000); factory.setHandshakeTimeout(30 1000); factory.setShutdownTimeout(0); Connection connection factory.newConnection(); Channel channel connection.createChannel(); //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。 channel.queueDeclare(QUEUENAME, false, false, false, null); System.out.println(" [] Waiting for messages. To exit press CTRL+C"); Consumer consumer new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message new String(body, StandardCharsets.UTF8); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUENAME, true, consumer); } } 完成上述步骤后,可以在控制台查看消费者是否启动成功。 完成以上所有步骤后,就成功接入了RabbitMQ服务,可以用消息队列进行消息发送和订阅了。