From c28317ee070bacbcd24684eb99788cc5aa8cd0e8 Mon Sep 17 00:00:00 2001 From: robin Date: Tue, 10 Feb 2026 23:43:05 +0800 Subject: [PATCH] =?UTF-8?q?dns=20clickhouse=E6=94=B9=E9=80=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../internal/accesslogs/storage_manager.go | 2 +- .../db/models/http_access_log_policy_dao.go | 14 +- EdgeAPI/internal/db/models/node_dao.go | 2 +- .../internal/db/models/ns_node_dao_plus.go | 2 +- .../service_http_access_log_policy_plus.go | 42 +++-- .../default/servers/accesslogs/createPopup.go | 12 +- .../default/servers/accesslogs/index.go | 6 +- .../servers/accesslogs/policyutils/utils.go | 6 +- .../default/servers/accesslogs/update.go | 12 +- .../pkg/serverconfigs/access_log_storages.go | 79 ++++++---- .../serverconfigs/access_log_write_targets.go | 12 +- .../internal/nodes/http_access_log_queue.go | 2 +- 日志策略逻辑梳理与问题清单.md | 148 ++++++++++++++++++ 13 files changed, 280 insertions(+), 59 deletions(-) create mode 100644 日志策略逻辑梳理与问题清单.md diff --git a/EdgeAPI/internal/accesslogs/storage_manager.go b/EdgeAPI/internal/accesslogs/storage_manager.go index 7639c47..9167b57 100644 --- a/EdgeAPI/internal/accesslogs/storage_manager.go +++ b/EdgeAPI/internal/accesslogs/storage_manager.go @@ -80,7 +80,7 @@ func (this *StorageManager) Loop() error { if int64(policy.Id) == publicPolicyId { this.disableDefaultDB = policy.DisableDefaultDB - this.writeTargets = serverconfigs.ResolveWriteTargetsByType(policy.Type, policy.DisableDefaultDB) + this.writeTargets = serverconfigs.ParseWriteTargetsFromPolicy(policy.WriteTargets, policy.Type, policy.DisableDefaultDB) foundPolicy = true } } diff --git a/EdgeAPI/internal/db/models/http_access_log_policy_dao.go b/EdgeAPI/internal/db/models/http_access_log_policy_dao.go index 57ab440..e597e3c 100644 --- a/EdgeAPI/internal/db/models/http_access_log_policy_dao.go +++ b/EdgeAPI/internal/db/models/http_access_log_policy_dao.go @@ -108,7 +108,6 @@ func (this *HTTPAccessLogPolicyDAO) FindAllEnabledAndOnPolicies(tx *dbs.Tx) (res // CreatePolicy 创建策略 func (this *HTTPAccessLogPolicyDAO) CreatePolicy(tx *dbs.Tx, name string, policyType string, optionsJSON []byte, condsJSON []byte, isPublic bool, firewallOnly bool, disableDefaultDB bool, writeTargetsJSON []byte) (policyId int64, err error) { - _ = writeTargetsJSON var op = NewHTTPAccessLogPolicyOperator() op.Name = name op.Type = policyType @@ -122,14 +121,17 @@ func (this *HTTPAccessLogPolicyDAO) CreatePolicy(tx *dbs.Tx, name string, policy op.IsOn = true op.FirewallOnly = firewallOnly op.DisableDefaultDB = disableDefaultDB - op.WriteTargets = dbs.SQL("NULL") + if len(writeTargetsJSON) > 0 { + op.WriteTargets = writeTargetsJSON + } else { + op.WriteTargets = "{}" + } op.State = HTTPAccessLogPolicyStateEnabled return this.SaveInt64(tx, op) } // UpdatePolicy 修改策略 func (this *HTTPAccessLogPolicyDAO) UpdatePolicy(tx *dbs.Tx, policyId int64, name string, policyType string, optionsJSON []byte, condsJSON []byte, isPublic bool, firewallOnly bool, disableDefaultDB bool, writeTargetsJSON []byte, isOn bool) error { - _ = writeTargetsJSON if policyId <= 0 { return errors.New("invalid policyId") } @@ -167,7 +169,11 @@ func (this *HTTPAccessLogPolicyDAO) UpdatePolicy(tx *dbs.Tx, policyId int64, nam op.IsPublic = isPublic op.FirewallOnly = firewallOnly op.DisableDefaultDB = disableDefaultDB - op.WriteTargets = dbs.SQL("NULL") + if len(writeTargetsJSON) > 0 { + op.WriteTargets = writeTargetsJSON + } else { + op.WriteTargets = "{}" + } op.IsOn = isOn return this.Save(tx, op) } diff --git a/EdgeAPI/internal/db/models/node_dao.go b/EdgeAPI/internal/db/models/node_dao.go index 7805a72..cb0452e 100644 --- a/EdgeAPI/internal/db/models/node_dao.go +++ b/EdgeAPI/internal/db/models/node_dao.go @@ -1174,7 +1174,7 @@ func (this *NodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64, dataMap *shared if publicPolicyId > 0 { publicPolicy, _ := SharedHTTPAccessLogPolicyDAO.FindEnabledHTTPAccessLogPolicy(tx, publicPolicyId) if publicPolicy != nil { - config.GlobalServerConfig.HTTPAccessLog.WriteTargets = serverconfigs.ResolveWriteTargetsByType(publicPolicy.Type, publicPolicy.DisableDefaultDB) + config.GlobalServerConfig.HTTPAccessLog.WriteTargets = serverconfigs.ParseWriteTargetsFromPolicy(publicPolicy.WriteTargets, publicPolicy.Type, publicPolicy.DisableDefaultDB) config.GlobalServerConfig.HTTPAccessLog.FilePath = ParseHTTPAccessLogPolicyFilePath(publicPolicy) } } diff --git a/EdgeAPI/internal/db/models/ns_node_dao_plus.go b/EdgeAPI/internal/db/models/ns_node_dao_plus.go index 0e59852..b074929 100644 --- a/EdgeAPI/internal/db/models/ns_node_dao_plus.go +++ b/EdgeAPI/internal/db/models/ns_node_dao_plus.go @@ -477,7 +477,7 @@ func (this *NSNodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64) (*dnsconfigs. if publicPolicyId > 0 { publicPolicy, _ := SharedHTTPAccessLogPolicyDAO.FindEnabledHTTPAccessLogPolicy(tx, publicPolicyId) if publicPolicy != nil { - config.AccessLogWriteTargets = serverconfigs.ResolveWriteTargetsByType(publicPolicy.Type, publicPolicy.DisableDefaultDB) + config.AccessLogWriteTargets = serverconfigs.ParseWriteTargetsFromPolicy(publicPolicy.WriteTargets, publicPolicy.Type, publicPolicy.DisableDefaultDB) config.AccessLogFilePath = ParseHTTPAccessLogPolicyFilePath(publicPolicy) } } diff --git a/EdgeAPI/internal/rpc/services/service_http_access_log_policy_plus.go b/EdgeAPI/internal/rpc/services/service_http_access_log_policy_plus.go index ab37ae6..32de70a 100644 --- a/EdgeAPI/internal/rpc/services/service_http_access_log_policy_plus.go +++ b/EdgeAPI/internal/rpc/services/service_http_access_log_policy_plus.go @@ -4,6 +4,7 @@ package services import ( "context" + "encoding/json" "errors" "github.com/TeaOSLab/EdgeAPI/internal/accesslogs" "github.com/TeaOSLab/EdgeAPI/internal/db/models" @@ -15,18 +16,23 @@ type HTTPAccessLogPolicyService struct { BaseService } -func (this *HTTPAccessLogPolicyService) normalizeStorageTypeAndTargets(policyType string, writeTargetsJSON []byte, disableDefaultDB bool) (string, []byte) { - _ = writeTargetsJSON - _ = disableDefaultDB - - // 兼容旧前端/缓存可能传来的历史类型编码 - switch policyType { - case "clickhouse": - policyType = serverconfigs.AccessLogStorageTypeFileClickhouse - case "mysql_clickhouse": - policyType = serverconfigs.AccessLogStorageTypeFileMySQLClickhouse +func (this *HTTPAccessLogPolicyService) normalizeStorageTypeAndTargets(policyType string, disableDefaultDB bool) (string, []byte, error) { + normalizedType := serverconfigs.NormalizeAccessLogStorageType(policyType) + targets := serverconfigs.BuildWriteTargetsByStorageType(normalizedType, disableDefaultDB) + if targets == nil { + return "", nil, errors.New("invalid access log storage type") } - return policyType, nil + + writeTargetsMap := map[string]bool{ + "file": targets.File, + "mysql": targets.MySQL, + "clickhouse": targets.ClickHouse, + } + writeTargetsJSON, err := json.Marshal(writeTargetsMap) + if err != nil { + return "", nil, err + } + return normalizedType, writeTargetsJSON, nil } // CountAllHTTPAccessLogPolicies 计算访问日志策略数量 @@ -68,7 +74,7 @@ func (this *HTTPAccessLogPolicyService) ListHTTPAccessLogPolicies(ctx context.Co IsPublic: policy.IsPublic, FirewallOnly: policy.FirewallOnly == 1, DisableDefaultDB: policy.DisableDefaultDB, - WriteTargetsJSON: nil, + WriteTargetsJSON: policy.WriteTargets, }) } return &pb.ListHTTPAccessLogPoliciesResponse{HttpAccessLogPolicies: pbPolicies}, nil @@ -91,7 +97,10 @@ func (this *HTTPAccessLogPolicyService) CreateHTTPAccessLogPolicy(ctx context.Co } } - policyType, writeTargetsJSON := this.normalizeStorageTypeAndTargets(req.Type, req.WriteTargetsJSON, req.DisableDefaultDB) + policyType, writeTargetsJSON, err := this.normalizeStorageTypeAndTargets(req.Type, req.DisableDefaultDB) + if err != nil { + return nil, err + } // 创建 policyId, err := models.SharedHTTPAccessLogPolicyDAO.CreatePolicy(tx, req.Name, policyType, req.OptionsJSON, req.CondsJSON, req.IsPublic, req.FirewallOnly, req.DisableDefaultDB, writeTargetsJSON) @@ -118,7 +127,10 @@ func (this *HTTPAccessLogPolicyService) UpdateHTTPAccessLogPolicy(ctx context.Co } } - policyType, writeTargetsJSON := this.normalizeStorageTypeAndTargets(req.Type, req.WriteTargetsJSON, req.DisableDefaultDB) + policyType, writeTargetsJSON, err := this.normalizeStorageTypeAndTargets(req.Type, req.DisableDefaultDB) + if err != nil { + return nil, err + } // 保存修改 err = models.SharedHTTPAccessLogPolicyDAO.UpdatePolicy(tx, req.HttpAccessLogPolicyId, req.Name, policyType, req.OptionsJSON, req.CondsJSON, req.IsPublic, req.FirewallOnly, req.DisableDefaultDB, writeTargetsJSON, req.IsOn) @@ -153,7 +165,7 @@ func (this *HTTPAccessLogPolicyService) FindHTTPAccessLogPolicy(ctx context.Cont IsPublic: policy.IsPublic, FirewallOnly: policy.FirewallOnly == 1, DisableDefaultDB: policy.DisableDefaultDB, - WriteTargetsJSON: nil, + WriteTargetsJSON: policy.WriteTargets, }}, nil } diff --git a/EdgeAdmin/internal/web/actions/default/servers/accesslogs/createPopup.go b/EdgeAdmin/internal/web/actions/default/servers/accesslogs/createPopup.go index 228c5ef..23d1d69 100644 --- a/EdgeAdmin/internal/web/actions/default/servers/accesslogs/createPopup.go +++ b/EdgeAdmin/internal/web/actions/default/servers/accesslogs/createPopup.go @@ -88,6 +88,10 @@ func (this *CreatePopupAction) RunPost(params struct { Require("请选择存储类型") baseType, _ := serverconfigs.ParseStorageTypeAndWriteTargets(params.Type) + _, writeTargets := serverconfigs.ParseStorageTypeAndWriteTargets(params.Type) + if writeTargets == nil { + writeTargets = &serverconfigs.AccessLogWriteTargets{File: true, MySQL: true} + } storedType := baseType if serverconfigs.IsFileBasedStorageType(params.Type) { storedType = params.Type @@ -178,6 +182,12 @@ func (this *CreatePopupAction) RunPost(params struct { this.ErrorPage(err) return } + writeTargetsMap := map[string]bool{ + "file": writeTargets.File, + "mysql": writeTargets.MySQL, + "clickhouse": writeTargets.ClickHouse, + } + writeTargetsJSON, _ := json.Marshal(writeTargetsMap) createResp, err := this.RPC().HTTPAccessLogPolicyRPC().CreateHTTPAccessLogPolicy(this.AdminContext(), &pb.CreateHTTPAccessLogPolicyRequest{ Name: params.Name, Type: storedType, @@ -186,7 +196,7 @@ func (this *CreatePopupAction) RunPost(params struct { IsPublic: params.IsPublic, FirewallOnly: params.FirewallOnly, DisableDefaultDB: params.DisableDefaultDB, - WriteTargetsJSON: nil, + WriteTargetsJSON: writeTargetsJSON, }) if err != nil { this.ErrorPage(err) diff --git a/EdgeAdmin/internal/web/actions/default/servers/accesslogs/index.go b/EdgeAdmin/internal/web/actions/default/servers/accesslogs/index.go index d71a87a..4cded29 100644 --- a/EdgeAdmin/internal/web/actions/default/servers/accesslogs/index.go +++ b/EdgeAdmin/internal/web/actions/default/servers/accesslogs/index.go @@ -46,7 +46,11 @@ func (this *IndexAction) RunGet(params struct{}) { return } } - typeDisplay := policy.Type + writeTargets := serverconfigs.ParseWriteTargetsFromPolicy(policy.WriteTargetsJSON, policy.Type, policy.DisableDefaultDB) + typeDisplay := serverconfigs.ComposeStorageTypeDisplay(policy.Type, writeTargets) + if typeDisplay == "" { + typeDisplay = policy.Type + } policyMaps = append(policyMaps, maps.Map{ "id": policy.Id, "name": policy.Name, diff --git a/EdgeAdmin/internal/web/actions/default/servers/accesslogs/policyutils/utils.go b/EdgeAdmin/internal/web/actions/default/servers/accesslogs/policyutils/utils.go index 69b74ba..bf27397 100644 --- a/EdgeAdmin/internal/web/actions/default/servers/accesslogs/policyutils/utils.go +++ b/EdgeAdmin/internal/web/actions/default/servers/accesslogs/policyutils/utils.go @@ -36,7 +36,11 @@ func InitPolicy(parent *actionutils.ParentAction, policyId int64) error { } } - typeDisplay := policy.Type + writeTargets := serverconfigs.ParseWriteTargetsFromPolicy(policy.WriteTargetsJSON, policy.Type, policy.DisableDefaultDB) + typeDisplay := serverconfigs.ComposeStorageTypeDisplay(policy.Type, writeTargets) + if typeDisplay == "" { + typeDisplay = policy.Type + } parent.Data["policy"] = maps.Map{ "id": policy.Id, diff --git a/EdgeAdmin/internal/web/actions/default/servers/accesslogs/update.go b/EdgeAdmin/internal/web/actions/default/servers/accesslogs/update.go index fe6f192..d20ef06 100644 --- a/EdgeAdmin/internal/web/actions/default/servers/accesslogs/update.go +++ b/EdgeAdmin/internal/web/actions/default/servers/accesslogs/update.go @@ -107,6 +107,10 @@ func (this *UpdateAction) RunPost(params struct { Require("请选择存储类型") baseType, _ := serverconfigs.ParseStorageTypeAndWriteTargets(params.Type) + _, writeTargets := serverconfigs.ParseStorageTypeAndWriteTargets(params.Type) + if writeTargets == nil { + writeTargets = &serverconfigs.AccessLogWriteTargets{File: true, MySQL: true} + } storedType := baseType if serverconfigs.IsFileBasedStorageType(params.Type) { storedType = params.Type @@ -204,6 +208,12 @@ func (this *UpdateAction) RunPost(params struct { this.ErrorPage(err) return } + writeTargetsMap := map[string]bool{ + "file": writeTargets.File, + "mysql": writeTargets.MySQL, + "clickhouse": writeTargets.ClickHouse, + } + writeTargetsJSON, _ := json.Marshal(writeTargetsMap) _, err = this.RPC().HTTPAccessLogPolicyRPC().UpdateHTTPAccessLogPolicy(this.AdminContext(), &pb.UpdateHTTPAccessLogPolicyRequest{ HttpAccessLogPolicyId: params.PolicyId, Name: params.Name, @@ -214,7 +224,7 @@ func (this *UpdateAction) RunPost(params struct { IsPublic: params.IsPublic, FirewallOnly: params.FirewallOnly, DisableDefaultDB: params.DisableDefaultDB, - WriteTargetsJSON: nil, + WriteTargetsJSON: writeTargetsJSON, }) if err != nil { this.ErrorPage(err) diff --git a/EdgeCommon/pkg/serverconfigs/access_log_storages.go b/EdgeCommon/pkg/serverconfigs/access_log_storages.go index 275f318..595a0ee 100644 --- a/EdgeCommon/pkg/serverconfigs/access_log_storages.go +++ b/EdgeCommon/pkg/serverconfigs/access_log_storages.go @@ -52,9 +52,60 @@ func IsFileBasedStorageType(code string) bool { } } +// NormalizeAccessLogStorageType 统一存储类型编码(兼容历史别名)。 +func NormalizeAccessLogStorageType(storageType string) string { + switch storageType { + case "clickhouse": + return AccessLogStorageTypeFileClickhouse + case "mysql_clickhouse": + return AccessLogStorageTypeFileMySQLClickhouse + default: + return storageType + } +} + +// BuildWriteTargetsByStorageType 按 type(唯一真源)构建写入目标。 +// disableDefaultDB 用于控制是否保留 MySQL 写入。 +func BuildWriteTargetsByStorageType(storageType string, disableDefaultDB bool) *AccessLogWriteTargets { + storageType = NormalizeAccessLogStorageType(storageType) + + targets := &AccessLogWriteTargets{} + switch storageType { + case AccessLogStorageTypeFile: + targets.File = true + case AccessLogStorageTypeFileMySQL: + targets.File = true + targets.MySQL = true + case AccessLogStorageTypeFileClickhouse: + targets.File = true + targets.ClickHouse = true + case AccessLogStorageTypeFileMySQLClickhouse: + targets.File = true + targets.MySQL = true + targets.ClickHouse = true + case AccessLogStorageTypeES, AccessLogStorageTypeTCP, AccessLogStorageTypeSyslog, AccessLogStorageTypeCommand: + // 保持现有兼容语义:非 file 类型默认写 MySQL(除非停用默认数据库) + targets.MySQL = true + default: + // 兜底保持可用 + targets.File = true + targets.MySQL = true + } + + if disableDefaultDB { + targets.MySQL = false + } + + if !targets.File && !targets.MySQL && !targets.ClickHouse { + targets.File = true + } + return targets +} + // ParseStorageTypeAndWriteTargets 从下拉框选中的类型解析出「实际存储类型」与「写入目标」 // 用于创建/更新策略:options 按 baseType 填(如 file),writeTargets 按组合填。 func ParseStorageTypeAndWriteTargets(selectedType string) (baseType string, writeTargets *AccessLogWriteTargets) { + selectedType = NormalizeAccessLogStorageType(selectedType) writeTargets = &AccessLogWriteTargets{} switch selectedType { case AccessLogStorageTypeFile: @@ -84,35 +135,9 @@ func ParseStorageTypeAndWriteTargets(selectedType string) (baseType string, writ return baseType, writeTargets } -// ResolveWriteTargetsByType 仅根据策略类型与 disableDefaultDB 计算写入目标(不依赖 writeTargets 字段) -func ResolveWriteTargetsByType(policyType string, disableDefaultDB bool) *AccessLogWriteTargets { - t := &AccessLogWriteTargets{} - switch policyType { - case AccessLogStorageTypeFileMySQL: - t.File = true - t.MySQL = true - case AccessLogStorageTypeFileClickhouse: - t.File = true - t.ClickHouse = true - case AccessLogStorageTypeFileMySQLClickhouse: - t.File = true - t.MySQL = true - t.ClickHouse = true - case AccessLogStorageTypeFile: - t.File = true - t.MySQL = !disableDefaultDB - default: - t.MySQL = !disableDefaultDB - } - if !t.File && !t.MySQL && !t.ClickHouse { - t.File = true - t.MySQL = true - } - return t -} - // ComposeStorageTypeDisplay 根据策略的 Type + WriteTargets 得到下拉框显示用的类型 code(用于编辑页回显) func ComposeStorageTypeDisplay(policyType string, writeTargets *AccessLogWriteTargets) string { + policyType = NormalizeAccessLogStorageType(policyType) if policyType != AccessLogStorageTypeFile { return policyType } diff --git a/EdgeCommon/pkg/serverconfigs/access_log_write_targets.go b/EdgeCommon/pkg/serverconfigs/access_log_write_targets.go index be19ade..6b8357d 100644 --- a/EdgeCommon/pkg/serverconfigs/access_log_write_targets.go +++ b/EdgeCommon/pkg/serverconfigs/access_log_write_targets.go @@ -6,15 +6,16 @@ package serverconfigs type AccessLogWriteTargets struct { File bool `yaml:"file" json:"file"` // 写本地 JSON 文件(供 Fluent Bit → ClickHouse 或自用) MySQL bool `yaml:"mysql" json:"mysql"` // 写 MySQL 默认库按日分表 - ClickHouse bool `yaml:"clickhouse" json:"clickhouse"` // 需要落 ClickHouse(文件+Fluent Bit 或 API 直写) + ClickHouse bool `yaml:"clickhouse" json:"clickhouse"` // 标记需要 ClickHouse 查询链路(写入由文件+Fluent Bit 异步完成) } -// NeedReportToAPI 是否需要上报到 API(写 MySQL 或 API 直写 ClickHouse 时需要) +// NeedReportToAPI 是否需要上报到 API。 +// 与 DNS 语义一致:仅 MySQL 打开时才上报 API,ClickHouse-only 不上报。 func (t *AccessLogWriteTargets) NeedReportToAPI() bool { if t == nil { return true // 兼容:未配置时保持原行为,上报 } - return t.MySQL || t.ClickHouse + return t.MySQL } // NeedWriteFile 节点是否需要写本地文件 @@ -25,8 +26,9 @@ func (t *AccessLogWriteTargets) NeedWriteFile() bool { return t.File } -// ParseWriteTargetsFromPolicy 兼容入口:当前统一按 type 推导写入目标,不再依赖 writeTargets 字段 +// ParseWriteTargetsFromPolicy 从策略字段解析写入目标。 +// 当前以 type 为唯一真源,writeTargetsJSON 参数仅保留函数签名兼容。 func ParseWriteTargetsFromPolicy(writeTargetsJSON []byte, policyType string, disableDefaultDB bool) *AccessLogWriteTargets { _ = writeTargetsJSON - return ResolveWriteTargetsByType(policyType, disableDefaultDB) + return BuildWriteTargetsByStorageType(policyType, disableDefaultDB) } diff --git a/EdgeNode/internal/nodes/http_access_log_queue.go b/EdgeNode/internal/nodes/http_access_log_queue.go index 6cfa84d..91b383f 100644 --- a/EdgeNode/internal/nodes/http_access_log_queue.go +++ b/EdgeNode/internal/nodes/http_access_log_queue.go @@ -117,7 +117,7 @@ Loop: } } - // 发送到 API(写 MySQL 或 API 直写 ClickHouse 时需要) + // 发送到 API(仅写 MySQL 时需要) if !needReportAPI { return nil } diff --git a/日志策略逻辑梳理与问题清单.md b/日志策略逻辑梳理与问题清单.md new file mode 100644 index 0000000..3f8c56e --- /dev/null +++ b/日志策略逻辑梳理与问题清单.md @@ -0,0 +1,148 @@ +# 日志策略逻辑梳理与问题清单(当前基线:`E:\AI_PRODUCT\waf-platform`) + +## 1. 结论摘要 + +- 当前链路是 **`type` + `writeTargets` 双字段共同决定行为**。 +- 运行时真正用于读写判断的是 `writeTargets`(`ParseWriteTargetsFromPolicy` 解析结果)。 +- HTTP 与 DNS 都已接入“公用策略”下发,DNS 也已支持 ClickHouse 读取。 +- 目前存在多处逻辑不一致,核心风险是:**页面显示、数据库值、实际读写行为可能不同步**。 + +## 2. 关键入口文件 + +- 类型与组合映射:`EdgeCommon/pkg/serverconfigs/access_log_storages.go` +- 写入目标定义/解析:`EdgeCommon/pkg/serverconfigs/access_log_write_targets.go` +- 策略创建/更新(Admin):`EdgeAdmin/internal/web/actions/default/servers/accesslogs/createPopup.go`、`EdgeAdmin/internal/web/actions/default/servers/accesslogs/update.go` +- 策略保存(API):`EdgeAPI/internal/rpc/services/service_http_access_log_policy_plus.go` +- 策略落库(DAO):`EdgeAPI/internal/db/models/http_access_log_policy_dao.go` +- 公用策略运行时缓存:`EdgeAPI/internal/accesslogs/storage_manager.go` +- HTTP 节点队列:`EdgeNode/internal/nodes/http_access_log_queue.go` +- DNS 节点队列:`EdgeDNS/internal/nodes/ns_access_log_queue.go` +- 节点配置下发:`EdgeAPI/internal/db/models/node_dao.go`、`EdgeAPI/internal/db/models/ns_node_dao_plus.go` +- HTTP 查询服务:`EdgeAPI/internal/rpc/services/service_http_access_log.go` +- DNS 查询服务:`EdgeAPI/internal/rpc/services/nameservers/service_ns_access_log.go` +- CH 查询实现:`EdgeAPI/internal/clickhouse/logs_ingest_store.go`、`EdgeAPI/internal/clickhouse/ns_logs_ingest_store.go` + +## 3. 数据模型与语义 + +`edgeHTTPAccessLogPolicies` 关键字段: + +- `type`:`file` / `file_mysql` / `file_clickhouse` / `file_mysql_clickhouse` / `es` / `tcp` / `syslog` / `command` +- `writeTargets`:JSON(`file/mysql/clickhouse` 三个布尔值) +- `disableDefaultDB`:停用默认数据库存储(兼容旧语义) + +当前实际规则: + +1. Admin 侧根据下拉 `type` 生成 `writeTargetsJSON`。 +2. API 原样落库(仅做少量历史 type 别名兼容)。 +3. 运行时使用 `ParseWriteTargetsFromPolicy(writeTargets, type, disableDefaultDB)` 得到最终写入目标。 + +## 4. 端到端链路(当前行为) + +### 4.1 策略创建/更新 + +- 创建与更新都会调用 `ParseStorageTypeAndWriteTargets`,并同时提交 `type` 与 `writeTargetsJSON`。 +- `file_clickhouse` / `file_mysql_clickhouse` 在 UI 上隐藏了手填路径输入,依赖旧值或默认目录回退。 +- DAO 更新时,只有 `writeTargetsJSON` 非空才会覆盖 `writeTargets` 字段。 + +### 4.2 HTTP 写入链路 + +- Node 侧: + - `needWriteFile = writeTargets == nil || writeTargets.NeedWriteFile()` + - `needReportAPI = writeTargets == nil || writeTargets.NeedReportToAPI()` +- API 侧: + - `CreateHTTPAccessLogs` 里是否写 MySQL 由 `canWriteAccessLogsToDB() -> WriteMySQL()` 决定。 + - 同时调用 `writeAccessLogsToPolicy()`,把日志再交给公用策略存储引擎处理(如 file/es/tcp/syslog/command)。 +- 查询侧: + - `shouldReadAccessLogsFromClickHouse()` 为真且 CH 配置可用时优先读 CH。 + - CH 失败后,按 `shouldReadAccessLogsFromMySQL()` 回退 MySQL。 + +### 4.3 DNS 写入链路 + +- DNS 节点: + - `needWriteFile = targets == nil || targets.File || targets.ClickHouse` + - `needReportAPI = targets == nil || targets.MySQL` + - 即 CH-only 下 DNS 只写本地文件,不上报 API。 +- DNS API 查询: + - 与 HTTP 一样优先 CH,再按策略回退 MySQL。 + +### 4.4 节点路径更新机制 + +- API 下发公用策略的 `AccessLogFilePath` 与 `AccessLogWriteTargets` 到 HTTP/DNS 节点配置。 +- Node/DNS 收到新配置后会 `SetDirByPolicyPath(...)` 并 `EnsureInit/Reopen/Close`,可自动切换目录。 +- 空路径时会回退到: + - HTTP:`EDGE_LOG_DIR` 或默认 `/var/log/edge/edge-node` + - DNS:`EDGE_DNS_LOG_DIR` 或默认 `/var/log/edge/edge-dns` + +## 5. 行为矩阵(按当前代码) + +- `file` + - 写文件:是 + - 写 MySQL:否(仅当 `writeTargets.mysql=true` 才会写) + - 读:优先 CH(若开启),否则按 MySQL 开关 +- `file_mysql` + - 写文件:是 + - 写 MySQL:是 + - 读:MySQL 可读;若 CH 也开则优先 CH +- `file_clickhouse` + - 写文件:是 + - 写 MySQL:否(理论上) + - 读:优先 CH;若 CH 不可用且 mysql=false,则返回空 +- `file_mysql_clickhouse` + - 写文件:是 + - 写 MySQL:是 + - 读:优先 CH,失败回退 MySQL +- `es/tcp/syslog/command` + - 仍会由 `writeTargets` 决定是否 MySQL(当前解析默认给 MySQL=true) + - 另外会通过策略引擎输出到对应目标 + +## 6. 逻辑问题清单(按优先级) + +### P0:`type` 与 `writeTargets` 双真源,容易漂移 + +- 页面展示与回显会参考 `type`,实际写读判断优先看 `writeTargets`。 +- 一旦两者不一致,会出现“UI 看起来是 ClickHouse,实际还在写/读 MySQL”。 + +### P0:`disableDefaultDB` 在新链路中容易失效 + +- `WriteMySQL()` 优先看 `writeTargets.MySQL`,只有 `writeTargets` 为空才回退 `disableDefaultDB`。 +- 由于 Admin 基本总会提交 `writeTargetsJSON`,`disableDefaultDB` 常常不会真正生效。 + +### P1:HTTP 与 DNS 在 CH-only 场景上报 API 语义不一致 + +- HTTP:`NeedReportToAPI()` = `MySQL || ClickHouse`,CH-only 仍会上报 API。 +- DNS:CH-only 不上报 API,仅写文件给 Fluent Bit。 +- 高并发下会带来不必要的 API 压力与行为差异。 + +### P1:`file_clickhouse` 可能出现空路径,策略引擎会启动失败 + +- `FileStorage.Start()` 要求 `path` 非空。 +- 但 UI 在 clickhouse 组合类型隐藏路径输入,若 `options.path` 为空,策略引擎会报错(虽然节点本地写文件仍可回退目录工作)。 + +### P1:HTTP 可能出现“节点写文件 + API 再写文件”的重复路径 + +- `CreateHTTPAccessLogs` 无论是否写 MySQL,都会 `writeAccessLogsToPolicy()`。 +- 公用策略若为 file*,API 侧 `StorageManager.createStorage()` 会创建 `FileStorage` 并再次落文件。 +- 若目标是“仅节点写文件供 Fluent Bit 采集”,这会引入额外重复写入。 + +### P2:DNS `requestId` 生成算法有重复风险 + +- `ns_access_log_queue.go` 里 `timestamp/requestId` 为 `loop()` 局部变量,每轮 tick 重置。 +- 同秒跨批次可能冲突,影响游标分页与去重。 + +### P2:UI 文案分支存在不可达条件 + +- `createPopup.html` / `update.html` 在 `file|file_mysql` 区块内嵌了 clickhouse 条件文案分支,实际不会触发。 +- 不影响功能,但会增加理解成本。 + +## 7. 建议修复顺序 + +1. 先统一单一真源(建议 API 层统一按 `type` 规范化并覆盖 `writeTargets`)。 +2. 明确 `disableDefaultDB` 与 `writeTargets` 的优先级,避免“配置项在 UI 可选但不生效”。 +3. 统一 HTTP/DNS 在 CH-only 的上报语义(建议都走“节点文件 + Fluent Bit”,API 不再接收该流量)。 +4. 修复 file_clickhouse 空路径策略启动失败(要求路径 or 统一默认路径回填到 options)。 +5. 修复 DNS requestId 生成(全局原子递增或更高精度时间戳方案)。 + +## 8. 当前可用性判断 + +- 系统“可运行”,但配置行为存在歧义,且在高并发下会放大成本和排障难度。 +- 若目标是稳定的高吞吐日志链路,建议优先处理 P0/P1 问题后再继续线上放量。