// Package accesslogs 提供边缘节点访问日志落盘(JSON Lines),供 Fluent Bit 采集写入 ClickHouse。 package accesslogs import ( "encoding/json" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" ) // LogType 与 Fluent Bit / logs_ingest 的 log_type 一致 const ( LogTypeAccess = "access" LogTypeWAF = "waf" LogTypeError = "error" ) // 请求/响应 body 落盘最大长度(字节),超出截断,避免单条过大 const maxBodyLen = 512 * 1024 // IngestLog 单行 JSON 结构与方案文档、ClickHouse logs_ingest 表字段对齐 type IngestLog struct { Timestamp int64 `json:"timestamp"` NodeId int64 `json:"node_id"` ClusterId int64 `json:"cluster_id"` ServerId int64 `json:"server_id"` Host string `json:"host"` IP string `json:"ip"` Method string `json:"method"` Path string `json:"path"` Status int32 `json:"status"` BytesIn int64 `json:"bytes_in"` BytesOut int64 `json:"bytes_out"` CostMs int64 `json:"cost_ms"` UA string `json:"ua"` Referer string `json:"referer"` LogType string `json:"log_type"` TraceId string `json:"trace_id,omitempty"` FirewallPolicyId int64 `json:"firewall_policy_id,omitempty"` FirewallRuleGroupId int64 `json:"firewall_rule_group_id,omitempty"` FirewallRuleSetId int64 `json:"firewall_rule_set_id,omitempty"` FirewallRuleId int64 `json:"firewall_rule_id,omitempty"` RequestHeaders string `json:"request_headers,omitempty"` RequestBody string `json:"request_body,omitempty"` ResponseHeaders string `json:"response_headers,omitempty"` ResponseBody string `json:"response_body,omitempty"` Attrs map[string]string `json:"attrs,omitempty"` } // stringsMapToJSON 将 map[string]*Strings 转为 JSON 字符串,便于落盘与 ClickHouse 存储 func stringsMapToJSON(m map[string]*pb.Strings) string { if len(m) == 0 { return "" } out := make(map[string]string, len(m)) for k, v := range m { if v != nil && len(v.Values) > 0 { out[k] = v.Values[0] } } if len(out) == 0 { return "" } b, _ := json.Marshal(out) return string(b) } // truncateBody 截断 body 到最大长度,避免单条过大 func truncateBody(b []byte) string { if len(b) == 0 { return "" } s := string(b) if len(s) > maxBodyLen { return s[:maxBodyLen] } return s } // buildRequestBody 将查询串与请求体合并写入 request_body 字段(不新增字段) func buildRequestBody(l *pb.HTTPAccessLog) string { q := l.GetQueryString() body := l.GetRequestBody() if len(q) == 0 && len(body) == 0 { return "" } if len(body) == 0 { return truncateBody([]byte(q)) } combined := make([]byte, 0, len(q)+1+len(body)) combined = append(combined, q...) combined = append(combined, '\n') combined = append(combined, body...) return truncateBody(combined) } // FromHTTPAccessLog 从 pb.HTTPAccessLog 转为 IngestLog,并决定 log_type func FromHTTPAccessLog(l *pb.HTTPAccessLog, clusterId int64) (ingest IngestLog, logType string) { ingest = IngestLog{ Timestamp: l.GetTimestamp(), NodeId: l.GetNodeId(), ClusterId: clusterId, ServerId: l.GetServerId(), Host: l.GetHost(), IP: l.GetRawRemoteAddr(), Method: l.GetRequestMethod(), Path: l.GetRequestURI(), // 使用 RequestURI 以包含查询参数 Status: l.GetStatus(), BytesIn: l.GetRequestLength(), BytesOut: l.GetBytesSent(), CostMs: int64(l.GetRequestTime() * 1000), UA: l.GetUserAgent(), Referer: l.GetReferer(), TraceId: l.GetRequestId(), FirewallPolicyId: l.GetFirewallPolicyId(), FirewallRuleGroupId: l.GetFirewallRuleGroupId(), FirewallRuleSetId: l.GetFirewallRuleSetId(), FirewallRuleId: l.GetFirewallRuleId(), RequestHeaders: stringsMapToJSON(l.GetHeader()), RequestBody: buildRequestBody(l), ResponseHeaders: stringsMapToJSON(l.GetSentHeader()), Attrs: l.GetAttrs(), } if ingest.IP == "" { ingest.IP = l.GetRemoteAddr() } // 响应 body 当前 pb 未提供,若后续扩展可在此赋值 // ingest.ResponseBody = ... // 与方案一致:waf > error > access;攻击日志通过 firewall_rule_id / firewall_policy_id 判断 if l.GetFirewallPolicyId() > 0 { logType = LogTypeWAF } else if len(l.GetErrors()) > 0 { logType = LogTypeError } else { logType = LogTypeAccess } ingest.LogType = logType return ingest, logType }