Kafka触发器
更新时间 2025-07-28 09:07:35
最近更新时间: 2025-07-28 09:07:35
Kafka触发器
Kafka触发器可以订阅天翼云提供的分布式消息队列Kafka实例,并根据消息触发关联的工作流,借此能力,使得工作流可以消费指定topic的消息,执行特定工作流处理逻辑。
注意事项
Kafka触发器订阅的Kafka实例必须和工作流在相同地域。
前提条件
已创建工作流。
已开通分布式消息Kafka实例(KAFKA引擎版),详情请参考创建分布式消息服务Kafka实例。
已创建Topic,创建GroupID(可选)
触发消息格式
Kafka触发器有两种消息格式:RawData和CloudEvent格式,可在触发器配置里选择。 CloudEvent格式:
[
{
"id": "eca53463-6baf-4d56-8f86-cbdb748208ed",
"source": "ctyun.faas.trigger.kafka",
"specversion": "1.0",
"type": "kafka:topic:send-message",
"datacontenttype": "application/json",
"subject": "kafka-trigger-mqbjvsezbp-dial-test:test-for-faas",
"time": "2025-05-22T02:04:16Z",
"data": {
"headers": {},
"timestamp": 1747879456,
"topic": "test-for-faas",
"partition": 0,
"offset": 15280,
"key": "",
"value": "msg[9]: 154b2a0e-2c3d-4b03-ae9e-c225b5370c3b, ts=2025-05-22 02:04:16"
}
}
]
参数 | 类型 | 示例值 | 描述 |
---|---|---|---|
id | string | eca53463-6baf-4d56-8f86-cbdb748208ed | 事件ID。标识事件的唯一值。 |
source | string | ctyun.faas.trigger.kafka | 事件源。Kafka触发器固定为ctyun.faas.trigger.kafka。 |
specversion | string | 1.0 | CloudEvents协议版本。 |
type | string | kafka:topic:send-message | 事件类型。 |
datacontenttype | string | application/json | 参数data的内容形式。 |
subject | string | kafka-trigger-mqbjvsezbp-dial-test:test-for-faas | 事件主体。格式为[SourceName]:[消息topic]。 |
time | string | 2025-05-22T02:04:16Z | 消息被触发的时间。 |
data | object | - | Kafka触发器独有消息格式,详细参见下文RawData描述。 |
RawData格式是CloudEvent格式的子集,只包含原始kafka消息的信息,消息结构相当于CloudEvent的data字段
[
{
"offset" : 15280,
"partition" : 0,
"headers" : {},
"topic" : "test-for-faas",
"key" : "testkey",
"timestamp" : 1747879456,
"value" : "msg[9]: 154b2a0e-2c3d-4b03-ae9e-c225b5370c3b, ts=2025-05-22 02:04:16"
}
]
参数 | 类型 | 示例值 | 描述 |
---|---|---|---|
offset | int | 15280 | 消息偏移量。 |
partition | int | 0 | 分区信息。 |
headers | map | - | 消息携带的header。 |
topic | string | test-for-faas | topic的名称。 |
key | string | testkey | 消息的key。 |
timestamp | int | 1747879456 | Unix时间戳(秒)。 |
value | string | hello,kafka | 消息的内容。 |
操作步骤
登录工作流控制台,点击目标工作流,进入工作流详情详情。
在配置选项卡中,选择左边的 工作流调度 选项卡。
点击 创建工作流调度,在弹出的右抽屉中选择 Kafka触发器,配置参数解释如下表。
配置项 | 操作说明 | 示例 |
---|---|---|
触发器类型 | 选择Kafka触发器。 | Kafka触发器 |
名称 | 填写自定义的触发器名称。 | kafka-trigger |
Kafka实例 | 选择已创建的Kafka实例。 | - |
Topic | 选择已创建的Kafka实例的Topic。 | - |
Group ID | • 快速创建:推荐方案。自动创建以GROUP-FC-Trigger-{trigger_name}-{uuid}命名的Group ID。 • 使用已有:选择Kafka实例已有的GroupID,请您注意不要与已有的业务混用GroupID,否则会影响已有的消息收发。 | - |
消费任务并发数 | 消费者的并发数量,有效取值范围为[1,20],建议不超过Topic的分区数。该值同时影响投递到函数的并发数。 | - |
消费位点 | 选择消息的消费位点,即触发器从kafka消息队列开始拉取消息的位置。 • 最早位点:从最早位点开始消费。 • 最新位点:从最新位点开始消费。 | 最新位点 |
调用方式 | 选择函数调用方式。 • 同步调用:指触发器消费topic消息后投递到函数是同步调用,会等待函数响应后继续下一个消息投递。但消费任务并发数大于1时,多个消费者有可能会并发消费消息并投递,并发的情况视topic队列本身积存的消息而定。 • 异步调用:指触发器消费topic消息后投递到函数是异步调用,不会等待函数响应,可以快速消费事件。 | 同步调用 |
触发器启用状态 | 创建触发器后是否立即启用。默认选择开启,即创建触发器后立即启用触发器。 | 启用 |
推送配置 | • 批量推送条数:批量推送的最大值,积压值达到后立刻推送,取值范围为[1,10000]。 • 批量推送间隔:批量推送的最大时间间隔,达到后立刻推送,单位秒,取值[0,15]。默认0无需等待,数据直接推送。 • 推送格式:函数收到的事件格式,详情请查阅触发器事件消息格式。 | - |
重试策略 | 消息推送函数失败后重试的策略,共两种: • 指数退避:指数退避重试,重试5次,重试周期为2,4,8,16,32(秒)。 • 线性退避:线性退避重试,重试5次,重试周期为1,2,3,4,5(秒)。 | - |
容错策略 | 当重试次数耗尽后仍然失败时的处理方式: • 允许容错:当异常发生并超过重试策略配置时直接丢弃。 • 禁止容错:当异常发生并超过重试策略配置时继续阻塞执行。 | - |
死信队列 | 当容错策略为:允许容错时,可以额外开启死信队列。当开启死信队列时且异常发生并超过重试策略配置时,消息会被投递到指定的消息队列里,当前只支持投递到kafka和rocketmq | - |