Go 环境准备 1. 下载Demo包kafkaconfluentgodemo.zip。 2. 使用开发工具导入Demo。 配置修改 1. 如果是ssl连接,需要在控制台下载证书。并且解压压缩包得到ssl.client.truststore.jks,执行以下命令生成caRoot.pem文件。 keytool importkeystore srckeystore ssl.client.truststore.jks destkeystore caRoot.p12 deststoretype pkcs12 openssl pkcs12 in caRoot.p12 out caRoot.pem 2. 修改kafka.json文件。(security.protocol仅在ssl连接时需要配置) { "topic": "XXX", "topic2": "XXX", "group.id": "XXX", "bootstrap.servers" : "XXX:XX", "security.protocol" : "SSL" } 生产消息 发送以下命令发送消息。 go run modvendor producer/producer.go 生产消息示例代码如下: package main import ( "encoding/json" "fmt" "github.com/confluentinc/confluentkafkago/kafka" "log" "os" "path/filepath" "strconv" "time" ) const ( INT32MAX 2147483647 1000 ) type KafkaConfig struct { Topic string json:"topic" Topic2 string json:"topic2" GroupId string json:"group.id" BootstrapServers string json:"bootstrap.servers" SecurityProtocol string json:"security.protocol" SslCaLocation string json:"ssl.ca.location" } // config should be a pointer to structure, if not, panic func loadJsonConfig() KafkaConfig { workPath, err : os.Getwd() if err ! nil { panic(err) } configPath : filepath.Join(workPath, "conf") fullPath : filepath.Join(configPath, "kafka.json") file, err : os.Open(fullPath); if (err ! nil) { msg : fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err) panic(msg) } defer file.Close() decoder : json.NewDecoder(file) var config &KafkaConfig{} err decoder.Decode(config); if (err ! nil) { msg : fmt.Sprintf("Decode json fail for config file at %s. Error: %v", fullPath, err) panic(msg) } json.Marshal(config) return config } func doInitProducer(cfg KafkaConfig) kafka.Producer { fmt.Print("init kafka producer, it may take a few seconds to init the connectionn") //common arguments var kafkaconf &kafka.ConfigMap{ "api.version.request": "true", "message.max.bytes": 1000000, "linger.ms": 500, "sticky.partitioning.linger.ms" : 1000, "retries": INT32MAX, "retry.backoff.ms": 1000, "acks": "1"} kafkaconf.SetKey("bootstrap.servers", cfg.BootstrapServers) switch cfg.SecurityProtocol { case "PLAINTEXT" : kafkaconf.SetKey("security.protocol", "plaintext"); case "SSL": kafkaconf.SetKey("security.protocol", "ssl"); kafkaconf.SetKey("ssl.ca.location", "/XXX/caRoot.pem") case "SASLSSL": kafkaconf.SetKey("security.protocol", "saslssl"); kafkaconf.SetKey("ssl.ca.location", "/XXX/caRoot.pem"); kafkaconf.SetKey("sasl.username", cfg.SaslUsername); kafkaconf.SetKey("sasl.password", cfg.SaslPassword); kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism); kafkaconf.SetKey("enable.ssl.certificate.verification", "false") case "SASLPLAINTEXT": kafkaconf.SetKey("security.protocol", "saslplaintext"); kafkaconf.SetKey("sasl.username", cfg.SaslUsername); kafkaconf.SetKey("sasl.password", cfg.SaslPassword); kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism) default: panic(kafka.NewError(kafka.ErrUnknownProtocol, "unknown protocol", true)) } producer, err : kafka.NewProducer(kafkaconf) if err ! nil { panic(err) } fmt.Print("init kafka producer successn") return producer } func main() { // Choose the correct protocol cfg : loadJsonConfig(); producer : doInitProducer(cfg) defer producer.Close() // Delivery report handler for produced messages go func() { for e : range producer.Events() { switch ev : e.(type) { case kafka.Message: if ev.TopicPartition.Error ! nil { log.Printf("Failed to write access log entry:%v", ev.TopicPartition.Error) } else { log.Printf("Send OK topic:%v partition:%v offset:%v content:%sn", ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset, ev.Value) } } } }() // Produce messages to topic (asynchronously) i : 0 for { i i + 1 value : "this is a kafka message from confluent go " + strconv.Itoa(i) var msg kafka.Message nil if i % 2 0 { msg &kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &cfg.Topic2, Partition: kafka.PartitionAny}, Value: []byte(value), } } else { msg &kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &cfg.Topic, Partition: kafka.PartitionAny}, Value: []byte(value), } } producer.Produce(msg, nil) time.Sleep(time.Duration(1) time.Millisecond) } // Wait for message deliveries before shutting down producer.Flush(15 1000) }