package databases import ( "database/sql" "fmt" "log" "os" "strings" "sync" _ "github.com/ClickHouse/clickhouse-go" ) type Clickhouse struct { DB *sql.DB TablesQueues map[string][]interface{} NeedCountForTable map[string]int TableTemplate map[string]string mux sync.Mutex } func NewClickhouse() *Clickhouse { ch := &Clickhouse{ DB: nil, TablesQueues: make(map[string][]interface{}), NeedCountForTable: map[string]int{ "device_memory": 3000, "device_logs": 3000, "simcard_logs": 3000, }, TableTemplate: map[string]string{ "device_traffic": "(device_id, iface, tx, rx, tx_max, rx_max, tx_traffic, rx_traffic, ts)", "device_cpu_status": "(device_id, cpu, cpu_temp, mother_temp, ts)", "device_loc_history": "(device_id, ts, long, lat, speed, course, altitude)", "device_memory": "(device_id, ts, data)", "device_logs": "(device_id, type, type_event, params, ts, ts_device)", "simcard_logs": "(imsi, type, params, ts)", "simcard_network_status": "(imsi, device_id, ts, signal, sig_lvl, state, roaming, iface, rsrp, rsrq, ecio, rscp, lte_band, sinr, temp, oper, oper_id, cell_id, lac_id)", "simcard_speed": "(imsi, device_id, tx, rx, tx_max, rx_max, tx_traffic, rx_traffic, ts)", "node_device_logs": "(dev_id, cmd_id, data, ts)", "rstat_data": "(device_id, ts, rstat_serial, in, out, pass)", "communication_status": "(device_id, ts, iface, rssi, rsrp, rsrq, ecio, rscp, lte_band, sinr, network_reg, temp)", }, } for table := range ch.NeedCountForTable { ch.TablesQueues[table] = make([]interface{}, 0) } return ch } func (ch *Clickhouse) InitDB() error { dsn := fmt.Sprintf("tcp://%s:%s?username=%s&password=%s&database=%s", os.Getenv("CLICKHOUSE_HOST"), os.Getenv("CLICKHOUSE_PORT"), os.Getenv("CLICKHOUSE_USER"), os.Getenv("CLICKHOUSE_PASSWORD"), os.Getenv("CLICKHOUSE_DATABASE"), ) db, err := sql.Open("clickhouse", dsn) if err != nil { return err } ch.DB = db return nil } func (ch *Clickhouse) Insert(table string, data interface{}) error { ch.mux.Lock() ch.TablesQueues[table] = append(ch.TablesQueues[table], data) shouldFlush := len(ch.TablesQueues[table]) >= ch.NeedCountForTable[table] ch.mux.Unlock() if shouldFlush { go func() { if err := ch.FlushQueue(table); err != nil { log.Printf("Clickhouse error: %s", err) } }() } return nil } func (ch *Clickhouse) FlushQueue(table string) error { ch.mux.Lock() if ch.DB == nil { ch.mux.Unlock() return fmt.Errorf("database connection is not initialized") } data := ch.TablesQueues[table] ch.TablesQueues[table] = make([]interface{}, 0) ch.mux.Unlock() query := fmt.Sprintf("INSERT INTO %s %s VALUES", table, ch.TableTemplate[table]) var valueStrings []string var valueArgs []interface{} for _, entry := range data { values := entry.([]interface{}) valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?)") valueArgs = append(valueArgs, values...) } query = query + strings.Join(valueStrings, ",") stmt, err := ch.DB.Prepare(query) if err != nil { return err } defer func(stmt *sql.Stmt) { err := stmt.Close() if err != nil { fmt.Printf("Ошибка закрытия подключения к БД") } }(stmt) _, err = stmt.Exec(valueArgs...) if err != nil { return err } return nil }