// Package clickhouse 提供 logs_ingest 表的只读查询(列表分页),用于访问日志列表优先走 ClickHouse。 package clickhouse import ( "context" "encoding/json" "fmt" "strconv" "strings" "time" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" ) // LogsIngestRow 对应 ClickHouse logs_ingest 表的一行(用于 List 结果与 RowToPB) type LogsIngestRow struct { Timestamp time.Time NodeId uint64 ClusterId uint64 ServerId uint64 Host string IP string Method string Path string Status uint16 BytesIn uint64 BytesOut uint64 CostMs uint32 UA string Referer string LogType string TraceId string FirewallPolicyId uint64 FirewallRuleGroupId uint64 FirewallRuleSetId uint64 FirewallRuleId uint64 RequestHeaders string RequestBody string ResponseHeaders string ResponseBody string } // ListFilter 列表查询条件(与 ListHTTPAccessLogsRequest 对齐) type ListFilter struct { Day string HourFrom string HourTo string Size int64 Reverse bool HasError bool HasFirewallPolicy bool FirewallPolicyId int64 NodeId int64 ClusterId int64 LastRequestId string ServerIds []int64 NodeIds []int64 // 搜索条件 Keyword string Ip string Domain string } // LogsIngestStore 封装对 logs_ingest 的只读列表查询 type LogsIngestStore struct { client *Client } // NewLogsIngestStore 创建 store,内部使用共享 Client func NewLogsIngestStore() *LogsIngestStore { return &LogsIngestStore{client: NewClient()} } // Client 返回底层 Client,供调用方判断 IsConfigured() func (s *LogsIngestStore) Client() *Client { return s.client } // List 按条件分页查询 logs_ingest,返回行、下一页游标(trace_id)与错误 func (s *LogsIngestStore) List(ctx context.Context, f ListFilter) (rows []*LogsIngestRow, nextCursor string, err error) { if !s.client.IsConfigured() { return nil, "", fmt.Errorf("clickhouse: not configured") } if f.Day == "" { return nil, "", fmt.Errorf("clickhouse: day required") } dayNumber, err := normalizeDayNumber(f.Day) if err != nil { return nil, "", err } table := "logs_ingest" if s.client.cfg.Database != "" && s.client.cfg.Database != "default" { table = quoteIdent(s.client.cfg.Database) + "." + quoteIdent("logs_ingest") } else { table = quoteIdent(table) } conditions := []string{"toYYYYMMDD(timestamp) = " + strconv.Itoa(dayNumber)} if f.HourFrom != "" { if _, err := strconv.Atoi(f.HourFrom); err == nil { conditions = append(conditions, "toHour(timestamp) >= "+f.HourFrom) } } if f.HourTo != "" { if _, err := strconv.Atoi(f.HourTo); err == nil { conditions = append(conditions, "toHour(timestamp) <= "+f.HourTo) } } if len(f.ServerIds) > 0 { parts := make([]string, 0, len(f.ServerIds)) for _, id := range f.ServerIds { parts = append(parts, strconv.FormatInt(id, 10)) } conditions = append(conditions, "server_id IN ("+strings.Join(parts, ",")+")") } if len(f.NodeIds) > 0 { parts := make([]string, 0, len(f.NodeIds)) for _, id := range f.NodeIds { parts = append(parts, strconv.FormatInt(id, 10)) } conditions = append(conditions, "node_id IN ("+strings.Join(parts, ",")+")") } if f.NodeId > 0 { conditions = append(conditions, "node_id = "+strconv.FormatInt(f.NodeId, 10)) } if f.ClusterId > 0 { conditions = append(conditions, "cluster_id = "+strconv.FormatInt(f.ClusterId, 10)) } if f.HasFirewallPolicy { conditions = append(conditions, "firewall_policy_id > 0") } if f.HasError { conditions = append(conditions, "status >= 400") } if f.FirewallPolicyId > 0 { conditions = append(conditions, "firewall_policy_id = "+strconv.FormatInt(f.FirewallPolicyId, 10)) } // 搜索条件 if f.Keyword != "" { keyword := escapeString(f.Keyword) // 在 host, path, ip, ua 中模糊搜索 conditions = append(conditions, fmt.Sprintf("(host LIKE '%%%s%%' OR path LIKE '%%%s%%' OR ip LIKE '%%%s%%' OR ua LIKE '%%%s%%')", keyword, keyword, keyword, keyword)) } if f.Ip != "" { conditions = append(conditions, "ip = '"+escapeString(f.Ip)+"'") } if f.Domain != "" { conditions = append(conditions, "host LIKE '%"+escapeString(f.Domain)+"%'") } // 游标分页:使用 trace_id 作为游标 // Reverse=false:历史向后翻页,查询更早的数据 // Reverse=true:实时增量拉新,查询更新的数据 if f.LastRequestId != "" { if f.Reverse { conditions = append(conditions, "trace_id > '"+escapeString(f.LastRequestId)+"'") } else { conditions = append(conditions, "trace_id < '"+escapeString(f.LastRequestId)+"'") } } where := strings.Join(conditions, " AND ") // 默认按时间倒序(最新的在前面),与前端默认行为一致 orderDir := "DESC" if f.Reverse { orderDir = "ASC" } limit := f.Size if limit <= 0 { limit = 20 } if limit > 1000 { limit = 1000 } orderBy := fmt.Sprintf("timestamp %s, trace_id %s", orderDir, orderDir) query := fmt.Sprintf("SELECT timestamp, node_id, cluster_id, server_id, host, ip, method, path, status, bytes_in, bytes_out, cost_ms, ua, referer, log_type, trace_id, firewall_policy_id, firewall_rule_group_id, firewall_rule_set_id, firewall_rule_id, request_headers, request_body, response_headers, response_body FROM %s WHERE %s ORDER BY %s LIMIT %d", table, where, orderBy, limit+1) var rawRows []map[string]interface{} if err = s.client.Query(ctx, query, &rawRows); err != nil { return nil, "", err } rows = make([]*LogsIngestRow, 0, len(rawRows)) for _, m := range rawRows { r := mapToLogsIngestRow(m) if r != nil { rows = append(rows, r) } } if !f.Reverse { if len(rows) > int(limit) { nextCursor = rows[limit].TraceId rows = rows[:limit] } return rows, nextCursor, nil } if len(rows) > int(limit) { rows = rows[:limit] } if len(rows) > 0 { nextCursor = rows[len(rows)-1].TraceId } // 实时模式统一返回为“最新在前”,与前端显示和 MySQL 语义一致。 for left, right := 0, len(rows)-1; left < right; left, right = left+1, right-1 { rows[left], rows[right] = rows[right], rows[left] } return rows, nextCursor, nil } // FindByTraceId 按 trace_id 查询单条日志详情 func (s *LogsIngestStore) FindByTraceId(ctx context.Context, traceId string) (*LogsIngestRow, error) { if !s.client.IsConfigured() { return nil, fmt.Errorf("clickhouse: not configured") } if traceId == "" { return nil, nil } table := quoteIdent("logs_ingest") query := fmt.Sprintf("SELECT timestamp, node_id, cluster_id, server_id, host, ip, method, path, status, bytes_in, bytes_out, cost_ms, ua, referer, log_type, trace_id, firewall_policy_id, firewall_rule_group_id, firewall_rule_set_id, firewall_rule_id, request_headers, request_body, response_headers, response_body FROM %s WHERE trace_id = '%s' LIMIT 1", table, escapeString(traceId)) var rawRows []map[string]interface{} if err := s.client.Query(ctx, query, &rawRows); err != nil { return nil, err } if len(rawRows) == 0 { return nil, nil } return mapToLogsIngestRow(rawRows[0]), nil } func quoteIdent(name string) string { return "`" + strings.ReplaceAll(name, "`", "``") + "`" } func escapeString(s string) string { return strings.ReplaceAll(s, "'", "''") } func normalizeDayNumber(day string) (int, error) { normalized := strings.TrimSpace(day) if normalized == "" { return 0, fmt.Errorf("clickhouse: day required") } normalized = strings.ReplaceAll(normalized, "-", "") if len(normalized) != 8 { return 0, fmt.Errorf("clickhouse: invalid day '%s'", day) } dayNumber, err := strconv.Atoi(normalized) if err != nil { return 0, fmt.Errorf("clickhouse: invalid day '%s'", day) } return dayNumber, nil } func mapToLogsIngestRow(m map[string]interface{}) *LogsIngestRow { r := &LogsIngestRow{} u64 := func(key string) uint64 { v, ok := m[key] if !ok || v == nil { return 0 } switch x := v.(type) { case float64: return uint64(x) case string: n, _ := strconv.ParseUint(x, 10, 64) return n case json.Number: n, _ := x.Int64() return uint64(n) } return 0 } u32 := func(key string) uint32 { return uint32(u64(key)) } str := func(key string) string { if v, ok := m[key]; ok && v != nil { if s, ok := v.(string); ok { return s } } return "" } ts := func(key string) time.Time { v, ok := m[key] if ok && v != nil { switch x := v.(type) { case string: t, _ := time.Parse("2006-01-02 15:04:05", x) return t case float64: return time.Unix(int64(x), 0) case json.Number: n, _ := x.Int64() return time.Unix(n, 0) } } return time.Time{} } r.Timestamp = ts("timestamp") r.NodeId = u64("node_id") r.ClusterId = u64("cluster_id") r.ServerId = u64("server_id") r.Host = str("host") r.IP = str("ip") r.Method = str("method") r.Path = str("path") r.Status = uint16(u64("status")) r.BytesIn = u64("bytes_in") r.BytesOut = u64("bytes_out") r.CostMs = u32("cost_ms") r.UA = str("ua") r.Referer = str("referer") r.LogType = str("log_type") r.TraceId = str("trace_id") r.FirewallPolicyId = u64("firewall_policy_id") r.FirewallRuleGroupId = u64("firewall_rule_group_id") r.FirewallRuleSetId = u64("firewall_rule_set_id") r.FirewallRuleId = u64("firewall_rule_id") r.RequestHeaders = str("request_headers") r.RequestBody = str("request_body") r.ResponseHeaders = str("response_headers") r.ResponseBody = str("response_body") return r } // RowToPB 将 logs_ingest 一行转为 pb.HTTPAccessLog(列表展示用+详情展示) func RowToPB(r *LogsIngestRow) *pb.HTTPAccessLog { if r == nil { return nil } a := &pb.HTTPAccessLog{ RequestId: r.TraceId, ServerId: int64(r.ServerId), NodeId: int64(r.NodeId), Timestamp: r.Timestamp.Unix(), Host: r.Host, RawRemoteAddr: r.IP, RemoteAddr: r.IP, RequestMethod: r.Method, RequestPath: r.Path, RequestURI: r.Path, // 前端使用 requestURI 显示完整路径 Scheme: "http", // 默认 http,日志中未存储实际值 Proto: "HTTP/1.1", // 默认值,日志中未存储实际值 Status: int32(r.Status), RequestLength: int64(r.BytesIn), BytesSent: int64(r.BytesOut), RequestTime: float64(r.CostMs) / 1000, UserAgent: r.UA, Referer: r.Referer, FirewallPolicyId: int64(r.FirewallPolicyId), FirewallRuleGroupId: int64(r.FirewallRuleGroupId), FirewallRuleSetId: int64(r.FirewallRuleSetId), FirewallRuleId: int64(r.FirewallRuleId), } if r.TimeISO8601() != "" { a.TimeISO8601 = r.TimeISO8601() } // TimeLocal: 用户友好的时间格式 (e.g., "2026-02-07 23:17:12") if !r.Timestamp.IsZero() { a.TimeLocal = r.Timestamp.Format("2006-01-02 15:04:05") } // 解析请求头 (JSON -> map[string]*pb.Strings) // ClickHouse 中存储的是 map[string]string 格式 if r.RequestHeaders != "" { var headers map[string]string if err := json.Unmarshal([]byte(r.RequestHeaders), &headers); err == nil { a.Header = make(map[string]*pb.Strings) for k, v := range headers { a.Header[k] = &pb.Strings{Values: []string{v}} } } } // 解析响应头 (JSON -> map[string]*pb.Strings) if r.ResponseHeaders != "" { var headers map[string]string if err := json.Unmarshal([]byte(r.ResponseHeaders), &headers); err == nil { a.SentHeader = make(map[string]*pb.Strings) for k, v := range headers { a.SentHeader[k] = &pb.Strings{Values: []string{v}} } } } // 请求体 if r.RequestBody != "" { a.RequestBody = []byte(r.RequestBody) } return a } // TimeISO8601 便于 RowToPB 使用 func (r *LogsIngestRow) TimeISO8601() string { if r.Timestamp.IsZero() { return "" } return r.Timestamp.UTC().Format("2006-01-02T15:04:05Z07:00") }