Files
waf-platform/EdgeReporter/internal/nodes/task_manager.go

142 lines
3.4 KiB
Go

// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nodes
import (
"encoding/json"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/reporterconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeReporter/internal/remotelogs"
"github.com/TeaOSLab/EdgeReporter/internal/rpc"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/types"
"net"
"sync"
"time"
)
type TaskManager struct {
}
func NewTaskManager() *TaskManager {
return &TaskManager{}
}
func (this *TaskManager) Start() {
// 监控时间没必要过短,因为我们不需要那么敏感
var ticker = time.NewTicker(3 * time.Minute)
if Tea.IsTesting() {
ticker = time.NewTicker(1 * time.Minute)
}
// 立即执行一次
err := this.Loop()
if err != nil {
remotelogs.Error("TASK_MANAGER", err.Error())
}
// 循环执行
for range ticker.C {
err := this.Loop()
if err != nil {
remotelogs.Error("TASK_MANAGER", err.Error())
}
}
}
func (this *TaskManager) Loop() error {
rpcClient, err := rpc.SharedRPC()
if err != nil {
return err
}
resp, err := rpcClient.ReportNodeRPC().FindReportNodeTasks(rpcClient.Context(), &pb.FindReportNodeTasksRequest{})
if err != nil {
return err
}
{
var tasksJSON = resp.IpAddrTasksJSON
if len(tasksJSON) > 0 {
var ipTasks = []*reporterconfigs.IPTask{}
err = json.Unmarshal(tasksJSON, &ipTasks)
if err != nil {
return err
}
if len(ipTasks) > 0 {
var queue = make(chan *reporterconfigs.IPTask, len(ipTasks))
for _, task := range ipTasks {
queue <- task
}
var pbResults = []*pb.ReportResult{}
var concurrent = 32
var wg = &sync.WaitGroup{}
wg.Add(concurrent)
for i := 0; i < concurrent; i++ {
go func() {
defer wg.Done()
for {
select {
case task := <-queue:
desc, costMs, err := this.runIPTask(task)
var errString = ""
if err != nil {
errString = err.Error()
}
// 级别
var level reporterconfigs.ReportLevel
if err != nil {
level = reporterconfigs.ReportLevelBroken
} else if costMs > 3000 { //
level = reporterconfigs.ReportLevelBad
} else if costMs > 1000 {
level = reporterconfigs.ReportLevelNormal
} else {
level = reporterconfigs.ReportLevelGood
}
pbResults = append(pbResults, &pb.ReportResult{
Type: reporterconfigs.TaskTypeIPAddr,
TargetId: task.AddrId,
TargetDesc: desc,
IsOk: err == nil,
CostMs: float32(costMs),
Error: errString,
Level: level,
})
default:
return
}
}
}()
}
wg.Wait()
if len(pbResults) > 0 {
_, err = rpcClient.ReportResultRPC().UpdateReportResults(rpcClient.Context(), &pb.UpdateReportResultsRequest{ReportResults: pbResults})
if err != nil {
remotelogs.Error("TASK_MANAGER", "upload report results failed: "+err.Error())
}
}
}
}
}
return nil
}
func (this *TaskManager) runIPTask(task *reporterconfigs.IPTask) (desc string, costMs float64, err error) {
before := time.Now()
desc = configutils.QuoteIP(task.IP) + ":" + types.String(task.Port)
conn, err := net.DialTimeout("tcp", desc, 5*time.Second)
if err != nil {
return desc, time.Since(before).Seconds() * 1000, err
}
_ = conn.Close()
return desc, time.Since(before).Seconds() * 1000, nil
}