127 lines
3.4 KiB
Go
127 lines
3.4 KiB
Go
package databases
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
|
|
_ "github.com/ClickHouse/clickhouse-go"
|
|
)
|
|
|
|
type Clickhouse struct {
|
|
DB *sql.DB
|
|
TablesQueues map[string][]interface{}
|
|
NeedCountForTable map[string]int
|
|
TableTemplate map[string]string
|
|
mux sync.Mutex
|
|
}
|
|
|
|
func NewClickhouse() *Clickhouse {
|
|
ch := &Clickhouse{
|
|
DB: nil,
|
|
TablesQueues: make(map[string][]interface{}),
|
|
NeedCountForTable: map[string]int{
|
|
"device_memory": 3000,
|
|
"device_logs": 3000,
|
|
"simcard_logs": 3000,
|
|
},
|
|
TableTemplate: map[string]string{
|
|
"device_traffic": "(device_id, iface, tx, rx, tx_max, rx_max, tx_traffic, rx_traffic, ts)",
|
|
"device_cpu_status": "(device_id, cpu, cpu_temp, mother_temp, ts)",
|
|
"device_loc_history": "(device_id, ts, long, lat, speed, course, altitude)",
|
|
"device_memory": "(device_id, ts, data)",
|
|
"device_logs": "(device_id, type, type_event, params, ts, ts_device)",
|
|
"simcard_logs": "(imsi, type, params, ts)",
|
|
"simcard_network_status": "(imsi, device_id, ts, signal, sig_lvl, state, roaming, iface, rsrp, rsrq, ecio, rscp, lte_band, sinr, temp, oper, oper_id, cell_id, lac_id)",
|
|
"simcard_speed": "(imsi, device_id, tx, rx, tx_max, rx_max, tx_traffic, rx_traffic, ts)",
|
|
"node_device_logs": "(dev_id, cmd_id, data, ts)",
|
|
"rstat_data": "(device_id, ts, rstat_serial, in, out, pass)",
|
|
"communication_status": "(device_id, ts, iface, rssi, rsrp, rsrq, ecio, rscp, lte_band, sinr, network_reg, temp)",
|
|
},
|
|
}
|
|
|
|
for table := range ch.NeedCountForTable {
|
|
ch.TablesQueues[table] = make([]interface{}, 0)
|
|
}
|
|
|
|
return ch
|
|
}
|
|
|
|
func (ch *Clickhouse) InitDB() error {
|
|
dsn := fmt.Sprintf("tcp://%s:%s?username=%s&password=%s&database=%s",
|
|
os.Getenv("CLICKHOUSE_HOST"), os.Getenv("CLICKHOUSE_PORT"),
|
|
os.Getenv("CLICKHOUSE_USER"), os.Getenv("CLICKHOUSE_PASSWORD"),
|
|
os.Getenv("CLICKHOUSE_DATABASE"),
|
|
)
|
|
|
|
db, err := sql.Open("clickhouse", dsn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ch.DB = db
|
|
return nil
|
|
}
|
|
|
|
func (ch *Clickhouse) Insert(table string, data interface{}) error {
|
|
ch.mux.Lock()
|
|
ch.TablesQueues[table] = append(ch.TablesQueues[table], data)
|
|
shouldFlush := len(ch.TablesQueues[table]) >= ch.NeedCountForTable[table]
|
|
ch.mux.Unlock()
|
|
|
|
if shouldFlush {
|
|
go func() {
|
|
if err := ch.FlushQueue(table); err != nil {
|
|
log.Printf("Clickhouse error: %s", err)
|
|
}
|
|
}()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ch *Clickhouse) FlushQueue(table string) error {
|
|
ch.mux.Lock()
|
|
if ch.DB == nil {
|
|
ch.mux.Unlock()
|
|
return fmt.Errorf("database connection is not initialized")
|
|
}
|
|
|
|
data := ch.TablesQueues[table]
|
|
ch.TablesQueues[table] = make([]interface{}, 0)
|
|
ch.mux.Unlock()
|
|
|
|
query := fmt.Sprintf("INSERT INTO %s %s VALUES", table, ch.TableTemplate[table])
|
|
var valueStrings []string
|
|
var valueArgs []interface{}
|
|
|
|
for _, entry := range data {
|
|
values := entry.([]interface{})
|
|
valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?)")
|
|
valueArgs = append(valueArgs, values...)
|
|
}
|
|
|
|
query = query + strings.Join(valueStrings, ",")
|
|
|
|
stmt, err := ch.DB.Prepare(query)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func(stmt *sql.Stmt) {
|
|
err := stmt.Close()
|
|
if err != nil {
|
|
fmt.Printf("Ошибка закрытия подключения к БД")
|
|
}
|
|
}(stmt)
|
|
|
|
_, err = stmt.Exec(valueArgs...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|