//go:build plus package nodes import ( "context" "encoding/json" "errors" "fmt" "github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/iplibrary" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ddosconfigs" "github.com/TeaOSLab/EdgeDNS/internal/agents" "github.com/TeaOSLab/EdgeDNS/internal/configs" teaconst "github.com/TeaOSLab/EdgeDNS/internal/const" "github.com/TeaOSLab/EdgeDNS/internal/dbs" "github.com/TeaOSLab/EdgeDNS/internal/events" "github.com/TeaOSLab/EdgeDNS/internal/firewalls" "github.com/TeaOSLab/EdgeDNS/internal/goman" "github.com/TeaOSLab/EdgeDNS/internal/remotelogs" "github.com/TeaOSLab/EdgeDNS/internal/rpc" "github.com/TeaOSLab/EdgeDNS/internal/utils" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/types" "github.com/iwind/gosock/pkg/gosock" "log" "os" "os/exec" "os/signal" "regexp" "runtime" "runtime/debug" "sync/atomic" "syscall" "time" ) var DaemonIsOn = false var DaemonPid = 0 var nodeTaskNotify = make(chan bool, 8) var sharedDomainManager *DomainManager var sharedRecordManager *RecordManager var sharedRouteManager *RouteManager var sharedKeyManager *KeyManager var sharedNodeConfig atomic.Pointer[dnsconfigs.NSNodeConfig] func init() { // 初始化默认空配置 sharedNodeConfig.Store(&dnsconfigs.NSNodeConfig{}) } // dnsNodeConfig 返回当前 DNS 节点配置(并发安全) func dnsNodeConfig() *dnsconfigs.NSNodeConfig { return sharedNodeConfig.Load() } func NewDNSNode() *DNSNode { return &DNSNode{ sock: gosock.NewTmpSock(teaconst.ProcessName), } } type DNSNode struct { sock *gosock.Sock RPC *rpc.RPCClient } func (this *DNSNode) Start() { // 设置netdns // 这个需要放在所有网络访问的最前面 _ = os.Setenv("GODEBUG", "netdns=go") // 判断是否在守护进程下 _, ok := os.LookupEnv("EdgeDaemon") if ok { remotelogs.Println("DNS_NODE", "start from daemon") DaemonIsOn = true DaemonPid = os.Getppid() } // 设置DNS解析库 err := os.Setenv("GODEBUG", "netdns=go") if err != nil { remotelogs.Error("DNS_NODE", "[DNS_RESOLVER]set env failed: "+err.Error()) } // 处理异常 this.handlePanic() // 监听signal this.listenSignals() // 本地Sock err = this.listenSock() if err != nil { logs.Println("[DNS_NODE]" + err.Error()) return } // 启动IP库 remotelogs.Println("DNS_NODE", "initializing ip library ...") err = iplibrary.InitPlus() if err != nil { remotelogs.Error("DNS_NODE", "initialize ip library failed: "+err.Error()) } runtime.GC() debug.FreeOSMemory() // 触发事件 events.Notify(events.EventStart) // 监控状态 goman.New(func() { NewNodeStatusExecutor().Listen() }) // 连接API goman.New(func() { NewAPIStream().Start() }) // 启动 goman.New(func() { this.start() }) // Hold住进程 logs.Println("[DNS_NODE]started") select {} } // Daemon 实现守护进程 func (this *DNSNode) Daemon() { var isDebug = lists.ContainsString(os.Args, "debug") for { conn, err := this.sock.Dial() if err != nil { if isDebug { log.Println("[DAEMON]starting ...") } // 尝试启动 err = func() error { exe, err := os.Executable() if err != nil { return err } // 可以标记当前是从守护进程启动的 _ = os.Setenv("EdgeDaemon", "on") _ = os.Setenv("EdgeBackground", "on") var cmd = exec.Command(exe) err = cmd.Start() if err != nil { return err } err = cmd.Wait() if err != nil { return err } return nil }() if err != nil { if isDebug { log.Println("[DAEMON]", err) } time.Sleep(1 * time.Second) } else { time.Sleep(5 * time.Second) } } else { _ = conn.Close() time.Sleep(5 * time.Second) } } } // Test 测试配置 func (this *DNSNode) Test() error { // 检查是否能连接API rpcClient, err := rpc.SharedRPC() if err != nil { return fmt.Errorf("test rpc failed: %w", err) } _, err = rpcClient.APINodeRPC.FindCurrentAPINodeVersion(rpcClient.Context(), &pb.FindCurrentAPINodeVersionRequest{}) if err != nil { return fmt.Errorf("test rpc failed: %w", err) } return nil } // InstallSystemService 安装系统服务 func (this *DNSNode) InstallSystemService() error { shortName := teaconst.SystemdServiceName exe, err := os.Executable() if err != nil { return err } manager := utils.NewServiceManager(shortName, teaconst.ProductName) err = manager.Install(exe, []string{}) if err != nil { return err } return nil } // 监听一些信号 func (this *DNSNode) listenSignals() { var queue = make(chan os.Signal, 8) signal.Notify(queue, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGQUIT) goman.New(func() { for range queue { time.Sleep(100 * time.Millisecond) utils.Exit() return } }) } // 监听本地sock func (this *DNSNode) listenSock() error { // 检查是否在运行 if this.sock.IsListening() { reply, err := this.sock.Send(&gosock.Command{Code: "pid"}) if err == nil { return errors.New("error: the process is already running, pid: " + maps.NewMap(reply.Params).GetString("pid")) } else { return errors.New("error: the process is already running") } } // 启动监听 goman.New(func() { this.sock.OnCommand(func(cmd *gosock.Command) { switch cmd.Code { case "pid": _ = cmd.Reply(&gosock.Command{ Code: "pid", Params: map[string]interface{}{ "pid": os.Getpid(), }, }) case "info": exePath, _ := os.Executable() _ = cmd.Reply(&gosock.Command{ Code: "info", Params: map[string]interface{}{ "pid": os.Getpid(), "version": teaconst.Version, "path": exePath, }, }) case "stop": _ = cmd.ReplyOk() // 退出主进程 events.Notify(events.EventQuit) time.Sleep(100 * time.Millisecond) os.Exit(0) case "gc": runtime.GC() debug.FreeOSMemory() _ = cmd.ReplyOk() } }) err := this.sock.Listen() if err != nil { logs.Println("NODE", err.Error()) } }) events.On(events.EventQuit, func() { logs.Println("[DNS_NODE]", "quit unix sock") _ = this.sock.Close() }) return nil } // 启动 func (this *DNSNode) start() { client, err := rpc.SharedRPC() if err != nil { remotelogs.Error("DNS_NODE", err.Error()) return } this.RPC = client tryTimes := 0 var configJSON []byte for { resp, err := client.NSNodeRPC.FindCurrentNSNodeConfig(client.Context(), &pb.FindCurrentNSNodeConfigRequest{}) if err != nil { tryTimes++ if tryTimes%10 == 0 { remotelogs.Error("NODE", "read config from API failed: "+err.Error()) } time.Sleep(1 * time.Second) // 不做长时间的无意义的重试 if tryTimes > 1000 { remotelogs.Error("NODE", "load failed: unable to read config from API") return } } else { configJSON = resp.NsNodeJSON break } } if len(configJSON) == 0 { remotelogs.Error("NODE", "can not find node config") return } var config = &dnsconfigs.NSNodeConfig{} err = json.Unmarshal(configJSON, config) if err != nil { remotelogs.Error("NODE", "decode config failed: "+err.Error()) return } err = config.Init(context.TODO()) if err != nil { remotelogs.Error("NODE", "init config failed: "+err.Error()) return } sharedNodeConfig.Store(config) configs.SharedNodeConfig.Store(config) events.Notify(events.EventReload) sharedNodeConfigManager.reload(config) apiConfig, _ := configs.SharedAPIConfig() if apiConfig != nil { apiConfig.NumberId = config.Id } // 验证 NodeId 防止路径遍历 var nodeIdRegexp = regexp.MustCompile(`^[a-zA-Z0-9_\-]+$`) if !nodeIdRegexp.MatchString(config.NodeId) { remotelogs.Error("NODE", "invalid NodeId: contains illegal characters") return } var db = dbs.NewDB(Tea.Root + "/data/data-" + types.String(config.Id) + "-" + config.NodeId + "-v0.1.0.db") err = db.Init() if err != nil { remotelogs.Error("NODE", "init database failed: "+err.Error()) return } goman.New(func() { sharedNodeConfigManager.Start() }) sharedDomainManager = NewDomainManager(db, config.ClusterId) goman.New(func() { sharedDomainManager.Start() }) sharedRecordManager = NewRecordManager(db) goman.New(func() { sharedRecordManager.Start() }) sharedRouteManager = NewRouteManager(db) goman.New(func() { sharedRouteManager.Start() }) sharedKeyManager = NewKeyManager(db) goman.New(func() { sharedKeyManager.Start() }) agents.SharedManager = agents.NewManager(db) goman.New(func() { agents.SharedManager.Start() }) // 等待所有 Manager 初始加载完成后再发送通知 <-sharedDomainManager.readyCh <-sharedRecordManager.readyCh <-sharedRouteManager.readyCh <-sharedKeyManager.readyCh <-agents.SharedManager.ReadyCh events.Notify(events.EventLoaded) // 启动循环 goman.New(func() { this.loop() }) } // 更新配置Loop func (this *DNSNode) loop() { var ticker = time.NewTicker(60 * time.Second) for { select { case <-ticker.C: case <-nodeTaskNotify: } err := this.processTasks() if err != nil { if rpc.IsConnError(err) { remotelogs.Debug("DNS_NODE", "process tasks: "+err.Error()) } else { remotelogs.Error("DNS_NODE", "process tasks: "+err.Error()) } } } } // 处理任务 func (this *DNSNode) processTasks() error { rpcClient, err := rpc.SharedRPC() if err != nil { return err } // 所有的任务 tasksResp, err := rpcClient.NodeTaskRPC.FindNodeTasks(rpcClient.Context(), &pb.FindNodeTasksRequest{}) if err != nil { return err } for _, task := range tasksResp.NodeTasks { switch task.Type { case "nsConfigChanged": sharedNodeConfigManager.NotifyChange() case "nsDomainChanged": sharedDomainManager.NotifyUpdate() case "nsRecordChanged": sharedRecordManager.NotifyUpdate() case "nsRouteChanged": sharedRouteManager.NotifyUpdate() case "nsKeyChanged": sharedKeyManager.NotifyUpdate() case "nsDDoSProtectionChanged": err := this.updateDDoS(rpcClient) if err != nil { remotelogs.Error("DNS_NODE", "apply DDoS config failed: "+err.Error()) } } _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(rpcClient.Context(), &pb.ReportNodeTaskDoneRequest{ NodeTaskId: task.Id, IsOk: true, Error: "", }) if err != nil { return err } } return nil } func (this *DNSNode) updateDDoS(rpcClient *rpc.RPCClient) error { resp, err := rpcClient.NSNodeRPC.FindNSNodeDDoSProtection(rpcClient.Context(), &pb.FindNSNodeDDoSProtectionRequest{}) if err != nil { return err } if len(resp.DdosProtectionJSON) == 0 { var cfg = dnsNodeConfig() if cfg != nil { cfg.DDoSProtection = nil } } else { var ddosProtectionConfig = &ddosconfigs.ProtectionConfig{} err = json.Unmarshal(resp.DdosProtectionJSON, ddosProtectionConfig) if err != nil { return fmt.Errorf("decode DDoS protection config failed: %w", err) } var cfg = dnsNodeConfig() if cfg != nil { cfg.DDoSProtection = ddosProtectionConfig } err = firewalls.SharedDDoSProtectionManager.Apply(ddosProtectionConfig) if err != nil { // 不阻塞 remotelogs.Error("NODE", "apply DDoS protection failed: "+err.Error()) } } return nil }