From ad9924354fcb478884488e16f82fa10e9786c233 Mon Sep 17 00:00:00 2001 From: moxitech Date: Mon, 27 May 2024 14:39:09 +0700 Subject: [PATCH] init --- .env | 31 ++++ .idea/.gitignore | 8 + .idea/modules.xml | 8 + .idea/node_rabbit_go.iml | 9 + Makefile | 3 + app/databases/clickhouse.go | 126 ++++++++++++++ app/databases/mysql.go | 109 ++++++++++++ app/databases/rabbitmq.go | 135 +++++++++++++++ app/entity/device.go | 17 ++ app/entity/device_uptime.go | 10 ++ app/internals/logger/logger.go | 116 +++++++++++++ app/lib/STOMPlib.go | 86 ++++++++++ app/lib/data_libs/config.go | 1 + app/lib/data_libs/multilinks.go | 137 +++++++++++++++ app/lib/data_libs/version.go | 188 ++++++++++++++++++++ app/lib/device_registry.go | 162 ++++++++++++++++++ app/lib/redis/device_status.go | 35 ++++ app/lib/request_controller.go | 86 ++++++++++ app/lib/task_executor.go | 245 ++++++++++++++++++++++++++ app/main.go | 295 ++++++++++++++++++++++++++++++++ func_header.txt | 91 ++++++++++ go.mod | 21 +++ go.sum | 76 ++++++++ qodana.yaml | 29 ++++ 24 files changed, 2024 insertions(+) create mode 100644 .env create mode 100644 .idea/.gitignore create mode 100644 .idea/modules.xml create mode 100644 .idea/node_rabbit_go.iml create mode 100644 Makefile create mode 100644 app/databases/clickhouse.go create mode 100644 app/databases/mysql.go create mode 100644 app/databases/rabbitmq.go create mode 100644 app/entity/device.go create mode 100644 app/entity/device_uptime.go create mode 100644 app/internals/logger/logger.go create mode 100644 app/lib/STOMPlib.go create mode 100644 app/lib/data_libs/config.go create mode 100644 app/lib/data_libs/multilinks.go create mode 100644 app/lib/data_libs/version.go create mode 100644 app/lib/device_registry.go create mode 100644 app/lib/redis/device_status.go create mode 100644 app/lib/request_controller.go create mode 100644 app/lib/task_executor.go create mode 100644 app/main.go create mode 100644 func_header.txt create mode 100644 go.mod create mode 100644 go.sum create mode 100644 qodana.yaml diff --git a/.env b/.env new file mode 100644 index 0000000..86c46fb --- /dev/null +++ b/.env @@ -0,0 +1,31 @@ + +# LOGGER +#SENTRY_ENVIRONMENT= + +# MYSQL +MARIADB_USER=root +MARIADB_PWD=1 +MARIADB_HOST=localhost +MARIADB_PORT=3306 +MARIADB_NAME=sistematics + +# Clickhouse +CLICKHOUSE_HOST=sis_clickhouse +CLICKHOUSE_PORT=8123 +CLICKHOUSE_USER= +CLICKHOUSE_PASSWORD= +CLICKHOUSE_DATABASE=sistematics +# RabbitMQ +RABBITMQSSL_KEYFILE= +RABBITMQSSL_EXT= +RABBITMQSSL_CACERTFILE= +RABBITMQSSL_CERTFILE= +RABBITMQ_EXTERNAL_PORT= +RABBITMQ_TCP_PORT= + +# STOMP +SERVICEBRIDGE_ENABLE= +RABBITMQ_HOST= +RABBITMQ_WS_PORT= +RABBITMQ_USER_PROD= +RABBITMQ_PASSWORD_PROD= \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..1c2fda5 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..c9619fb --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/node_rabbit_go.iml b/.idea/node_rabbit_go.iml new file mode 100644 index 0000000..338a266 --- /dev/null +++ b/.idea/node_rabbit_go.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..77c7cf0 --- /dev/null +++ b/Makefile @@ -0,0 +1,3 @@ +PHONY: dev +dev: + go run ./app/main.go \ No newline at end of file diff --git a/app/databases/clickhouse.go b/app/databases/clickhouse.go new file mode 100644 index 0000000..7b09181 --- /dev/null +++ b/app/databases/clickhouse.go @@ -0,0 +1,126 @@ +package databases + +import ( + "database/sql" + "fmt" + "log" + "os" + "strings" + "sync" + + _ "github.com/ClickHouse/clickhouse-go" +) + +type Clickhouse struct { + DB *sql.DB + TablesQueues map[string][]interface{} + NeedCountForTable map[string]int + TableTemplate map[string]string + mux sync.Mutex +} + +func NewClickhouse() *Clickhouse { + ch := &Clickhouse{ + DB: nil, + TablesQueues: make(map[string][]interface{}), + NeedCountForTable: map[string]int{ + "device_memory": 3000, + "device_logs": 3000, + "simcard_logs": 3000, + }, + TableTemplate: map[string]string{ + "device_traffic": "(device_id, iface, tx, rx, tx_max, rx_max, tx_traffic, rx_traffic, ts)", + "device_cpu_status": "(device_id, cpu, cpu_temp, mother_temp, ts)", + "device_loc_history": "(device_id, ts, long, lat, speed, course, altitude)", + "device_memory": "(device_id, ts, data)", + "device_logs": "(device_id, type, type_event, params, ts, ts_device)", + "simcard_logs": "(imsi, type, params, ts)", + "simcard_network_status": "(imsi, device_id, ts, signal, sig_lvl, state, roaming, iface, rsrp, rsrq, ecio, rscp, lte_band, sinr, temp, oper, oper_id, cell_id, lac_id)", + "simcard_speed": "(imsi, device_id, tx, rx, tx_max, rx_max, tx_traffic, rx_traffic, ts)", + "node_device_logs": "(dev_id, cmd_id, data, ts)", + "rstat_data": "(device_id, ts, rstat_serial, in, out, pass)", + "communication_status": "(device_id, ts, iface, rssi, rsrp, rsrq, ecio, rscp, lte_band, sinr, network_reg, temp)", + }, + } + + for table := range ch.NeedCountForTable { + ch.TablesQueues[table] = make([]interface{}, 0) + } + + return ch +} + +func (ch *Clickhouse) InitDB() error { + dsn := fmt.Sprintf("tcp://%s:%s?username=%s&password=%s&database=%s", + os.Getenv("CLICKHOUSE_HOST"), os.Getenv("CLICKHOUSE_PORT"), + os.Getenv("CLICKHOUSE_USER"), os.Getenv("CLICKHOUSE_PASSWORD"), + os.Getenv("CLICKHOUSE_DATABASE"), + ) + + db, err := sql.Open("clickhouse", dsn) + if err != nil { + return err + } + + ch.DB = db + return nil +} + +func (ch *Clickhouse) Insert(table string, data interface{}) error { + ch.mux.Lock() + ch.TablesQueues[table] = append(ch.TablesQueues[table], data) + shouldFlush := len(ch.TablesQueues[table]) >= ch.NeedCountForTable[table] + ch.mux.Unlock() + + if shouldFlush { + go func() { + if err := ch.FlushQueue(table); err != nil { + log.Printf("Clickhouse error: %s", err) + } + }() + } + return nil +} + +func (ch *Clickhouse) FlushQueue(table string) error { + ch.mux.Lock() + if ch.DB == nil { + ch.mux.Unlock() + return fmt.Errorf("database connection is not initialized") + } + + data := ch.TablesQueues[table] + ch.TablesQueues[table] = make([]interface{}, 0) + ch.mux.Unlock() + + query := fmt.Sprintf("INSERT INTO %s %s VALUES", table, ch.TableTemplate[table]) + var valueStrings []string + var valueArgs []interface{} + + for _, entry := range data { + values := entry.([]interface{}) + valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?)") + valueArgs = append(valueArgs, values...) + } + + query = query + strings.Join(valueStrings, ",") + + stmt, err := ch.DB.Prepare(query) + if err != nil { + return err + } + + defer func(stmt *sql.Stmt) { + err := stmt.Close() + if err != nil { + fmt.Printf("Ошибка закрытия подключения к БД") + } + }(stmt) + + _, err = stmt.Exec(valueArgs...) + if err != nil { + return err + } + + return nil +} diff --git a/app/databases/mysql.go b/app/databases/mysql.go new file mode 100644 index 0000000..0c9c805 --- /dev/null +++ b/app/databases/mysql.go @@ -0,0 +1,109 @@ +package databases + +import ( + "database/sql" + "fmt" + "log" + "os" + "sync" + + _ "github.com/go-sql-driver/mysql" +) + +type Database struct { + db *sql.DB + mux sync.Mutex +} + +// NewSQLDatabase создает новое соединение с базой данных +func NewSQLDatabase() *Database { + d := &Database{} + var err error + dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", + os.Getenv("MARIADB_USER"), + os.Getenv("MARIADB_PWD"), + os.Getenv("MARIADB_HOST"), + os.Getenv("MARIADB_PORT"), + os.Getenv("MARIADB_NAME"), + ) + d.db, err = sql.Open("mysql", dsn) + if err != nil { + log.Fatal(err) + } + err = d.db.Ping() + if err != nil { + log.Fatal(err) + } + return d +} + +// Query выполняет запрос к базе данных и возвращает строки результата +func (d *Database) Query(query string) (*sql.Rows, error) { + d.mux.Lock() + defer d.mux.Unlock() + + rows, err := d.db.Query(query) + if err != nil { + return nil, err + } + return rows, nil +} + +// Exec выполняет запрос к базе данных без возврата результата +func (d *Database) Exec(query string, args ...interface{}) error { + d.mux.Lock() + defer d.mux.Unlock() + + _, err := d.db.Exec(query, args...) + if err != nil { + return err + } + return nil +} + +// Insert выполняет вставку данных в указанную таблицу +func (d *Database) Insert(table string, columns []string, values []interface{}) error { + if len(columns) == 0 || len(values) == 0 { + return fmt.Errorf("columns and values must not be empty") + } + if len(columns) != len(values) { + return fmt.Errorf("the number of columns must match the number of values") + } + + columnsStr := fmt.Sprintf("(%s)", joinStrings(columns, ", ")) + valuesPlaceholders := fmt.Sprintf("(%s)", joinStrings(makePlaceholders(len(values)), ", ")) + + query := fmt.Sprintf("INSERT INTO %s %s VALUES %s", table, columnsStr, valuesPlaceholders) + return d.Exec(query, values...) +} + +// Close закрывает соединение с базой данных +func (d *Database) Close() { + d.mux.Lock() + defer d.mux.Unlock() + + if err := d.db.Close(); err != nil { + log.Println("Error closing the database connection:", err) + } +} + +// joinStrings объединяет строки с указанным разделителем +func joinStrings(elements []string, delimiter string) string { + result := "" + for i, element := range elements { + if i > 0 { + result += delimiter + } + result += element + } + return result +} + +// makePlaceholders создает строковые заполнители для значений +func makePlaceholders(n int) []string { + placeholders := make([]string, n) + for i := range placeholders { + placeholders[i] = "?" + } + return placeholders +} diff --git a/app/databases/rabbitmq.go b/app/databases/rabbitmq.go new file mode 100644 index 0000000..9f54f78 --- /dev/null +++ b/app/databases/rabbitmq.go @@ -0,0 +1,135 @@ +package databases + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "os" + "time" + + "github.com/streadway/amqp" +) + +type RabbitMQ struct { + Connection *amqp.Connection + MyselfQueue *amqp.Channel + RegQueue *amqp.Channel + DataQueue *amqp.Channel + RPCQueue *amqp.Channel + ModemRPCChannel *amqp.Channel +} + +func NewRabbitMQ() *RabbitMQ { + return &RabbitMQ{} +} + +func (rmq *RabbitMQ) Connect() error { + sslEnable := os.Getenv("RABBITMQSSL_KEYFILE") != "" && os.Getenv("RABBITMQSSL_EXT") == "true" + var tlsConfig *tls.Config + + if sslEnable { + caCert, err := os.ReadFile(os.Getenv("RABBITMQSSL_CACERTFILE")) + + if err != nil { + return err + } + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + clientCert, err := tls.LoadX509KeyPair(os.Getenv("RABBITMQSSL_CERTFILE"), os.Getenv("RABBITMQSSL_KEYFILE")) + if err != nil { + return err + } + + tlsConfig = &tls.Config{ + RootCAs: caCertPool, + Certificates: []tls.Certificate{clientCert}, + } + } + + connString := fmt.Sprintf("%s://%s:%s@%s:%s", + map[bool]string{true: "amqps", false: "amqp"}[sslEnable], + os.Getenv("RABBITMQ_USER_PROD"), + os.Getenv("RABBITMQ_PASSWORD_PROD"), + os.Getenv("RABBITMQ_HOST"), + map[bool]string{true: os.Getenv("RABBITMQ_EXTERNAL_PORT"), false: os.Getenv("RABBITMQ_TCP_PORT")}[sslEnable], + ) + + conn, err := amqp.DialConfig(connString, amqp.Config{ + TLSClientConfig: tlsConfig, + }) + if err != nil { + return err + } + + rmq.Connection = conn + + rmq.ModemRPCChannel, err = conn.Channel() + if err != nil { + return err + } + + rmq.RegQueue, err = conn.Channel() + if err != nil { + return err + } + + rmq.DataQueue, err = conn.Channel() + if err != nil { + return err + } + + rmq.RPCQueue, err = conn.Channel() + if err != nil { + return err + } + + _, err = rmq.RegQueue.QueueDeclare("reg_queue", true, false, false, false, nil) + if err != nil { + return err + } + + err = rmq.RegQueue.Qos(300, 0, false) + if err != nil { + return err + } + + _, err = rmq.DataQueue.QueueDeclare("data_queue", true, false, false, false, nil) + if err != nil { + return err + } + + rmq.DataQueue.Qos(300, 0, false) + + _, err = rmq.RPCQueue.QueueDeclare("rpc_queue", true, false, false, false, nil) + if err != nil { + return err + } + + rmq.RPCQueue.Qos(300, 0, false) + + return nil +} + +func (rmq *RabbitMQ) Delay(ms time.Duration) { + time.Sleep(ms * time.Millisecond) +} + +func (rmq *RabbitMQ) ReadyCheck() bool { + var trying int + for trying = 0; trying < 30; trying++ { + //TODO CHECk IF EXISTS + rmq.RegQueue.Consume("reg_queue", "", false, false, false, false, nil) + rmq.DataQueue.Consume("data_queue", "", false, false, false, false, nil) + rmq.RPCQueue.Consume("rpc_queue", "", false, false, false, false, nil) + + //if regOK && dataOK && rpcOK { + // return true + //} + + rmq.Delay(1000) + } + + return false +} diff --git a/app/entity/device.go b/app/entity/device.go new file mode 100644 index 0000000..bbb3ae9 --- /dev/null +++ b/app/entity/device.go @@ -0,0 +1,17 @@ +package entity + +type Device struct { + Id int + Device_id string + Fid string + Mid string + Visible int + Device_location_id int + Device_model_id int + Name string + Name2 string + Description string + Add_Desc string + //TODO ADD ETC ROW() + Ts_sync int64 +} diff --git a/app/entity/device_uptime.go b/app/entity/device_uptime.go new file mode 100644 index 0000000..ef24dfb --- /dev/null +++ b/app/entity/device_uptime.go @@ -0,0 +1,10 @@ +package entity + +type DeviceUptime struct { + Device_id int // Id устройства + Ts int // Время обновления + Status int // Статус (0 - выключен, 1 - работает) + Num_serv int // ??? + Uptime int // Uptime модема во время регистрации + Queue_size int // Размер очереди в момент подключения +} diff --git a/app/internals/logger/logger.go b/app/internals/logger/logger.go new file mode 100644 index 0000000..1c34824 --- /dev/null +++ b/app/internals/logger/logger.go @@ -0,0 +1,116 @@ +package logger + +import ( + "fmt" + "github.com/getsentry/sentry-go" + "os" +) + +var ( + colorsTags = map[string]string{ + "Reset": "\033[0m", + "Bright": "\033[1m", + "Dim": "\033[2m", + "Underscore": "\033[4m", + "Blink": "\033[5m", + "Reverse": "\033[7m", + "Hidden": "\033[8m", + + "FgBlack": "\033[30m", + "FgRed": "\033[31m", + "FgGreen": "\033[32m", + "FgYellow": "\033[33m", + "FgBlue": "\033[34m", + "FgMagenta": "\033[35m", + "FgCyan": "\033[36m", + "FgWhite": "\033[37m", + "FgGray": "\033[90m", + + "BgBlack": "\033[40m", + "BgRed": "\033[41m", + "BgGreen": "\033[42m", + "BgYellow": "\033[43m", + "BgBlue": "\033[44m", + "BgMagenta": "\033[45m", + "BgCyan": "\033[46m", + "BgWhite": "\033[47m", + "BgGray": "\033[100m", + } +) + +type Logger struct { + sentryEnabled bool +} + +func NewLogger() *Logger { + logger := &Logger{} + if env := os.Getenv("SENTRY_ENVIRONMENT"); env != "" { + err := sentry.Init(sentry.ClientOptions{ + Dsn: "https://7698f4c6119546638794f11e26ee11e9@sentry.sis.ski/11", + Environment: env, + TracesSampleRate: 1.0, + }) + if err != nil { + fmt.Printf("Sentry initialization failed: %v\n", err) + } else { + logger.sentryEnabled = true + } + } + return logger +} + +func (l *Logger) Info(message string) { + fmt.Printf("%sINFO%s: %s%s\n", colorsTags["BgWhite"]+colorsTags["FgBlack"], colorsTags["Reset"], message, colorsTags["Reset"]) +} + +func (l *Logger) Debug(message string) { + fmt.Printf("%sDEBUG%s: %s%s\n", colorsTags["BgBlue"], colorsTags["Reset"], message, colorsTags["Reset"]) +} + +func (l *Logger) Fatal(message string) { + fmt.Printf("%sFATAL%s: %s%s%s%s%s%s%s\n", colorsTags["Blink"]+colorsTags["BgRed"], colorsTags["Reset"], colorsTags["Bright"], colorsTags["FgMagenta"], message, colorsTags["Reset"], colorsTags["Reset"], colorsTags["Reset"]) + if l.sentryEnabled { + sentry.ConfigureScope(func(scope *sentry.Scope) { + scope.SetLevel(sentry.LevelFatal) + }) + sentry.CaptureMessage(message) + } +} + +func (l *Logger) Error(message string) { + fmt.Printf("%sERROR%s: %s%s%s\n", colorsTags["BgRed"], colorsTags["Reset"], colorsTags["FgRed"], message, colorsTags["Reset"]) + if l.sentryEnabled { + sentry.ConfigureScope(func(scope *sentry.Scope) { + scope.SetLevel(sentry.LevelError) + }) + sentry.CaptureMessage(message) + } +} + +func (l *Logger) Critical(message string) { + fmt.Printf("%sCRITICAL%s: %s%s%s%s%s\n", colorsTags["Blink"]+colorsTags["BgRed"], colorsTags["Reset"], colorsTags["FgMagenta"], message, colorsTags["Reset"], colorsTags["Reset"], colorsTags["Reset"]) + if l.sentryEnabled { + sentry.ConfigureScope(func(scope *sentry.Scope) { + scope.SetLevel("critical") + }) + sentry.CaptureMessage(message) + } +} + +func (l *Logger) Warning(message string) { + fmt.Printf("%sWARN%s: %s%s\n", colorsTags["BgYellow"]+colorsTags["FgBlack"], colorsTags["Reset"], message, colorsTags["Reset"]) + if l.sentryEnabled { + sentry.ConfigureScope(func(scope *sentry.Scope) { + scope.SetLevel(sentry.LevelWarning) + }) + sentry.CaptureMessage(message) + } +} + +func (l *Logger) Log(message string) { + fmt.Println(message) + sentry.ConfigureScope(func(scope *sentry.Scope) { + scope.SetLevel("log") + }) + sentry.CaptureMessage(message) +} diff --git a/app/lib/STOMPlib.go b/app/lib/STOMPlib.go new file mode 100644 index 0000000..37f09f2 --- /dev/null +++ b/app/lib/STOMPlib.go @@ -0,0 +1,86 @@ +package lib + +import ( + "crypto/md5" + "encoding/hex" + "encoding/json" + "fmt" + "github.com/go-stomp/stomp" + "golang.org/x/net/websocket" + "log" + "os" + "time" +) + +type STOMPlib struct { + client *stomp.Conn + waitMessages map[string]*time.Timer + state bool + brokerURL string +} + +func NewSTOMPlib() *STOMPlib { + s := &STOMPlib{ + client: nil, + waitMessages: make(map[string]*time.Timer), + state: os.Getenv("SERVICEBRIDGE_ENABLE") == "true", + brokerURL: fmt.Sprintf("ws://%s:%s/ws", + os.Getenv("RABBITMQ_HOST"), + os.Getenv("RABBITMQ_WS_PORT")), + } + + if s.state { + conn, err := websocket.Dial(s.brokerURL, "", "") + if err != nil { + log.Fatalf("Error connecting to WebSocket: %v", err) + } + /* + stomp.ConnOpt.Login(os.Getenv("RABBITMQ_USER_PROD"), os.Getenv("RABBITMQ_PASSWORD_PROD")), + //stomp.ConnOpt.ReconnectDelay(5*time.Second), + stomp.ConnOpt.HeartBeat(4*time.Second, 4*time.Second), + ////conn + */ + s.client, err = stomp.Connect(conn, + stomp.ConnOpt.Login(os.Getenv("RABBITMQ_USER_PROD"), os.Getenv("RABBITMQ_PASSWORD_PROD")), + stomp.ConnOpt.HeartBeat(4*time.Second, 4*time.Second), + //TODO + ) + + if err != nil { + log.Fatalf("Error creating STOMP client: %v", err) + } + } + + return s +} + +func (s *STOMPlib) SendMessage(topic, id string, message map[string]interface{}) { + if s.state && s.client != nil { + // Преобразование message в JSON + messageBytes, err := json.Marshal(message) + if err != nil { + fmt.Printf("[STOMP ERROR] JSON marshal error: %v\n", err) + return + } + messageStr := string(messageBytes) + + hash := md5.Sum(messageBytes) + hashStr := hex.EncodeToString(hash[:]) + if _, exists := s.waitMessages[hashStr]; exists { + return + } + + s.waitMessages[hashStr] = time.AfterFunc(1*time.Second, func() { + delete(s.waitMessages, hashStr) + err := s.client.Send( + fmt.Sprintf("/topic/%s.%s", topic, id), + "text/plain", + []byte(messageStr), + nil, + ) + if err != nil { + fmt.Printf("[STOMP ERROR] %v\n", err) + } + }) + } +} diff --git a/app/lib/data_libs/config.go b/app/lib/data_libs/config.go new file mode 100644 index 0000000..d3668a8 --- /dev/null +++ b/app/lib/data_libs/config.go @@ -0,0 +1 @@ +package data_libs diff --git a/app/lib/data_libs/multilinks.go b/app/lib/data_libs/multilinks.go new file mode 100644 index 0000000..7ba191c --- /dev/null +++ b/app/lib/data_libs/multilinks.go @@ -0,0 +1,137 @@ +package data_libs + +import ( + "fmt" + "node_rabbit_go/app/lib" + "sync" +) + +type Multilinks struct { + serverList sync.Map +} + +func NewMultilinks() *Multilinks { + return &Multilinks{} +} + +func (m *Multilinks) getPacket(requestController *lib.RequestController, data map[string]interface{}, devID string, timestamp int64) { + if data == nil { + return + } + + // Каналы для асинхронной обработки + serverListChan := make(chan map[string]int, 1) + deleteDoneChan := make(chan error, 1) + insertDoneChan := make(chan error, 1) + sendMessageChan := make(chan error, 1) + + // Горутины для выполнения асинхронных операций + go m.getServerList(requestController, serverListChan) + go m.deleteDeviceServer(requestController, devID, deleteDoneChan) + + serverList := <-serverListChan + for _, linkData := range data { + linkDataMap, ok := linkData.(map[string]interface{}) + if !ok { + continue + } + for _, keyData := range linkDataMap { + keyDataMap, ok := keyData.(map[string]interface{}) + if !ok { + continue + } + serverIP, ok := keyDataMap["server_ip"].(string) + if !ok { + continue + } + serverID, exists := serverList[serverIP] + if !exists { + go m.insertServer(requestController, serverIP, serverListChan) + serverID = <-serverListChan + } + + localIPs, ok := keyDataMap["local_ip"].([]interface{}) + if !ok { + continue + } + port, ok := keyDataMap["port"].(int) + if !ok { + continue + } + + for _, ip := range localIPs { + ipStr, ok := ip.(string) + if !ok { + continue + } + go m.insertDeviceServer(requestController, devID, serverID, ipStr, port, insertDoneChan) + <-insertDoneChan + } + + message := map[string]interface{}{ + "action": "UPDATE", + "subtopic": "device_update_main", + } + go requestController.STOMP.SendMessage("device_update_main", devID, message, sendMessageChan) + <-sendMessageChan + } + } + + if err := <-deleteDoneChan; err != nil { + //logger_and_exception_handler.Error(fmt.Sprintf("Error deleting device server: %v", err)) + } +} + +func (m *Multilinks) getServerList(requestController *lib.RequestController, resultChan chan map[string]int) { + query := "SELECT id, ip FROM servers WHERE 1" + rows, err := requestController.MySQL.Query(query) + if err != nil { + //logger_and_exception_handler.Error(fmt.Sprintf("Error fetching server list: %v", err)) + resultChan <- nil + return + } + //defer rows.Close() + + serverList := make(map[string]int) + for rows.Next() { + var id int + var ip string + if err := rows.Scan(&id, &ip); err != nil { + //logger_and_exception_handler.Error(fmt.Sprintf("Error scanning server row: %v", err)) + resultChan <- nil + return + } + serverList[ip] = id + } + + resultChan <- serverList +} + +func (m *Multilinks) deleteDeviceServer(requestController *lib.RequestController, devID string, resultChan chan error) { + query := fmt.Sprintf("DELETE FROM device_server WHERE device_id='%s'", devID) + _, err := requestController.MySQL.Query(query) + resultChan <- err +} + +func (m *Multilinks) insertServer(requestController *lib.RequestController, serverIP string, resultChan chan int) { + query := fmt.Sprintf("INSERT INTO servers (name, ip, name_short) VALUES ('NOT REGISTERED', '%s', 'NOT_REG') RETURNING id", serverIP) + row, _ := requestController.MySQL.Query(query) + + var id int + if err := row.Scan(&id); err != nil { + //logger_and_exception_handler.Error(fmt.Sprintf("Error inserting server: %v", err)) + resultChan <- 0 + return + } + + m.serverList.Store(serverIP, id) + resultChan <- id +} + +func (m *Multilinks) insertDeviceServer(requestController *lib.RequestController, devID string, serverID int, ip string, port int, resultChan chan error) { + query := fmt.Sprintf( + "INSERT INTO device_server (device_id, server_id, ip, port) VALUES ('%s', %d, '%s', %d) ON DUPLICATE KEY UPDATE device_id='%s', server_id=%d, ip='%s', port=%d", + devID, serverID, ip, port, devID, serverID, ip, port) + _, err := requestController.MySQL.Query(query) + resultChan <- err +} diff --git a/app/lib/data_libs/version.go b/app/lib/data_libs/version.go new file mode 100644 index 0000000..0950403 --- /dev/null +++ b/app/lib/data_libs/version.go @@ -0,0 +1,188 @@ +package data_libs + +import ( + "encoding/json" + "fmt" + "node_rabbit_go/app/lib" + "time" +) + +type Version struct{} + +func New() *Version { + return &Version{} +} + +func (v *Version) GetPacket(requestController *lib.RequestController, data map[string]interface{}, devID string, timestamp int64) { + selectResultChan := make(chan []struct { + Version string `db:"version"` + }, 1) + errChan := make(chan error, 1) + + go v.selectVersion(requestController, devID, selectResultChan, errChan) + + select { + case selectResult := <-selectResultChan: + var oldVer map[string]interface{} + if selectResult[0].Version != "" { + err := json.Unmarshal([]byte(selectResult[0].Version), &oldVer) + if err != nil { + //log.Logger.Error(fmt.Sprintf("JSON unmarshal error: %v", err)) + return + } + } + + detectChangesChan := make(chan bool) + go v.CheckChanges(oldVer, data, devID, requestController, detectChangesChan) + + detectChanges := <-detectChangesChan + if detectChanges { + oldVerJSON, _ := json.Marshal(oldVer) + newVerJSON, _ := json.Marshal(data) + + go v.insertVersionHistory(requestController, devID, timestamp, string(oldVerJSON), string(newVerJSON), errChan) + go v.updateDeviceVersion(requestController, devID, string(newVerJSON), errChan) + + requestController.STOMP.SendMessage("device_update_main", devID, map[string]interface{}{ + "action": "UPDATE", + "subtopic": "device_update_main", + }) + } + case err := <-errChan: + fmt.Printf("%s", err) + //logger_and_exception_handler.Error(fmt.Sprintf("SQL select error: %v", err)) + return + } +} + +func (v *Version) selectVersion(requestController *lib.RequestController, devID string, resultChan chan []struct { + Version string `db:"version"` +}, errChan chan error) { + query := fmt.Sprintf("SELECT version FROM device WHERE device_id=%d", devID) + rows, err := requestController.MySQL.Query(query) + if err != nil { + errChan <- err + return + } + //defer rows.Close() + + var selectResult []struct { + Version string `db:"version"` + } + + for rows.Next() { + var version string + if err := rows.Scan(&version); err != nil { + errChan <- err + return + } + selectResult = append(selectResult, struct { + Version string `db:"version"` + }{Version: version}) + } + + if err := rows.Err(); err != nil { + errChan <- err + return + } + + resultChan <- selectResult +} + +func (v *Version) insertVersionHistory(requestController *lib.RequestController, devID string, timestamp int64, oldVer, newVer string, errChan chan error) { + _, err := requestController.MySQL.Query(fmt.Sprintf("INSERT INTO device_version_history(device_id, ts, old_ver, new_ver) VALUES (%s, %s, %s, %s)", + devID, timestamp, oldVer, newVer)) + if err != nil { + errChan <- err + } +} + +func (v *Version) updateDeviceVersion(requestController *lib.RequestController, devID, newVer string, errChan chan error) { + _, err := requestController.MySQL.Query(fmt.Sprintf("UPDATE device SET version = %s WHERE device_id = %s", newVer, devID)) + if err != nil { + errChan <- err + } +} + +func (v *Version) CheckChanges(oldVer, newVer map[string]interface{}, deviceID string, requestController *lib.RequestController, resultChan chan bool) { + keysOld := getKeys(oldVer) + keysNew := getKeys(newVer) + + findLoraNew := indexOf(keysNew, "lora_id") + findOldLora := indexOf(keysOld, "lora_id") + + if findLoraNew != -1 && findOldLora == -1 { + go v.insertLora(requestController, newVer[keysNew[findLoraNew]].(string), deviceID) + } else if findOldLora != -1 && findLoraNew == -1 { + go v.updateLoraStatus(requestController, deviceID, oldVer[keysOld[findOldLora]].(string)) + } + + if len(keysNew) != len(keysOld) { + resultChan <- true + return + } + + diffRes := diff(keysNew, keysOld) + if len(diffRes) > 0 { + resultChan <- true + return + } + + for _, key := range keysNew { + if oldVer[key] != newVer[key] { + resultChan <- true + return + } + } + + resultChan <- false +} + +func (v *Version) insertLora(requestController *lib.RequestController, loraID, deviceID string) { + _, err := requestController.MySQL.Query(fmt.Sprintf("REPLACE INTO device_lora (lora_id, device_id, status, created_at, updated_at) VALUES (%s, %s, %s, %s, %s)", + loraID, deviceID, 1, time.Now().Unix(), time.Now().Unix())) + if err != nil { + //logger_and_exception_handler.Error(fmt.Sprintf("SQL replace error: %v", err)) + } +} + +func (v *Version) updateLoraStatus(requestController *lib.RequestController, deviceID, loraID string) { + _, err := requestController.MySQL.Query(fmt.Sprintf("UPDATE device_lora SET status=%s, updated_at=%s WHERE device_id=%s AND lora_id=%s", + 0, time.Now().Unix(), deviceID, loraID)) + if err != nil { + //logger_and_exception_handler.Error(fmt.Sprintf("SQL update error: %v", err)) + } +} + +func getKeys(m map[string]interface{}) []string { + keys := make([]string, 0, len(m)) + for key := range m { + keys = append(keys, key) + } + return keys +} + +func indexOf(slice []string, item string) int { + for i, v := range slice { + if v == item { + return i + } + } + return -1 +} + +func diff(slice1, slice2 []string) []string { + var diff []string + m := make(map[string]bool) + + for _, item := range slice2 { + m[item] = true + } + + for _, item := range slice1 { + if _, found := m[item]; !found { + diff = append(diff, item) + } + } + return diff +} diff --git a/app/lib/device_registry.go b/app/lib/device_registry.go new file mode 100644 index 0000000..07da117 --- /dev/null +++ b/app/lib/device_registry.go @@ -0,0 +1,162 @@ +package lib + +import ( + "fmt" + entity2 "node_rabbit_go/app/entity" + "strings" + "time" +) + +type DeviceRegistry struct { + DeviceRegistry map[string]entity2.Device + DeviceUptimeRegistry map[string]entity2.DeviceUptime + RequestController *RequestController + OnlineDevices []string + TempBeforeUpdateTSSYNC []string +} + +func NewDeviceRegistry(rc *RequestController) *DeviceRegistry { + dr := &DeviceRegistry{ + DeviceRegistry: make(map[string]entity2.Device), + DeviceUptimeRegistry: make(map[string]entity2.DeviceUptime), + RequestController: rc, + OnlineDevices: []string{}, + TempBeforeUpdateTSSYNC: []string{}, + } + go dr.initializeRegistry() + return dr +} + +func (dr *DeviceRegistry) initializeRegistry() { + getDeviceList, _ := dr.RequestController.MySQL.Query(`select * from device where 1`) + //defer getDeviceList.Close() + for getDeviceList.Next() { + var device entity2.Device + err := getDeviceList.Scan(&device.Id, &device.Name) + if err != nil { + fmt.Println("Error scanning device:", err) + continue + } + dr.DeviceRegistry[device.Id] = device + } + + getUptimeList, _ := dr.RequestController.MySQL.Query(`select * from device_uptime where 1`) + for getUptimeList.Next() { + var uptime entity2.DeviceUptime + err := getUptimeList.Scan(&uptime.Id, &uptime.Device_id) + if err != nil { + fmt.Println("Error scanning uptime:", err) + continue + } + dr.DeviceUptimeRegistry[uptime.Device_id] = uptime + } +} + +func (dr *DeviceRegistry) FindDevice(mid string) entity2.Device { + device, found := dr.DeviceRegistry[mid] + if !found { + rows, err := dr.RequestController.MySQL.Query(fmt.Sprintf("select * from device where mid = '%s'", mid)) + if err != nil { + fmt.Println("Error querying device:", err) + return entity2.Device{} // Предположим, что у вас есть структура для устройства + } + //defer rows.Close() + + if rows.Next() { + var d entity2.Device // Предположим, что у вас есть структура для устройства + err := rows.Scan(&d.Id, &d.Name) // Пример сканирования значений + if err != nil { + fmt.Println("Error scanning device:", err) + return entity2.Device{} // Предположим, что у вас есть структура для устройства + } + dr.DeviceRegistry[d.Mid] = d + return d + } + } + return device +} + +func (dr *DeviceRegistry) CheckOnline(device entity2.Device) { + tsNow := time.Now().Unix() + dr.UpdateDeviceTsSync(device.Id) + device.Ts_sync = tsNow +} + +func (dr *DeviceRegistry) WriteMM_AgentData(deviceID string, uptime string, queueSize string) { + if deviceID != "" && uptime != "" && queueSize != "" { + _, err := dr.RequestController.MySQL.Query(fmt.Sprintf("UPDATE `device_uptime` SET `uptime`='%s', `queue_size`='%s' WHERE `device_id`='%s'", uptime, queueSize, deviceID)) + if err != nil { + return + } + } +} + +func (dr *DeviceRegistry) UpdateDeviceTsSync(deviceID string) { + dr.MarkDeviceOnline([]string{deviceID}) + if !contains(dr.OnlineDevices, deviceID) { + dr.OnlineDevices = append(dr.OnlineDevices, deviceID) + } +} + +func (dr *DeviceRegistry) MarkDeviceOnline(deviceList []string) { + for _, deviceID := range deviceList { + uptime, _ := dr.FindUptime(deviceID) + //if uptime != nil + uptime.Status = 1 + + } +} + +func (dr *DeviceRegistry) MarkDeviceOffline(mid string) { + device := dr.FindDevice(mid) + uptime, _ := dr.FindUptime(device.Id) + uptime.Status = 0 +} + +func (dr *DeviceRegistry) FindUptime(id string) (entity2.DeviceUptime, error) { + record, found := dr.DeviceUptimeRegistry[id] + if !found { + rows, _ := dr.RequestController.MySQL.Query(fmt.Sprintf("select * from device_uptime where device_id = '%s'", id)) + + //defer rows.Close() + + if rows.Next() { + var uptime entity2.DeviceUptime + err := rows.Scan(&uptime.Id, &uptime.Device_id) + if err != nil { + fmt.Println("Error scanning uptime record:", err) + + } + dr.DeviceUptimeRegistry[uptime.Device_id] = uptime + return uptime, nil + } + } + return record, nil +} + +func (dr *DeviceRegistry) GlobalUpdateTsSync() { + dr.TempBeforeUpdateTSSYNC = append([]string{}, dr.OnlineDevices...) + dr.OnlineDevices = []string{} + if len(dr.TempBeforeUpdateTSSYNC) > 0 { + deviceIDsStr := strings.Join(dr.TempBeforeUpdateTSSYNC, ",") + _, err := dr.RequestController.MySQL.Query(fmt.Sprintf("UPDATE `device` SET `ts_sync` = %d WHERE `id` IN (%s)", time.Now().Unix(), deviceIDsStr)) + if err != nil { + fmt.Println("Error in GlobalUpdateTsSync") + } + + } + dr.TempBeforeUpdateTSSYNC = []string{} +} + +func (dr *DeviceRegistry) CheckDeviceOnline(deviceID string) bool { + return contains(dr.OnlineDevices, deviceID) || contains(dr.TempBeforeUpdateTSSYNC, deviceID) +} + +func contains(arr []string, val string) bool { + for _, item := range arr { + if item == val { + return true + } + } + return false +} diff --git a/app/lib/redis/device_status.go b/app/lib/redis/device_status.go new file mode 100644 index 0000000..46be092 --- /dev/null +++ b/app/lib/redis/device_status.go @@ -0,0 +1,35 @@ +package device_status + +import ( + "context" + "github.com/go-redis/redis/v8" +) + +// DeviceStatus структура устройства его статуса в Redis +type DeviceStatus struct { + Key string + Client *redis.Client +} + +// NewDeviceStatus конструктор, принимающий клиент Redis +func NewDeviceStatus(redisClient *redis.Client) *DeviceStatus { + return &DeviceStatus{ + Key: "device_status", + Client: redisClient, + } +} + +// GetOnline получает статус онлайн для указанного устройства из Redis. +func (ds *DeviceStatus) GetOnline(ctx context.Context, devID string) (string, error) { + return ds.Client.HGet(ctx, ds.Key, devID).Result() +} + +// SetOnline устанавливает статус онлайн для указанного устройства в Redis. +func (ds *DeviceStatus) SetOnline(ctx context.Context, devID string) error { + return ds.Client.HSet(ctx, ds.Key, devID, 0).Err() +} + +// SetOffline устанавливает статус оффлайн для указанного устройства в Redis. +func (ds *DeviceStatus) SetOffline(ctx context.Context, devID string) error { + return ds.Client.HDel(ctx, ds.Key, devID).Err() +} diff --git a/app/lib/request_controller.go b/app/lib/request_controller.go new file mode 100644 index 0000000..3d6edc2 --- /dev/null +++ b/app/lib/request_controller.go @@ -0,0 +1,86 @@ +package lib + +import ( + "encoding/json" + "fmt" + databases2 "node_rabbit_go/app/databases" + "node_rabbit_go/app/internals/logger" + "time" +) + +type RequestController struct { + MySQL *databases2.Database + LoggerAndExceptionHandler *logger.LoggerAndExceptionHandler + Clickhouse *databases2.Clickhouse + STOMP *STOMPlib + deviceDebugList []string +} + +func NewRequestController() *RequestController { + rc := &RequestController{ + MySQL: databases2.NewSQLDatabase(), + LoggerAndExceptionHandler: logger.NewLoggerAndExceptionHandler(), + Clickhouse: databases2.NewClickhouse(), + STOMP: NewSTOMPlib(), + deviceDebugList: []string{}, + } + // Запуск получения устройств + go rc.getDebugDeviceListPeriodically() + return rc +} + +func (rc *RequestController) WriteLog(devID string, data interface{}, ts time.Time) { + if rc.CheckDeviceDebugMode(devID) { + jsonData, err := json.Marshal(data) + if err != nil { + rc.LoggerAndExceptionHandler.Error(fmt.Sprintf("Error marshaling data: %s", err.Error())) + return + } + err = rc.Clickhouse.Insert("node_device_logs", map[string]interface{}{ + "dev_id": devID, + "cmd_id": 0, + "data": string(jsonData), + "ts": ts, + }) + if err != nil { + rc.LoggerAndExceptionHandler.Error(fmt.Sprintf("Error inserting log into ClickHouse:", err.Error())) + } + } +} + +func (rc *RequestController) getDebugDeviceList() error { + rows, err := rc.MySQL.Query("SELECT `id` FROM `device` WHERE `debug_mode`=1") + if err != nil { + rc.LoggerAndExceptionHandler.Error(fmt.Sprintf("Error querying debug devices:", err.Error())) + return err + } + + rc.deviceDebugList = []string{} + for rows.Next() { + var devID string + if err := rows.Scan(&devID); err != nil { + rc.LoggerAndExceptionHandler.Error(fmt.Sprintf("Error scanning debug device row:", err.Error())) + continue + } + rc.deviceDebugList = append(rc.deviceDebugList, devID) + } + return nil +} + +func (rc *RequestController) getDebugDeviceListPeriodically() { + for { + time.Sleep(2 * time.Second) + if err := rc.getDebugDeviceList(); err != nil { + rc.LoggerAndExceptionHandler.Error(err.Error()) + } + } +} + +func (rc *RequestController) CheckDeviceDebugMode(devID string) bool { + for _, id := range rc.deviceDebugList { + if id == devID { + return true + } + } + return false +} diff --git a/app/lib/task_executor.go b/app/lib/task_executor.go new file mode 100644 index 0000000..2953920 --- /dev/null +++ b/app/lib/task_executor.go @@ -0,0 +1,245 @@ +package lib + +// +//import ( +// "encoding/json" +// "fmt" +// "log" +// "math" +// "strconv" +// "sync" +// "time" +//) +// +//type TasksExecutor struct { +// RequestController *RequestController +// RabbitMQ *RabbitMQ +// DeviceRegistry *DeviceRegistry +//} +// +//func NewTasksExecutor(requestController *RequestController, rabbitMQ *RabbitMQ, deviceRegistry *DeviceRegistry) *TasksExecutor { +// return &TasksExecutor{ +// RequestController: requestController, +// RabbitMQ: rabbitMQ, +// DeviceRegistry: deviceRegistry, +// } +//} +// +//func (te *TasksExecutor) Start() { +// // Добавить ожидание коннекта к RabbitMQ +// wg := sync.WaitGroup{} +// wg.Add(1) +// go func() { +// defer wg.Done() +// for { +// time.Sleep(2 * time.Second) +// te.Check() +// } +// }() +// wg.Wait() +//} +// +//func (te *TasksExecutor) Check() { +// result, err := te.RequestController.MySQL.Query("SELECT * FROM `queue_task` WHERE (`status` = 0 OR `status` = 1 OR `status` = 2) AND `type`=1") +// if err != nil { +// log.Printf("Error querying tasks: %v", err) +// return +// } +// +// if len(result) == 0 { +// return +// } +// +// for _, task := range result { +// if task.TsScheduled > time.Now().Unix() { +// continue +// } +// +// if time.Now().Unix() > (task.Ts+(task.Timeout)) && task.Timeout != 0 { +// te.DeleteTask("Task timeout", task.Id) +// continue +// } +// +// if task.Status == 0 { +// go te.RunTask(task) +// } +// } +//} +// +//func (te *TasksExecutor) RunTask(task Task) { +// _, err := te.RequestController.MysqlQuery(fmt.Sprintf("UPDATE `queue_task` SET `status`=1, `ts_start`='%d' WHERE `id`=%d", time.Now().Unix(), task.Id)) +// if err != nil { +// log.Printf("Error updating task status: %v", err) +// return +// } +// +// switch task.NumCom { +// case 3, 8, 9, 16, 17, 18, 19, 20, 21: +// go te.SendTask(task) +// default: +// go te.DeleteTask("Unknown id_task", task.Id) +// } +//} +// +//func (te *TasksExecutor) DeleteTask(reason string, id int) { +// log.Printf("[TASK EXECUTOR] %s (id=%d)", reason, id) +// _, err := te.RequestController.MysqlQuery(fmt.Sprintf("UPDATE `queue_task` SET `status`=3, `ts_end`=%d WHERE `id`=%d", time.Now().Unix(), id)) +// if err != nil { +// log.Printf("Error deleting task: %v", err) +// } +//} +// +//func (te *TasksExecutor) SendTask(task Task) { +// channel, err := te.RabbitMQ.CreateQueue(fmt.Sprintf("rpc_%d", task.Mid)) +// if err != nil { +// log.Printf("Error creating queue: %v", err) +// return +// } +// +// options := map[string]interface{}{ +// "replyTo": "rpc_queue", +// "deliveryMode": 2, +// } +// +// task.Params = json.RawMessage(task.ParamsStr) +// if task.Timeout != 0 { +// options["expiration"] = task.Timeout * 1000 +// } +// +// err = channel.SendToQueue(fmt.Sprintf("rpc_%d", task.Mid), []byte(fmt.Sprintf(`{"task_id":%d,"num_com":%d,"params":%s}`, task.Id, task.NumCom, string(task.Params))), options) +// if err != nil { +// log.Printf("Error sending task to queue: %v", err) +// return +// } +// +// if task.NumCom == 17 { +// findDevice, err := te.DeviceRegistry.FindDevice(task.Mid) +// if err != nil { +// log.Printf("Error finding device: %v", err) +// return +// } +// if findDevice != nil { +// findDevice.Blocked = task.Params.Command +// _, err := te.RequestController.MysqlQuery(fmt.Sprintf("UPDATE `device` SET `blocked`= '%s' WHERE id = %d", task.Params.Command, task.DevId)) +// if err != nil { +// log.Printf("Error updating device: %v", err) +// } +// } +// } +//} +// +//func (te *TasksExecutor) TaskParse(packet Packet) { +// task, err := te.RequestController.MysqlQuery(fmt.Sprintf("SELECT * FROM `queue_task` WHERE `id`=%d", packet.TaskId)) +// if err != nil { +// log.Printf("Error querying task: %v", err) +// return +// } +// +// if packet.Status != nil { +// switch task[0].NumCom { +// case 16: +// te.RequestController.STOMP.SendMessage("selftest", task[0].Id, fmt.Sprintf("%v", packet)) +// } +// if *packet.Status >= 4 && *packet.Status <= 6 { +// _, err := te.RequestController.MysqlQuery(fmt.Sprintf("UPDATE `queue_task` SET `status` = %d, `ts_end` = %d WHERE `id`=%d", *packet.Status, time.Now().Unix(), packet.TaskId)) +// if err != nil { +// log.Printf("Error updating task status: %v", err) +// } +// } else { +// _, err := te.RequestController.MysqlQuery(fmt.Sprintf("UPDATE `queue_task` SET `status` = %d WHERE `id`=%d", *packet.Status, packet.TaskId)) +// if err != nil { +// log.Printf("Error updating task status: %v", err) +// } +// } +// } else { +// if len(task) == 0 { +// return +// } +// switch task[0].NumCom { +// case 3: +// case 8: +// case 9: +// case 16: +// te.RequestController.STOMP.SendMessage("selftest", task[0].Id, fmt.Sprintf("%v", packet)) +// if packet.Data != nil && packet.Data.Done { +// te.SelftestResult(packet.Data, packet.TaskId, task[0].DevId) +// } +// case 17: +// case 18: +// go te.RPCResult(packet) +// case 19: +// case 20: +// log.Printf("%d - %v", task[0].Id, packet) +// case 21: +// default: +// log.Printf("[TASK EXECUTOR] Incorrect num_com for task %d - %v", task[0].Id, packet) +// } +// } +//} +// +//func (te *TasksExecutor) RPCResult(data Packet) { +// data.Mid = nil +// _, err := te.RequestController.MysqlQuery("SET sql_mode = NO_BACKSLASH_ESCAPES;") +// if err != nil { +// log.Printf("Error setting SQL mode: %v", err) +// } +// te.RequestController.STOMP.SendMessage("rpc_result", data.TaskId, fmt.Sprintf("%v", data)) +// _, err = te.RequestController.MysqlQuery(fmt.Sprintf("UPDATE `queue_task` SET `log` = '%v', `ts_end`=%d WHERE `id`=%d", data, time.Now().Unix(), data.TaskId)) +// if err != nil { +// log.Printf("Error updating task status: %v", err) +// } +//} +// +//func (te *TasksExecutor) SelftestResult(data map[string]interface{}, taskId, deviceId int) { +// testingModes := []string{} +// for testingMode := range data { +// if testingMode != "done" { +// testingModes = append(testingModes, testingMode) +// } +// } +// +// result := map[string]map[string][]string{} +// for _, iface := range testingModes { +// result[iface] = map[string][]string{} +// for key, value := range data[iface].(map[string]interface{}) { +// result[iface][key] = []string{} +// if iface == "ping" { +// result[iface][key] = append(result[iface][key], fmt.Sprintf("%f ms", value.(float64))) +// } else { +// result[iface][key] = append(result[iface][key], fmt.Sprintf("%v", value)) +// } +// } +// } +// +// // Перепаршиваем данные в читаемый для фронта вид +// dataForStomp := map[string]map[string][]string{} +// for iface, modes := range result { +// dataForStomp[iface] = map[string][]string{} +// for mode, values := range modes { +// dataForStomp[iface][mode] = []string{} +// if mode != "latency" && mode != "packet_loss" { +// speedLimits := []int{0, 1024, 1024 * 1024, 1024 * 1024 * 1024, 1024 * 1024 * 1024 * 1024, 1024 * 1024 * 1024 * 1024 * 1024} +// speedNames := []string{"", "b/s", "Kb/s", "Mb/s", "Gb/s", "Tb/s"} +// for i := 1; i <= len(speedNames); i++ { +// if floatValue, err := strconv.ParseFloat(values[0], 64); err == nil { +// if floatValue >= float64(speedLimits[i-1]) && floatValue < float64(speedLimits[i]) { +// n := float64(speedLimits[i-1]) +// dataForStomp[iface][mode] = append(dataForStomp[iface][mode], fmt.Sprintf("%.2f %s", math.Floor((floatValue/n)*100)/100, speedNames[i])) +// break +// } +// } +// } +// } else if mode == "latency" { +// dataForStomp[iface][mode] = append(dataForStomp[iface][mode], fmt.Sprintf("%v ms", values[0])) +// } else { +// dataForStomp[iface][mode] = append(dataForStomp[iface][mode], values...) +// } +// } +// } +// +// te.RequestController.STOMP.SendMessage("selftest", taskId, fmt.Sprintf(`{"type":"result", "data": %v}`, dataForStomp)) +// _, err := te.RequestController.MysqlQuery(fmt.Sprintf("UPDATE queue_task SET log = '%v', ts_end = %d WHERE id = %d", result, time.Now().Unix(), taskId)) +// if err != nil { +// log.Printf("Error updating task status: %v", err) +// } +//} diff --git a/app/main.go b/app/main.go new file mode 100644 index 0000000..124cbab --- /dev/null +++ b/app/main.go @@ -0,0 +1,295 @@ +package main + +func main() { + +} + +// +//func main() { +// //LOAD FROM DOTENV VARS +// //err := godotenv.Load() +// //if err != nil { +// // fmt.Printf("Ошибка чтения .env файла %s", err) +// //} +// // === Logger Example [Using Sentry!] === +// //logger := logger2.NewLoggerAndExceptionHandler() +// //logger.Info("Hello World") +// //logger.Fatal("Fatal Msg") +// +// // === Device status example [Using Redis!] === +// //ctx := context.Background() +// //redisClient := redis.NewClient(&redis.Options{ +// // Addr: "localhost:6379", +// //}) +// //ds := device_status.NewDeviceStatus(redisClient) +// //err := ds.SetOnline(ctx, "device_1") +// //if err != nil { +// // fmt.Printf("Error while set online: %s", err) +// //} +// //status, err := ds.GetOnline(ctx, "device_1") +// //if err != nil { +// // fmt.Printf("Error while set offline: %s", err) +// //} +// //fmt.Printf("Device status: %s", status) +// //err = ds.SetOffline(ctx, "device_1") +// //if err != nil { +// // panic(err) +// //} +// +// /* DB TEST */ +// //database := databases.NewDatabase() +// //// Example query +// //rows, err := database.Query("SELECT * FROM users") +// //if err != nil { +// // log.Fatal(err) +// //} +// //// Process rows +// //for rows.Next() { +// // var id int +// // var name string +// // if err := rows.Scan(&id, &name); err != nil { +// // log.Fatal(err) +// // } +// // fmt.Println(id, name) +// //} +// //database.Close() +// +// // /* Clickhouse */ +// //ch := databases.NewClickhouse() +// //err := ch.InitDB() +// //if err != nil { +// // log.Fatal(err) +// //} +// // +// //go func() { +// // for { +// // time.Sleep(time.Second) +// // +// // for table, data := range ch.TablesQueues { +// // if len(data) >= ch.NeedCountForTable[table] { +// // err := ch.FlushQueue(table) +// // if err != nil { +// // log.Println("Error flushing queue:", err) +// // } +// // } +// // } +// // } +// //}() +// // +// //// Example usage: +// //data := []interface{}{"1", "2", "3"} +// //err = ch.Insert("device_traffic", data) +// //if err != nil { +// // log.Println("Error inserting data:", err) +// //} +// +// /* Request controller */ +// //rc := lib.NewRequestController() +// //devID := "1#1" +// //data := map[string]interface{}{ +// // "k": "v", +// //} +// //ts := time.Now() +// // +// //rc.WriteLog(devID, data, ts) +// +//} + +////////////////////////////////////////////////////////////////////////////////////// +//var libraries = map[string]interface{}{ +// "version": "./lib/data_libs.go/version", +// "gps": "./lib/data_libs.go/gps", +// "modemd_info": "./lib/data_libs.go/modemd_info", +// "traffic": "./lib/data_libs.go/traffic", +// "system_interfaces": "./lib/data_libs.go/system_interfaces", +// "hardware_status": "./lib/data_libs.go/hardware_status", +// "multilinks": "./lib/data_libs.go/multilinks", +// "iface_state": "./lib/data_libs.go/iface_state", +// "backup_state": "./lib/data_libs.go/backup_state", +// "mm_config": "./lib/data_libs.go/config", +// "log": "./lib/data_libs.go/log", +// "lora": "./lib/data_libs.go/lora", +// "rstat": "./lib/data_libs.go/rstat", +//} +// +//func main() { +// log := logger.NewLoggerAndExceptionHandler() +// log.Info("Start \"monitoring_consumer\" process") +// +// rabbitMQ := lib.NewRabbitMQ() +// requestController := lib.NewRequestController() +// deviceRegistry := lib.NewDeviceRegistry(requestController) +// +// err := rabbitMQ.Connect() +// if err != nil { +// log.Error(fmt.Sprintf("RabbitMQ connection error: %v", err)) +// os.Exit(1) +// } +// +// readyFlag := rabbitMQ.ReadyCheck() +// if !readyFlag { +// log.Error(fmt.Sprintf("RabbitMQ not ready: %v", err)) +// os.Exit(1) +// } +// +// go dataQueueConsumer(rabbitMQ, deviceRegistry, requestController) +// go regQueueConsumer(rabbitMQ, deviceRegistry, requestController) +// // T__EXECUTOR +// //if os.Getenv("NODE_TASK_PARSER") != "DISABLE" { +// // taskExecutor := newT (requestController, rabbitMQ, deviceRegistry) +// // go rpcQueueConsumer(rabbitMQ, taskExecutor, deviceRegistry) +// //} else { +// // logger_and_exception_handler.Info("Task parsing and sender disable by env variable") +// //} +// +// select {} +//} +// +//func dataQueueConsumer(rabbitMQ *lib.RabbitMQ, deviceRegistry *interface{}, requestController *lib.RequestController) { +// rabbitMQ.Consume(rabbitMQ.DataQueue, "data_queue", func(msg []byte, contentType string) { +// if contentType == "application/zlib" { +// buffer, err := base64.StdEncoding.DecodeString(string(msg)) +// if err != nil { +// logger_and_exception_handler.Error(fmt.Sprintf("[DATA QUEUE] Error decoding base64: %v", err)) +// return +// } +// +// zlibReader, err := zlib.NewReader(bytes.NewReader(buffer)) +// if err != nil { +// logger_and_exception_handler.Error(fmt.Sprintf("[DATA QUEUE] Error then zlib unpack: %v", err)) +// return +// } +// defer zlibReader.Close() +// +// unzippedData, err := io.ReadAll(zlibReader) +// if err != nil { +// logger_and_exception_handler.Error(fmt.Sprintf("[DATA QUEUE] Error reading unzipped data: %v", err)) +// return +// } +// +// var packets []map[string]interface{} +// err = json.Unmarshal(unzippedData, &packets) +// if err != nil { +// logger_and_exception_handler.Error(fmt.Sprintf("[DATA QUEUE] Unknown error: %v\n%s", err, unzippedData)) +// return +// } +// +// for _, packet := range packets { +// parsePacket(packet, deviceRegistry, requestController) +// } +// } else { +// var packet map[string]interface{} +// err := json.Unmarshal(msg, &packet) +// if err != nil { +// logger_and_exception_handler.Warning(fmt.Sprintf("%s\n%v\n%v", msg, err.Error(), err)) +// return +// } +// parsePacket(packet, deviceRegistry, requestController) +// } +// }) +//} +// +//func regQueueConsumer(rabbitMQ *lib.RabbitMQ, deviceRegistry *interface{}, requestController *lib.RequestController) { +// rabbitMQ.Consume(rabbitMQ.RegQueue, "reg_queue", func(msg []byte) { +// var info map[string]interface{} +// err := json.Unmarshal(msg, &info) +// if err != nil { +// logger_and_exception_handler.SentryCaptureException(err) +// return +// } +// +// findDevice, err := deviceRegistry.FindDevice(info["mid"].(string)) +// if err != nil { +// logger_and_exception_handler.SentryCaptureException(err) +// return +// } +// +// if findDevice == nil { +// query := `INSERT INTO device(\` + "`desc`, `group_id`, `mid`, `serial`, `add_ts`, `fid`)" + +// "SELECT \" \", 0, ?, ?, ?, CONCAT('00', first_available_fid) " + +// "FROM (" + +// "SELECT IFNULL(MIN(fid_numeric) + 1, 1) AS first_available_fid " + +// "FROM (" + +// "SELECT CAST(SUBSTRING(fid, 3) AS UNSIGNED) AS fid_numeric " + +// "FROM device WHERE fid LIKE '00%' " + +// "UNION ALL SELECT 0 AS fid_numeric) AS subquery " + +// "WHERE fid_numeric + 1 NOT IN (SELECT CAST(SUBSTRING(fid, 3) AS UNSIGNED) FROM device WHERE fid LIKE '00%') " + +// "UNION SELECT 1 AS first_available_fid FROM device WHERE NOT EXISTS (SELECT * FROM device WHERE fid LIKE '00%')) AS subquery;` +// +// _, err := requestController.MySQL.Query(query, info["mid"], info["data"].(map[string]interface{})["serial"], time.Now().Unix()) +// if err != nil { +// logger_and_exception_handler.SentryCaptureException(err) +// return +// } +// +// findDevice, err = deviceRegistry.FindDevice(info["mid"].(string)) +// if err != nil { +// logger_and_exception_handler.SentryCaptureException(err) +// return +// } +// } +// +// deviceRegistry.CheckOnline(findDevice) +// deviceRegistry.WriteMMAgentData(findDevice.DeviceID, info["data"].(map[string]interface{})["uptime"], info["data"].(map[string]interface{})["queue_size"]) +// requestController.STOMP.SendMessage("device_update_main", findDevice.DeviceID, map[string]interface{}{ +// "action": "UPDATE", +// "subtopic": "device_update_main", +// }) +// }) +//} +// +//func rpcQueueConsumer(rabbitMQ *lib.RabbitMQ, deviceRegistry *interface{}, requestController *lib.RequestController) { +// rabbitMQ.Consume(rabbitMQ.RPCQueue, "rpc_queue", func(msg []byte) { +// var packet map[string]interface{} +// err := json.Unmarshal(msg, &packet) +// if err != nil { +// logger_and_exception_handler.Warning(fmt.Sprintf("%s\n%v\n%v", msg, err.Error(), err)) +// return +// } +// +// findDevice, err := deviceRegistry.FindDevice(packet["mid"].(string)) +// if err != nil { +// logger_and_exception_handler.Warning(fmt.Sprintf("%s\n%v\n%v", msg, err.Error(), err)) +// return +// } +// +// deviceRegistry.CheckOnline(findDevice) +// taskExecutor.TaskParse(packet) +// }) +//} +// +//func parsePacket(packet map[string]interface{}, deviceRegistry *interface{}, requestController *request_controller.RequestController) { +// if packetType, ok := packet["type"].(string); ok { +// lib, exists := libraries[packetType] +// if !exists { +// logger_and_exception_handler.Warning(fmt.Sprintf("[DATA QUEUE] Undefined type in message - %v", packet)) +// return +// } +// +// findDevice, err := deviceRegistry.FindDevice(packet["mid"].(string)) +// if err != nil || findDevice == nil { +// logger_and_exception_handler.Warning(fmt.Sprintf("[DATA QUEUE] DEVICE %s not found in database. Return task in queue", packet["mid"])) +// return +// } +// +// deviceRegistry.CheckOnline(findDevice) +// requestController.WriteLog(findDevice.DeviceID, packet, time.Now().Unix()) +// +// if ts, ok := packet["ts"].(int64); ok && ts < 157784630 { +// packet["ts"] = time.Now().Unix() - ts +// } +// +// lib.GetPacket(requestController, packet["data"].([]interface{})[0], findDevice.DeviceID, packet["ts"].(int64)) +// } else { +// logger_and_exception_handler.Warning(fmt.Sprintf("[DATA QUEUE] PACKAGE %v doesn't have 'type' param", packet)) +// } +//} +// +//func init() { +// go func() { +// for { +// device_registry.GlobalUpdateTsSync() +// time.Sleep(10 * time.Second) +// } +// }() +//} diff --git a/func_header.txt b/func_header.txt new file mode 100644 index 0000000..a093cf3 --- /dev/null +++ b/func_header.txt @@ -0,0 +1,91 @@ +список функций device_status.go (Проверяет статус устройства): + func NewDeviceStatus(redisClient *redis.Client) *DeviceStatus + func (ds *DeviceStatus) GetOnline(ctx context.Context, devID string) (string, error) + func (ds *DeviceStatus) SetOnline(ctx context.Context, devID string) error + +Список функций и структур STOMPlib.go (Реализует подключение по STOMP): + type STOMPlib struct { + client *stomp.Conn + waitMessages map[string]*time.Timer + state bool + brokerURL string + } + func NewSTOMPlib() *STOMPlib + func (s *STOMPlib) SendMessage(topic, id, message string) + +Список функций и структур clickhouse.go (БД Clickhouse): + type Clickhouse struct { + DB *sql.DB + TablesQueues map[string][]interface{} + NeedCountForTable map[string]int + TableTemplate map[string]string + } + func NewClickhouse() *Clickhouse + func (ch *Clickhouse) InitDB() error + func (ch *Clickhouse) Insert(table string, data interface{}) error + func (ch *Clickhouse) FlushQueue(table string) error + +Список функций и структур mysql.go (БД Mysql): + type Database struct { + db *sql.DB + } + func NewSQLDatabase() *Database + func (d *Database) Query(query string) (*sql.Rows, error) + func (d *Database) Close() + +Список функций и структур logger.go (Aka "LoggerAndExceptionHandler", Логгер): + type LoggerAndExceptionHandler struct { + sentryEnabled bool + } + func NewLoggerAndExceptionHandler() *LoggerAndExceptionHandler + func (l *LoggerAndExceptionHandler) [Info || Debug || Fatal || Error || Critical || Warning || Log] (message string) + +Список функций и структур request_controller.go (Контроллер данных): + type RequestController struct { + MySQL *databases.Database + LoggerAndExceptionHandler *logger.LoggerAndExceptionHandler + Clickhouse *databases.Clickhouse + STOMP *STOMPlib + deviceDebugList []string + } + func NewRequestController() *RequestController + func (rc *RequestController) WriteLog(devID string, data interface{}, ts time.Time) + func (rc *RequestController) getDebugDeviceList() error + func (rc *RequestController) getDebugDeviceListPeriodically() + func (rc *RequestController) CheckDeviceDebugMode(devID string) bool + +Список функций и структур device_registry.go (Реестр устройств): + type Device struct { #$fakedata + id string + name string + mid string + device_id string + ts_sync int64 + } + type DeviceUptime struct { #$fakedata + id string + device_id string + status int + } + type DeviceRegistry struct { + DeviceRegistry map[string]Device + DeviceUptimeRegistry map[string]DeviceUptime + RequestController *RequestController + OnlineDevices []string + TempBeforeUpdateTSSYNC []string + } + func NewDeviceRegistry(rc *RequestController) *DeviceRegistry + func (dr *DeviceRegistry) initializeRegistry() + func (dr *DeviceRegistry) FindDevice(mid string) Device + func (dr *DeviceRegistry) CheckOnline(device Device) + func (dr *DeviceRegistry) WriteMM_AgentData(deviceID string, uptime string, queueSize string) + func (dr *DeviceRegistry) UpdateDeviceTsSync(deviceID string) + func (dr *DeviceRegistry) MarkDeviceOnline(deviceList []string) + func (dr *DeviceRegistry) MarkDeviceOffline(mid string) + func (dr *DeviceRegistry) FindUptime(id string) (DeviceUptime, error) + func (dr *DeviceRegistry) GlobalUpdateTsSync() + func (dr *DeviceRegistry) CheckDeviceOnline(deviceID string) bool + func contains(arr []string, val string) bool + + +// TODO Multilinks \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..425bc2d --- /dev/null +++ b/go.mod @@ -0,0 +1,21 @@ +module node_rabbit_go + +go 1.22 + +require ( + filippo.io/edwards25519 v1.1.0 // indirect + github.com/ClickHouse/clickhouse-go v1.5.4 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/getsentry/sentry-go v0.27.0 // indirect + github.com/go-redis/redis/v8 v8.11.5 // indirect + github.com/go-sql-driver/mysql v1.8.1 // indirect + github.com/go-stomp/stomp v2.1.4+incompatible // indirect + github.com/go-stomp/stomp/v3 v3.1.0 // indirect + github.com/joho/godotenv v1.5.1 // indirect + github.com/streadway/amqp v1.1.0 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..2adf0e0 --- /dev/null +++ b/go.sum @@ -0,0 +1,76 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/ClickHouse/clickhouse-go v1.5.4 h1:cKjXeYLNWVJIx2J1K6H2CqyRmfwVJVY1OV1coaaFcI0= +github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= +github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg= +github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= +github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/go-stomp/stomp v2.1.4+incompatible h1:D3SheUVDOz9RsjVWkoh/1iCOwD0qWjyeTZMUZ0EXg2Y= +github.com/go-stomp/stomp v2.1.4+incompatible/go.mod h1:VqCtqNZv1226A1/79yh+rMiFUcfY3R109np+7ke4n0c= +github.com/go-stomp/stomp/v3 v3.1.0 h1:JnvRJuua/fX2Lq5Ie5DXzrOL18dnzIUenCZXM6rr8/0= +github.com/go-stomp/stomp/v3 v3.1.0/go.mod h1:ztzZej6T2W4Y6FlD+Tb5n7HQP3/O5UNQiuC169pIp10= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/jjeffery/stomp v0.0.0-20160907031752-b994bda931e1 h1:DnN4Kb2KScs9mto+qjj6UJFEBf0im7pkUGWh4fG5+mQ= +github.com/jjeffery/stomp v0.0.0-20160907031752-b994bda931e1/go.mod h1:Dm2/JSV8osvkOOlPf9ztZHdKvbAJIJkQPBgX7v9MyOc= +github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM= +github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= +golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/qodana.yaml b/qodana.yaml new file mode 100644 index 0000000..215d808 --- /dev/null +++ b/qodana.yaml @@ -0,0 +1,29 @@ +#-------------------------------------------------------------------------------# +# Qodana analysis is configured by qodana.yaml file # +# https://www.jetbrains.com/help/qodana/qodana-yaml.html # +#-------------------------------------------------------------------------------# +version: "1.0" + +#Specify inspection profile for code analysis +profile: + name: qodana.starter + +#Enable inspections +#include: +# - name: + +#Disable inspections +#exclude: +# - name: +# paths: +# - + +#Execute shell command before Qodana execution (Applied in CI/CD pipeline) +#bootstrap: sh ./prepare-qodana.sh + +#Install IDE plugins before Qodana execution (Applied in CI/CD pipeline) +#plugins: +# - id: #(plugin id can be found at https://plugins.jetbrains.com) + +#Specify Qodana linter for analysis (Applied in CI/CD pipeline) +linter: jetbrains/qodana-go:latest