go_rabbitmq/app/lib/data_libs/multilinks.go

138 lines
3.9 KiB
Go

package data_libs
import (
"fmt"
"node_rabbit_go/app/lib"
"sync"
)
type Multilinks struct {
serverList sync.Map
}
func NewMultilinks() *Multilinks {
return &Multilinks{}
}
func (m *Multilinks) getPacket(requestController *lib.RequestController, data map[string]interface{}, devID string, timestamp int64) {
if data == nil {
return
}
// Каналы для асинхронной обработки
serverListChan := make(chan map[string]int, 1)
deleteDoneChan := make(chan error, 1)
insertDoneChan := make(chan error, 1)
sendMessageChan := make(chan error, 1)
// Горутины для выполнения асинхронных операций
go m.getServerList(requestController, serverListChan)
go m.deleteDeviceServer(requestController, devID, deleteDoneChan)
serverList := <-serverListChan
for _, linkData := range data {
linkDataMap, ok := linkData.(map[string]interface{})
if !ok {
continue
}
for _, keyData := range linkDataMap {
keyDataMap, ok := keyData.(map[string]interface{})
if !ok {
continue
}
serverIP, ok := keyDataMap["server_ip"].(string)
if !ok {
continue
}
serverID, exists := serverList[serverIP]
if !exists {
go m.insertServer(requestController, serverIP, serverListChan)
serverID = <-serverListChan
}
localIPs, ok := keyDataMap["local_ip"].([]interface{})
if !ok {
continue
}
port, ok := keyDataMap["port"].(int)
if !ok {
continue
}
for _, ip := range localIPs {
ipStr, ok := ip.(string)
if !ok {
continue
}
go m.insertDeviceServer(requestController, devID, serverID, ipStr, port, insertDoneChan)
<-insertDoneChan
}
message := map[string]interface{}{
"action": "UPDATE",
"subtopic": "device_update_main",
}
go requestController.STOMP.SendMessage("device_update_main", devID, message, sendMessageChan)
<-sendMessageChan
}
}
if err := <-deleteDoneChan; err != nil {
//logger_and_exception_handler.Error(fmt.Sprintf("Error deleting device server: %v", err))
}
}
func (m *Multilinks) getServerList(requestController *lib.RequestController, resultChan chan map[string]int) {
query := "SELECT id, ip FROM servers WHERE 1"
rows, err := requestController.MySQL.Query(query)
if err != nil {
//logger_and_exception_handler.Error(fmt.Sprintf("Error fetching server list: %v", err))
resultChan <- nil
return
}
//defer rows.Close()
serverList := make(map[string]int)
for rows.Next() {
var id int
var ip string
if err := rows.Scan(&id, &ip); err != nil {
//logger_and_exception_handler.Error(fmt.Sprintf("Error scanning server row: %v", err))
resultChan <- nil
return
}
serverList[ip] = id
}
resultChan <- serverList
}
func (m *Multilinks) deleteDeviceServer(requestController *lib.RequestController, devID string, resultChan chan error) {
query := fmt.Sprintf("DELETE FROM device_server WHERE device_id='%s'", devID)
_, err := requestController.MySQL.Query(query)
resultChan <- err
}
func (m *Multilinks) insertServer(requestController *lib.RequestController, serverIP string, resultChan chan int) {
query := fmt.Sprintf("INSERT INTO servers (name, ip, name_short) VALUES ('NOT REGISTERED', '%s', 'NOT_REG') RETURNING id", serverIP)
row, _ := requestController.MySQL.Query(query)
var id int
if err := row.Scan(&id); err != nil {
//logger_and_exception_handler.Error(fmt.Sprintf("Error inserting server: %v", err))
resultChan <- 0
return
}
m.serverList.Store(serverIP, id)
resultChan <- id
}
func (m *Multilinks) insertDeviceServer(requestController *lib.RequestController, devID string, serverID int, ip string, port int, resultChan chan error) {
query := fmt.Sprintf(
"INSERT INTO device_server (device_id, server_id, ip, port) VALUES ('%s', %d, '%s', %d) ON DUPLICATE KEY UPDATE device_id='%s', server_id=%d, ip='%s', port=%d",
devID, serverID, ip, port, devID, serverID, ip, port)
_, err := requestController.MySQL.Query(query)
resultChan <- err
}