// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. package nodes import ( "bytes" "context" "encoding/json" "fmt" "github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeDNS/internal/accesslogs" "github.com/TeaOSLab/EdgeDNS/internal/configs" teaconst "github.com/TeaOSLab/EdgeDNS/internal/const" "github.com/TeaOSLab/EdgeDNS/internal/events" "github.com/TeaOSLab/EdgeDNS/internal/remotelogs" "github.com/TeaOSLab/EdgeDNS/internal/rpc" "github.com/TeaOSLab/EdgeDNS/internal/utils" "sort" "time" ) var sharedNodeConfigManager = NewNodeConfigManager() type NodeConfigManager struct { notifyChan chan bool ticker *time.Ticker timezone string lastConfigJSON []byte lastAPINodeVersion int64 lastAPINodeAddrs []string // 以前的API节点地址 } func NewNodeConfigManager() *NodeConfigManager { return &NodeConfigManager{ notifyChan: make(chan bool, 2), ticker: time.NewTicker(3 * time.Minute), } } func (this *NodeConfigManager) Start() { for { select { case <-this.ticker.C: case <-this.notifyChan: } err := this.Loop() if err != nil { if rpc.IsConnError(err) { remotelogs.Debug("NODE_CONFIG_MANAGER", err.Error()) } else { remotelogs.Error("NODE_CONFIG_MANAGER", err.Error()) } } } } func (this *NodeConfigManager) Loop() error { client, err := rpc.SharedRPC() if err != nil { return err } resp, err := client.NSNodeRPC.FindCurrentNSNodeConfig(client.Context(), &pb.FindCurrentNSNodeConfigRequest{}) if err != nil { return err } var configJSON = resp.NsNodeJSON if len(configJSON) == 0 { return nil } // 检查是否有变化 if bytes.Equal(this.lastConfigJSON, configJSON) { return nil } this.lastConfigJSON = configJSON var config = &dnsconfigs.NSNodeConfig{} err = json.Unmarshal(configJSON, config) if err != nil { return err } err = config.Init(context.TODO()) if err != nil { return err } sharedNodeConfig.Store(config) configs.SharedNodeConfig.Store(config) this.reload(config) events.Notify(events.EventReload) return nil } func (this *NodeConfigManager) NotifyChange() { select { case this.notifyChan <- true: default: } } // 刷新配置 func (this *NodeConfigManager) reload(config *dnsconfigs.NSNodeConfig) { teaconst.IsPlus = config.IsPlus accesslogs.SharedDNSFileWriter().SetDirByPolicyPath(config.AccessLogFilePath) accesslogs.SharedDNSFileWriter().SetRotateConfig(config.AccessLogRotate) needWriteFile := config.AccessLogWriteTargets == nil || config.AccessLogWriteTargets.File || config.AccessLogWriteTargets.ClickHouse if needWriteFile { _ = accesslogs.SharedDNSFileWriter().EnsureInit() } else { _ = accesslogs.SharedDNSFileWriter().Close() } // timezone var timeZone = config.TimeZone if len(timeZone) == 0 { timeZone = "Asia/Shanghai" } if this.timezone != timeZone { location, err := time.LoadLocation(timeZone) if err != nil { remotelogs.Error("TIMEZONE", "change time zone failed: "+err.Error()) return } remotelogs.Println("TIMEZONE", "change time zone to '"+timeZone+"'") time.Local = location this.timezone = timeZone } // API Node地址,这里不限制是否为空,因为在为空时仍然要有对应的处理 this.changeAPINodeAddrs(config.APINodeAddrs) } // 检查API节点地址 func (this *NodeConfigManager) changeAPINodeAddrs(apiNodeAddrs []*serverconfigs.NetworkAddressConfig) { var addrs = []string{} for _, addr := range apiNodeAddrs { err := addr.Init() if err != nil { remotelogs.Error("NODE", "changeAPINodeAddrs: validate api node address '"+configutils.QuoteIP(addr.Host)+":"+addr.PortRange+"' failed: "+err.Error()) } else { addrs = append(addrs, addr.FullAddresses()...) } } sort.Strings(addrs) if utils.EqualStrings(this.lastAPINodeAddrs, addrs) { return } this.lastAPINodeAddrs = addrs config, err := configs.LoadAPIConfig() if err != nil { remotelogs.Error("NODE", "changeAPINodeAddrs: "+err.Error()) return } if config == nil { return } var oldEndpoints = config.RPCEndpoints rpcClient, err := rpc.SharedRPC() if err != nil { return } if len(addrs) > 0 { this.lastAPINodeVersion++ var v = this.lastAPINodeVersion // 异步检测,防止阻塞 go func(v int64) { defer func() { if r := recover(); r != nil { remotelogs.Error("NODE", fmt.Sprintf("goroutine panic: %v", r)) } }() // 测试新的API节点地址 if rpcClient.TestEndpoints(addrs) { config.RPCEndpoints = addrs } else { config.RPCEndpoints = oldEndpoints this.lastAPINodeAddrs = nil // 恢复为空,以便于下次更新重试 } // 检查测试中间有无新的变更 if v != this.lastAPINodeVersion { return } err = rpcClient.UpdateConfig(config) if err != nil { remotelogs.Error("NODE", "changeAPINodeAddrs: update rpc config failed: "+err.Error()) } }(v) return } err = rpcClient.UpdateConfig(config) if err != nil { remotelogs.Error("NODE", "changeAPINodeAddrs: update rpc config failed: "+err.Error()) } }