收发定时/延时消息 分布式消息服务RocketMQ版支持任意时间的定时消息,最大推迟时间可达到40天。 定时消息即生产者生产消息到分布式消息服务RocketMQ版后,消息不会立即被消费,而是延迟到设定的时间点后才会发送给消费者进行消费。 收发消息前,请参考收集连接信息收集RocketMQ所需的连接信息。 准备环境 1. 在命令行输入python,检查是否已安装Python。得到如下回显,说明Python已安装。 PS C:> python Python 3.9.9 (tags/v3.9.9:ccb0e6a, Nov 15 2021, 18:08:50) [MSC v.1929 64 bit (AMD64)] on win32 Type "help", "copyright", "credits" or "license" for more information. 如果未安装Python,请使用以下命令安装: pip install rocketmqclientpython 2. 安装librocketmq库和rocketmqclientpython。 说明 建议下载rocketmqclientcpp2.2.0,获取librocketmq库。 3. 将librocketmq.so添加到系统动态库搜索路径。 1. 查找librocketmq.so的路径。 find / name librocketmq.so 2. 将librocketmq.so添加到系统动态库搜索路径。 ln s /查找到的librocketmq.so路径/librocketmq.so /usr/lib sudo ldconfig 以下示例代码中的参数说明如下,请参考收集连接信息获取参数值。 GROUP:表示消费组名称。 ENDPOINT:表示实例连接地址和端口。 TOPIC:表示Topic名称。 发送消息 参考如下示例代码。 import datetime from rocketmq.client import Producer, Message from rocketmq.exceptions import RocketMQException endpoint "${ENDPOINT}" 填写分布式消息服务RocketMQ控制台Namesrv接入点 accesskey "${ACCESSKEY}" 填写AccessKey 在分布式消息服务RocketMQ控制台用户管理菜单中创建的用户ID accesssecret "${SECRETKEY}" 填写SecretKey 在分布式消息服务RocketMQ控制台用户管理菜单中创建的用户密钥 topic "${TOPIC}" 填写Topic,在管理控制台创建 producergroup "${GROUP}" 生产者组group 初始化生产者 producer Producer(producergroup) producer.setnamesrvaddr(endpoint) producer.setsessioncredentials(accesskey, accesssecret, "") 启动生产者 try: producer.start() except RocketMQException as e: print('start producer error:', e) exit(1) msg Message(topic) msg.setbody("Hello RocketMQ") delaytime 10 发送任意延迟消息,时间单位为毫秒,如下所示:消息将在10s后投递 delaytimestamp int((datetime.datetime.now() + datetime.timedelta(secondsdelaytime)).timestamp() 1000) msg.setproperty('STARTDELIVERTIME', str(delaytimestamp)) 发送消息 try: result producer.sendsync(msg) print('send result:', result) except RocketMQException as e: print('send message error:', e) producer.shutdown() exit(1) 关闭生产者实例,释放资源 producer.shutdown()