go_rabbitmq/app/databases/clickhouse.go

127 lines
3.4 KiB
Go

package databases
import (
"database/sql"
"fmt"
"log"
"os"
"strings"
"sync"
_ "github.com/ClickHouse/clickhouse-go"
)
type Clickhouse struct {
DB *sql.DB
TablesQueues map[string][]interface{}
NeedCountForTable map[string]int
TableTemplate map[string]string
mux sync.Mutex
}
func NewClickhouse() *Clickhouse {
ch := &Clickhouse{
DB: nil,
TablesQueues: make(map[string][]interface{}),
NeedCountForTable: map[string]int{
"device_memory": 3000,
"device_logs": 3000,
"simcard_logs": 3000,
},
TableTemplate: map[string]string{
"device_traffic": "(device_id, iface, tx, rx, tx_max, rx_max, tx_traffic, rx_traffic, ts)",
"device_cpu_status": "(device_id, cpu, cpu_temp, mother_temp, ts)",
"device_loc_history": "(device_id, ts, long, lat, speed, course, altitude)",
"device_memory": "(device_id, ts, data)",
"device_logs": "(device_id, type, type_event, params, ts, ts_device)",
"simcard_logs": "(imsi, type, params, ts)",
"simcard_network_status": "(imsi, device_id, ts, signal, sig_lvl, state, roaming, iface, rsrp, rsrq, ecio, rscp, lte_band, sinr, temp, oper, oper_id, cell_id, lac_id)",
"simcard_speed": "(imsi, device_id, tx, rx, tx_max, rx_max, tx_traffic, rx_traffic, ts)",
"node_device_logs": "(dev_id, cmd_id, data, ts)",
"rstat_data": "(device_id, ts, rstat_serial, in, out, pass)",
"communication_status": "(device_id, ts, iface, rssi, rsrp, rsrq, ecio, rscp, lte_band, sinr, network_reg, temp)",
},
}
for table := range ch.NeedCountForTable {
ch.TablesQueues[table] = make([]interface{}, 0)
}
return ch
}
func (ch *Clickhouse) InitDB() error {
dsn := fmt.Sprintf("tcp://%s:%s?username=%s&password=%s&database=%s",
os.Getenv("CLICKHOUSE_HOST"), os.Getenv("CLICKHOUSE_PORT"),
os.Getenv("CLICKHOUSE_USER"), os.Getenv("CLICKHOUSE_PASSWORD"),
os.Getenv("CLICKHOUSE_DATABASE"),
)
db, err := sql.Open("clickhouse", dsn)
if err != nil {
return err
}
ch.DB = db
return nil
}
func (ch *Clickhouse) Insert(table string, data interface{}) error {
ch.mux.Lock()
ch.TablesQueues[table] = append(ch.TablesQueues[table], data)
shouldFlush := len(ch.TablesQueues[table]) >= ch.NeedCountForTable[table]
ch.mux.Unlock()
if shouldFlush {
go func() {
if err := ch.FlushQueue(table); err != nil {
log.Printf("Clickhouse error: %s", err)
}
}()
}
return nil
}
func (ch *Clickhouse) FlushQueue(table string) error {
ch.mux.Lock()
if ch.DB == nil {
ch.mux.Unlock()
return fmt.Errorf("database connection is not initialized")
}
data := ch.TablesQueues[table]
ch.TablesQueues[table] = make([]interface{}, 0)
ch.mux.Unlock()
query := fmt.Sprintf("INSERT INTO %s %s VALUES", table, ch.TableTemplate[table])
var valueStrings []string
var valueArgs []interface{}
for _, entry := range data {
values := entry.([]interface{})
valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?)")
valueArgs = append(valueArgs, values...)
}
query = query + strings.Join(valueStrings, ",")
stmt, err := ch.DB.Prepare(query)
if err != nil {
return err
}
defer func(stmt *sql.Stmt) {
err := stmt.Close()
if err != nil {
fmt.Printf("Ошибка закрытия подключения к БД")
}
}(stmt)
_, err = stmt.Exec(valueArgs...)
if err != nil {
return err
}
return nil
}