Files
waf-platform/EdgeDNS/internal/nodes/manager_node_config.go
2026-02-13 22:36:17 +08:00

209 lines
5.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nodes
import (
"bytes"
"context"
"encoding/json"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeDNS/internal/accesslogs"
"github.com/TeaOSLab/EdgeDNS/internal/configs"
teaconst "github.com/TeaOSLab/EdgeDNS/internal/const"
"github.com/TeaOSLab/EdgeDNS/internal/events"
"github.com/TeaOSLab/EdgeDNS/internal/remotelogs"
"github.com/TeaOSLab/EdgeDNS/internal/rpc"
"github.com/TeaOSLab/EdgeDNS/internal/utils"
"sort"
"time"
)
var sharedNodeConfigManager = NewNodeConfigManager()
type NodeConfigManager struct {
notifyChan chan bool
ticker *time.Ticker
timezone string
lastConfigJSON []byte
lastAPINodeVersion int64
lastAPINodeAddrs []string // 以前的API节点地址
}
func NewNodeConfigManager() *NodeConfigManager {
return &NodeConfigManager{
notifyChan: make(chan bool, 2),
ticker: time.NewTicker(3 * time.Minute),
}
}
func (this *NodeConfigManager) Start() {
for {
select {
case <-this.ticker.C:
case <-this.notifyChan:
}
err := this.Loop()
if err != nil {
if rpc.IsConnError(err) {
remotelogs.Debug("NODE_CONFIG_MANAGER", err.Error())
} else {
remotelogs.Error("NODE_CONFIG_MANAGER", err.Error())
}
}
}
}
func (this *NodeConfigManager) Loop() error {
client, err := rpc.SharedRPC()
if err != nil {
return err
}
resp, err := client.NSNodeRPC.FindCurrentNSNodeConfig(client.Context(), &pb.FindCurrentNSNodeConfigRequest{})
if err != nil {
return err
}
var configJSON = resp.NsNodeJSON
if len(configJSON) == 0 {
return nil
}
// 检查是否有变化
if bytes.Equal(this.lastConfigJSON, configJSON) {
return nil
}
this.lastConfigJSON = configJSON
var config = &dnsconfigs.NSNodeConfig{}
err = json.Unmarshal(configJSON, config)
if err != nil {
return err
}
err = config.Init(context.TODO())
if err != nil {
return err
}
sharedNodeConfig = config
configs.SharedNodeConfig = config
this.reload(config)
events.Notify(events.EventReload)
return nil
}
func (this *NodeConfigManager) NotifyChange() {
select {
case this.notifyChan <- true:
default:
}
}
// 刷新配置
func (this *NodeConfigManager) reload(config *dnsconfigs.NSNodeConfig) {
teaconst.IsPlus = config.IsPlus
accesslogs.SharedDNSFileWriter().SetDirByPolicyPath(config.AccessLogFilePath)
accesslogs.SharedDNSFileWriter().SetRotateConfig(config.AccessLogRotate)
needWriteFile := config.AccessLogWriteTargets == nil || config.AccessLogWriteTargets.File || config.AccessLogWriteTargets.ClickHouse
if needWriteFile {
_ = accesslogs.SharedDNSFileWriter().EnsureInit()
} else {
_ = accesslogs.SharedDNSFileWriter().Close()
}
// timezone
var timeZone = config.TimeZone
if len(timeZone) == 0 {
timeZone = "Asia/Shanghai"
}
if this.timezone != timeZone {
location, err := time.LoadLocation(timeZone)
if err != nil {
remotelogs.Error("TIMEZONE", "change time zone failed: "+err.Error())
return
}
remotelogs.Println("TIMEZONE", "change time zone to '"+timeZone+"'")
time.Local = location
this.timezone = timeZone
}
// API Node地址这里不限制是否为空因为在为空时仍然要有对应的处理
this.changeAPINodeAddrs(config.APINodeAddrs)
}
// 检查API节点地址
func (this *NodeConfigManager) changeAPINodeAddrs(apiNodeAddrs []*serverconfigs.NetworkAddressConfig) {
var addrs = []string{}
for _, addr := range apiNodeAddrs {
err := addr.Init()
if err != nil {
remotelogs.Error("NODE", "changeAPINodeAddrs: validate api node address '"+configutils.QuoteIP(addr.Host)+":"+addr.PortRange+"' failed: "+err.Error())
} else {
addrs = append(addrs, addr.FullAddresses()...)
}
}
sort.Strings(addrs)
if utils.EqualStrings(this.lastAPINodeAddrs, addrs) {
return
}
this.lastAPINodeAddrs = addrs
config, err := configs.LoadAPIConfig()
if err != nil {
remotelogs.Error("NODE", "changeAPINodeAddrs: "+err.Error())
return
}
if config == nil {
return
}
var oldEndpoints = config.RPCEndpoints
rpcClient, err := rpc.SharedRPC()
if err != nil {
return
}
if len(addrs) > 0 {
this.lastAPINodeVersion++
var v = this.lastAPINodeVersion
// 异步检测,防止阻塞
go func(v int64) {
// 测试新的API节点地址
if rpcClient.TestEndpoints(addrs) {
config.RPCEndpoints = addrs
} else {
config.RPCEndpoints = oldEndpoints
this.lastAPINodeAddrs = nil // 恢复为空,以便于下次更新重试
}
// 检查测试中间有无新的变更
if v != this.lastAPINodeVersion {
return
}
err = rpcClient.UpdateConfig(config)
if err != nil {
remotelogs.Error("NODE", "changeAPINodeAddrs: update rpc config failed: "+err.Error())
}
}(v)
return
}
err = rpcClient.UpdateConfig(config)
if err != nil {
remotelogs.Error("NODE", "changeAPINodeAddrs: update rpc config failed: "+err.Error())
}
}