Files
waf-platform/EdgeHttpDNS/internal/nodes/snapshot_manager.go
2026-02-27 10:35:22 +08:00

161 lines
3.8 KiB
Go

package nodes
import (
"fmt"
"log"
"strings"
"sync"
"time"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeHttpDNS/internal/rpc"
)
type LoadedDomain struct {
Domain *pb.HTTPDNSDomain
Rules []*pb.HTTPDNSCustomRule
}
type LoadedApp struct {
App *pb.HTTPDNSApp
Domains map[string]*LoadedDomain // key: lower(domain)
}
type LoadedSnapshot struct {
LoadedAt int64
NodeID int64
ClusterID int64
Clusters map[int64]*pb.HTTPDNSCluster
Apps map[string]*LoadedApp // key: lower(appId)
}
type SnapshotManager struct {
quitCh <-chan struct{}
ticker *time.Ticker
locker sync.RWMutex
snapshot *LoadedSnapshot
}
func NewSnapshotManager(quitCh <-chan struct{}) *SnapshotManager {
return &SnapshotManager{
quitCh: quitCh,
ticker: time.NewTicker(30 * time.Second),
}
}
func (m *SnapshotManager) Start() {
defer m.ticker.Stop()
if err := m.RefreshNow("startup"); err != nil {
log.Println("[HTTPDNS_NODE][snapshot]initial refresh failed:", err.Error())
}
for {
select {
case <-m.ticker.C:
if err := m.RefreshNow("periodic"); err != nil {
log.Println("[HTTPDNS_NODE][snapshot]periodic refresh failed:", err.Error())
}
case <-m.quitCh:
return
}
}
}
func (m *SnapshotManager) Current() *LoadedSnapshot {
m.locker.RLock()
defer m.locker.RUnlock()
return m.snapshot
}
func (m *SnapshotManager) RefreshNow(reason string) error {
rpcClient, err := rpc.SharedRPC()
if err != nil {
return err
}
nodeResp, err := rpcClient.HTTPDNSNodeRPC.FindHTTPDNSNode(rpcClient.Context(), &pb.FindHTTPDNSNodeRequest{
NodeId: 0,
})
if err != nil {
return err
}
if nodeResp.GetNode() == nil {
return fmt.Errorf("httpdns node info not found")
}
clusterResp, err := rpcClient.HTTPDNSClusterRPC.FindAllHTTPDNSClusters(rpcClient.Context(), &pb.FindAllHTTPDNSClustersRequest{})
if err != nil {
return err
}
clusters := map[int64]*pb.HTTPDNSCluster{}
for _, cluster := range clusterResp.GetClusters() {
if cluster == nil || cluster.GetId() <= 0 {
continue
}
clusters[cluster.GetId()] = cluster
}
appResp, err := rpcClient.HTTPDNSAppRPC.FindAllHTTPDNSApps(rpcClient.Context(), &pb.FindAllHTTPDNSAppsRequest{})
if err != nil {
return err
}
apps := map[string]*LoadedApp{}
for _, app := range appResp.GetApps() {
if app == nil || app.GetId() <= 0 || len(app.GetAppId()) == 0 {
continue
}
domainResp, err := rpcClient.HTTPDNSDomainRPC.ListHTTPDNSDomainsWithAppId(rpcClient.Context(), &pb.ListHTTPDNSDomainsWithAppIdRequest{
AppDbId: app.GetId(),
})
if err != nil {
log.Println("[HTTPDNS_NODE][snapshot]list domains failed, appId:", app.GetAppId(), "err:", err.Error())
continue
}
domains := map[string]*LoadedDomain{}
for _, domain := range domainResp.GetDomains() {
if domain == nil || domain.GetId() <= 0 || len(domain.GetDomain()) == 0 {
continue
}
ruleResp, err := rpcClient.HTTPDNSRuleRPC.ListHTTPDNSCustomRulesWithDomainId(rpcClient.Context(), &pb.ListHTTPDNSCustomRulesWithDomainIdRequest{
DomainId: domain.GetId(),
})
if err != nil {
log.Println("[HTTPDNS_NODE][snapshot]list rules failed, domain:", domain.GetDomain(), "err:", err.Error())
continue
}
domains[strings.ToLower(strings.TrimSpace(domain.GetDomain()))] = &LoadedDomain{
Domain: domain,
Rules: ruleResp.GetRules(),
}
}
apps[strings.ToLower(strings.TrimSpace(app.GetAppId()))] = &LoadedApp{
App: app,
Domains: domains,
}
}
snapshot := &LoadedSnapshot{
LoadedAt: time.Now().Unix(),
NodeID: nodeResp.GetNode().GetId(),
ClusterID: nodeResp.GetNode().GetClusterId(),
Clusters: clusters,
Apps: apps,
}
m.locker.Lock()
m.snapshot = snapshot
m.locker.Unlock()
reportRuntimeLog("info", "config", "snapshot", "snapshot refreshed: "+reason, fmt.Sprintf("snapshot-%d", time.Now().UnixNano()))
return nil
}