Files
waf-platform/EdgeAPI/internal/clickhouse/logs_ingest_store.go
2026-02-07 20:30:31 +08:00

286 lines
7.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package clickhouse 提供 logs_ingest 表的只读查询(列表分页),用于访问日志列表优先走 ClickHouse。
package clickhouse
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
// LogsIngestRow 对应 ClickHouse logs_ingest 表的一行(用于 List 结果与 RowToPB
type LogsIngestRow struct {
Timestamp time.Time
NodeId uint64
ClusterId uint64
ServerId uint64
Host string
IP string
Method string
Path string
Status uint16
BytesIn uint64
BytesOut uint64
CostMs uint32
UA string
Referer string
LogType string
TraceId string
FirewallPolicyId uint64
FirewallRuleGroupId uint64
FirewallRuleSetId uint64
FirewallRuleId uint64
RequestHeaders string
RequestBody string
ResponseHeaders string
ResponseBody string
}
// ListFilter 列表查询条件(与 ListHTTPAccessLogsRequest 对齐)
type ListFilter struct {
Day string
HourFrom string
HourTo string
Size int64
Reverse bool
HasError bool
HasFirewallPolicy bool
FirewallPolicyId int64
NodeId int64
ClusterId int64
LastRequestId string
ServerIds []int64
NodeIds []int64
}
// LogsIngestStore 封装对 logs_ingest 的只读列表查询
type LogsIngestStore struct {
client *Client
}
// NewLogsIngestStore 创建 store内部使用共享 Client
func NewLogsIngestStore() *LogsIngestStore {
return &LogsIngestStore{client: NewClient()}
}
// Client 返回底层 Client供调用方判断 IsConfigured()
func (s *LogsIngestStore) Client() *Client {
return s.client
}
// List 按条件分页查询 logs_ingest返回行、下一页游标trace_id与错误
func (s *LogsIngestStore) List(ctx context.Context, f ListFilter) (rows []*LogsIngestRow, nextCursor string, err error) {
if !s.client.IsConfigured() {
return nil, "", fmt.Errorf("clickhouse: not configured")
}
if f.Day == "" {
return nil, "", fmt.Errorf("clickhouse: day required")
}
table := "logs_ingest"
if s.client.cfg.Database != "" && s.client.cfg.Database != "default" {
table = quoteIdent(s.client.cfg.Database) + "." + quoteIdent("logs_ingest")
} else {
table = quoteIdent(table)
}
conditions := []string{"toDate(timestamp) = '" + escapeString(f.Day) + "'"}
if f.HourFrom != "" {
if _, err := strconv.Atoi(f.HourFrom); err == nil {
conditions = append(conditions, "toHour(timestamp) >= "+f.HourFrom)
}
}
if f.HourTo != "" {
if _, err := strconv.Atoi(f.HourTo); err == nil {
conditions = append(conditions, "toHour(timestamp) <= "+f.HourTo)
}
}
if len(f.ServerIds) > 0 {
parts := make([]string, 0, len(f.ServerIds))
for _, id := range f.ServerIds {
parts = append(parts, strconv.FormatInt(id, 10))
}
conditions = append(conditions, "server_id IN ("+strings.Join(parts, ",")+")")
}
if len(f.NodeIds) > 0 {
parts := make([]string, 0, len(f.NodeIds))
for _, id := range f.NodeIds {
parts = append(parts, strconv.FormatInt(id, 10))
}
conditions = append(conditions, "node_id IN ("+strings.Join(parts, ",")+")")
}
if f.NodeId > 0 {
conditions = append(conditions, "node_id = "+strconv.FormatInt(f.NodeId, 10))
}
if f.ClusterId > 0 {
conditions = append(conditions, "cluster_id = "+strconv.FormatInt(f.ClusterId, 10))
}
if f.HasFirewallPolicy {
conditions = append(conditions, "firewall_policy_id > 0")
}
if f.FirewallPolicyId > 0 {
conditions = append(conditions, "firewall_policy_id = "+strconv.FormatInt(f.FirewallPolicyId, 10))
}
where := strings.Join(conditions, " AND ")
orderDir := "ASC"
if f.Reverse {
orderDir = "DESC"
}
limit := f.Size
if limit <= 0 {
limit = 20
}
if limit > 1000 {
limit = 1000
}
orderBy := fmt.Sprintf("timestamp %s, node_id %s, server_id %s, trace_id %s", orderDir, orderDir, orderDir, orderDir)
query := fmt.Sprintf("SELECT timestamp, node_id, cluster_id, server_id, host, ip, method, path, status, bytes_in, bytes_out, cost_ms, ua, referer, log_type, trace_id, firewall_policy_id, firewall_rule_group_id, firewall_rule_set_id, firewall_rule_id, request_headers, request_body, response_headers, response_body FROM %s WHERE %s ORDER BY %s LIMIT %d",
table, where, orderBy, limit+1)
var rawRows []map[string]interface{}
if err = s.client.Query(ctx, query, &rawRows); err != nil {
return nil, "", err
}
rows = make([]*LogsIngestRow, 0, len(rawRows))
for _, m := range rawRows {
r := mapToLogsIngestRow(m)
if r != nil {
rows = append(rows, r)
}
}
if len(rows) > int(limit) {
nextCursor = rows[limit].TraceId
rows = rows[:limit]
}
return rows, nextCursor, nil
}
func quoteIdent(name string) string {
return "`" + strings.ReplaceAll(name, "`", "``") + "`"
}
func escapeString(s string) string {
return strings.ReplaceAll(s, "'", "''")
}
func mapToLogsIngestRow(m map[string]interface{}) *LogsIngestRow {
r := &LogsIngestRow{}
u64 := func(key string) uint64 {
v, ok := m[key]
if !ok || v == nil {
return 0
}
switch x := v.(type) {
case float64:
return uint64(x)
case string:
n, _ := strconv.ParseUint(x, 10, 64)
return n
case json.Number:
n, _ := x.Int64()
return uint64(n)
}
return 0
}
u32 := func(key string) uint32 {
return uint32(u64(key))
}
str := func(key string) string {
if v, ok := m[key]; ok && v != nil {
if s, ok := v.(string); ok {
return s
}
}
return ""
}
ts := func(key string) time.Time {
v, ok := m[key]
if ok && v != nil {
switch x := v.(type) {
case string:
t, _ := time.Parse("2006-01-02 15:04:05", x)
return t
case float64:
return time.Unix(int64(x), 0)
case json.Number:
n, _ := x.Int64()
return time.Unix(n, 0)
}
}
return time.Time{}
}
r.Timestamp = ts("timestamp")
r.NodeId = u64("node_id")
r.ClusterId = u64("cluster_id")
r.ServerId = u64("server_id")
r.Host = str("host")
r.IP = str("ip")
r.Method = str("method")
r.Path = str("path")
r.Status = uint16(u64("status"))
r.BytesIn = u64("bytes_in")
r.BytesOut = u64("bytes_out")
r.CostMs = u32("cost_ms")
r.UA = str("ua")
r.Referer = str("referer")
r.LogType = str("log_type")
r.TraceId = str("trace_id")
r.FirewallPolicyId = u64("firewall_policy_id")
r.FirewallRuleGroupId = u64("firewall_rule_group_id")
r.FirewallRuleSetId = u64("firewall_rule_set_id")
r.FirewallRuleId = u64("firewall_rule_id")
r.RequestHeaders = str("request_headers")
r.RequestBody = str("request_body")
r.ResponseHeaders = str("response_headers")
r.ResponseBody = str("response_body")
return r
}
// RowToPB 将 logs_ingest 一行转为 pb.HTTPAccessLog列表展示用
func RowToPB(r *LogsIngestRow) *pb.HTTPAccessLog {
if r == nil {
return nil
}
a := &pb.HTTPAccessLog{
RequestId: r.TraceId,
ServerId: int64(r.ServerId),
NodeId: int64(r.NodeId),
Timestamp: r.Timestamp.Unix(),
Host: r.Host,
RawRemoteAddr: r.IP,
RemoteAddr: r.IP,
RequestMethod: r.Method,
RequestPath: r.Path,
Status: int32(r.Status),
RequestLength: int64(r.BytesIn),
BytesSent: int64(r.BytesOut),
RequestTime: float64(r.CostMs) / 1000,
UserAgent: r.UA,
Referer: r.Referer,
FirewallPolicyId: int64(r.FirewallPolicyId),
FirewallRuleGroupId: int64(r.FirewallRuleGroupId),
FirewallRuleSetId: int64(r.FirewallRuleSetId),
FirewallRuleId: int64(r.FirewallRuleId),
}
if r.TimeISO8601() != "" {
a.TimeISO8601 = r.TimeISO8601()
}
return a
}
// TimeISO8601 便于 RowToPB 使用
func (r *LogsIngestRow) TimeISO8601() string {
if r.Timestamp.IsZero() {
return ""
}
return r.Timestamp.UTC().Format("2006-01-02T15:04:05Z07:00")
}