代码示例 本节介绍了RabbitMQ接入的代码示例。 安全接入点(PLAIN、AMQPLAIN授权机制) java import com.rabbitmq.client.; import java.io.IOException; public class RabbitmqAmqpDemo { public static void main(String[] args) throws Exception { String host "192.168.0.0"; //安全接入点ip Integer port 5672; //安全接入点port String username "xxx"; //集群管理用户列表的用户名 String password "xxx"; //集群管理用户列表的密码 String vhost "/"; String exchangeName "extest"; String queueName "qutest"; ConnectionFactory connectionFactory new ConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); Connection connection connectionFactory.newConnection(); Channel channel connection.createChannel(); channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, "test"); String message "Hello Aop"; for (int i 0; i < 10; i++) { channel.basicPublish(exchangeName, "test", null, message.getBytes()); System.out.println("消息发送成功"); } Channel consumeChannel connection.createChannel(); Consumer consumer new DefaultConsumer(consumeChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String messageGet new String(body, "UTF8"); if (messageGet.equals(message)) { System.out.println("消息消费成功"); } } }; consumeChannel.setDefaultConsumer(consumer); consumeChannel.basicConsume(queueName, false, consumer); Thread.sleep(10000); } } SSL接入点(EXTERNAL授权机制) java import com.rabbitmq.client.; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; import java.io.FileInputStream; import java.io.IOException; import java.security.KeyStore; public class RabbitmqExternalDemo { public static void main(String[] args) throws Exception { String host "192.168.0.0"; //SSL接入点ip int port 5671; //SSL接入点port //以下2个ssl文件可通过控制台获取安装包, 具体的获取方式可以查看2.2.1接入步骤的第二小节 String ksFile "D:tmpsslclientrabbitmqkey.p12"; String tksFile "D:tmpssltruststore"; String vhost "/"; String exchangeName "extest"; String queueName "qutest"; char[] keyPassphrase "W3zT98Zz9Io".toCharArray(); KeyStore ks KeyStore.getInstance("PKCS12"); ks.load(new FileInputStream(ksFile), keyPassphrase); KeyManagerFactory kmf KeyManagerFactory.getInstance("SunX509"); kmf.init(ks, keyPassphrase); char[] trustPassphrase null; trustPassphrase "W3zT98Zz9Io".toCharArray(); KeyStore tks KeyStore.getInstance("JKS"); tks.load(new FileInputStream(tksFile), trustPassphrase); TrustManagerFactory tmf TrustManagerFactory.getInstance("SunX509"); tmf.init(tks); SSLContext c SSLContext.getInstance("tlsv1.2"); c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); ConnectionFactory connectionFactory new ConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setVirtualHost(vhost); connectionFactory.setSaslConfig(DefaultSaslConfig.EXTERNAL); connectionFactory.useSslProtocol(c); Connection connection connectionFactory.newConnection(); Channel channel connection.createChannel(); channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, "test"); String message "Hello Aop"; for (int i 0; i < 10; i++) { channel.basicPublish(exchangeName, "test", null, message.getBytes()); System.out.println("消息发送成功"); } Channel consumeChannel connection.createChannel(); Consumer consumer new DefaultConsumer(consumeChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String messageGet new String(body, "UTF8"); if (messageGet.equals(message)) { System.out.println("消息消费成功"); } } }; consumeChannel.setDefaultConsumer(consumer); consumeChannel.basicConsume(queueName, false, consumer); Thread.sleep(10000); } }