searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

GO语言访问RocketMQ

2023-05-19 07:20:21
16
0

0. 背景

一般来说,ROCKETMQ 主流使用的是JAVA和C++客户端,实际上ROCEKTMQ也有GO语言的客户端。

 

1.准备工作

Go:1.13 或以上

go.mod

require (
github.com/apache/rocketmq-client-go/v2 v2.1.0-rc3
)

2.收发普通消息

2.1 同步发送消息

同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。

发送消息的示例代码

package main

import (
"context"
"fmt"
"os"
"strconv"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)

// Package main implements a simple producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.249.184.132:9876", "10.249.184.134:9876", "10.249.184.133:9876"})),
producer.WithGroupName("test_producer_group"),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "test1234",
SecretKey: "Sv12F21341@",
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
fmt.Printf("start producer ")
topic := "test11"

for i := 0; i < 10; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
}
res, err := p.SendSync(context.Background(), msg)

if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
} else {
fmt.Printf("shutdown producer")
}
}

2.2 异步发送消息

消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

示例代码:

package main

import (
"context"
"fmt"
"os"
"sync"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)

// Package main implements a simple producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.249.184.132:9876", "10.249.184.134:9876", "10.249.184.133:9876"})),
producer.WithGroupName("test_producer_group"),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "test1234",
SecretKey: "Sv12F21341@",
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
fmt.Printf("start producer ")
topic := "test11"

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
err := p.SendAsync(context.Background(),
func(ctx context.Context, result *primitive.SendResult, e error) {
if e != nil {
fmt.Printf("receive message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", result.String())
}
wg.Done()
}, primitive.NewMessage(topic, []byte("Hello RocketMQ Go Client!")))

if err != nil {
fmt.Printf("send message error: %s\n", err)
}
}
wg.Wait()
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
} else {
fmt.Printf("shutdown producer")
}
}

2.3 单向发送消息

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

示例代码:

package main

import (
"context"
"fmt"
"os"
"strconv"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)

// Package main implements a simple producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.249.184.132:9876", "10.249.184.134:9876", "10.249.184.133:9876"})),
producer.WithGroupName("test_producer_group"),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "test1234",
SecretKey: "Sv12F21341@",
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
fmt.Printf("start producer ")
topic := "test11"

for i := 0; i < 10; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
}
err := p.SendOneWay(context.Background(), msg)

if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success\n")
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
} else {
fmt.Printf("shutdown producer")
}
}

2.4 订阅消息

只推荐使用PUSH的方式:

package main

import (
"context"
"fmt"
"os"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
sig := make(chan os.Signal)
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("sub1"),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.249.184.132:9876", "10.249.184.134:9876", "10.249.184.133:9876"})),
consumer.WithConsumeFromWhere(consumer.ConsumeFromLastOffset),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "test1234",
SecretKey: "Sv12F21341@",
}),
)
err := c.Subscribe("test11", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback: %v \n", msgs[i])
}

return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
fmt.Println("start Consumer")
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
<-sig
err = c.Shutdown()
if err != nil {
fmt.Printf("shutdown Consumer error: %s", err.Error())
} else {
fmt.Println("shutdown Consumer")
}
}

3.收发顺序消息

顺序消息(FIFO消息)是RocketMQ提供的一种严格按照顺序来发布和消费的消息类型。

顺序消息分为两类:

  • 全局顺序:对于指定的一个Topic,所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和消费。为了确保这个效果,建议topic只建在一个broker上面,并且只有一个queue。
  • 分区顺序:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念

3.1 发送顺序消息

示例代码:

package main

import (
"context"
"fmt"
"os"
"strconv"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)

// manualQueueSelector use the queue manually set in the provided Message's QueueID field as the queue to send.
type manualQueueSelector struct{}

func NewManualQueueSelector() producer.QueueSelector {
return new(manualQueueSelector)
}

func (manualQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
return queues[0]
}

func main() {
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.249.184.132:9876", "10.249.184.134:9876", "10.249.184.133:9876"})),
producer.WithGroupName("test_producer_group"),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "test1234",
SecretKey: "Sv12F21341@",
}),
producer.WithQueueSelector(NewManualQueueSelector()),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
fmt.Printf("start producer ")
topic := "test11"

for i := 0; i < 10; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
}
res, err := p.SendSync(context.Background(), msg)

if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
} else {
fmt.Printf("shutdown producer")
}
}

3.2 订阅消息

示例代码;

package main

