RocketMQ .NET SDK 说明 分布式消息服务RocketMQ兼容了社区版 HTTP SDK,您可以使用社区版 HTTP SDK接入分布式消息服务RocketMQ。 前提条件 1. 下载社区 C SDK到本地并解压。 2. 使用Visual Studio打开sln文件导入工程。 发送普通消息 using System; using Aliyun.MQ; using Aliyun.MQ.Model; namespace MQ.Sample { public class Producer { // 填写分布式消息服务RocketMQ控制台HTTP接入点 private const string endpoint "${HTTPENDPOINT}"; // 填写AccessKey,在管理控制台创建 private const string accessKeyId "${ACCESSKEY}"; // 填写SecretKey 在管理控制台创建 private const string secretAccessKey "${SECRETKEY}"; // 消息所属的Topic,在消息队列RocketMQ版控制台创建。 private const string topicName "${TOPIC}"; // Topic所属实例ID,默认实例为空 private const string instanceId "${INSTANCEID}"; private static MQClient client new Aliyun.MQ.MQClient(accessKeyId, secretAccessKey, endpoint); static MQProducer producer client.GetProducer(instanceId, topicName); static void Main(string[] args) { try { // 循环发送4条消息。 for (int i 0; i messages null; try { messages consumer.ConsumeMessage( 3, // 一次最多消费3条(最多可设置为16条) 3 // 长轮询时间3秒(最多可设置为30秒) ); } catch (Exception exp1) { if (exp1 is MessageNotExistException) { Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId); continue; } Console.WriteLine(exp1); Thread.Sleep(2000); } if (messages null) { continue; } List handlers new List<>(); Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:"); // 处理业务逻辑 foreach (Message message in messages) { Console.WriteLine(message); handlers.Add(message.ReceiptHandle); } // Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费 // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样 try { consumer.AckMessage(handlers); Console.WriteLine("Ack message success:"); foreach (string handle in handlers) { Console.Write("t" + handle); } Console.WriteLine(); } catch (Exception exp2) { // 某些消息的句柄可能超时了会导致确认不成功 if (exp2 is AckMessageException) { AckMessageException ackExp (AckMessageException)exp2; Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId); foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems) { Console.WriteLine("tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage); } } } } catch (Exception ex) { Console.WriteLine(ex); Thread.Sleep(2000); } } } }}