RabbitMQ的发布订阅演示
publish subscribe 发布订阅,群发
这个演示比之前的内容多出了一个X。RabbitMQ 消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上,生产者经常甚至根本不知道消息是否会被传送到任何队列。
相反,生产者只能将消息发送到exchange。exchange是一件非常简单的事情。一方面它接收来自生产者的消息,另一方面将它们推送到队列中。exchange所必须确切地知道如何处理它收到的消息。其规则由交换类型exchange定义 。
本演示的目标是实现生产者的消息发送到多个队列中去,可以被多个消费者使用,使用到了exchange中的fanout 类型。
fanout就是将他收到的所有消息广播到所绑定的所有队列中
关于exchange的类型可以看这个文档。
生产者
与之前略有不同的是,在连接RabbitMQ中,生产者只是定义了exchange,而消费者这是使用随机名称的queue,然后绑定到了已经定义的exchange上。
producer.py
import sys
import pika
message = ' '.join(sys.argv[1:]) or "pub sub"
credentials = pika.PlainCredentials(username="admin", password="admin") # 连接凭证 如果不指定 则默认使用 guest guest
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.191',
virtual_host='/',
credentials=credentials))
# 创建exchange
channel = connection.channel()
channel.exchange_declare(exchange='pub sub', exchange_type="fanout")
channel.basic_publish(exchange='pub sub',
routing_key='',
body=message) # 往队列里发的消息内容
msg = "Message '{}' has been sent to mq".format(message)
print(msg)
消费者
consumer.py
import pika
def callback(ch, method, propertities,body):
print(" [x] Received %r" % body)
# ########################### 消费者 ###########################
credentials = pika.PlainCredentials(username="admin", password="admin") # 连接凭证 默认为 guest
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.191',
virtual_host='/',
credentials=credentials))
channel = connection.channel()
# 声明exchange
channel.exchange_declare(exchange='pub sub', exchange_type='fanout')
# 匿名queue
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# exchange与queue绑定
channel.queue_bind(exchange='pub sub', queue=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for message. To exit press CTRL+C')
channel.start_consuming()
验证方法
- 启动多个消费者脚本进行
- 多次运行生产者脚本
- 观察消费者脚本,可以发现所有消费者脚本都会受到生产者的消息