package metrics import ( "encoding/json" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/types" "os" ) func MigrateSQLiteTaskToKV(item *serverconfigs.MetricItemConfig) error { var itemIdString = types.String(item.Id) var sqlitePath = Tea.Root + "/data/metric." + itemIdString + ".db" _, err := os.Stat(sqlitePath) if err != nil { if os.IsNotExist(err) { return nil } return err } dst := NewKVTask(item) err = dst.Init() if err != nil { return err } err = dst.Truncate() if err != nil { return err } remotelogs.Println("METRIC", "migrating sqlite task '"+itemIdString+"' to kvstore ...") src := NewSQLiteTask(item) err = src.Init() if err != nil { return err } defer closeSQLiteTask(src) var offset = 0 const size = 1000 for { rows, queryErr := src.db.Query(`SELECT "hash", "keys", "value", "time", "serverId" FROM "`+src.statTableName+`" WHERE "version"=? ORDER BY "id" ASC LIMIT ?, ?`, item.Version, offset, size) if queryErr != nil { return queryErr } var countRows int for rows.Next() { countRows++ var hash string var keysData []byte var value float64 var timeString string var serverId int64 scanErr := rows.Scan(&hash, &keysData, &value, &timeString, &serverId) if scanErr != nil { _ = rows.Close() return scanErr } var keys []string if len(keysData) > 0 { unmarshalErr := json.Unmarshal(keysData, &keys) if unmarshalErr != nil { _ = rows.Close() return unmarshalErr } } insertErr := dst.InsertStat(&Stat{ ServerId: serverId, Keys: keys, Hash: hash, Value: int64(value), Time: timeString, }) if insertErr != nil { _ = rows.Close() return insertErr } } err = rows.Err() if err != nil { _ = rows.Close() return err } closeErr := rows.Close() if closeErr != nil { return closeErr } if countRows < size { break } offset += size } err = dst.Flush() if err != nil { return err } err = removeMetricSQLiteFiles(sqlitePath) if err != nil { return err } remotelogs.Println("METRIC", "migrated sqlite task '"+itemIdString+"' to kvstore") return nil } func closeSQLiteTask(task *SQLiteTask) { if task == nil { return } for _, stmt := range []*dbs.Stmt{ task.insertStatStmt, task.deleteByVersionStmt, task.deleteByExpiresTimeStmt, task.selectTopStmt, task.sumStmt, } { if stmt != nil { _ = stmt.Close() } } if task.db != nil { _ = task.db.Close() } } func removeMetricSQLiteFiles(path string) error { for _, filename := range []string{path, path + "-shm", path + "-wal"} { err := os.Remove(filename) if err != nil && !os.IsNotExist(err) { return err } } return nil }