之前的示例里使用fanout 和 direct两种类型的exchange,解决的是群发和指定类型的消息。本次示例使用topic实现正则匹配的消息队列处理.
topic
发送到topic交换的消息不能是任意的 routing_key 。 它必须是单词列表,由点分隔。这些词可以是任何东西,但通常它们指定与消息相关的一些特征。一些有效的路由键示例:“ stock.usd.nyse ”、“ nyse.vmw ”、“ quick.orange.rabbit ”。 routing_key中可以有任意多的单词,最多为 255 个字节。
- *(star) can substitute for exactly one word. 一个单词
- #(hash) can substitute for zero or more words. 0或多个单词
生产者代码
import sys
import pika
severity = sys.argv[1] if len(sys.argv) > 1 else 'topic' # 第一个参数 作为消息类型
message = ' '.join(sys.argv[2:]) or "Topic Queues!"
credentials = pika.PlainCredentials(username="admin", password="admin") # 连接凭证 如果不指定 则默认使用 guest guest
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.191',
virtual_host='/',
credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='topic', exchange_type='topic') # 如果队列没有创建,就创建这个队列
channel.basic_publish(exchange='topic', routing_key=severity, body=message)
connection.close()
msg = "Message '{}' has been sent to mq".format(message)
print(msg)
消费者代码
import pika
import sys
bind_keys = sys.argv[1:] # 使用参数作为队列关键字 可以输入多个关键字
if not bind_keys:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
def callback(ch, method, propertities,body):
print(" [x] Received %r" % body)
# ########################### 消费者 ###########################
credentials = pika.PlainCredentials(username="admin", password="admin") # 连接凭证 默认为 guest
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.191',
virtual_host='/',
credentials=credentials))
channel = connection.channel()
# 声明exchange
channel.exchange_declare(exchange='topic', exchange_type='topic')
# 匿名queue
result = channel.queue_declare(queue='', exclusive=True, )
queue_name = result.method.queue
# queue绑定
for bind_key in bind_keys:
channel.queue_bind(
exchange='topic', queue=queue_name, routing_key=bind_key)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for message. To exit press CTRL+C')
channel.start_consuming()
结果验证
启动两个消费者
# 1号消费者
python consumer.py "*.*.rabbit"
# 2号消费者
python consumer.py "*.orange.*" "layzy.#"
多次运行生产者
python producer.py "ab.ab.rabbit" ab.ab.rabbit
# 命中1号消费者
python producer.py "ab.rabbit" ab.ab.rabbit
# 没有命中任何消费者
python producer.py "ll.ll.ll.ll.layzy" ll.ll.ll.ll.layzy
# 没有命中任何消费者
python producer.py "layzy.ll.ll.ll" layzy
# 命中2号消费者
python producer.py "oo.orange.layzy" oring.layzy
# 命中2号消费者
python producer.py "layzy.l.rabbit" lr
# 同时命中1、2号消费者