// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. package nodes import ( "encoding/json" "errors" "github.com/TeaOSLab/EdgeCommon/pkg/reporterconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeReporter/internal/configs" teaconst "github.com/TeaOSLab/EdgeReporter/internal/const" "github.com/TeaOSLab/EdgeReporter/internal/events" "github.com/TeaOSLab/EdgeReporter/internal/remotelogs" "github.com/TeaOSLab/EdgeReporter/internal/rpc" "github.com/TeaOSLab/EdgeReporter/internal/utils" _ "github.com/iwind/TeaGo/bootstrap" "github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/maps" "github.com/iwind/gosock/pkg/gosock" "log" "net" "os" "os/exec" "runtime" "time" ) var sharedNodeConfig *reporterconfigs.NodeConfig var nodeTaskNotify = make(chan bool, 8) type ReportNode struct { sock *gosock.Sock RPC *rpc.RPCClient } func NewReportNode() *ReportNode { return &ReportNode{ sock: gosock.NewTmpSock(teaconst.ProcessName), } } func (this *ReportNode) Run() { // 本地Sock err := this.listenSock() if err != nil { logs.Println("[REPORT_NODE]" + err.Error()) return } // 触发事件 events.Notify(events.EventStart) // 启动 go this.start() // Hold住进程 select {} } // Daemon 守护程序 func (this *ReportNode) Daemon() { path := os.TempDir() + "/edge-reporter.sock" isDebug := lists.ContainsString(os.Args, "debug") isDebug = true for { conn, err := net.DialTimeout("unix", path, 1*time.Second) if err != nil { if isDebug { log.Println("[DAEMON]starting ...") } // 尝试启动 err = func() error { exe, err := os.Executable() if err != nil { return err } cmd := exec.Command(exe) err = cmd.Start() if err != nil { return err } err = cmd.Wait() if err != nil { return err } return nil }() if err != nil { if isDebug { log.Println("[DAEMON]", err) } time.Sleep(1 * time.Second) } else { time.Sleep(5 * time.Second) } } else { _ = conn.Close() time.Sleep(5 * time.Second) } } } // InstallSystemService 安装系统服务 func (this *ReportNode) InstallSystemService() error { shortName := teaconst.SystemdServiceName exe, err := os.Executable() if err != nil { return err } manager := utils.NewServiceManager(shortName, teaconst.ProductName) err = manager.Install(exe, []string{}) if err != nil { return err } return nil } // 监听本地sock func (this *ReportNode) listenSock() error { if runtime.GOOS == "windows" { return nil } // 检查是否在运行 if this.sock.IsListening() { reply, err := this.sock.Send(&gosock.Command{Code: "pid"}) if err == nil { return errors.New("error: the process is already running, pid: " + maps.NewMap(reply.Params).GetString("pid")) } else { return errors.New("error: the process is already running") } } // 启动监听 go func() { this.sock.OnCommand(func(cmd *gosock.Command) { switch cmd.Code { case "pid": _ = cmd.Reply(&gosock.Command{ Code: "pid", Params: map[string]interface{}{ "pid": os.Getpid(), }, }) case "info": exePath, _ := os.Executable() _ = cmd.Reply(&gosock.Command{ Code: "info", Params: map[string]interface{}{ "pid": os.Getpid(), "version": teaconst.Version, "path": exePath, }, }) case "stop": _ = cmd.ReplyOk() // 退出主进程 events.Notify(events.EventQuit) os.Exit(0) } }) err := this.sock.Listen() if err != nil { logs.Println("REPORT_NODE", err.Error()) } }() events.On(events.EventQuit, func() { logs.Println("REPORT_NODE", "quit unix sock") _ = this.sock.Close() }) return nil } // 启动 func (this *ReportNode) start() { remotelogs.Println("REPORT_NODE", "starting ...") apiConfig, err := configs.LoadAPIConfig() if err != nil { remotelogs.Println("REPORT_NODE", err.Error()) return } client, err := rpc.SharedRPC() if err != nil { remotelogs.Error("REPORT_NODE", err.Error()) return } this.RPC = client tryTimes := 0 var configJSON []byte for { resp, err := client.ReportNodeRPC().FindCurrentReportNodeConfig(client.Context(), &pb.FindCurrentReportNodeConfigRequest{}) if err != nil { tryTimes++ if tryTimes%10 == 0 { remotelogs.Error("NODE", err.Error()) } time.Sleep(1 * time.Second) // 不做长时间的无意义的重试 if tryTimes > 1000 { return } } else { configJSON = resp.ReportNodeJSON break } } if len(configJSON) == 0 { remotelogs.Error("NODE", "can not find node") return } var config = &reporterconfigs.NodeConfig{} err = json.Unmarshal(configJSON, config) if err != nil { remotelogs.Error("NODE", "decode config failed: "+err.Error()) return } err = config.Init() if err != nil { remotelogs.Error("NODE", "init config failed: "+err.Error()) return } sharedNodeConfig = config apiConfig.NumberId = config.Id remotelogs.Println("REPORT_NODE", "started") // 发送通知 events.Notify(events.EventLoad) // 连接API go NewAPIStream().Start() // 状态上报 go NewStatusManager().Start() // 执行任务 go NewTaskManager().Start() }