Files
waf-platform/EdgeAPI/internal/accesslogs/storage_manager.go
2026-02-10 23:43:05 +08:00

241 lines
6.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
//go:build plus
package accesslogs
import (
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/types"
"sync"
"time"
)
var SharedStorageManager = NewStorageManager()
type StorageManager struct {
storageMap map[int64]StorageInterface // policyId => Storage
publicPolicyId int64
disableDefaultDB bool
writeTargets *serverconfigs.AccessLogWriteTargets // 公用策略的写入目标
locker sync.Mutex
}
func NewStorageManager() *StorageManager {
return &StorageManager{
storageMap: map[int64]StorageInterface{},
}
}
func (this *StorageManager) Start() {
var ticker = time.NewTicker(1 * time.Minute)
if Tea.IsTesting() {
ticker = time.NewTicker(5 * time.Second)
}
// 启动时执行一次
var err = this.Loop()
if err != nil {
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "update error: "+err.Error())
}
// 循环执行
for range ticker.C {
err := this.Loop()
if err != nil {
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "update error: "+err.Error())
}
}
}
// Loop 更新
func (this *StorageManager) Loop() error {
var tx *dbs.Tx
// 共用策略
publicPolicyId, err := models.SharedHTTPAccessLogPolicyDAO.FindCurrentPublicPolicyId(tx)
if err != nil {
return err
}
this.publicPolicyId = publicPolicyId
// 所有策略
policies, err := models.SharedHTTPAccessLogPolicyDAO.FindAllEnabledAndOnPolicies(tx)
if err != nil {
return err
}
var foundPolicy = false
var policyIds = []int64{}
for _, policy := range policies {
if policy.IsOn {
policyIds = append(policyIds, int64(policy.Id))
if int64(policy.Id) == publicPolicyId {
this.disableDefaultDB = policy.DisableDefaultDB
this.writeTargets = serverconfigs.ParseWriteTargetsFromPolicy(policy.WriteTargets, policy.Type, policy.DisableDefaultDB)
foundPolicy = true
}
}
}
if !foundPolicy {
this.disableDefaultDB = false
this.writeTargets = nil
}
this.locker.Lock()
defer this.locker.Unlock()
// 关闭不用的
for policyId, storage := range this.storageMap {
if !lists.ContainsInt64(policyIds, policyId) {
err = storage.Close()
if err != nil {
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "close '"+types.String(policyId)+"' failed: "+err.Error())
}
delete(this.storageMap, policyId)
remotelogs.Println("ACCESS_LOG_STORAGE_MANAGER", "remove '"+types.String(policyId)+"'")
}
}
for _, policy := range policies {
var policyId = int64(policy.Id)
storage, ok := this.storageMap[policyId]
if ok {
// 检查配置是否有变更
if types.Int(policy.Version) != storage.Version() {
err = storage.Close()
if err != nil {
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "close policy '"+types.String(policyId)+"' failed: "+err.Error())
// 继续往下执行
}
if len(policy.Options) > 0 {
err = json.Unmarshal(policy.Options, storage.Config())
if err != nil {
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "unmarshal policy '"+types.String(policyId)+"' config failed: "+err.Error())
storage.SetOk(false)
continue
}
}
storage.SetVersion(types.Int(policy.Version))
storage.SetFirewallOnly(policy.FirewallOnly == 1)
err = storage.Start()
if err != nil {
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"' failed: "+err.Error())
continue
}
storage.SetOk(true)
remotelogs.Println("ACCESS_LOG_STORAGE_MANAGER", "restart policy '"+types.String(policyId)+"'")
}
} else {
storage, err = this.createStorage(policy.Type, policy.Options)
if err != nil {
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "create policy '"+types.String(policyId)+"' failed: "+err.Error())
continue
}
storage.SetVersion(types.Int(policy.Version))
storage.SetFirewallOnly(policy.FirewallOnly == 1)
this.storageMap[policyId] = storage
err = storage.Start()
if err != nil {
remotelogs.Error("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"' failed: "+err.Error())
continue
}
storage.SetOk(true)
remotelogs.Println("ACCESS_LOG_STORAGE_MANAGER", "start policy '"+types.String(policyId)+"'")
}
}
return nil
}
func (this *StorageManager) DisableDefaultDB() bool {
return this.disableDefaultDB
}
// WriteMySQL 公用策略是否写入 MySQL以 writeTargets 为准,无则用 disableDefaultDB
func (this *StorageManager) WriteMySQL() bool {
if this.writeTargets != nil {
return this.writeTargets.MySQL
}
return !this.disableDefaultDB
}
// WriteClickHouse 公用策略是否写入 ClickHouse文件+Fluent Bit 或后续 API 直写)
func (this *StorageManager) WriteClickHouse() bool {
if this.writeTargets != nil {
return this.writeTargets.ClickHouse
}
return false
}
// WriteTargets 返回公用策略的写入目标(供节点配置注入等)
func (this *StorageManager) WriteTargets() *serverconfigs.AccessLogWriteTargets {
return this.writeTargets
}
func (this *StorageManager) createStorage(storageType string, optionsJSON []byte) (StorageInterface, error) {
if serverconfigs.IsFileBasedStorageType(storageType) {
storageType = serverconfigs.AccessLogStorageTypeFile
}
switch storageType {
case serverconfigs.AccessLogStorageTypeFile:
var config = &serverconfigs.AccessLogFileStorageConfig{}
if len(optionsJSON) > 0 {
err := json.Unmarshal(optionsJSON, config)
if err != nil {
return nil, err
}
}
return NewFileStorage(config), nil
case serverconfigs.AccessLogStorageTypeES:
var config = &serverconfigs.AccessLogESStorageConfig{}
if len(optionsJSON) > 0 {
err := json.Unmarshal(optionsJSON, config)
if err != nil {
return nil, err
}
}
return NewESStorage(config), nil
case serverconfigs.AccessLogStorageTypeTCP:
var config = &serverconfigs.AccessLogTCPStorageConfig{}
if len(optionsJSON) > 0 {
err := json.Unmarshal(optionsJSON, config)
if err != nil {
return nil, err
}
}
return NewTCPStorage(config), nil
case serverconfigs.AccessLogStorageTypeSyslog:
var config = &serverconfigs.AccessLogSyslogStorageConfig{}
if len(optionsJSON) > 0 {
err := json.Unmarshal(optionsJSON, config)
if err != nil {
return nil, err
}
}
return NewSyslogStorage(config), nil
case serverconfigs.AccessLogStorageTypeCommand:
var config = &serverconfigs.AccessLogCommandStorageConfig{}
if len(optionsJSON) > 0 {
err := json.Unmarshal(optionsJSON, config)
if err != nil {
return nil, err
}
}
return NewCommandStorage(config), nil
}
return nil, errors.New("invalid policy type '" + storageType + "'")
}