package databases import ( "crypto/tls" "crypto/x509" "fmt" "os" "time" "github.com/streadway/amqp" ) type RabbitMQ struct { Connection *amqp.Connection MyselfQueue *amqp.Channel RegQueue *amqp.Channel DataQueue *amqp.Channel RPCQueue *amqp.Channel ModemRPCChannel *amqp.Channel } func NewRabbitMQ() *RabbitMQ { return &RabbitMQ{} } func (rmq *RabbitMQ) Connect() error { sslEnable := os.Getenv("RABBITMQSSL_KEYFILE") != "" && os.Getenv("RABBITMQSSL_EXT") == "true" var tlsConfig *tls.Config if sslEnable { caCert, err := os.ReadFile(os.Getenv("RABBITMQSSL_CACERTFILE")) if err != nil { return err } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) clientCert, err := tls.LoadX509KeyPair(os.Getenv("RABBITMQSSL_CERTFILE"), os.Getenv("RABBITMQSSL_KEYFILE")) if err != nil { return err } tlsConfig = &tls.Config{ RootCAs: caCertPool, Certificates: []tls.Certificate{clientCert}, } } connString := fmt.Sprintf("%s://%s:%s@%s:%s", map[bool]string{true: "amqps", false: "amqp"}[sslEnable], os.Getenv("RABBITMQ_USER_PROD"), os.Getenv("RABBITMQ_PASSWORD_PROD"), os.Getenv("RABBITMQ_HOST"), map[bool]string{true: os.Getenv("RABBITMQ_EXTERNAL_PORT"), false: os.Getenv("RABBITMQ_TCP_PORT")}[sslEnable], ) conn, err := amqp.DialConfig(connString, amqp.Config{ TLSClientConfig: tlsConfig, }) if err != nil { return err } rmq.Connection = conn rmq.ModemRPCChannel, err = conn.Channel() if err != nil { return err } rmq.RegQueue, err = conn.Channel() if err != nil { return err } rmq.DataQueue, err = conn.Channel() if err != nil { return err } rmq.RPCQueue, err = conn.Channel() if err != nil { return err } _, err = rmq.RegQueue.QueueDeclare("reg_queue", true, false, false, false, nil) if err != nil { return err } err = rmq.RegQueue.Qos(300, 0, false) if err != nil { return err } _, err = rmq.DataQueue.QueueDeclare("data_queue", true, false, false, false, nil) if err != nil { return err } rmq.DataQueue.Qos(300, 0, false) _, err = rmq.RPCQueue.QueueDeclare("rpc_queue", true, false, false, false, nil) if err != nil { return err } rmq.RPCQueue.Qos(300, 0, false) return nil } func (rmq *RabbitMQ) Delay(ms time.Duration) { time.Sleep(ms * time.Millisecond) } func (rmq *RabbitMQ) ReadyCheck() bool { var trying int for trying = 0; trying < 30; trying++ { //TODO CHECk IF EXISTS rmq.RegQueue.Consume("reg_queue", "", false, false, false, false, nil) rmq.DataQueue.Consume("data_queue", "", false, false, false, false, nil) rmq.RPCQueue.Consume("rpc_queue", "", false, false, false, false, nil) //if regOK && dataOK && rpcOK { // return true //} rmq.Delay(1000) } return false }