import (
"context"
"fmt"
"os"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
sig := make(chan os.Signal)
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("sub1"),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.249.184.132:9876", "10.249.184.134:9876", "10.249.184.133:9876"})),
consumer.WithConsumeFromWhere(consumer.ConsumeFromLastOffset),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "test1234",
SecretKey: "Sv12F21341@",
}),
consumer.WithConsumerOrder(true),
)
err := c.Subscribe("test11", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
orderlyCtx, _ := primitive.GetOrderlyCtx(ctx)
fmt.Printf("orderly context: %v\n", orderlyCtx)
fmt.Printf("subscribe orderly callback: %v \n", msgs)
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
fmt.Println("start Consumer")
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
<-sig
err = c.Shutdown()
if err != nil {
fmt.Printf("shutdown Consumer error: %s", err.Error())
} else {
fmt.Println("shutdown Consumer")
}
}

4. 收发事务消息

示例代码:

package main

import (
"context"
"fmt"
"os"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)

type DemoListener struct {
localTrans *sync.Map
transactionIndex int32
}

func NewDemoListener() *DemoListener {
return &DemoListener{
localTrans: new(sync.Map),
}
}

func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
nextIndex := atomic.AddInt32(&dl.transactionIndex, 1)
fmt.Printf("nextIndex: %v for transactionID: %v\n", nextIndex, msg.TransactionId)
status := nextIndex % 3
dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status+1))

fmt.Printf("dl")
return primitive.UnknowState
}

func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
fmt.Printf("%v msg transactionID : %v\n", time.Now(), msg.TransactionId)
v, existed := dl.localTrans.Load(msg.TransactionId)
if !existed {
fmt.Printf("unknow msg: %v, return Commit", msg)
return primitive.CommitMessageState
}
state := v.(primitive.LocalTransactionState)
switch state {
case 1:
fmt.Printf("checkLocalTransaction COMMIT_MESSAGE: %v\n", msg)
return primitive.CommitMessageState
case 2:
fmt.Printf("checkLocalTransaction ROLLBACK_MESSAGE: %v\n", msg)
return primitive.RollbackMessageState
case 3:
fmt.Printf("checkLocalTransaction unknow: %v\n", msg)
return primitive.UnknowState
default:
fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg)
return primitive.CommitMessageState
}
}

func main() {
p, _ := rocketmq.NewTransactionProducer(
NewDemoListener(),
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.249.184.132:9876", "10.249.184.134:9876", "10.249.184.133:9876"})),
producer.WithGroupName("test_producer_group"),
producer.WithRetry(1),
producer.WithCredentials(primitive.Credentials{
AccessKey: "test1234",
SecretKey: "Sv12F21341@",
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
fmt.Printf("start producer ")
topic := "test11"

for i := 0; i < 10; i++ {
res, err := p.SendMessageInTransaction(context.Background(),
primitive.NewMessage(topic, []byte("Hello RocketMQ again "+strconv.Itoa(i))))

if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
time.Sleep(5 * time.Minute)
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
} else {
fmt.Printf("shutdown producer")
}
}

5. 延时消息

延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。

在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的延时事件触发。使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。

Apache RocketMQ 一共支持18个等级的延迟投递,具体时间如下:

投递等级(delay level)

延迟时间

1

1s

2

5s

3

10s

4

30s

5

1min

6

2min

7

3min

8

4min

9

5min

10

6min

11

7min

12

8min

13

9min

14

10min

15

20min

16

30min

17

1h

18

2h

实例代码:

package main

import (
"context"
"fmt"
"os"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)

func main() {
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.249.184.132:9876", "10.249.184.134:9876", "10.249.184.133:9876"})),
producer.WithGroupName("test_producer_group"),
producer.WithRetry(2),
producer.WithCredentials(primitive.Credentials{
AccessKey: "test1234",
SecretKey: "Sv12F21341@",
}),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
fmt.Printf("start producer ")
topic := "test11"

for i := 0; i < 10; i++ {
msg := primitive.NewMessage(topic, []byte("Hello RocketMQ Go Client!"))
msg.WithDelayTimeLevel(3)
res, err := p.SendSync(context.Background(), msg)

if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
} else {
fmt.Printf("shutdown producer")
}
}

6. 总结

通过GO语言可以基本使用ROCEKTMQ的SDK客户端功能。但管理类的接口并不完善, 实际使用的时候,可以结合其他管理的方法去解决。

0条评论
0 / 1000
叶****伟
6文章数
0粉丝数
叶****伟
6 文章 | 0 粉丝