package nodes import ( "fmt" "log" "strings" "sync" "time" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeHttpDNS/internal/rpc" ) type LoadedDomain struct { Domain *pb.HTTPDNSDomain Rules []*pb.HTTPDNSCustomRule } type LoadedApp struct { App *pb.HTTPDNSApp Domains map[string]*LoadedDomain // key: lower(domain) } type LoadedSnapshot struct { LoadedAt int64 NodeID int64 ClusterID int64 Clusters map[int64]*pb.HTTPDNSCluster Apps map[string]*LoadedApp // key: lower(appId) } type SnapshotManager struct { quitCh <-chan struct{} ticker *time.Ticker locker sync.RWMutex snapshot *LoadedSnapshot timezone string } func NewSnapshotManager(quitCh <-chan struct{}) *SnapshotManager { return &SnapshotManager{ quitCh: quitCh, ticker: time.NewTicker(30 * time.Second), } } func (m *SnapshotManager) Start() { defer m.ticker.Stop() if err := m.RefreshNow("startup"); err != nil { log.Println("[HTTPDNS_NODE][snapshot]initial refresh failed:", err.Error()) } for { select { case <-m.ticker.C: if err := m.RefreshNow("periodic"); err != nil { log.Println("[HTTPDNS_NODE][snapshot]periodic refresh failed:", err.Error()) } case <-m.quitCh: return } } } func (m *SnapshotManager) Current() *LoadedSnapshot { m.locker.RLock() defer m.locker.RUnlock() return m.snapshot } func (m *SnapshotManager) RefreshNow(reason string) error { rpcClient, err := rpc.SharedRPC() if err != nil { return err } nodeResp, err := rpcClient.HTTPDNSNodeRPC.FindHTTPDNSNode(rpcClient.Context(), &pb.FindHTTPDNSNodeRequest{ NodeId: 0, }) if err != nil { return err } if nodeResp.GetNode() == nil { return fmt.Errorf("httpdns node info not found") } clusterResp, err := rpcClient.HTTPDNSClusterRPC.FindAllHTTPDNSClusters(rpcClient.Context(), &pb.FindAllHTTPDNSClustersRequest{}) if err != nil { return err } clusters := map[int64]*pb.HTTPDNSCluster{} for _, cluster := range clusterResp.GetClusters() { if cluster == nil || cluster.GetId() <= 0 { continue } clusters[cluster.GetId()] = cluster } appResp, err := rpcClient.HTTPDNSAppRPC.FindAllHTTPDNSApps(rpcClient.Context(), &pb.FindAllHTTPDNSAppsRequest{}) if err != nil { return err } apps := map[string]*LoadedApp{} for _, app := range appResp.GetApps() { if app == nil || app.GetId() <= 0 || len(app.GetAppId()) == 0 { continue } domainResp, err := rpcClient.HTTPDNSDomainRPC.ListHTTPDNSDomainsWithAppId(rpcClient.Context(), &pb.ListHTTPDNSDomainsWithAppIdRequest{ AppDbId: app.GetId(), }) if err != nil { log.Println("[HTTPDNS_NODE][snapshot]list domains failed, appId:", app.GetAppId(), "err:", err.Error()) continue } domains := map[string]*LoadedDomain{} for _, domain := range domainResp.GetDomains() { if domain == nil || domain.GetId() <= 0 || len(domain.GetDomain()) == 0 { continue } ruleResp, err := rpcClient.HTTPDNSRuleRPC.ListHTTPDNSCustomRulesWithDomainId(rpcClient.Context(), &pb.ListHTTPDNSCustomRulesWithDomainIdRequest{ DomainId: domain.GetId(), }) if err != nil { log.Println("[HTTPDNS_NODE][snapshot]list rules failed, domain:", domain.GetDomain(), "err:", err.Error()) continue } domains[strings.ToLower(strings.TrimSpace(domain.GetDomain()))] = &LoadedDomain{ Domain: domain, Rules: ruleResp.GetRules(), } } apps[strings.ToLower(strings.TrimSpace(app.GetAppId()))] = &LoadedApp{ App: app, Domains: domains, } } snapshot := &LoadedSnapshot{ LoadedAt: time.Now().UnixNano(), NodeID: nodeResp.GetNode().GetId(), ClusterID: nodeResp.GetNode().GetClusterId(), Clusters: clusters, Apps: apps, } m.locker.Lock() m.snapshot = snapshot m.locker.Unlock() reportRuntimeLog("info", "config", "snapshot", "snapshot refreshed: "+reason, fmt.Sprintf("snapshot-%d", time.Now().UnixNano())) // timezone sync - prefer current node's cluster timezone var timeZone string if snapshot.ClusterID > 0 { if cluster := clusters[snapshot.ClusterID]; cluster != nil && len(cluster.GetTimeZone()) > 0 { timeZone = cluster.GetTimeZone() } } // fallback to any non-empty cluster timezone for compatibility if len(timeZone) == 0 { for _, cluster := range clusters { if cluster != nil && len(cluster.GetTimeZone()) > 0 { timeZone = cluster.GetTimeZone() break } } } if len(timeZone) == 0 { timeZone = "Asia/Shanghai" } if m.timezone != timeZone { location, err := time.LoadLocation(timeZone) if err != nil { log.Println("[HTTPDNS_NODE][TIMEZONE]change time zone failed:", err.Error()) } else { log.Println("[HTTPDNS_NODE][TIMEZONE]change time zone to '" + timeZone + "'") time.Local = location m.timezone = timeZone } } return nil }