502 lines
11 KiB
Go
502 lines
11 KiB
Go
//go:build plus
|
|
|
|
package nodes
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs"
|
|
"github.com/TeaOSLab/EdgeCommon/pkg/iplibrary"
|
|
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
|
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ddosconfigs"
|
|
"github.com/TeaOSLab/EdgeDNS/internal/agents"
|
|
"github.com/TeaOSLab/EdgeDNS/internal/configs"
|
|
teaconst "github.com/TeaOSLab/EdgeDNS/internal/const"
|
|
"github.com/TeaOSLab/EdgeDNS/internal/dbs"
|
|
"github.com/TeaOSLab/EdgeDNS/internal/events"
|
|
"github.com/TeaOSLab/EdgeDNS/internal/firewalls"
|
|
"github.com/TeaOSLab/EdgeDNS/internal/goman"
|
|
"github.com/TeaOSLab/EdgeDNS/internal/remotelogs"
|
|
"github.com/TeaOSLab/EdgeDNS/internal/rpc"
|
|
"github.com/TeaOSLab/EdgeDNS/internal/utils"
|
|
"github.com/iwind/TeaGo/Tea"
|
|
"github.com/iwind/TeaGo/lists"
|
|
"github.com/iwind/TeaGo/logs"
|
|
"github.com/iwind/TeaGo/maps"
|
|
"github.com/iwind/TeaGo/types"
|
|
"github.com/iwind/gosock/pkg/gosock"
|
|
"log"
|
|
"os"
|
|
"os/exec"
|
|
"os/signal"
|
|
"regexp"
|
|
"runtime"
|
|
"runtime/debug"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
)
|
|
|
|
var DaemonIsOn = false
|
|
var DaemonPid = 0
|
|
var nodeTaskNotify = make(chan bool, 8)
|
|
|
|
var sharedDomainManager *DomainManager
|
|
var sharedRecordManager *RecordManager
|
|
var sharedRouteManager *RouteManager
|
|
var sharedKeyManager *KeyManager
|
|
|
|
var sharedNodeConfig atomic.Pointer[dnsconfigs.NSNodeConfig]
|
|
|
|
func init() {
|
|
// 初始化默认空配置
|
|
sharedNodeConfig.Store(&dnsconfigs.NSNodeConfig{})
|
|
}
|
|
|
|
// dnsNodeConfig 返回当前 DNS 节点配置(并发安全)
|
|
func dnsNodeConfig() *dnsconfigs.NSNodeConfig {
|
|
return sharedNodeConfig.Load()
|
|
}
|
|
|
|
func NewDNSNode() *DNSNode {
|
|
return &DNSNode{
|
|
sock: gosock.NewTmpSock(teaconst.ProcessName),
|
|
}
|
|
}
|
|
|
|
type DNSNode struct {
|
|
sock *gosock.Sock
|
|
|
|
RPC *rpc.RPCClient
|
|
}
|
|
|
|
func (this *DNSNode) Start() {
|
|
// 设置netdns
|
|
// 这个需要放在所有网络访问的最前面
|
|
_ = os.Setenv("GODEBUG", "netdns=go")
|
|
|
|
// 判断是否在守护进程下
|
|
_, ok := os.LookupEnv("EdgeDaemon")
|
|
if ok {
|
|
remotelogs.Println("DNS_NODE", "start from daemon")
|
|
DaemonIsOn = true
|
|
DaemonPid = os.Getppid()
|
|
}
|
|
|
|
// 设置DNS解析库
|
|
err := os.Setenv("GODEBUG", "netdns=go")
|
|
if err != nil {
|
|
remotelogs.Error("DNS_NODE", "[DNS_RESOLVER]set env failed: "+err.Error())
|
|
}
|
|
|
|
// 处理异常
|
|
this.handlePanic()
|
|
|
|
// 监听signal
|
|
this.listenSignals()
|
|
|
|
// 本地Sock
|
|
err = this.listenSock()
|
|
if err != nil {
|
|
logs.Println("[DNS_NODE]" + err.Error())
|
|
return
|
|
}
|
|
|
|
// 启动IP库
|
|
remotelogs.Println("DNS_NODE", "initializing ip library ...")
|
|
err = iplibrary.InitPlus()
|
|
if err != nil {
|
|
remotelogs.Error("DNS_NODE", "initialize ip library failed: "+err.Error())
|
|
}
|
|
runtime.GC()
|
|
debug.FreeOSMemory()
|
|
|
|
// 触发事件
|
|
events.Notify(events.EventStart)
|
|
|
|
// 监控状态
|
|
goman.New(func() {
|
|
NewNodeStatusExecutor().Listen()
|
|
})
|
|
|
|
// 连接API
|
|
goman.New(func() {
|
|
NewAPIStream().Start()
|
|
})
|
|
|
|
// 启动
|
|
goman.New(func() {
|
|
this.start()
|
|
})
|
|
|
|
// Hold住进程
|
|
logs.Println("[DNS_NODE]started")
|
|
select {}
|
|
}
|
|
|
|
// Daemon 实现守护进程
|
|
func (this *DNSNode) Daemon() {
|
|
var isDebug = lists.ContainsString(os.Args, "debug")
|
|
for {
|
|
conn, err := this.sock.Dial()
|
|
if err != nil {
|
|
if isDebug {
|
|
log.Println("[DAEMON]starting ...")
|
|
}
|
|
|
|
// 尝试启动
|
|
err = func() error {
|
|
exe, err := os.Executable()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 可以标记当前是从守护进程启动的
|
|
_ = os.Setenv("EdgeDaemon", "on")
|
|
_ = os.Setenv("EdgeBackground", "on")
|
|
|
|
var cmd = exec.Command(exe)
|
|
err = cmd.Start()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = cmd.Wait()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}()
|
|
|
|
if err != nil {
|
|
if isDebug {
|
|
log.Println("[DAEMON]", err)
|
|
}
|
|
time.Sleep(1 * time.Second)
|
|
} else {
|
|
time.Sleep(5 * time.Second)
|
|
}
|
|
} else {
|
|
_ = conn.Close()
|
|
time.Sleep(5 * time.Second)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Test 测试配置
|
|
func (this *DNSNode) Test() error {
|
|
// 检查是否能连接API
|
|
rpcClient, err := rpc.SharedRPC()
|
|
if err != nil {
|
|
return fmt.Errorf("test rpc failed: %w", err)
|
|
}
|
|
_, err = rpcClient.APINodeRPC.FindCurrentAPINodeVersion(rpcClient.Context(), &pb.FindCurrentAPINodeVersionRequest{})
|
|
if err != nil {
|
|
return fmt.Errorf("test rpc failed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// InstallSystemService 安装系统服务
|
|
func (this *DNSNode) InstallSystemService() error {
|
|
shortName := teaconst.SystemdServiceName
|
|
|
|
exe, err := os.Executable()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
manager := utils.NewServiceManager(shortName, teaconst.ProductName)
|
|
err = manager.Install(exe, []string{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// 监听一些信号
|
|
func (this *DNSNode) listenSignals() {
|
|
var queue = make(chan os.Signal, 8)
|
|
signal.Notify(queue, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGQUIT)
|
|
goman.New(func() {
|
|
for range queue {
|
|
time.Sleep(100 * time.Millisecond)
|
|
utils.Exit()
|
|
return
|
|
}
|
|
})
|
|
}
|
|
|
|
// 监听本地sock
|
|
func (this *DNSNode) listenSock() error {
|
|
// 检查是否在运行
|
|
if this.sock.IsListening() {
|
|
reply, err := this.sock.Send(&gosock.Command{Code: "pid"})
|
|
if err == nil {
|
|
return errors.New("error: the process is already running, pid: " + maps.NewMap(reply.Params).GetString("pid"))
|
|
} else {
|
|
return errors.New("error: the process is already running")
|
|
}
|
|
}
|
|
|
|
// 启动监听
|
|
goman.New(func() {
|
|
this.sock.OnCommand(func(cmd *gosock.Command) {
|
|
switch cmd.Code {
|
|
case "pid":
|
|
_ = cmd.Reply(&gosock.Command{
|
|
Code: "pid",
|
|
Params: map[string]interface{}{
|
|
"pid": os.Getpid(),
|
|
},
|
|
})
|
|
case "info":
|
|
exePath, _ := os.Executable()
|
|
_ = cmd.Reply(&gosock.Command{
|
|
Code: "info",
|
|
Params: map[string]interface{}{
|
|
"pid": os.Getpid(),
|
|
"version": teaconst.Version,
|
|
"path": exePath,
|
|
},
|
|
})
|
|
case "stop":
|
|
_ = cmd.ReplyOk()
|
|
|
|
// 退出主进程
|
|
events.Notify(events.EventQuit)
|
|
time.Sleep(100 * time.Millisecond)
|
|
os.Exit(0)
|
|
case "gc":
|
|
runtime.GC()
|
|
debug.FreeOSMemory()
|
|
_ = cmd.ReplyOk()
|
|
}
|
|
})
|
|
|
|
err := this.sock.Listen()
|
|
if err != nil {
|
|
logs.Println("NODE", err.Error())
|
|
}
|
|
})
|
|
|
|
events.On(events.EventQuit, func() {
|
|
logs.Println("[DNS_NODE]", "quit unix sock")
|
|
_ = this.sock.Close()
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// 启动
|
|
func (this *DNSNode) start() {
|
|
client, err := rpc.SharedRPC()
|
|
if err != nil {
|
|
remotelogs.Error("DNS_NODE", err.Error())
|
|
return
|
|
}
|
|
this.RPC = client
|
|
tryTimes := 0
|
|
var configJSON []byte
|
|
for {
|
|
resp, err := client.NSNodeRPC.FindCurrentNSNodeConfig(client.Context(), &pb.FindCurrentNSNodeConfigRequest{})
|
|
if err != nil {
|
|
tryTimes++
|
|
if tryTimes%10 == 0 {
|
|
remotelogs.Error("NODE", "read config from API failed: "+err.Error())
|
|
}
|
|
time.Sleep(1 * time.Second)
|
|
|
|
// 不做长时间的无意义的重试
|
|
if tryTimes > 1000 {
|
|
remotelogs.Error("NODE", "load failed: unable to read config from API")
|
|
return
|
|
}
|
|
} else {
|
|
configJSON = resp.NsNodeJSON
|
|
break
|
|
}
|
|
}
|
|
if len(configJSON) == 0 {
|
|
remotelogs.Error("NODE", "can not find node config")
|
|
return
|
|
}
|
|
var config = &dnsconfigs.NSNodeConfig{}
|
|
err = json.Unmarshal(configJSON, config)
|
|
if err != nil {
|
|
remotelogs.Error("NODE", "decode config failed: "+err.Error())
|
|
return
|
|
}
|
|
err = config.Init(context.TODO())
|
|
if err != nil {
|
|
remotelogs.Error("NODE", "init config failed: "+err.Error())
|
|
return
|
|
}
|
|
|
|
sharedNodeConfig.Store(config)
|
|
|
|
configs.SharedNodeConfig.Store(config)
|
|
events.Notify(events.EventReload)
|
|
|
|
sharedNodeConfigManager.reload(config)
|
|
|
|
apiConfig, _ := configs.SharedAPIConfig()
|
|
if apiConfig != nil {
|
|
apiConfig.NumberId = config.Id
|
|
}
|
|
|
|
// 验证 NodeId 防止路径遍历
|
|
var nodeIdRegexp = regexp.MustCompile(`^[a-zA-Z0-9_\-]+$`)
|
|
if !nodeIdRegexp.MatchString(config.NodeId) {
|
|
remotelogs.Error("NODE", "invalid NodeId: contains illegal characters")
|
|
return
|
|
}
|
|
|
|
var db = dbs.NewDB(Tea.Root + "/data/data-" + types.String(config.Id) + "-" + config.NodeId + "-v0.1.0.db")
|
|
err = db.Init()
|
|
if err != nil {
|
|
remotelogs.Error("NODE", "init database failed: "+err.Error())
|
|
return
|
|
}
|
|
|
|
goman.New(func() {
|
|
sharedNodeConfigManager.Start()
|
|
})
|
|
|
|
sharedDomainManager = NewDomainManager(db, config.ClusterId)
|
|
goman.New(func() {
|
|
sharedDomainManager.Start()
|
|
})
|
|
|
|
sharedRecordManager = NewRecordManager(db)
|
|
goman.New(func() {
|
|
sharedRecordManager.Start()
|
|
})
|
|
|
|
sharedRouteManager = NewRouteManager(db)
|
|
goman.New(func() {
|
|
sharedRouteManager.Start()
|
|
})
|
|
|
|
sharedKeyManager = NewKeyManager(db)
|
|
goman.New(func() {
|
|
sharedKeyManager.Start()
|
|
})
|
|
|
|
agents.SharedManager = agents.NewManager(db)
|
|
goman.New(func() {
|
|
agents.SharedManager.Start()
|
|
})
|
|
|
|
// 等待所有 Manager 初始加载完成后再发送通知
|
|
<-sharedDomainManager.readyCh
|
|
<-sharedRecordManager.readyCh
|
|
<-sharedRouteManager.readyCh
|
|
<-sharedKeyManager.readyCh
|
|
<-agents.SharedManager.ReadyCh
|
|
events.Notify(events.EventLoaded)
|
|
|
|
// 启动循环
|
|
goman.New(func() {
|
|
this.loop()
|
|
})
|
|
}
|
|
|
|
// 更新配置Loop
|
|
func (this *DNSNode) loop() {
|
|
var ticker = time.NewTicker(60 * time.Second)
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
case <-nodeTaskNotify:
|
|
}
|
|
|
|
err := this.processTasks()
|
|
if err != nil {
|
|
if rpc.IsConnError(err) {
|
|
remotelogs.Debug("DNS_NODE", "process tasks: "+err.Error())
|
|
} else {
|
|
remotelogs.Error("DNS_NODE", "process tasks: "+err.Error())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// 处理任务
|
|
func (this *DNSNode) processTasks() error {
|
|
rpcClient, err := rpc.SharedRPC()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 所有的任务
|
|
tasksResp, err := rpcClient.NodeTaskRPC.FindNodeTasks(rpcClient.Context(), &pb.FindNodeTasksRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, task := range tasksResp.NodeTasks {
|
|
switch task.Type {
|
|
case "nsConfigChanged":
|
|
sharedNodeConfigManager.NotifyChange()
|
|
case "nsDomainChanged":
|
|
sharedDomainManager.NotifyUpdate()
|
|
case "nsRecordChanged":
|
|
sharedRecordManager.NotifyUpdate()
|
|
case "nsRouteChanged":
|
|
sharedRouteManager.NotifyUpdate()
|
|
case "nsKeyChanged":
|
|
sharedKeyManager.NotifyUpdate()
|
|
case "nsDDoSProtectionChanged":
|
|
err := this.updateDDoS(rpcClient)
|
|
if err != nil {
|
|
remotelogs.Error("DNS_NODE", "apply DDoS config failed: "+err.Error())
|
|
}
|
|
}
|
|
|
|
_, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(rpcClient.Context(), &pb.ReportNodeTaskDoneRequest{
|
|
NodeTaskId: task.Id,
|
|
IsOk: true,
|
|
Error: "",
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (this *DNSNode) updateDDoS(rpcClient *rpc.RPCClient) error {
|
|
resp, err := rpcClient.NSNodeRPC.FindNSNodeDDoSProtection(rpcClient.Context(), &pb.FindNSNodeDDoSProtectionRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(resp.DdosProtectionJSON) == 0 {
|
|
var cfg = dnsNodeConfig()
|
|
if cfg != nil {
|
|
cfg.DDoSProtection = nil
|
|
}
|
|
} else {
|
|
var ddosProtectionConfig = &ddosconfigs.ProtectionConfig{}
|
|
err = json.Unmarshal(resp.DdosProtectionJSON, ddosProtectionConfig)
|
|
if err != nil {
|
|
return fmt.Errorf("decode DDoS protection config failed: %w", err)
|
|
}
|
|
|
|
var cfg = dnsNodeConfig()
|
|
if cfg != nil {
|
|
cfg.DDoSProtection = ddosProtectionConfig
|
|
}
|
|
|
|
err = firewalls.SharedDDoSProtectionManager.Apply(ddosProtectionConfig)
|
|
if err != nil {
|
|
// 不阻塞
|
|
remotelogs.Error("NODE", "apply DDoS protection failed: "+err.Error())
|
|
}
|
|
}
|
|
return nil
|
|
}
|