编译工程
生产消费 引入依赖 go.mod plaintext module testgo require ( github.com/rabbitmq/amqp091go v1.10.0 golang.org/x/net v0.26.0 ) go 1.20
生产消息 plaintext package main import ( "flag" "fmt" amqp "github.com/rabbitmq/amqp091go" "log" ) var ( uri flag.String("uri", "amqp://USERNAME:PASSWORD@33.0.1.35:5672", "AMQP URI") exchangeName flag.String("exchange", "testexchange", "Durable AMQP exchange name") exchangeType flag.String("exchangetype", "direct", "Exchange type directfanouttopicxcustom") routingKey flag.String("key", "testkey", "AMQP routing key") body flag.String("body", "foobar", "Body of message") reliable flag.Bool("reliable", true, "Wait for the publisher confirmation before exiting") ) func init() { flag.Parse() } func main() { if err : publish(uri, exchangeName, exchangeType, routingKey, body, reliable); err ! nil { log.Fatalf("%s", err) } log.Printf("published %dB OK", len(body)) } func publish(amqpURI, exchange, exchangeType, routingKey, body string, reliable bool) error { // This function dials, connects, declares, publishes, and tears down, // all in one go. In a real service, you probably want to maintain a // longlived connection as state, and publish against that. log.Printf("dialing %q", amqpURI) connection, err : amqp.Dial(amqpURI) if err ! nil { return fmt.Errorf("Dial: %s", err) } defer connection.Close() log.Printf("got Connection, getting Channel") channel, err : connection.Channel() if err ! nil { return fmt.Errorf("Channel: %s", err) } log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange) if err : channel.ExchangeDeclare( exchange, // name exchangeType, // type true, // durable false, // autodeleted false, // internal false, // noWait nil, // arguments ); err ! nil { return fmt.Errorf("Exchange Declare: %s", err) } // Reliable publisher confirms require confirm.select support from the // connection. if reliable { log.Printf("enabling publishing confirms.") if err : channel.Confirm(false); err ! nil { return fmt.Errorf("Channel could not be put into confirm mode: %s", err) } confirms : channel.NotifyPublish(make(chan amqp.Confirmation, 1)) defer confirmOne(confirms) } log.Printf("declared Exchange, publishing %dB body (%q)", len(body), body) if err channel.Publish( exchange, // publish to an exchange routingKey, // routing to 0 or more queues false, // mandatory false, // immediate amqp.Publishing{ Headers: amqp.Table{}, ContentType: "text/plain", ContentEncoding: "", Body: []byte(body), DeliveryMode: amqp.Transient, // 1nonpersistent, 2persistent Priority: 0, // 09 // a bunch of application/implementationspecific fields }, ); err ! nil { return fmt.Errorf("Exchange Publish: %s", err) } return nil } // One would typically keep a channel of publishings, a sequence number, and a // set of unacknowledged sequence numbers and loop until the publishing channel // is closed. func confirmOne(confirms 0 { log.Printf("running for %s", lifetime) time.Sleep(lifetime) } else { log.Printf("running forever") select {} } log.Printf("shutting down") if err : c.Shutdown(); err ! nil { log.Fatalf("error during shutdown: %s", err) } } type Consumer struct { conn amqp.Connection channel amqp.Channel tag string done chan error } func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) (Consumer, error) { c : &Consumer{ conn: nil, channel: nil, tag: ctag, done: make(chan error), } var err error log.Printf("dialing %q", amqpURI) c.conn, err amqp.Dial(amqpURI) if err ! nil { return nil, fmt.Errorf("Dial: %s", err) } go func() { fmt.Printf("closing: %s",