消息收发 MQTT发送Kafka接收 终端设备使用MQTT SDK接入终端连接地址进行消息发布,云端应用服务使用Kafka sdk接入 服务端连接地址按父主题进行数据消费。 MQTT发送顺序消息kafka接收顺序消息 创建父主题,类型分区顺序,父主题以orderMsg2mq开头。 终端设备使用MQTT SDK接入终端连接地址进行消息发布,云端应用服务使用kafka sdk接入 服务端连接地址按父主题进行分区顺序数据消费 Kafka发送MQTT接收 云端应用服务使用kafka sdk接入服务端连接地址往系统主题mq2mqtt下发指令,终端设备使用MQTT SDK接入终端连接地址接收; 示例: import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; import java.util.Properties; public class PubMsgTest { public static void main(String[] args) { Properties props new Properties(); // 填入您在mqtt控制台查看到的kafka接入点信息。 props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer new KafkaProducer<>(props); // 这里需要指定系统内置的流转到mqtt服务端的特殊kafka主题,一般命名为:mq2mqtt。 String topic "mq2mqtt"; // kafka消息需要在header中指定需要发往mqtt订阅的主题以及一些会话属性、qos等级等信息。 RecordHeaders headers new RecordHeaders(); headers.add(new RecordHeader("qosLevel", "2".getBytes())); headers.add(new RecordHeader("cleanSession", "true".getBytes())); // 这里需要指定您的mqtt客户端订阅的主题,支持topic filter。 headers.add(new RecordHeader("mqttTopic", "topic1/a/b/c".getBytes())); byte[] payload new byte[1026 1024]; for (int i 0; i (topic, null, null, null, message, headers)); // System.out.println("Sent: " + message); System.out.println("sent successfully"); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } }