func amqpClient(instID int64, url string, queueName string) { log.Info("amqpClient start with url=%s,queueName=%s", url, queueName) conn, err := rabbitmq.Dial("amqp://" + url) for err != nil { conn, err = rabbitmq.Dial("amqp://" + url) time.Sleep(2 * time.Second) log.Warn("amqpClient retry dial url=%s,queueName=%s", url, queueName) } failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close()
ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close()
q, err := ch.QueueDeclare( queueName, true, false, false, false, nil, ) failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume( q.Name, "", true, false, false, false, nil, ) failOnError(err, "Failed to register a consumer")
forever := make(chan bool) go func() { for d := range msgs { log.Info("Received a message: %s", d.Body) var mapMsg map[string]interface{} json.Unmarshal(d.Body, &mapMsg) mapMsg["instId"] = instID empData, err := json.Marshal(mapMsg) if err != nil { fmt.Println(err.Error()) return } jsonStr := string(empData)
replay, err := rpcClientSend(jsonStr) if err != nil { log.Error("could not transmit: %v", err) } else { log.Info("server response: %s", replay) } } }()
log.Info(" [*] Waiting for messages. To exit press CTRL+C") <-forever }
|