分布式消息服务RocketMQ事件源
更新时间 2025-06-24 17:44:38
最近更新时间: 2025-06-24 17:44:38
本文介绍如何在事件总线EventBridge管理控制台添加分布式消息服务RocketMQ作为事件流中的事件提供方。
前提条件
事件总线EventBridge
分布式消息服务RocketMQ
开通分布式消息服务RocketMQ并开通实例。
操作步骤
登录事件总线管理控制台。
在左侧导航栏,单击事件流。
在事件流页面,单击创建事件流。
在事件源(Source)配置面板,事件提供方选择分布式消息服务RocketMQ,选择或填入主题等配置然后单击确认,如图1所示。
图1 创建事件流时选择分布式消息服务RocketMQ作为事件提供方
参数说明
参数 | 说明 | 示例 |
---|---|---|
实例名称 | 前提条件中已创建的分布式消息服务RocketMQ版实例。 | xxx |
Topic | 当前实例中的Topic。 | topic1 |
Group | 消费组名。
| group1 |
消费位点 | 开始消费的位置。
| 最新位点 |
Tag | 用于过滤消息的Tag值,非必填。 | tag1 |
事件示例
{
"id": "413305a4-2076-47fc-9443-2639472752be",
"source": "ctyun:rocketmq",
"specversion": "1.0",
"subject": "ctyun:rocketmq:0b3a633836ef43b890df8834233ab4da:topic:source0205",
"time": "2025-02-05T08:01:47.633357732Z",
"datacontenttype": "application/json",
"type": "rocketmq:Topic:SendMessage",
"ctyunregion": "b342b77efxxxxxxxxxxxxxac110002",
"ctyunaccountid": "0b3a63383xxxxxxxxxxxxxxxxx4233ab4da",
"data": {
"systemProperties": {
"MIN_OFFSET": "0",
"MAX_OFFSET": "1",
"KEYS": "webtest",
"CONSUME_START_TIME": "1738742507633",
"CLUSTER": "c967202954xxxxxxxxxxx268bf6222285",
"UNIQ_KEY": "21001CDC0BExxxxxxxxx806D2460082",
"TAGS": "1738742300229_0"
},
"userProperties": {},
"topic": "source0205",
"msgId": "21001CDC0BE805EF04B51806D2460082",
"body": "WebTestTools_173874230022xxxxxxxxxxxxxxxxxx5gC7OiRAiPNG1NwCig6Fwd9eS"
}
}
data字段包含的参数解释如下表所示:
参数 | 类型 | 示例值 | 描述 |
---|---|---|---|
topic | String | source1 | Topic名称。 |
msgId | String | 210018FC0EE205EF04B51CF068350066 | 消息ID。 |
properties | Object | 详见properties属性参数解释 | 消息properties。 |
body | Object | WebsFjDDxPd503bG60bNdaT0qMrsjt5evXd9df | 消息体,默认以JSON格式编码。 |
data字段中systemProperties属性的参数解释如下图所示:
参数 | 类型 | 示例值 | 描述 |
---|---|---|---|
CLUSTER | String | 16e0eef12f83485dbba23a0750ec5078 | RocketMQ实例集群ID。 |
CONSUME_START_TIME | String | 1733467917369 | 开始消费时间戳。 |
KEYS | String | key | 消息Key值。 |
TAGS | String | 1733467917351_0 | 消息Tag值。 |
MIN_OFFSET | String | 0 | 最小消费位点。 |
MAX_OFFSET | String | 1 | 最大消费位点。 |