ssl生产消息 plaintext package main import ( "crypto/tls" "crypto/x509" "flag" "fmt" amqp "github.com/rabbitmq/amqp091go" "io/ioutil" "log" ) var ( uri flag.String("uri", "amqps://USERNAME:PASSWORD@10.10.33.196:5671", "AMQP URI") exchangeName flag.String("exchange", "goexchange", "Durable AMQP exchange name") exchangeType flag.String("exchangetype", "direct", "Exchange type directfanouttopicxcustom") routingKey flag.String("key", "testkey", "AMQP routing key") body flag.String("body", "foobar", "Body of message") reliable flag.Bool("reliable", true, "Wait for the publisher confirmation before exiting") ) func init() { flag.Parse() } func main() { if err : publish(uri, exchangeName, exchangeType, routingKey, body, reliable); err ! nil { log.Fatalf("%s", err) } log.Printf("published %dB OK", len(body)) } func publish(amqpsURI, exchange, exchangeType, routingKey, body string, reliable bool) error { caCert, err : ioutil.ReadFile("D:tmphzmqtest0520rabbitmq
sslclientcacertificate.pem") if err ! nil { return err } cert, err : tls.LoadX509KeyPair("D:tmphzmqtest0520rabbitmq
sslclientclientrabbitmqcertificate.pem", "D:tmphzmqtest0520rabbitmq
sslclientclientrabbitmqkey.pem") if err ! nil { return err } rootCAs : x509.NewCertPool() rootCAs.AppendCertsFromPEM(caCert) tlsConf : &tls.Config{ RootCAs: rootCAs, Certificates: []tls.Certificate{cert}, //ServerName: "localhost", // Optional InsecureSkipVerify: true, } connection, err : amqp.DialTLS(amqpsURI, tlsConf) if err ! nil { return fmt.Errorf("Dial: %s", err) } defer connection.Close() log.Printf("got Connection, getting Channel") channel, err : connection.Channel() if err ! nil { return fmt.Errorf("Channel: %s", err) } log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange) if err : channel.ExchangeDeclare( exchange, // name exchangeType, // type true, // durable false, // autodeleted false, // internal false, // noWait nil, // arguments ); err ! nil { return fmt.Errorf("Exchange Declare: %s", err) } // Reliable publisher confirms require confirm.select support from the // connection. if reliable { log.Printf("enabling publishing confirms.") if err : channel.Confirm(false); err ! nil { return fmt.Errorf("Channel could not be put into confirm mode: %s", err) } confirms : channel.NotifyPublish(make(chan amqp.Confirmation, 1)) defer confirmOne(confirms) } log.Printf("declared Exchange, publishing %dB body (%q)", len(body), body) if err channel.Publish( exchange, // publish to an exchange routingKey, // routing to 0 or more queues false, // mandatory false, // immediate amqp.Publishing{ Headers: amqp.Table{}, ContentType: "text/plain", ContentEncoding: "", Body: []byte(body), DeliveryMode: amqp.Transient, // 1nonpersistent, 2persistent Priority: 0, // 09 }, ); err ! nil { return fmt.Errorf("Exchange Publish: %s", err) } return nil } func confirmOne(confirms