活动

天翼云最新优惠活动,涵盖免费试用,产品折扣等,助您降本增效!
热门活动
  • 618智算钜惠季 爆款云主机2核4G限时秒杀,88元/年起!
  • 免费体验DeepSeek,上天翼云息壤 NEW 新老用户均可免费体验2500万Tokens,限时两周
  • 云上钜惠 HOT 爆款云主机全场特惠,更有万元锦鲤券等你来领!
  • 算力套餐 HOT 让算力触手可及
  • 天翼云脑AOne NEW 连接、保护、办公,All-in-One!
  • 中小企业应用上云专场 产品组合下单即享折上9折起,助力企业快速上云
  • 息壤高校钜惠活动 NEW 天翼云息壤杯高校AI大赛,数款产品享受线上订购超值特惠
  • 天翼云电脑专场 HOT 移动办公新选择,爆款4核8G畅享1年3.5折起,快来抢购!
  • 天翼云奖励推广计划 加入成为云推官,推荐新用户注册下单得现金奖励
免费活动
  • 免费试用中心 HOT 多款云产品免费试用,快来开启云上之旅
  • 天翼云用户体验官 NEW 您的洞察,重塑科技边界

智算服务

打造统一的产品能力,实现算网调度、训练推理、技术架构、资源管理一体化智算服务
智算云(DeepSeek专区)
科研助手
  • 算力商城
  • 应用商城
  • 开发机
  • 并行计算
算力互联调度平台
  • 应用市场
  • 算力市场
  • 算力调度推荐
一站式智算服务平台
  • 模型广场
  • 体验中心
  • 服务接入
智算一体机
  • 智算一体机
大模型
  • DeepSeek-R1-昇腾版(671B)
  • DeepSeek-R1-英伟达版(671B)
  • DeepSeek-V3-昇腾版(671B)
  • DeepSeek-R1-Distill-Llama-70B
  • DeepSeek-R1-Distill-Qwen-32B
  • Qwen2-72B-Instruct
  • StableDiffusion-V2.1
  • TeleChat-12B

应用商城

天翼云精选行业优秀合作伙伴及千余款商品,提供一站式云上应用服务
进入甄选商城进入云市场创新解决方案
办公协同
  • WPS云文档
  • 安全邮箱
  • EMM手机管家
  • 智能商业平台
财务管理
  • 工资条
  • 税务风控云
企业应用
  • 翼信息化运维服务
  • 翼视频云归档解决方案
工业能源
  • 智慧工厂_生产流程管理解决方案
  • 智慧工地
建站工具
  • SSL证书
  • 新域名服务
网络工具
  • 翼云加速
灾备迁移
  • 云管家2.0
  • 翼备份
资源管理
  • 全栈混合云敏捷版(软件)
  • 全栈混合云敏捷版(一体机)
行业应用
  • 翼电子教室
  • 翼智慧显示一体化解决方案

合作伙伴

天翼云携手合作伙伴,共创云上生态,合作共赢
天翼云生态合作中心
  • 天翼云生态合作中心
天翼云渠道合作伙伴
  • 天翼云代理渠道合作伙伴
天翼云服务合作伙伴
  • 天翼云集成商交付能力认证
天翼云应用合作伙伴
  • 天翼云云市场合作伙伴
  • 天翼云甄选商城合作伙伴
天翼云技术合作伙伴
  • 天翼云OpenAPI中心
  • 天翼云EasyCoding平台
天翼云培训认证
  • 天翼云学堂
  • 天翼云市场商学院
天翼云合作计划
  • 云汇计划
天翼云东升计划
  • 适配中心
  • 东升计划
  • 适配互认证

开发者

开发者相关功能入口汇聚
技术社区
  • 专栏文章
  • 互动问答
  • 技术视频
资源与工具
  • OpenAPI中心
开放能力
  • EasyCoding敏捷开发平台
培训与认证
  • 天翼云学堂
  • 天翼云认证
魔乐社区
  • 魔乐社区

支持与服务

为您提供全方位支持与服务,全流程技术保障,助您轻松上云,安全无忧
文档与工具
  • 文档中心
  • 新手上云
  • 自助服务
  • OpenAPI中心
