package data_libs import ( "fmt" "node_rabbit_go/app/lib" "sync" ) type Multilinks struct { serverList sync.Map } func NewMultilinks() *Multilinks { return &Multilinks{} } func (m *Multilinks) getPacket(requestController *lib.RequestController, data map[string]interface{}, devID string, timestamp int64) { if data == nil { return } // Каналы для асинхронной обработки serverListChan := make(chan map[string]int, 1) deleteDoneChan := make(chan error, 1) insertDoneChan := make(chan error, 1) sendMessageChan := make(chan error, 1) // Горутины для выполнения асинхронных операций go m.getServerList(requestController, serverListChan) go m.deleteDeviceServer(requestController, devID, deleteDoneChan) serverList := <-serverListChan for _, linkData := range data { linkDataMap, ok := linkData.(map[string]interface{}) if !ok { continue } for _, keyData := range linkDataMap { keyDataMap, ok := keyData.(map[string]interface{}) if !ok { continue } serverIP, ok := keyDataMap["server_ip"].(string) if !ok { continue } serverID, exists := serverList[serverIP] if !exists { go m.insertServer(requestController, serverIP, serverListChan) serverID = <-serverListChan } localIPs, ok := keyDataMap["local_ip"].([]interface{}) if !ok { continue } port, ok := keyDataMap["port"].(int) if !ok { continue } for _, ip := range localIPs { ipStr, ok := ip.(string) if !ok { continue } go m.insertDeviceServer(requestController, devID, serverID, ipStr, port, insertDoneChan) <-insertDoneChan } message := map[string]interface{}{ "action": "UPDATE", "subtopic": "device_update_main", } go requestController.STOMP.SendMessage("device_update_main", devID, message, sendMessageChan) <-sendMessageChan } } if err := <-deleteDoneChan; err != nil { //logger_and_exception_handler.Error(fmt.Sprintf("Error deleting device server: %v", err)) } } func (m *Multilinks) getServerList(requestController *lib.RequestController, resultChan chan map[string]int) { query := "SELECT id, ip FROM servers WHERE 1" rows, err := requestController.MySQL.Query(query) if err != nil { //logger_and_exception_handler.Error(fmt.Sprintf("Error fetching server list: %v", err)) resultChan <- nil return } //defer rows.Close() serverList := make(map[string]int) for rows.Next() { var id int var ip string if err := rows.Scan(&id, &ip); err != nil { //logger_and_exception_handler.Error(fmt.Sprintf("Error scanning server row: %v", err)) resultChan <- nil return } serverList[ip] = id } resultChan <- serverList } func (m *Multilinks) deleteDeviceServer(requestController *lib.RequestController, devID string, resultChan chan error) { query := fmt.Sprintf("DELETE FROM device_server WHERE device_id='%s'", devID) _, err := requestController.MySQL.Query(query) resultChan <- err } func (m *Multilinks) insertServer(requestController *lib.RequestController, serverIP string, resultChan chan int) { query := fmt.Sprintf("INSERT INTO servers (name, ip, name_short) VALUES ('NOT REGISTERED', '%s', 'NOT_REG') RETURNING id", serverIP) row, _ := requestController.MySQL.Query(query) var id int if err := row.Scan(&id); err != nil { //logger_and_exception_handler.Error(fmt.Sprintf("Error inserting server: %v", err)) resultChan <- 0 return } m.serverList.Store(serverIP, id) resultChan <- id } func (m *Multilinks) insertDeviceServer(requestController *lib.RequestController, devID string, serverID int, ip string, port int, resultChan chan error) { query := fmt.Sprintf( "INSERT INTO device_server (device_id, server_id, ip, port) VALUES ('%s', %d, '%s', %d) ON DUPLICATE KEY UPDATE device_id='%s', server_id=%d, ip='%s', port=%d", devID, serverID, ip, port, devID, serverID, ip, port) _, err := requestController.MySQL.Query(query) resultChan <- err }