生产消费 生产消息 生产者需要创建一个连接到RabbitMQ服务器,然后创建一个通道(Channel)来进行消息的发布。在发布消息之前,生产者通常需要先声明一个队列,以确保消息能够被正确地路由和接收。 一旦连接和通道建立完成,生产者可以使用basicPublish()方法将消息发布到指定的队列。在发布消息时,需要指定目标队列的名称、消息内容以及其他的属性。 发布消息后,RabbitMQ将会将消息存储在队列中,等待消费者来接收。消费者可以使用相同的客户端库来创建连接和通道,并使用basicConsume()方法来订阅队列并接收消息。一旦有消息到达队列,消费者就会收到消息并进行相应的处理。 通过使用RabbitMQ,生产者和消费者可以实现解耦,即它们可以独立地进行开发和部署。生产者可以按照自己的节奏和需求发布消息,而消费者可以根据自己的处理能力和负载来接收和处理消息。 代码示例: import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class RabbitmqProducer { // private final static String EXCHANGENAME "YOUR EXCHANGE NAME"; private final static String QUEUENAME "YOUR QUEUE NAME"; // private final static String ROUTINGKEY "YOUR ROUTING KEY"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // 创建连接工厂 ConnectionFactory factory new ConnectionFactory(); // 设置主机ip factory.setHost("YOUR HOST IP"); // 设置amqp的端口号 factory.setPort(YOUR PORT); // 设置用户名密码 factory.setUsername("YOUR USER NAME"); factory.setPassword("YOUR USER PASSWORD"); // 设置Vhost,需要在控制台先创建 factory.setVirtualHost("YOUR VHOST"); //基于网络环境合理设置超时时间 factory.setConnectionTimeout(30 1000); factory.setHandshakeTimeout(30 1000); factory.setShutdownTimeout(0); // 创建一个连接 Connection connection factory.newConnection(); // 创建一个频道 Channel channel connection.createChannel(); // 发送方消息确认,channel.confirmSelect(); // 启用发送方事务机制,channel.txSelect(); // 指定一个队列 channel.queueDeclare(QUEUENAME, false, false, false, null); for (int i 0; i < 100; i++) { // 发送的消息 String message "Hello RabbitMQ!" + i; // 往队列中发送一条消息,使用默认的交换器 channel.basicPublish("", QUEUENAME, null, message.getBytes(StandardCharsets.UTF8)); // 使用自定义交换器,需要在管理台预先建好,并设置routing key // channel.basicPublish(EXCHANGENAME, ROUTINGKEY, null, message.getBytes(StandardCharsets.UTF8)); System.out.println(" [x] Sent '" + message + "'"); TimeUnit.MILLISECONDS.sleep(100); } //关闭频道和连接 channel.close(); connection.close(); } } 消息发送后,可以进入控制台,在实例列表的队列选项卡查看消息发送状态。