定价
  • 价格计算器
  • 定价策略
基础服务
  • 售前咨询
  • 在线支持
  • 在线支持
  • 工单服务
  • 建议与反馈
  • 用户体验官
  • 服务保障
  • 客户公告
  • 会员中心
增值服务
  • 红心服务
  • 首保服务
  • 客户支持计划
  • 专家技术服务
  • 备案管家

了解天翼云

天翼云秉承央企使命,致力于成为数字经济主力军,投身科技强国伟大事业,为用户提供安全、普惠云服务
品牌介绍
  • 关于天翼云
  • 智算云
  • 天翼云4.0
  • 新闻资讯
  • 天翼云APP
基础设施
  • 全球基础设施
  • 信任中心
最佳实践
  • 精选案例
  • 超级探访
  • 云杂志
  • 分析师和白皮书
  • 天翼云·创新直播间
市场活动
  • 2025智能云生态大会
  • 2024智算云生态大会
  • 2023云生态大会
  • 2022云生态大会
  • 天翼云中国行
天翼云
  • 活动
  • 智算服务
  • 产品
  • 解决方案
  • 应用商城
  • 合作伙伴
  • 开发者
  • 支持与服务
  • 了解天翼云
      • 文档
      • 控制中心
      • 备案
      • 管理中心
      文档中心

      分布式消息服务RocketMQ

      分布式消息服务RocketMQ

        • 产品动态
        • 服务公告
        • 2024
        • 【优惠】正式开放2年7折,3年5折包年折扣
        • 【优惠】分布式消息服务RocketMQ增加包年优惠折扣和产品资费下调
        • 【通知】CTGMQ引擎调整为白名单特性
        • 【通知】通用型主机规格调整为白名单特性
        • 产品简介
        • 产品定义
        • 产品优势
        • 功能特性
        • 应用场景
        • 产品规格
        • 开源对比
        • 分布式消息产品选型
        • 使用限制
        • 安全方案
        • 名词解释
        • 产品架构
        • 与其他服务关系
        • 计费说明
        • 产品资费
        • 新资费
        • 旧资费
        • 计费项
        • 计费模式
        • 续费、到期与欠费
        • 退订
        • 快速入门
        • 入门指引
        • 环境准备
        • 创建RocketMQ实例
        • 创建主题和订阅组
        • 创建应用用户和密码
        • 生产消费验证
        • 用户指南
        • 创建实例
        • 实例管理
        • 查看实例
        • 修改实例
        • 连接实例
        • 扩容
        • 按需转包周期
        • 续订和退订
        • 概览
        • 集群信息
        • 标签管理
        • Topic管理
        • 查看Topic
        • 创建Topic
        • 修改Topic
        • 删除Topic
        • 拨测
        • 重置消费位置
        • Topic导入/导出
        • 消费组管理
        • 查看消费组
        • 创建消费组
        • 修改消费组
        • 删除消费组
        • 消费组导入/导出
        • 查看订阅关系
        • 用户权限管理
        • 消息查询
        • 查询消息
        • 消息轨迹
        • 生产者实例查询
        • 消费者实例查询
        • 监控与告警
        • 监控指标说明
        • 集群监控
        • 仪表盘
        • 配置告警
        • 最佳实践
        • 生产者
        • 消费者
        • topic、queue的规划
        • Java客户端Pull和Push的选择:Java客户端必须使用Push Consumer
        • 有序消费和无序消费的选择
        • 消费幂等
        • 业务消息设计:Topic与Tag
        • 同组Consumer订阅关系一致
        • 开发指南
        • 概述
        • 收集连接信息
        • Java
        • 收发普通消息
        • 收发顺序消息
        • 收发事务消息
        • 收发定时/延时消息
        • 消费限流
        • Go
        • 收发普通消息
        • 收发顺序消息
        • 收发事务消息
        • 收发定时/延时消息
        • Python
        • 收发普通消息
        • 收发顺序消息
        • 收发事务消息
        • 收发定时/延时消息
        • 性能白皮书
        • RocketMQ性能白皮书
        • API参考
        • API使用说明
        • SDK参考
        • SDK概述
        • RocketMQ C++ SDK
        • RocketMQ .NET SDK
        • RocketMQ PHP SDK
        • 常见问题
        • 计费类
        • 购买类
        • 操作类
        • 管理类
        • 产品类
        • 相关协议
        • 服务等级协议
        • 服务条款
          无相关产品

          本页目录

          帮助中心分布式消息服务RocketMQ快速入门生产消费验证
          生产消费验证
          更新时间 2024-12-04 18:26:30
          • 新浪微博
          • 微信
            扫码分享
          • 复制链接
          最近更新时间: 2024-12-04 18:26:30
          分享文章
          • 新浪微博
          • 微信
            扫码分享
          • 复制链接
          本文介绍分布式消息服务RocketMQ入门指引的生产消费验证内容。

          背景信息

          RocketMQ的生产消费验证是指在使用RocketMQ进行消息生产和消费时的验证过程。具体而言,验证包括以下几个方面:

          • 生产者验证:RocketMQ提供了丰富的生产者API,开发人员可以使用这些API将消息发送到RocketMQ的消息队列中。在验证阶段,可以通过发送消息并检查返回结果来确保消息成功发送到Broker节点。此外,生产者还应该验证消息的顺序性、事务性以及可靠性等方面。
          • 消费者验证:RocketMQ的消费者可以订阅特定的消息主题,从而消费这些主题下的消息。在验证阶段,消费者应该能够正确地从Broker节点拉取消息并进行消费处理。消费者还可以验证消息的顺序性、重试机制以及消息过滤等功能。

          操作步骤

          1、 天翼云官网点击控制中心,选择产品分布式消息服务RocketMQ。

          2、 登录分布式消息服务RocketMQ控制台,点击右上角地域选择对应资源池。

          进入实例列表,点击【管理】按钮进入管理菜单。

          3、 进入实例列表,点击【管理】按钮进入管理菜单。

          4、 进入主题管理菜单,点击【拨测】按钮,进行生产消费的拨测验证,验证开通的消息实例和主题。

          61.png

          1)生产测试拨测:

          62.png

          • 选择消息类型,默认普通消息。
          • 填写需要产生的测试消息数量,以及每条消息的大小,默认每条消息1KB,建议不超过4MB(4096KB)。
          • 选择已建的消息主题,若无选项,请新增主题,详见上文创建主题和订阅组。
          • 点击【测试】按钮,按照已填写规格及数量产生测试消息数据,展示消息数据的信息,包括消息ID(messageID)、发送状态、主题名(topic名)、Broker名、队列ID。

          拨测功能涉及消息发送状态码,以下是RocketMQ消息发送状态码及其说明:

          ✧ SEND_OK(发送成功):表示消息成功发送到了消息服务器。

          ✧ FLUSH_DISK_TIMEOUT(刷新磁盘超时):表示消息已经成功发送到消息服务器,但是刷新到磁盘上超时。这可能会导致消息服务器在宕机后,尚未持久化到磁盘上的数据丢失。

          ✧ FLUSH_SLAVE_TIMEOUT(刷新从服务器超时):表示消息已经成功发送到消息服务器,但是刷新到从服务器上超时。这可能会导致主从同步不一致。

          ✧ SLAVE_NOT_AVAILABLE(从服务器不可用):表示消息已经成功发送到消息服务器,但是从服务器不可用。这可能是由于网络故障或从服务器宕机引起的。

          ✧ UNKNOWN_ERROR(未知错误):表示发送消息时遇到了未知的错误。一般情况下建议重试发送消息。

          ✧ MESSAGE_SIZE_EXCEEDED(消息大小超过限制):表示消息的大小超过了消息服务器的限制。需要检查消息的大小是否合适。

          ✧ PRODUCE_THROTTLE(消息生产被限流):表示消息生产者的频率超出了消息服务器的限制。这可能是由于消息发送频率过高引起的。

          ✧ SERVICE_NOT_AVAILABLE(服务不可用):表示消息服务器不可用。这可能是由于网络故障或者消息服务器宕机引起的。

          请注意,以上状态码仅适用于RocketMQ消息发送阶段,并且并不代表消息是否成功被消费者接收。同时,这些状态码也可能因版本变化而有所不同,建议查阅官方文档获取最新信息。

          2)消费测试拨测:

          63.png

          • 选择消息顺序,下拉选择无序/有序,默认选项为无序。

          RocketMQ是一种开源的分布式消息中间件,它支持有序消息和无序消息。

          ✧ 有序消息是指消息的消费顺序与发送顺序完全一致。在某些业务场景下,消息的处理需要保证顺序性,例如订单的处理或者任务的执行。RocketMQ提供了有序消息的支持,通过指定消息的顺序属性或使用消息队列的分区机制,可以确保消息按照指定的顺序进行消费。

          ✧ 无序消息则是指消息的消费顺序与发送顺序无关。无序消息的特点是高吞吐量和低延迟,适用于一些不要求严格顺序的业务场景,如日志收集等。

          在RocketMQ中,有序消息和无序消息的实现方式略有不同。有序消息需要借助MessageQueue的分区机制和消费者端的顺序消息消费来实现。而无序消息则是通过消息的发送和接收的并发处理来实现的。

          总的来说,RocketMQ既支持有序消息也支持无序消息,根据业务需求选择合适的消息类型来满足业务的要求。

          • 选择消费方式,目前仅提供pull方式。值得注意的是,RocketMQ还提供了推送(push)方式的消费模式,其中消息队列服务器会主动将消息推送给消费者。但在当前仅限于pull方式的消费模式。
          • 填写消费数量。
          • 下拉选择选择已建的消息主题和订阅组,若无选项,请新增主题和订阅组,详见上文创建主题和订阅组。
          • 点击【测试】按钮,按照已填写规格及数量产生消费数据,展示消息数据的信息,包括消息ID(messageID)、主题名称(topicName)、生成时间、存储时间、队列ID、消费状态。

          拨测功能涉及消息消费状态码,RocketMQ消费状态码是指在消息消费过程中,对消费结果进行标识的状态码。以下是常见的RocketMQ消费状态码:

          ✧ CONSUME_SUCCESS(消费成功):表示消息成功被消费。

          ✧ RECONSUME_LATER(稍后重试):表示消费失败,需要稍后再次进行消费。

          ✧ CONSUME_FAILURE(消费失败):表示消息消费出现异常或失败。

          ✧ SLAVE_NOT_AVAILABLE(从节点不可用):表示消费者无法访问从节点来消费消息。

          ✧ NO_MATCHED_MESSAGE(无匹配的消息):表示当前没有匹配的消息需要消费。

          ✧ OFFSET_ILLEGAL(偏移量非法):表示消费的偏移量参数不合法。

          ✧ BROKER_TIMEOUT(Broker超时):表示由于Broker超时导致消费失败。

          5、 用户应用按照规范接入RocketMQ,发送、消费消息。

          1)生产者示例API

          以下适用于南京3、上海7、重庆2、乌鲁木齐27、保定、石家庄20、内蒙6、晋中、北京5节点。

          --ctgmq引擎版本,SDK下载方式详见环境准备-其他工具章节。

          package com.ctg.guide;
          import com.ctg.mq.api.CTGMQFactory;
          import com.ctg.mq.api.IMQProducer;
          import com.ctg.mq.api.PropertyKeyConst;
          import com.ctg.mq.api.bean.MQMessage;
          import com.ctg.mq.api.bean.MQSendResult;
          import com.ctg.mq.api.exception.MQException;
          import com.ctg.mq.api.exception.MQProducerException;
          import java.util.Properties;
          /**
           * Producer,发送消息
           */
          public class Producer {
              public static void main(String[] args) throws InterruptedException, MQException {
          Properties properties = new Properties();
          properties.setProperty(PropertyKeyConst.ProducerGroupName, "producer_group");
          properties.setProperty(PropertyKeyConst.NamesrvAddr, "10.50.208.1:9876;10.50.208.2:9876;10.50.208.3:9876");
          properties.setProperty(PropertyKeyConst.NamesrvAuthID, "app4test");
          properties.setProperty(PropertyKeyConst.NamesrvAuthPwd, "******"); properties.setProperty(PropertyKeyConst.ClusterName, "defaultMQBrokerCluster");
          properties.setProperty(PropertyKeyConst.TenantID, "defaultMQTenantID");
                  IMQProducer producer = CTGMQFactory.createProducer(properties);//建议应用启动时创建
                  int connectResult = producer.connect();
                  if(connectResult != 0){
                      return;
                  }
                  for (int i = 0; i < 10; i++) {
                      try {
                          MQMessage message = new MQMessage(
                                  "test_topic_1",// topic
                                  "ORDER_KEY_"+i,// key
                                  "ORDER_TAG",//tag
                                  ("HELLO ORDER BODY" + i).getBytes()// body
                          );
                          MQSendResult sendResult = producer.send(message);
                          //System.out.println(sendResult);
                          //TODO
                      } catch (MQProducerException e) {
                          //TODO
                      }
                  }
                  producer.close();//建议应用关闭时关闭
              }
          
          

          以下适用于华东1、华北2、西南1、华南2、上海36、青岛20、长沙42、南昌5、武汉41、杭州7、西南2-贵州、太原4、郑州5、西安7、呼和浩特3节点。

          --rocketmq引擎版本,SDK下载方式详见环境准备-其他工具章节。

          importorg.apache.rocketmq.client.exception.MQClientException;
          importorg.apache.rocketmq.client.producer.DefaultMQProducer;
          importorg.apache.rocketmq.client.producer.SendResult;
          importorg.apache.rocketmq.common.message.Message;
          importorg.apache.rocketmq.remoting.common.RemotingHelper;
          importorg.apache.rocketmq.acl.common.AclClientRPCHook;
          
          
          publicclassProducer{
          publicstaticvoidmain(String[] args)throwsMQClientException,InterruptedException{
          
          AclClientRPCHook rpcHook =newAclClientRPCHook(
          newSessionCredentials(ACCESS_KEY, SECRET_KEY));
          DefaultMQProducer producer =newDefaultMQProducer("ProducerGroupName", rpcHook);
          // 填入元数据地址
                  producer.setNamesrvAddr("192.168.0.1:9876");
          //producer.setUseTLS(true);    //如果需要开启SSL,请增加此行代码
                  producer.start();
          
          for(int i =0; i <128; i++)
          try{
          {
          Message msg =newMessage("TopicTest",
          "TagA",
          "OrderID188",
          "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
          SendResult sendResult = producer.send(msg);
          System.out.printf("%s%n", sendResult);
          }
          
          }catch(Exception e){
                          e.printStackTrace();
          }
          
                  producer.shutdown();
          }
          }
          
          

          示例参数说明:

          Namesrv地址

          611.png

          Namesrv地址可从控制台查看,多个地址按分号分隔:

          612.png

          应用用户和密码

          613.png

          这个应用用户和密码就是控制台创建的应用用户和密码。

          租户id和集群名

          614.png

          集群名和租户id可以从应用用户管理查询:

          615.png

          生产组

          616.png

          生产组名不需要提前创建,只需创建生产者时候配置,服务端会自动创建。建议按业务规划好生产组名,严禁按随机方式生成生产组名。

          6.消费者示例API

          以下适用于南京3、上海7、重庆2、乌鲁木齐27、保定、石家庄20、内蒙6、晋中、北京5节点。

          --ctgmq引擎版本,SDK下载方式详见环境准备-其他工具章节。

          package com.ctg.guide;
          
          import com.ctg.mq.api.enums.MQConsumeFromWhere;
          import com.ctg.mq.api.CTGMQFactory;
          import com.ctg.mq.api.IMQPushConsumer;
          import com.ctg.mq.api.PropertyKeyConst;
          import com.ctg.mq.api.bean.MQResult;
          import com.ctg.mq.api.listener.ConsumerTopicListener;
          import com.ctg.mq.api.listener.ConsumerTopicStatus;
          
          import java.util.List;
          import java.util.Properties;
          
          public class PushConsumer {
          
              public static void main(String[] args) throws Exception {
                  final Properties properties = new Properties();
                  properties.setProperty(PropertyKeyConst.ConsumerGroupName, "test_consumer_1");
                  properties.setProperty(PropertyKeyConst.NamesrvAddr, "10.50.208.1:9876;10.50.208.2:9876;10.50.208.3:9876");
                  properties.setProperty(PropertyKeyConst.NamesrvAuthID, "app4test");
                  properties.setProperty(PropertyKeyConst.NamesrvAuthPwd, "******"); properties.setProperty(PropertyKeyConst.ClusterName, "defaultMQBrokerCluster");
          properties.setProperty(PropertyKeyConst.TenantID, "defaultMQTenantID");
          
           
                  IMQPushConsumer consumer = CTGMQFactory.createPushConsumer(properties);
                  int connectResult = consumer.connect();
                  if (connectResult != 0) {
                      return;
                  }
                  consumer.listenTopic("test_topic_1", null, new ConsumerTopicListener() {
                      @Override
                      public ConsumerTopicStatus onMessage(List<MQResult> mqResultList) {
                          //mqResultList 默认为1,可通过批量消费数量设置
                          for(MQResult result : mqResultList) {
                              //TODO
                              System.out.println(result);
                          }
                          return ConsumerTopicStatus.CONSUME_SUCCESS;//对消息批量确认(成功)
                          //return ConsumerTopicStatus.RECONSUME_LATER;//对消息批量确认(失败)
                      }
                  });
              }
          }
          
          
          

          以下适用于华东1、华北2、西南1、华南2、上海36、青岛20、长沙42、南昌5、武汉41、杭州7、西南2-贵州、太原4、郑州5、西安7、呼和浩特3节点。

          --rocketmq引擎版本,SDK下载方式详见环境准备-其他工具章节。

          importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
          importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
          importorg.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
          importorg.apache.rocketmq.client.exception.MQClientException;
          
          publicclassPushConsumer{
          
          publicstaticvoidmain(String[] args)throwsException{
          AclClientRPCHook rpcHook =newAclClientRPCHook(
          newSessionCredentials(ACCESS_KEY, SECRET_KEY));
          DefaultMQPushConsumer consumer =newDefaultMQPushConsumer(rpcHook);
              consumer.setConsumerGroup("ConsumerGroupName");
          // 填入元数据地址
                  consumer.setNamesrvAddr("192.168.0.1:9876");
          //consumer.setUseTLS(true);    //如果需要开启SSL,请增加此行代码
                  consumer.subscribe("TopicTest","*");
                  consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)->{
          System.out.printf("%s Receive New Messages: %s %n",Thread.currentThread().getName(), msgs);
          returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          });
                  consumer.start();
          System.out.printf("Consumer Started.%n");
          }
          }
          

          示例参数说明:

          Namesrv地址

          1.png

          Namesrv地址可从控制台查看,多个地址按分号分隔。

          2.png

          应用用户和密码

          3.png

          这个应用用户和密码就是控制台创建的应用用户和密码。

          租户id和集群名

          4.png

          集群名和租户id可以从应用用户管理查询。

          5.png

          订阅组

          6.png

          订阅组名需要在控制台提前创建好。

          7.png

          文档反馈

          建议您登录后反馈,可在建议与反馈里查看问题处理进度

          鼠标选中文档,精准反馈问题

          选中存在疑惑的内容,即可快速反馈问题,我们会跟进处理

          知道了

          上一篇 :  创建应用用户和密码
          下一篇 :  用户指南
          搜索 关闭
          ©2025 天翼云科技有限公司版权所有 增值电信业务经营许可证A2.B1.B2-20090001
          公司地址:北京市东城区青龙胡同甲1号、3号2幢2层205-32室
          备案 京公网安备11010802043424号 京ICP备 2021034386号
          ©2025天翼云科技有限公司版权所有
          京ICP备 2021034386号
          备案 京公网安备11010802043424号
          增值电信业务经营许可证A2.B1.B2-20090001
          用户协议 隐私政策 法律声明