// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. package nodes import ( "encoding/json" "github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/reporterconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeReporter/internal/remotelogs" "github.com/TeaOSLab/EdgeReporter/internal/rpc" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/types" "net" "sync" "time" ) type TaskManager struct { } func NewTaskManager() *TaskManager { return &TaskManager{} } func (this *TaskManager) Start() { // 监控时间没必要过短,因为我们不需要那么敏感 var ticker = time.NewTicker(3 * time.Minute) if Tea.IsTesting() { ticker = time.NewTicker(1 * time.Minute) } // 立即执行一次 err := this.Loop() if err != nil { remotelogs.Error("TASK_MANAGER", err.Error()) } // 循环执行 for range ticker.C { err := this.Loop() if err != nil { remotelogs.Error("TASK_MANAGER", err.Error()) } } } func (this *TaskManager) Loop() error { rpcClient, err := rpc.SharedRPC() if err != nil { return err } resp, err := rpcClient.ReportNodeRPC().FindReportNodeTasks(rpcClient.Context(), &pb.FindReportNodeTasksRequest{}) if err != nil { return err } { var tasksJSON = resp.IpAddrTasksJSON if len(tasksJSON) > 0 { var ipTasks = []*reporterconfigs.IPTask{} err = json.Unmarshal(tasksJSON, &ipTasks) if err != nil { return err } if len(ipTasks) > 0 { var queue = make(chan *reporterconfigs.IPTask, len(ipTasks)) for _, task := range ipTasks { queue <- task } var pbResults = []*pb.ReportResult{} var concurrent = 32 var wg = &sync.WaitGroup{} wg.Add(concurrent) for i := 0; i < concurrent; i++ { go func() { defer wg.Done() for { select { case task := <-queue: desc, costMs, err := this.runIPTask(task) var errString = "" if err != nil { errString = err.Error() } // 级别 var level reporterconfigs.ReportLevel if err != nil { level = reporterconfigs.ReportLevelBroken } else if costMs > 3000 { // level = reporterconfigs.ReportLevelBad } else if costMs > 1000 { level = reporterconfigs.ReportLevelNormal } else { level = reporterconfigs.ReportLevelGood } pbResults = append(pbResults, &pb.ReportResult{ Type: reporterconfigs.TaskTypeIPAddr, TargetId: task.AddrId, TargetDesc: desc, IsOk: err == nil, CostMs: float32(costMs), Error: errString, Level: level, }) default: return } } }() } wg.Wait() if len(pbResults) > 0 { _, err = rpcClient.ReportResultRPC().UpdateReportResults(rpcClient.Context(), &pb.UpdateReportResultsRequest{ReportResults: pbResults}) if err != nil { remotelogs.Error("TASK_MANAGER", "upload report results failed: "+err.Error()) } } } } } return nil } func (this *TaskManager) runIPTask(task *reporterconfigs.IPTask) (desc string, costMs float64, err error) { before := time.Now() desc = configutils.QuoteIP(task.IP) + ":" + types.String(task.Port) conn, err := net.DialTimeout("tcp", desc, 5*time.Second) if err != nil { return desc, time.Since(before).Seconds() * 1000, err } _ = conn.Close() return desc, time.Since(before).Seconds() * 1000, nil }