生产消费验证 本文介绍分布式消息服务RocketMQ入门指引的生产消费验证内容。 背景信息 RocketMQ的生产消费验证是指在使用RocketMQ进行消息生产和消费时的验证过程。具体而言,验证包括以下几个方面: 生产者验证:RocketMQ提供了丰富的生产者API,开发人员可以使用这些API将消息发送到RocketMQ的消息队列中。在验证阶段,可以通过发送消息并检查返回结果来确保消息成功发送到Broker节点。此外,生产者还应该验证消息的顺序性、事务性以及可靠性等方面。 消费者验证:RocketMQ的消费者可以订阅特定的消息主题,从而消费这些主题下的消息。在验证阶段,消费者应该能够正确地从Broker节点拉取消息并进行消费处理。消费者还可以验证消息的顺序性、重试机制以及消息过滤等功能。 操作步骤 1、 天翼云官网点击控制中心,选择产品分布式消息服务RocketMQ。 2、 登录分布式消息服务RocketMQ控制台,点击右上角地域选择对应资源池。 3、 进入实例列表,点击【管理】按钮进入管理菜单。 4、 进入Topic管理菜单,点击【拨测】按钮,进行生产消费的拨测验证,验证开通的消息实例和主题。 1)生产测试拨测: 选择消息类型,默认普通消息。 填写需要产生的测试消息数量,以及每条消息的大小,默认每条消息1KB,建议不超过4MB(4096KB)。 选择已建的消息主题,若无选项,请新增主题,详见上文创建主题和订阅组。 点击【测试】按钮,按照已填写规格及数量产生测试消息数据,展示消息数据的信息,包括消息ID(messageID)、发送状态、主题名(topic名)、Broker名、队列ID。 拨测功能涉及消息发送状态码,以下是RocketMQ消息发送状态码及其说明: ✧ SENDOK(发送成功):表示消息成功发送到了消息服务器。 ✧ FLUSHDISKTIMEOUT(刷新磁盘超时):表示消息已经成功发送到消息服务器,但是刷新到磁盘上超时。这可能会导致消息服务器在宕机后,尚未持久化到磁盘上的数据丢失。 ✧ FLUSHSLAVETIMEOUT(刷新从服务器超时):表示消息已经成功发送到消息服务器,但是刷新到从服务器上超时。这可能会导致主从同步不一致。 ✧ SLAVENOTAVAILABLE(从服务器不可用):表示消息已经成功发送到消息服务器,但是从服务器不可用。这可能是由于网络故障或从服务器宕机引起的。 ✧ UNKNOWNERROR(未知错误):表示发送消息时遇到了未知的错误。一般情况下建议重试发送消息。 ✧ MESSAGESIZEEXCEEDED(消息大小超过限制):表示消息的大小超过了消息服务器的限制。需要检查消息的大小是否合适。 ✧ PRODUCETHROTTLE(消息生产被限流):表示消息生产者的频率超出了消息服务器的限制。这可能是由于消息发送频率过高引起的。 ✧ SERVICENOTAVAILABLE(服务不可用):表示消息服务器不可用。这可能是由于网络故障或者消息服务器宕机引起的。 请注意,以上状态码仅适用于RocketMQ消息发送阶段,并且不代表消息是否成功被消费者接收。同时,这些状态码也可能因版本变化而有所不同,建议查阅官方文档获取最新信息。 2)消费测试拨测: 选择消息顺序,下拉选择无序/有序,默认选项为无序。 RocketMQ是一种开源的分布式消息中间件,它支持有序消息和无序消息。 ✧ 有序消息是指消息的消费顺序与发送顺序完全一致。在某些业务场景下,消息的处理需要保证顺序性,例如订单的处理或者任务的执行。RocketMQ提供了有序消息的支持,通过指定消息的顺序属性或使用消息队列的分区机制,可以确保消息按照指定的顺序进行消费。 ✧ 无序消息则是指消息的消费顺序与发送顺序无关。无序消息的特点是高吞吐量和低延迟,适用于一些不要求严格顺序的业务场景,如日志收集等。 在RocketMQ中,有序消息和无序消息的实现方式略有不同。有序消息需要借助MessageQueue的分区机制和消费者端的顺序消息消费来实现。而无序消息则是通过消息的发送和接收的并发处理来实现的。 总的来说,RocketMQ既支持有序消息也支持无序消息,根据业务需求选择合适的消息类型来满足业务的要求。 选择消费方式,目前仅提供pull方式。值得注意的是,RocketMQ还提供了推送(push)方式的消费模式,其中消息队列服务器会主动将消息推送给消费者。但在当前仅限于pull方式的消费模式。 填写消费数量。 下拉选择选择已建的消息主题和订阅组,若无选项,请新增主题和订阅组,详见上文创建主题和订阅组。 点击【测试】按钮,按照已填写规格及数量产生消费数据,展示消息数据的信息,包括消息ID(messageID)、主题名称(topicName)、生成时间、存储时间、队列ID、消费状态。 拨测功能涉及消息消费状态码,RocketMQ消费状态码是指在消息消费过程中,对消费结果进行标识的状态码。以下是常见的RocketMQ消费状态码: ✧ CONSUMESUCCESS(消费成功):表示消息成功被消费。 ✧ RECONSUMELATER(稍后重试):表示消费失败,需要稍后再次进行消费。 ✧ CONSUMEFAILURE(消费失败):表示消息消费出现异常或失败。 ✧ SLAVENOTAVAILABLE(从节点不可用):表示消费者无法访问从节点来消费消息。 ✧ NOMATCHEDMESSAGE(无匹配的消息):表示当前没有匹配的消息需要消费。 ✧ OFFSETILLEGAL(偏移量非法):表示消费的偏移量参数不合法。 ✧ BROKERTIMEOUT(Broker超时):表示由于Broker超时导致消费失败。 5、 用户应用按照规范接入RocketMQ,发送、消费消息。 1)生产者示例API 以下适用于南京3、上海7、重庆2、乌鲁木齐27、保定、石家庄20、内蒙6、晋中、北京5节点。 ctgmq引擎版本,SDK下载方式详见环境准备其他工具章节。 package com.ctg.guide; import com.ctg.mq.api.CTGMQFactory; import com.ctg.mq.api.IMQProducer; import com.ctg.mq.api.PropertyKeyConst; import com.ctg.mq.api.bean.MQMessage; import com.ctg.mq.api.bean.MQSendResult; import com.ctg.mq.api.exception.MQException; import com.ctg.mq.api.exception.MQProducerException; import java.util.Properties; / Producer,发送消息 / public class Producer { public static void main(String[] args) throws InterruptedException, MQException { Properties properties new Properties(); properties.setProperty(PropertyKeyConst.ProducerGroupName, "producergroup"); properties.setProperty(PropertyKeyConst.NamesrvAddr, "10.50.208.1:9876;10.50.208.2:9876;10.50.208.3:9876"); properties.setProperty(PropertyKeyConst.NamesrvAuthID, "app4test"); properties.setProperty(PropertyKeyConst.NamesrvAuthPwd, ""); properties.setProperty(PropertyKeyConst.ClusterName, "defaultMQBrokerCluster"); properties.setProperty(PropertyKeyConst.TenantID, "defaultMQTenantID"); IMQProducer producer CTGMQFactory.createProducer(properties);//建议应用启动时创建 int connectResult producer.connect(); if(connectResult ! 0){ return; } for (int i 0; i mqResultList) { //mqResultList 默认为1,可通过批量消费数量设置 for(MQResult result : mqResultList) { //TODO System.out.println(result); } return ConsumerTopicStatus.CONSUMESUCCESS;//对消息批量确认(成功) //return ConsumerTopicStatus.RECONSUMELATER;//对消息批量确认(失败) } }); } } 以下适用于华东1、华北2、西南1、华南2、上海36、青岛20、长沙42、南昌5、武汉41、杭州7、西南2贵州、太原4、郑州5、西安7、呼和浩特3节点。 rocketmq引擎版本,SDK下载方式详见环境准备其他工具章节。 importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer; importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; importorg.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; importorg.apache.rocketmq.client.exception.MQClientException; publicclassPushConsumer{ public static void main(String[] args) throws Exception{ AclClientRPCHook rpcHook newAclClientRPCHook(newSessionCredentials(ACCESSKEY, SECRETKEY)); DefaultMQPushConsumer consumer new DefaultMQPushConsumer(rpcHook); consumer.setConsumerGroup("ConsumerGroupName"); // 填入元数据地址 consumer.setNamesrvAddr("XXX:xxx"); ; //如果需要开启SSL,请增加此行代码 consumer.subscribe("YOUR TOPIC",""); consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)>{ System.out.printf("%s Receive New Messages: %s %n",Thread.currentThread().getName(), msgs); returnConsumeConcurrentlyStatus.CONSUMESUCCESS; }); consumer.start(); System.out.printf("Consumer Started.%n"); } } 示例参数说明: Namesrv地址 Namesrv地址可从控制台查看,多个地址按分号分隔。 应用用户和密码 这个应用用户和密码就是控制台创建的应用用户和密码。 租户id和集群名 集群名和租户id可以从应用用户管理查询。 订阅组 消费组名需要在控制台提前创建好。