Files
2026-02-04 20:27:13 +08:00

252 lines
5.1 KiB
Go

// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nodes
import (
"encoding/json"
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/reporterconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeReporter/internal/configs"
teaconst "github.com/TeaOSLab/EdgeReporter/internal/const"
"github.com/TeaOSLab/EdgeReporter/internal/events"
"github.com/TeaOSLab/EdgeReporter/internal/remotelogs"
"github.com/TeaOSLab/EdgeReporter/internal/rpc"
"github.com/TeaOSLab/EdgeReporter/internal/utils"
_ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/gosock/pkg/gosock"
"log"
"net"
"os"
"os/exec"
"runtime"
"time"
)
var sharedNodeConfig *reporterconfigs.NodeConfig
var nodeTaskNotify = make(chan bool, 8)
type ReportNode struct {
sock *gosock.Sock
RPC *rpc.RPCClient
}
func NewReportNode() *ReportNode {
return &ReportNode{
sock: gosock.NewTmpSock(teaconst.ProcessName),
}
}
func (this *ReportNode) Run() {
// 本地Sock
err := this.listenSock()
if err != nil {
logs.Println("[REPORT_NODE]" + err.Error())
return
}
// 触发事件
events.Notify(events.EventStart)
// 启动
go this.start()
// Hold住进程
select {}
}
// Daemon 守护程序
func (this *ReportNode) Daemon() {
path := os.TempDir() + "/edge-reporter.sock"
isDebug := lists.ContainsString(os.Args, "debug")
isDebug = true
for {
conn, err := net.DialTimeout("unix", path, 1*time.Second)
if err != nil {
if isDebug {
log.Println("[DAEMON]starting ...")
}
// 尝试启动
err = func() error {
exe, err := os.Executable()
if err != nil {
return err
}
cmd := exec.Command(exe)
err = cmd.Start()
if err != nil {
return err
}
err = cmd.Wait()
if err != nil {
return err
}
return nil
}()
if err != nil {
if isDebug {
log.Println("[DAEMON]", err)
}
time.Sleep(1 * time.Second)
} else {
time.Sleep(5 * time.Second)
}
} else {
_ = conn.Close()
time.Sleep(5 * time.Second)
}
}
}
// InstallSystemService 安装系统服务
func (this *ReportNode) InstallSystemService() error {
shortName := teaconst.SystemdServiceName
exe, err := os.Executable()
if err != nil {
return err
}
manager := utils.NewServiceManager(shortName, teaconst.ProductName)
err = manager.Install(exe, []string{})
if err != nil {
return err
}
return nil
}
// 监听本地sock
func (this *ReportNode) listenSock() error {
if runtime.GOOS == "windows" {
return nil
}
// 检查是否在运行
if this.sock.IsListening() {
reply, err := this.sock.Send(&gosock.Command{Code: "pid"})
if err == nil {
return errors.New("error: the process is already running, pid: " + maps.NewMap(reply.Params).GetString("pid"))
} else {
return errors.New("error: the process is already running")
}
}
// 启动监听
go func() {
this.sock.OnCommand(func(cmd *gosock.Command) {
switch cmd.Code {
case "pid":
_ = cmd.Reply(&gosock.Command{
Code: "pid",
Params: map[string]interface{}{
"pid": os.Getpid(),
},
})
case "info":
exePath, _ := os.Executable()
_ = cmd.Reply(&gosock.Command{
Code: "info",
Params: map[string]interface{}{
"pid": os.Getpid(),
"version": teaconst.Version,
"path": exePath,
},
})
case "stop":
_ = cmd.ReplyOk()
// 退出主进程
events.Notify(events.EventQuit)
os.Exit(0)
}
})
err := this.sock.Listen()
if err != nil {
logs.Println("REPORT_NODE", err.Error())
}
}()
events.On(events.EventQuit, func() {
logs.Println("REPORT_NODE", "quit unix sock")
_ = this.sock.Close()
})
return nil
}
// 启动
func (this *ReportNode) start() {
remotelogs.Println("REPORT_NODE", "starting ...")
apiConfig, err := configs.LoadAPIConfig()
if err != nil {
remotelogs.Println("REPORT_NODE", err.Error())
return
}
client, err := rpc.SharedRPC()
if err != nil {
remotelogs.Error("REPORT_NODE", err.Error())
return
}
this.RPC = client
tryTimes := 0
var configJSON []byte
for {
resp, err := client.ReportNodeRPC().FindCurrentReportNodeConfig(client.Context(), &pb.FindCurrentReportNodeConfigRequest{})
if err != nil {
tryTimes++
if tryTimes%10 == 0 {
remotelogs.Error("NODE", err.Error())
}
time.Sleep(1 * time.Second)
// 不做长时间的无意义的重试
if tryTimes > 1000 {
return
}
} else {
configJSON = resp.ReportNodeJSON
break
}
}
if len(configJSON) == 0 {
remotelogs.Error("NODE", "can not find node")
return
}
var config = &reporterconfigs.NodeConfig{}
err = json.Unmarshal(configJSON, config)
if err != nil {
remotelogs.Error("NODE", "decode config failed: "+err.Error())
return
}
err = config.Init()
if err != nil {
remotelogs.Error("NODE", "init config failed: "+err.Error())
return
}
sharedNodeConfig = config
apiConfig.NumberId = config.Id
remotelogs.Println("REPORT_NODE", "started")
// 发送通知
events.Notify(events.EventLoad)
// 连接API
go NewAPIStream().Start()
// 状态上报
go NewStatusManager().Start()
// 执行任务
go NewTaskManager().Start()
}