392 lines
11 KiB
Go
392 lines
11 KiB
Go
// Package clickhouse 提供 logs_ingest 表的只读查询(列表分页),用于访问日志列表优先走 ClickHouse。
|
||
|
||
package clickhouse
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||
)
|
||
|
||
// LogsIngestRow 对应 ClickHouse logs_ingest 表的一行(用于 List 结果与 RowToPB)
|
||
type LogsIngestRow struct {
|
||
Timestamp time.Time
|
||
NodeId uint64
|
||
ClusterId uint64
|
||
ServerId uint64
|
||
Host string
|
||
IP string
|
||
Method string
|
||
Path string
|
||
Status uint16
|
||
BytesIn uint64
|
||
BytesOut uint64
|
||
CostMs uint32
|
||
UA string
|
||
Referer string
|
||
LogType string
|
||
TraceId string
|
||
FirewallPolicyId uint64
|
||
FirewallRuleGroupId uint64
|
||
FirewallRuleSetId uint64
|
||
FirewallRuleId uint64
|
||
RequestHeaders string
|
||
RequestBody string
|
||
ResponseHeaders string
|
||
ResponseBody string
|
||
}
|
||
|
||
// ListFilter 列表查询条件(与 ListHTTPAccessLogsRequest 对齐)
|
||
type ListFilter struct {
|
||
Day string
|
||
HourFrom string
|
||
HourTo string
|
||
Size int64
|
||
Reverse bool
|
||
HasError bool
|
||
HasFirewallPolicy bool
|
||
FirewallPolicyId int64
|
||
NodeId int64
|
||
ClusterId int64
|
||
LastRequestId string
|
||
ServerIds []int64
|
||
NodeIds []int64
|
||
// 搜索条件
|
||
Keyword string
|
||
Ip string
|
||
Domain string
|
||
}
|
||
|
||
// LogsIngestStore 封装对 logs_ingest 的只读列表查询
|
||
type LogsIngestStore struct {
|
||
client *Client
|
||
}
|
||
|
||
// NewLogsIngestStore 创建 store,内部使用共享 Client
|
||
func NewLogsIngestStore() *LogsIngestStore {
|
||
return &LogsIngestStore{client: NewClient()}
|
||
}
|
||
|
||
// Client 返回底层 Client,供调用方判断 IsConfigured()
|
||
func (s *LogsIngestStore) Client() *Client {
|
||
return s.client
|
||
}
|
||
|
||
// List 按条件分页查询 logs_ingest,返回行、下一页游标(trace_id)与错误
|
||
func (s *LogsIngestStore) List(ctx context.Context, f ListFilter) (rows []*LogsIngestRow, nextCursor string, err error) {
|
||
if !s.client.IsConfigured() {
|
||
return nil, "", fmt.Errorf("clickhouse: not configured")
|
||
}
|
||
if f.Day == "" {
|
||
return nil, "", fmt.Errorf("clickhouse: day required")
|
||
}
|
||
table := "logs_ingest"
|
||
if s.client.cfg.Database != "" && s.client.cfg.Database != "default" {
|
||
table = quoteIdent(s.client.cfg.Database) + "." + quoteIdent("logs_ingest")
|
||
} else {
|
||
table = quoteIdent(table)
|
||
}
|
||
|
||
conditions := []string{"toDate(timestamp) = '" + escapeString(f.Day) + "'"}
|
||
if f.HourFrom != "" {
|
||
if _, err := strconv.Atoi(f.HourFrom); err == nil {
|
||
conditions = append(conditions, "toHour(timestamp) >= "+f.HourFrom)
|
||
}
|
||
}
|
||
if f.HourTo != "" {
|
||
if _, err := strconv.Atoi(f.HourTo); err == nil {
|
||
conditions = append(conditions, "toHour(timestamp) <= "+f.HourTo)
|
||
}
|
||
}
|
||
if len(f.ServerIds) > 0 {
|
||
parts := make([]string, 0, len(f.ServerIds))
|
||
for _, id := range f.ServerIds {
|
||
parts = append(parts, strconv.FormatInt(id, 10))
|
||
}
|
||
conditions = append(conditions, "server_id IN ("+strings.Join(parts, ",")+")")
|
||
}
|
||
if len(f.NodeIds) > 0 {
|
||
parts := make([]string, 0, len(f.NodeIds))
|
||
for _, id := range f.NodeIds {
|
||
parts = append(parts, strconv.FormatInt(id, 10))
|
||
}
|
||
conditions = append(conditions, "node_id IN ("+strings.Join(parts, ",")+")")
|
||
}
|
||
if f.NodeId > 0 {
|
||
conditions = append(conditions, "node_id = "+strconv.FormatInt(f.NodeId, 10))
|
||
}
|
||
if f.ClusterId > 0 {
|
||
conditions = append(conditions, "cluster_id = "+strconv.FormatInt(f.ClusterId, 10))
|
||
}
|
||
if f.HasFirewallPolicy {
|
||
conditions = append(conditions, "firewall_policy_id > 0")
|
||
}
|
||
if f.HasError {
|
||
conditions = append(conditions, "status >= 400")
|
||
}
|
||
if f.FirewallPolicyId > 0 {
|
||
conditions = append(conditions, "firewall_policy_id = "+strconv.FormatInt(f.FirewallPolicyId, 10))
|
||
}
|
||
|
||
// 搜索条件
|
||
if f.Keyword != "" {
|
||
keyword := escapeString(f.Keyword)
|
||
// 在 host, path, ip, ua 中模糊搜索
|
||
conditions = append(conditions, fmt.Sprintf("(host LIKE '%%%s%%' OR path LIKE '%%%s%%' OR ip LIKE '%%%s%%' OR ua LIKE '%%%s%%')", keyword, keyword, keyword, keyword))
|
||
}
|
||
if f.Ip != "" {
|
||
conditions = append(conditions, "ip = '"+escapeString(f.Ip)+"'")
|
||
}
|
||
if f.Domain != "" {
|
||
conditions = append(conditions, "host LIKE '%"+escapeString(f.Domain)+"%'")
|
||
}
|
||
|
||
// 游标分页:使用 trace_id 作为游标
|
||
// Reverse=false:历史向后翻页,查询更早的数据
|
||
// Reverse=true:实时增量拉新,查询更新的数据
|
||
if f.LastRequestId != "" {
|
||
if f.Reverse {
|
||
conditions = append(conditions, "trace_id > '"+escapeString(f.LastRequestId)+"'")
|
||
} else {
|
||
conditions = append(conditions, "trace_id < '"+escapeString(f.LastRequestId)+"'")
|
||
}
|
||
}
|
||
|
||
where := strings.Join(conditions, " AND ")
|
||
// 默认按时间倒序(最新的在前面),与前端默认行为一致
|
||
orderDir := "DESC"
|
||
if f.Reverse {
|
||
orderDir = "ASC"
|
||
}
|
||
limit := f.Size
|
||
if limit <= 0 {
|
||
limit = 20
|
||
}
|
||
if limit > 1000 {
|
||
limit = 1000
|
||
}
|
||
orderBy := fmt.Sprintf("timestamp %s, trace_id %s", orderDir, orderDir)
|
||
|
||
query := fmt.Sprintf("SELECT timestamp, node_id, cluster_id, server_id, host, ip, method, path, status, bytes_in, bytes_out, cost_ms, ua, referer, log_type, trace_id, firewall_policy_id, firewall_rule_group_id, firewall_rule_set_id, firewall_rule_id, request_headers, request_body, response_headers, response_body FROM %s WHERE %s ORDER BY %s LIMIT %d",
|
||
table, where, orderBy, limit+1)
|
||
|
||
var rawRows []map[string]interface{}
|
||
if err = s.client.Query(ctx, query, &rawRows); err != nil {
|
||
return nil, "", err
|
||
}
|
||
|
||
rows = make([]*LogsIngestRow, 0, len(rawRows))
|
||
for _, m := range rawRows {
|
||
r := mapToLogsIngestRow(m)
|
||
if r != nil {
|
||
rows = append(rows, r)
|
||
}
|
||
}
|
||
if !f.Reverse {
|
||
if len(rows) > int(limit) {
|
||
nextCursor = rows[limit].TraceId
|
||
rows = rows[:limit]
|
||
}
|
||
return rows, nextCursor, nil
|
||
}
|
||
|
||
if len(rows) > int(limit) {
|
||
rows = rows[:limit]
|
||
}
|
||
if len(rows) > 0 {
|
||
nextCursor = rows[len(rows)-1].TraceId
|
||
}
|
||
|
||
// 实时模式统一返回为“最新在前”,与前端显示和 MySQL 语义一致。
|
||
for left, right := 0, len(rows)-1; left < right; left, right = left+1, right-1 {
|
||
rows[left], rows[right] = rows[right], rows[left]
|
||
}
|
||
return rows, nextCursor, nil
|
||
}
|
||
|
||
// FindByTraceId 按 trace_id 查询单条日志详情
|
||
func (s *LogsIngestStore) FindByTraceId(ctx context.Context, traceId string) (*LogsIngestRow, error) {
|
||
if !s.client.IsConfigured() {
|
||
return nil, fmt.Errorf("clickhouse: not configured")
|
||
}
|
||
if traceId == "" {
|
||
return nil, nil
|
||
}
|
||
|
||
table := quoteIdent("logs_ingest")
|
||
query := fmt.Sprintf("SELECT timestamp, node_id, cluster_id, server_id, host, ip, method, path, status, bytes_in, bytes_out, cost_ms, ua, referer, log_type, trace_id, firewall_policy_id, firewall_rule_group_id, firewall_rule_set_id, firewall_rule_id, request_headers, request_body, response_headers, response_body FROM %s WHERE trace_id = '%s' LIMIT 1",
|
||
table, escapeString(traceId))
|
||
|
||
var rawRows []map[string]interface{}
|
||
if err := s.client.Query(ctx, query, &rawRows); err != nil {
|
||
return nil, err
|
||
}
|
||
if len(rawRows) == 0 {
|
||
return nil, nil
|
||
}
|
||
return mapToLogsIngestRow(rawRows[0]), nil
|
||
}
|
||
|
||
func quoteIdent(name string) string {
|
||
return "`" + strings.ReplaceAll(name, "`", "``") + "`"
|
||
}
|
||
|
||
func escapeString(s string) string {
|
||
return strings.ReplaceAll(s, "'", "''")
|
||
}
|
||
|
||
func mapToLogsIngestRow(m map[string]interface{}) *LogsIngestRow {
|
||
r := &LogsIngestRow{}
|
||
u64 := func(key string) uint64 {
|
||
v, ok := m[key]
|
||
if !ok || v == nil {
|
||
return 0
|
||
}
|
||
switch x := v.(type) {
|
||
case float64:
|
||
return uint64(x)
|
||
case string:
|
||
n, _ := strconv.ParseUint(x, 10, 64)
|
||
return n
|
||
case json.Number:
|
||
n, _ := x.Int64()
|
||
return uint64(n)
|
||
}
|
||
return 0
|
||
}
|
||
u32 := func(key string) uint32 {
|
||
return uint32(u64(key))
|
||
}
|
||
str := func(key string) string {
|
||
if v, ok := m[key]; ok && v != nil {
|
||
if s, ok := v.(string); ok {
|
||
return s
|
||
}
|
||
}
|
||
return ""
|
||
}
|
||
ts := func(key string) time.Time {
|
||
v, ok := m[key]
|
||
if ok && v != nil {
|
||
switch x := v.(type) {
|
||
case string:
|
||
t, _ := time.Parse("2006-01-02 15:04:05", x)
|
||
return t
|
||
case float64:
|
||
return time.Unix(int64(x), 0)
|
||
case json.Number:
|
||
n, _ := x.Int64()
|
||
return time.Unix(n, 0)
|
||
}
|
||
}
|
||
return time.Time{}
|
||
}
|
||
|
||
r.Timestamp = ts("timestamp")
|
||
r.NodeId = u64("node_id")
|
||
r.ClusterId = u64("cluster_id")
|
||
r.ServerId = u64("server_id")
|
||
r.Host = str("host")
|
||
r.IP = str("ip")
|
||
r.Method = str("method")
|
||
r.Path = str("path")
|
||
r.Status = uint16(u64("status"))
|
||
r.BytesIn = u64("bytes_in")
|
||
r.BytesOut = u64("bytes_out")
|
||
r.CostMs = u32("cost_ms")
|
||
r.UA = str("ua")
|
||
r.Referer = str("referer")
|
||
r.LogType = str("log_type")
|
||
r.TraceId = str("trace_id")
|
||
r.FirewallPolicyId = u64("firewall_policy_id")
|
||
r.FirewallRuleGroupId = u64("firewall_rule_group_id")
|
||
r.FirewallRuleSetId = u64("firewall_rule_set_id")
|
||
r.FirewallRuleId = u64("firewall_rule_id")
|
||
r.RequestHeaders = str("request_headers")
|
||
r.RequestBody = str("request_body")
|
||
r.ResponseHeaders = str("response_headers")
|
||
r.ResponseBody = str("response_body")
|
||
return r
|
||
}
|
||
|
||
// RowToPB 将 logs_ingest 一行转为 pb.HTTPAccessLog(列表展示用+详情展示)
|
||
func RowToPB(r *LogsIngestRow) *pb.HTTPAccessLog {
|
||
if r == nil {
|
||
return nil
|
||
}
|
||
a := &pb.HTTPAccessLog{
|
||
RequestId: r.TraceId,
|
||
ServerId: int64(r.ServerId),
|
||
NodeId: int64(r.NodeId),
|
||
Timestamp: r.Timestamp.Unix(),
|
||
Host: r.Host,
|
||
RawRemoteAddr: r.IP,
|
||
RemoteAddr: r.IP,
|
||
RequestMethod: r.Method,
|
||
RequestPath: r.Path,
|
||
RequestURI: r.Path, // 前端使用 requestURI 显示完整路径
|
||
Scheme: "http", // 默认 http,日志中未存储实际值
|
||
Proto: "HTTP/1.1", // 默认值,日志中未存储实际值
|
||
Status: int32(r.Status),
|
||
RequestLength: int64(r.BytesIn),
|
||
BytesSent: int64(r.BytesOut),
|
||
RequestTime: float64(r.CostMs) / 1000,
|
||
UserAgent: r.UA,
|
||
Referer: r.Referer,
|
||
FirewallPolicyId: int64(r.FirewallPolicyId),
|
||
FirewallRuleGroupId: int64(r.FirewallRuleGroupId),
|
||
FirewallRuleSetId: int64(r.FirewallRuleSetId),
|
||
FirewallRuleId: int64(r.FirewallRuleId),
|
||
}
|
||
if r.TimeISO8601() != "" {
|
||
a.TimeISO8601 = r.TimeISO8601()
|
||
}
|
||
// TimeLocal: 用户友好的时间格式 (e.g., "2026-02-07 23:17:12")
|
||
if !r.Timestamp.IsZero() {
|
||
a.TimeLocal = r.Timestamp.Format("2006-01-02 15:04:05")
|
||
}
|
||
|
||
// 解析请求头 (JSON -> map[string]*pb.Strings)
|
||
// ClickHouse 中存储的是 map[string]string 格式
|
||
if r.RequestHeaders != "" {
|
||
var headers map[string]string
|
||
if err := json.Unmarshal([]byte(r.RequestHeaders), &headers); err == nil {
|
||
a.Header = make(map[string]*pb.Strings)
|
||
for k, v := range headers {
|
||
a.Header[k] = &pb.Strings{Values: []string{v}}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 解析响应头 (JSON -> map[string]*pb.Strings)
|
||
if r.ResponseHeaders != "" {
|
||
var headers map[string]string
|
||
if err := json.Unmarshal([]byte(r.ResponseHeaders), &headers); err == nil {
|
||
a.SentHeader = make(map[string]*pb.Strings)
|
||
for k, v := range headers {
|
||
a.SentHeader[k] = &pb.Strings{Values: []string{v}}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 请求体
|
||
if r.RequestBody != "" {
|
||
a.RequestBody = []byte(r.RequestBody)
|
||
}
|
||
|
||
return a
|
||
}
|
||
|
||
// TimeISO8601 便于 RowToPB 使用
|
||
func (r *LogsIngestRow) TimeISO8601() string {
|
||
if r.Timestamp.IsZero() {
|
||
return ""
|
||
}
|
||
return r.Timestamp.UTC().Format("2006-01-02T15:04:05Z07:00")
|
||
}
|