如何实现RabbitMQ的高性能 使用集群的负载均衡 队列的性能受单个CPU内核控制,当一个RabbitMQ节点处理消息的能力达到瓶颈时,可以通过集群进行扩展,从而达到提升吞吐量的目的。 使用多个节点,集群会自动将队列均衡的创建在各个节点上。除了使用集群模式,您还可以使用以下两个插件优化负载均衡: Consistent hash exchange 该插件使用交换器来平衡队列之间的消息。根据消息的路由键,发送到交换器的消息一致且均匀地分布在多个队列中。该插件创建路由键的散列,并将消息传播到与该交换器具有绑定关系的队列中。使用此插件时,需要确保消费者从所有队列中消费。 使用示例如下: 使用不同的路由键来路由消息。 public class ConsistentHashExchangeExample1 { private static String CONSISTENTHASHEXCHANGETYPE "xconsistenthash"; public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException { ConnectionFactory cf new ConnectionFactory(); Connection conn cf.newConnection(); Channel ch conn.createChannel(); for (String q : Arrays.asList("q1", "q2", "q3", "q4")) { ch.queueDeclare(q, true, false, false, null); ch.queuePurge(q); } ch.exchangeDeclare("e1", CONSISTENTHASHEXCHANGETYPE, true, false, null); for (String q : Arrays.asList("q1", "q2")) { ch.queueBind(q, "e1", "1"); } for (String q : Arrays.asList("q3", "q4")) { ch.queueBind(q, "e1", "2"); } ch.confirmSelect(); AMQP.BasicProperties.Builder bldr new AMQP.BasicProperties.Builder(); for (int i 0; i args new HashMap<>(); args.put("hashheader", "hashon"); ch.exchangeDeclare(EXCHANGE, EXCHANGETYPE, true, false, args); for (String q : Arrays.asList("q1", "q2")) { ch.queueBind(q, EXCHANGE, "1"); } for (String q : Arrays.asList("q3", "q4")) { ch.queueBind(q, EXCHANGE, "2"); } ch.confirmSelect(); for (int i 0; i hdrs new HashMap<>(); hdrs.put("hashon", String.valueOf(i)); ch.basicPublish(EXCHANGE, "", bldr.headers(hdrs).build(), "".getBytes("UTF8")); } ch.waitForConfirmsOrDie(10000); System.out.println("Done publishing!"); System.out.println("Evaluating results..."); // wait for one stats emission interval so that queue counters // are uptodate in the management UI Thread.sleep(5); System.out.println("Done."); conn.close(); } } 使用消息属性来路由消息,例如messageid、correlationid或timestamp属性。该方式需要使用“hashproperty”参数来声明交换器,且消息必须带有所选择的消息属性,否则会被路由到相同的队列。 public class ConsistentHashExchangeExample2 { public static final String EXCHANGE "e2"; private static String EXCHANGETYPE "xconsistenthash"; public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException { ConnectionFactory cf new ConnectionFactory(); Connection conn cf.newConnection(); Channel ch conn.createChannel(); for (String q : Arrays.asList("q1", "q2", "q3", "q4")) { ch.queueDeclare(q, true, false, false, null); ch.queuePurge(q); } Map args new HashMap<>(); args.put("hashheader", "hashon"); ch.exchangeDeclare(EXCHANGE, EXCHANGETYPE, true, false, args); for (String q : Arrays.asList("q1", "q2")) { ch.queueBind(q, EXCHANGE, "1"); } for (String q : Arrays.asList("q3", "q4")) { ch.queueBind(q, EXCHANGE, "2"); } ch.confirmSelect(); for (int i 0; i hdrs new HashMap<>(); hdrs.put("hashon", String.valueOf(i)); ch.basicPublish(EXCHANGE, "", bldr.headers(hdrs).build(), "".getBytes("UTF8")); } ch.waitForConfirmsOrDie(10000); System.out.println("Done publishing!"); System.out.println("Evaluating results..."); // wait for one stats emission interval so that queue counters // are uptodate in the management UI Thread.sleep(5); System.out.println("Done."); conn.close(); } }