go-rabbitmq

amqp
go-amqp-reconnect

客户端样例:

/*
{
"instId": 123,
"url": "root:dahuacloud@172.100.81.13:5672",
"queueName": "test_crowdDensityStatistic",
}
*/
func amqpClient(instID int64, url string, queueName string) {
log.Info("amqpClient start with url=%s,queueName=%s", url, queueName)
//amqp://root:dahuacloud@172.100.81.13:5672/
conn, err := rabbitmq.Dial("amqp://" + url)
for err != nil {
conn, err = rabbitmq.Dial("amqp://" + url)
time.Sleep(2 * time.Second)
log.Warn("amqpClient retry dial url=%s,queueName=%s", url, queueName)
}
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)
go func() {
for d := range msgs {
log.Info("Received a message: %s", d.Body)
var mapMsg map[string]interface{}
json.Unmarshal(d.Body, &mapMsg)
mapMsg["instId"] = instID
empData, err := json.Marshal(mapMsg)
if err != nil {
fmt.Println(err.Error())
return
}
jsonStr := string(empData)

replay, err := rpcClientSend(jsonStr)
if err != nil {
log.Error("could not transmit: %v", err)
} else {
log.Info("server response: %s", replay)
}
}
}()

log.Info(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}