package nodes import ( "encoding/json" "log" "os" "runtime" "strconv" "strings" "time" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeHttpDNS/internal/configs" teaconst "github.com/TeaOSLab/EdgeHttpDNS/internal/const" "github.com/TeaOSLab/EdgeHttpDNS/internal/rpc" "github.com/TeaOSLab/EdgeHttpDNS/internal/utils" "github.com/iwind/TeaGo/maps" "github.com/shirou/gopsutil/v3/cpu" "github.com/shirou/gopsutil/v3/mem" ) type StatusManager struct { quitCh <-chan struct{} ticker *time.Ticker isFirstTime bool cpuUpdatedTime time.Time cpuLogicalCount int cpuPhysicalCount int } func NewStatusManager(quitCh <-chan struct{}) *StatusManager { return &StatusManager{ quitCh: quitCh, ticker: time.NewTicker(30 * time.Second), isFirstTime: true, cpuUpdatedTime: time.Now(), cpuLogicalCount: runtime.NumCPU(), } } func (m *StatusManager) Start() { defer m.ticker.Stop() m.update() for { select { case <-m.ticker.C: m.update() case <-m.quitCh: return } } } func (m *StatusManager) update() { status := m.collectStatus() statusJSON, err := json.Marshal(status) if err != nil { log.Println("[HTTPDNS_NODE][status]marshal status failed:", err.Error()) return } rpcClient, err := rpc.SharedRPC() if err != nil { log.Println("[HTTPDNS_NODE][status]rpc unavailable:", err.Error()) return } config, err := configs.SharedAPIConfig() if err != nil { log.Println("[HTTPDNS_NODE][status]load config failed:", err.Error()) return } nodeId, _ := strconv.ParseInt(config.NodeId, 10, 64) _, err = rpcClient.HTTPDNSNodeRPC.UpdateHTTPDNSNodeStatus(rpcClient.Context(), &pb.UpdateHTTPDNSNodeStatusRequest{ NodeId: nodeId, IsUp: true, IsInstalled: true, IsActive: true, StatusJSON: statusJSON, }) if err != nil { log.Println("[HTTPDNS_NODE][status]update status failed:", err.Error()) } m.reportNodeValues(rpcClient, status) m.isFirstTime = false } func (m *StatusManager) collectStatus() *nodeconfigs.NodeStatus { now := time.Now().Unix() status := &nodeconfigs.NodeStatus{ BuildVersion: teaconst.Version, BuildVersionCode: utils.VersionToLong(teaconst.Version), ConfigVersion: 0, OS: runtime.GOOS, Arch: runtime.GOARCH, CPULogicalCount: m.cpuLogicalCount, CPUPhysicalCount: m.cpuPhysicalCount, IsActive: true, ConnectionCount: 0, UpdatedAt: now, Timestamp: now, } rpcClient, err := rpc.SharedRPC() if err == nil { total, failed, avgCostSeconds := rpcClient.GetAndResetMetrics() if total > 0 { status.APISuccessPercent = float64(total-failed) * 100.0 / float64(total) status.APIAvgCostSeconds = avgCostSeconds } } hostname, _ := os.Hostname() status.Hostname = hostname exePath, _ := os.Executable() status.ExePath = exePath m.updateCPU(status) m.updateMemory(status) m.updateLoad(status) return status } func (m *StatusManager) updateCPU(status *nodeconfigs.NodeStatus) { duration := time.Duration(0) if m.isFirstTime { duration = 100 * time.Millisecond } percents, err := cpu.Percent(duration, false) if err == nil && len(percents) > 0 { status.CPUUsage = percents[0] / 100 } if time.Since(m.cpuUpdatedTime) >= 5*time.Minute || m.cpuLogicalCount <= 0 { m.cpuUpdatedTime = time.Now() if logicalCount, countErr := cpu.Counts(true); countErr == nil && logicalCount > 0 { m.cpuLogicalCount = logicalCount } if physicalCount, countErr := cpu.Counts(false); countErr == nil && physicalCount > 0 { m.cpuPhysicalCount = physicalCount } } status.CPULogicalCount = m.cpuLogicalCount if m.cpuPhysicalCount > 0 { status.CPUPhysicalCount = m.cpuPhysicalCount } } func (m *StatusManager) updateMemory(status *nodeconfigs.NodeStatus) { stat, err := mem.VirtualMemory() if err != nil || stat == nil || stat.Total == 0 { return } usedBytes := stat.Used if stat.Total > stat.Free+stat.Buffers+stat.Cached { usedBytes = stat.Total - stat.Free - stat.Buffers - stat.Cached } status.MemoryTotal = stat.Total status.MemoryUsage = float64(usedBytes) / float64(stat.Total) } func (m *StatusManager) updateLoad(status *nodeconfigs.NodeStatus) { load1m, load5m, load15m := readLoadAvg() status.Load1m = load1m status.Load5m = load5m status.Load15m = load15m } func (m *StatusManager) reportNodeValues(rpcClient *rpc.RPCClient, status *nodeconfigs.NodeStatus) { if rpcClient == nil || status == nil { return } m.createNodeValue(rpcClient, nodeconfigs.NodeValueItemCPU, maps.Map{ "usage": status.CPUUsage, "cores": status.CPULogicalCount, }) m.createNodeValue(rpcClient, nodeconfigs.NodeValueItemMemory, maps.Map{ "usage": status.MemoryUsage, "total": status.MemoryTotal, }) m.createNodeValue(rpcClient, nodeconfigs.NodeValueItemLoad, maps.Map{ "load1m": status.Load1m, "load5m": status.Load5m, "load15m": status.Load15m, }) } func (m *StatusManager) createNodeValue(rpcClient *rpc.RPCClient, item string, value maps.Map) { valueJSON, err := json.Marshal(value) if err != nil { log.Println("[HTTPDNS_NODE][status]marshal node value failed:", err.Error()) return } _, err = rpcClient.NodeValueRPC.CreateNodeValue(rpcClient.Context(), &pb.CreateNodeValueRequest{ Item: item, ValueJSON: valueJSON, CreatedAt: time.Now().Unix(), }) if err != nil { log.Println("[HTTPDNS_NODE][status]create node value failed:", item, err.Error()) } } func readLoadAvg() (float64, float64, float64) { data, err := os.ReadFile("/proc/loadavg") if err != nil { return 0, 0, 0 } parts := strings.Fields(strings.TrimSpace(string(data))) if len(parts) < 3 { return 0, 0, 0 } load1m, _ := strconv.ParseFloat(parts[0], 64) load5m, _ := strconv.ParseFloat(parts[1], 64) load15m, _ := strconv.ParseFloat(parts[2], 64) return load1m, load5m, load15m }