Files
waf-platform/EdgeDNS/internal/nodes/dns_node.go

456 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//go:build plus
package nodes
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/iplibrary"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ddosconfigs"
"github.com/TeaOSLab/EdgeDNS/internal/agents"
"github.com/TeaOSLab/EdgeDNS/internal/configs"
teaconst "github.com/TeaOSLab/EdgeDNS/internal/const"
"github.com/TeaOSLab/EdgeDNS/internal/dbs"
"github.com/TeaOSLab/EdgeDNS/internal/events"
"github.com/TeaOSLab/EdgeDNS/internal/firewalls"
"github.com/TeaOSLab/EdgeDNS/internal/goman"
"github.com/TeaOSLab/EdgeDNS/internal/remotelogs"
"github.com/TeaOSLab/EdgeDNS/internal/rpc"
"github.com/TeaOSLab/EdgeDNS/internal/utils"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types"
"github.com/iwind/gosock/pkg/gosock"
"log"
"os"
"os/exec"
"os/signal"
"runtime"
"runtime/debug"
"syscall"
"time"
)
var DaemonIsOn = false
var DaemonPid = 0
var nodeTaskNotify = make(chan bool, 8)
var sharedDomainManager *DomainManager
var sharedRecordManager *RecordManager
var sharedRouteManager *RouteManager
var sharedKeyManager *KeyManager
var sharedNodeConfig = &dnsconfigs.NSNodeConfig{}
func NewDNSNode() *DNSNode {
return &DNSNode{
sock: gosock.NewTmpSock(teaconst.ProcessName),
}
}
type DNSNode struct {
sock *gosock.Sock
RPC *rpc.RPCClient
}
func (this *DNSNode) Start() {
// 设置netdns
// 这个需要放在所有网络访问的最前面
_ = os.Setenv("GODEBUG", "netdns=go")
// 判断是否在守护进程下
_, ok := os.LookupEnv("EdgeDaemon")
if ok {
remotelogs.Println("DNS_NODE", "start from daemon")
DaemonIsOn = true
DaemonPid = os.Getppid()
}
// 设置DNS解析库
err := os.Setenv("GODEBUG", "netdns=go")
if err != nil {
remotelogs.Error("DNS_NODE", "[DNS_RESOLVER]set env failed: "+err.Error())
}
// 处理异常
this.handlePanic()
// 监听signal
this.listenSignals()
// 本地Sock
err = this.listenSock()
if err != nil {
logs.Println("[DNS_NODE]" + err.Error())
return
}
// 启动IP库
remotelogs.Println("DNS_NODE", "initializing ip library ...")
err = iplibrary.InitPlus()
if err != nil {
remotelogs.Error("DNS_NODE", "initialize ip library failed: "+err.Error())
}
runtime.GC()
debug.FreeOSMemory()
// 触发事件
events.Notify(events.EventStart)
// 监控状态
go NewNodeStatusExecutor().Listen()
// 连接API
go NewAPIStream().Start()
// 启动
go this.start()
// Hold住进程
logs.Println("[DNS_NODE]started")
select {}
}
// Daemon 实现守护进程
func (this *DNSNode) Daemon() {
var isDebug = lists.ContainsString(os.Args, "debug")
for {
conn, err := this.sock.Dial()
if err != nil {
if isDebug {
log.Println("[DAEMON]starting ...")
}
// 尝试启动
err = func() error {
exe, err := os.Executable()
if err != nil {
return err
}
// 可以标记当前是从守护进程启动的
_ = os.Setenv("EdgeDaemon", "on")
_ = os.Setenv("EdgeBackground", "on")
var cmd = exec.Command(exe)
err = cmd.Start()
if err != nil {
return err
}
err = cmd.Wait()
if err != nil {
return err
}
return nil
}()
if err != nil {
if isDebug {
log.Println("[DAEMON]", err)
}
time.Sleep(1 * time.Second)
} else {
time.Sleep(5 * time.Second)
}
} else {
_ = conn.Close()
time.Sleep(5 * time.Second)
}
}
}
// Test 测试配置
func (this *DNSNode) Test() error {
// 检查是否能连接API
rpcClient, err := rpc.SharedRPC()
if err != nil {
return fmt.Errorf("test rpc failed: %w", err)
}
_, err = rpcClient.APINodeRPC.FindCurrentAPINodeVersion(rpcClient.Context(), &pb.FindCurrentAPINodeVersionRequest{})
if err != nil {
return fmt.Errorf("test rpc failed: %w", err)
}
return nil
}
// InstallSystemService 安装系统服务
func (this *DNSNode) InstallSystemService() error {
shortName := teaconst.SystemdServiceName
exe, err := os.Executable()
if err != nil {
return err
}
manager := utils.NewServiceManager(shortName, teaconst.ProductName)
err = manager.Install(exe, []string{})
if err != nil {
return err
}
return nil
}
// 监听一些信号
func (this *DNSNode) listenSignals() {
var queue = make(chan os.Signal, 8)
signal.Notify(queue, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGQUIT)
goman.New(func() {
for range queue {
time.Sleep(100 * time.Millisecond)
utils.Exit()
return
}
})
}
// 监听本地sock
func (this *DNSNode) listenSock() error {
// 检查是否在运行
if this.sock.IsListening() {
reply, err := this.sock.Send(&gosock.Command{Code: "pid"})
if err == nil {
return errors.New("error: the process is already running, pid: " + maps.NewMap(reply.Params).GetString("pid"))
} else {
return errors.New("error: the process is already running")
}
}
// 启动监听
go func() {
this.sock.OnCommand(func(cmd *gosock.Command) {
switch cmd.Code {
case "pid":
_ = cmd.Reply(&gosock.Command{
Code: "pid",
Params: map[string]interface{}{
"pid": os.Getpid(),
},
})
case "info":
exePath, _ := os.Executable()
_ = cmd.Reply(&gosock.Command{
Code: "info",
Params: map[string]interface{}{
"pid": os.Getpid(),
"version": teaconst.Version,
"path": exePath,
},
})
case "stop":
_ = cmd.ReplyOk()
// 退出主进程
events.Notify(events.EventQuit)
time.Sleep(100 * time.Millisecond)
os.Exit(0)
case "gc":
runtime.GC()
debug.FreeOSMemory()
_ = cmd.ReplyOk()
}
})
err := this.sock.Listen()
if err != nil {
logs.Println("NODE", err.Error())
}
}()
events.On(events.EventQuit, func() {
logs.Println("[DNS_NODE]", "quit unix sock")
_ = this.sock.Close()
})
return nil
}
// 启动
func (this *DNSNode) start() {
client, err := rpc.SharedRPC()
if err != nil {
remotelogs.Error("DNS_NODE", err.Error())
return
}
this.RPC = client
tryTimes := 0
var configJSON []byte
for {
resp, err := client.NSNodeRPC.FindCurrentNSNodeConfig(client.Context(), &pb.FindCurrentNSNodeConfigRequest{})
if err != nil {
tryTimes++
if tryTimes%10 == 0 {
remotelogs.Error("NODE", "read config from API failed: "+err.Error())
}
time.Sleep(1 * time.Second)
// 不做长时间的无意义的重试
if tryTimes > 1000 {
remotelogs.Error("NODE", "load failed: unable to read config from API")
return
}
} else {
configJSON = resp.NsNodeJSON
break
}
}
if len(configJSON) == 0 {
remotelogs.Error("NODE", "can not find node config")
return
}
var config = &dnsconfigs.NSNodeConfig{}
err = json.Unmarshal(configJSON, config)
if err != nil {
remotelogs.Error("NODE", "decode config failed: "+err.Error())
return
}
err = config.Init(context.TODO())
if err != nil {
remotelogs.Error("NODE", "init config failed: "+err.Error())
return
}
sharedNodeConfig = config
configs.SharedNodeConfig = config
events.Notify(events.EventReload)
sharedNodeConfigManager.reload(config)
apiConfig, _ := configs.SharedAPIConfig()
if apiConfig != nil {
apiConfig.NumberId = config.Id
}
var db = dbs.NewDB(Tea.Root + "/data/data-" + types.String(config.Id) + "-" + config.NodeId + "-v0.1.0.db")
err = db.Init()
if err != nil {
remotelogs.Error("NODE", "init database failed: "+err.Error())
return
}
go sharedNodeConfigManager.Start()
sharedDomainManager = NewDomainManager(db, config.ClusterId)
go sharedDomainManager.Start()
sharedRecordManager = NewRecordManager(db)
go sharedRecordManager.Start()
sharedRouteManager = NewRouteManager(db)
go sharedRouteManager.Start()
sharedKeyManager = NewKeyManager(db)
go sharedKeyManager.Start()
agents.SharedManager = agents.NewManager(db)
go agents.SharedManager.Start()
// 发送通知这里发送通知需要在DomainManager、RecordeManager等加载完成之后
time.Sleep(1 * time.Second)
events.Notify(events.EventLoaded)
// 启动循环
go this.loop()
}
// 更新配置Loop
func (this *DNSNode) loop() {
var ticker = time.NewTicker(60 * time.Second)
for {
select {
case <-ticker.C:
case <-nodeTaskNotify:
}
err := this.processTasks()
if err != nil {
if rpc.IsConnError(err) {
remotelogs.Debug("DNS_NODE", "process tasks: "+err.Error())
} else {
remotelogs.Error("DNS_NODE", "process tasks: "+err.Error())
}
}
}
}
// 处理任务
func (this *DNSNode) processTasks() error {
rpcClient, err := rpc.SharedRPC()
if err != nil {
return err
}
// 所有的任务
tasksResp, err := rpcClient.NodeTaskRPC.FindNodeTasks(rpcClient.Context(), &pb.FindNodeTasksRequest{})
if err != nil {
return err
}
for _, task := range tasksResp.NodeTasks {
switch task.Type {
case "nsConfigChanged":
sharedNodeConfigManager.NotifyChange()
case "nsDomainChanged":
sharedDomainManager.NotifyUpdate()
case "nsRecordChanged":
sharedRecordManager.NotifyUpdate()
case "nsRouteChanged":
sharedRouteManager.NotifyUpdate()
case "nsKeyChanged":
sharedKeyManager.NotifyUpdate()
case "nsDDoSProtectionChanged":
err := this.updateDDoS(rpcClient)
if err != nil {
remotelogs.Error("DNS_NODE", "apply DDoS config failed: "+err.Error())
}
}
_, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(rpcClient.Context(), &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id,
IsOk: true,
Error: "",
})
if err != nil {
return err
}
}
return nil
}
func (this *DNSNode) updateDDoS(rpcClient *rpc.RPCClient) error {
resp, err := rpcClient.NSNodeRPC.FindNSNodeDDoSProtection(rpcClient.Context(), &pb.FindNSNodeDDoSProtectionRequest{})
if err != nil {
return err
}
if len(resp.DdosProtectionJSON) == 0 {
if sharedNodeConfig != nil {
sharedNodeConfig.DDoSProtection = nil
}
} else {
var ddosProtectionConfig = &ddosconfigs.ProtectionConfig{}
err = json.Unmarshal(resp.DdosProtectionJSON, ddosProtectionConfig)
if err != nil {
return fmt.Errorf("decode DDoS protection config failed: %w", err)
}
if sharedNodeConfig != nil {
sharedNodeConfig.DDoSProtection = ddosProtectionConfig
}
err = firewalls.SharedDDoSProtectionManager.Apply(ddosProtectionConfig)
if err != nil {
// 不阻塞
remotelogs.Error("NODE", "apply DDoS protection failed: "+err.Error())
}
}
return nil
}