RocketMQ触发器
更新时间 2025-07-28 09:07:35
最近更新时间: 2025-07-28 09:07:35
RocketMQ触发器
RocketMQ触发器可以订阅分布式消息服务RocketMQ并根据消息触发关联的工作流,借此能力,使得工作流可以消费指定topic的消息,执行自定义处理逻辑。
注意事项
RocketMQ触发器订阅的RocketMQ实例必须和工作流在相同地域。
前提条件
已创建工作流。
开通分布式消息服务RocketMQ实例(RocketMQ引擎类型),详情请参考开通RocketMQ实例。
创建Topic和GroupID。
创建用户,且默认Topic权限设置为:PUB|SUB,默认消费组权限为SUB。详情请参考创建用户。
触发消息格式
有两种消息格式:RawData和CloudEvent格式,可在触发器配置里选择。
CloudEvent格式:
[
{
"id": "21000777109E05EF04B574B8A1DF0001",
"source": "ctyun.faas.trigger.rocketmq",
"specversion": "1.0",
"type": "rocketmq:topic:send-message",
"datacontenttype": "application/json",
"subject": ":mq-func-hckzeddbxj-rocket-test:test-for-faas",
"time": "57361-07-03T16:18:39Z",
"data": {
"topic": "test-for-faas",
"properties": {
"CLUSTER": "1dafcb4049ba42df96d80b7dd2f99c5e",
"CONSUME_START_TIME": "1747987057130",
"KEYS": "webtest",
"MAX_OFFSET": "2",
"MIN_OFFSET": "0",
"TAGS": "1747987057097_0",
"UNIQ_KEY": "21000777109E05EF04B574B8A1DF0001"
},
"data": "WebTestTools_174798gjkS"
}
}
]
参数 | 类型 | 示例值 | 描述 |
---|---|---|---|
id | string | 21000777109E05EF04B574B8A1DF0001 | 事件ID。标识事件的唯一值。提取自RocketMQ消息。 |
source | string | ctyun.faas.trigger.rocketmq | 事件源。RocketMQ触发器固定为ctyun.faas.trigger.rocketmq。 |
specversion | string | 1.0 | CloudEvents协议版本。 |
type | string | rocketmq:topic:send-message | 事件类型。 |
datacontenttype | string | application/json | 参数data的内容形式。 |
subject | string | mq-func-hckzeddbxj-rocket-test:test-for-faas | 事件主体。 |
time | string | 2025-05-22T02:04:16Z | 消息被触发的时间。 |
data | object | - | RocketMQ触发器独有消息格式,详细参见下文RawData描述。 |
RawData格式 是CloudEvent格式的子集,只包含原始rabbitmq消息的信息,消息结构相当于CloudEvent的data字段,具体如下:
[
{
"topic": "test-for-faas",
"properties": {
"CLUSTER": "1dafcb4049ba42df96d80b7dd2f99c5e",
"CONSUME_START_TIME": "1747987204637",
"KEYS": "webtest",
"MAX_OFFSET": "3",
"MIN_OFFSET": "0",
"TAGS": "1747987204605_0",
"UNIQ_KEY": "2100077510A605EF04B574BAE2080001"
},
"data": "WebTestTools_17v6fg0J"
}
]
参数 | 类型 | 示例 | 描述 |
---|---|---|---|
topic | string | test-for-faas | Topic名称。 |
properties | map | - | 消息自定义属性。 |
properties.CLUSTER | string | 1dafcb4049ba42df96d80b7dd2f99c5e | RocketMQ实例ID。 |
properties.CONSUME_START_TIME | string | 1747987204637 | Unix时间戳,毫秒。 |
properties.KEYS | string | webtest | 消息的key。 |
properties.MAX_OFFSET | string | 3 | 消息队列中的最大偏移量。 |
properties.MIN_OFFSET | string | 0 | 消息队列中的最小偏移量。 |
properties.TAGS | string | 1747987204605_0 | 消息标签。 |
properties.UNIQ_KEY | string | 2100077510A605EF04B574BAE2080001 | 消息唯一键。 |
data | string | WebTestTools_17v6fg0J | 消息体内容。 |
操作步骤
登录工作流控制台,点击目标工作流,进入工作流详情详情。
在配置选项卡中,选择左边的 工作流调度 选项卡。
点击 创建工作流调度,在弹出的右抽屉中选择 RocketMQ触发器,配置参数解释如下表。
配置项 | 操作说明 | 示例 |
---|---|---|
触发器类型 | 选择RocketMQ触发器。 | RocketMQ触发器 |
名称 | 填写自定义的触发器名称。 | rocketmq-trigger |
版本或别名 | 默认值为LATEST,支持选择任意函数版本或函数别名。 | LATEST |
RocketMQ 实例 | 选择已创建的RocketMQ实例。 | - |
Topic | 选择已创建的RocketMQ实例的Topic。 | - |
Group ID | 选择已创建的RocketMQ实例的Group ID。 | - |
消费位点 | 选择消息的消费位点,即触发器从RocketMQ实例开始拉取消息的位置。取值说明如下。 最新位点:从最新位点开始消费。 最早位点:从最早位点开始消费。 指定时间戳:从指定时间戳开始消费。 | 最新位点 |
调用方式 | 选择函数调用方式。 同步调用:指触发器消费topic消息后投递到函数是同步调用,会等待函数响应后继续下一个消息投递。 异步调用:指触发器消费topic消息后投递到函数是异步调用,不会等待函数响应,可以快速消费事件。 | 同步调用 |
用户ID | RocketMQ实例用户ID,需要在RocketMQ控制台创建。 | - |
密钥 | RocketMQ实例用户密钥,需要在RocketMQ控制台创建。 | - |
触发器启用状态 | 创建触发器后是否立即启用。默认选择开启,即创建触发器后立即启用触发器。 | - |
推送配置 | 批量推送条数:批量推送的最大值,积压值达到后立刻推送,取值范围为 [1, 10000]。 批量推送间隔:批量推送的最大时间间隔,达到后立刻推送,单位秒,取值[0,15]。默认0无需等待,数据直接推送。 推送格式:函数收到的事件格式,详情请查阅触发器事件消息格式。 | - |
重试策略 | 消息推送函数失败后重试的策略,共两种: 指数退避:指数退避重试,重试5次,重试周期为2,4,8,16,32(秒)。 线性退避:线性退避重试,重试5次,重试周期为1,2,3,4,5(秒)。 | - |
容错策略 | 当重试次数耗尽后仍然失败时的处理方式: 允许容错:当异常发生并超过重试策略配置时直接丢弃。 禁止容错:当异常发生并超过重试策略配置时继续阻塞执行。 | - |
死信队列 | 当容错策略为:允许容错时,可以额外开启死信队列。当开启死信队列时且异常发生并超过重试策略配置时,消息会被投递到指定的消息队列里,当前只支持投递到kafka和rocketmq | - |