go_rabbitmq/app/databases/rabbitmq.go

136 lines
2.7 KiB
Go

package databases
import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"time"
"github.com/streadway/amqp"
)
type RabbitMQ struct {
Connection *amqp.Connection
MyselfQueue *amqp.Channel
RegQueue *amqp.Channel
DataQueue *amqp.Channel
RPCQueue *amqp.Channel
ModemRPCChannel *amqp.Channel
}
func NewRabbitMQ() *RabbitMQ {
return &RabbitMQ{}
}
func (rmq *RabbitMQ) Connect() error {
sslEnable := os.Getenv("RABBITMQSSL_KEYFILE") != "" && os.Getenv("RABBITMQSSL_EXT") == "true"
var tlsConfig *tls.Config
if sslEnable {
caCert, err := os.ReadFile(os.Getenv("RABBITMQSSL_CACERTFILE"))
if err != nil {
return err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
clientCert, err := tls.LoadX509KeyPair(os.Getenv("RABBITMQSSL_CERTFILE"), os.Getenv("RABBITMQSSL_KEYFILE"))
if err != nil {
return err
}
tlsConfig = &tls.Config{
RootCAs: caCertPool,
Certificates: []tls.Certificate{clientCert},
}
}
connString := fmt.Sprintf("%s://%s:%s@%s:%s",
map[bool]string{true: "amqps", false: "amqp"}[sslEnable],
os.Getenv("RABBITMQ_USER_PROD"),
os.Getenv("RABBITMQ_PASSWORD_PROD"),
os.Getenv("RABBITMQ_HOST"),
map[bool]string{true: os.Getenv("RABBITMQ_EXTERNAL_PORT"), false: os.Getenv("RABBITMQ_TCP_PORT")}[sslEnable],
)
conn, err := amqp.DialConfig(connString, amqp.Config{
TLSClientConfig: tlsConfig,
})
if err != nil {
return err
}
rmq.Connection = conn
rmq.ModemRPCChannel, err = conn.Channel()
if err != nil {
return err
}
rmq.RegQueue, err = conn.Channel()
if err != nil {
return err
}
rmq.DataQueue, err = conn.Channel()
if err != nil {
return err
}
rmq.RPCQueue, err = conn.Channel()
if err != nil {
return err
}
_, err = rmq.RegQueue.QueueDeclare("reg_queue", true, false, false, false, nil)
if err != nil {
return err
}
err = rmq.RegQueue.Qos(300, 0, false)
if err != nil {
return err
}
_, err = rmq.DataQueue.QueueDeclare("data_queue", true, false, false, false, nil)
if err != nil {
return err
}
rmq.DataQueue.Qos(300, 0, false)
_, err = rmq.RPCQueue.QueueDeclare("rpc_queue", true, false, false, false, nil)
if err != nil {
return err
}
rmq.RPCQueue.Qos(300, 0, false)
return nil
}
func (rmq *RabbitMQ) Delay(ms time.Duration) {
time.Sleep(ms * time.Millisecond)
}
func (rmq *RabbitMQ) ReadyCheck() bool {
var trying int
for trying = 0; trying < 30; trying++ {
//TODO CHECk IF EXISTS
rmq.RegQueue.Consume("reg_queue", "", false, false, false, false, nil)
rmq.DataQueue.Consume("data_queue", "", false, false, false, false, nil)
rmq.RPCQueue.Consume("rpc_queue", "", false, false, false, false, nil)
//if regOK && dataOK && rpcOK {
// return true
//}
rmq.Delay(1000)
}
return false
}