Kafka触发器 Kafka触发器 Kafka触发器可以订阅天翼云提供的分布式消息队列Kafka实例,并根据消息触发关联的工作流,借此能力,使得工作流可以消费指定topic的消息,执行特定工作流处理逻辑。 注意事项 Kafka触发器订阅的Kafka实例必须和工作流在相同地域。 前提条件 已创建工作流。 已开通分布式消息Kafka实例(KAFKA引擎版),详情请参考创建分布式消息服务Kafka实例。 已创建Topic,创建GroupID(可选) 触发消息格式 Kafka触发器有两种消息格式:RawData和CloudEvent格式,可在触发器配置里选择。 CloudEvent格式: plaintext [ { "id": "eca534636baf4d568f86cbdb748208ed", "source": "ctyun.faas.trigger.kafka", "specversion": "1.0", "type": "kafka:topic:sendmessage", "datacontenttype": "application/json", "subject": "kafkatriggermqbjvsezbpdialtest:testforfaas", "time": "20250522T02:04:16Z", "data": { "headers": {}, "timestamp": 1747879456, "topic": "testforfaas", "partition": 0, "offset": 15280, "key": "", "value": "msg[9]: 154b2a0e2c3d4b03ae9ec225b5370c3b, ts20250522 02:04:16" } } ] 参数 类型 示例值 描述 id string eca534636baf4d568f86cbdb748208ed 事件ID。标识事件的唯一值。 source string ctyun.faas.trigger.kafka 事件源。Kafka触发器固定为ctyun.faas.trigger.kafka。 specversion string 1.0 CloudEvents协议版本。 type string kafka:topic:sendmessage 事件类型。 datacontenttype string application/json 参数data的内容形式。 subject string kafkatriggermqbjvsezbpdialtest:testforfaas 事件主体。格式为[SourceName]:[消息topic]。 time string 20250522T02:04:16Z 消息被触发的时间。 data object Kafka触发器独有消息格式,详细参见下文RawData描述。 RawData格式是CloudEvent格式的子集,只包含原始kafka消息的信息,消息结构相当于CloudEvent的data字段 plaintext [ { "offset" : 15280, "partition" : 0, "headers" : {}, "topic" : "testforfaas", "key" : "testkey", "timestamp" : 1747879456, "value" : "msg[9]: 154b2a0e2c3d4b03ae9ec225b5370c3b, ts20250522 02:04:16" } ] 参数 类型 示例值 描述 offset int 15280 消息偏移量。 partition int 0 分区信息。 headers map 消息携带的header。 topic string testforfaas topic的名称。 key string testkey 消息的key。 timestamp int 1747879456 Unix时间戳(秒)。 value string hello,kafka 消息的内容。