Files
waf-platform/EdgeAPI/internal/clickhouse/ns_logs_ingest_store.go
2026-02-13 22:36:17 +08:00

378 lines
9.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package clickhouse
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
// NSLogsIngestRow 对应 dns_logs_ingest 表一行.
type NSLogsIngestRow struct {
Timestamp time.Time
RequestId string
NodeId uint64
ClusterId uint64
DomainId uint64
RecordId uint64
RemoteAddr string
QuestionName string
QuestionType string
RecordName string
RecordType string
RecordValue string
Networking string
IsRecursive bool
Error string
NSRouteCodes []string
ContentJSON string
}
// NSListFilter DNS 日志查询过滤条件.
type NSListFilter struct {
Day string
Size int64
Reverse bool
LastRequestId string
NSClusterId int64
NSNodeId int64
NSDomainId int64
NSRecordId int64
RecordType string
Keyword string
}
// NSLogsIngestStore DNS ClickHouse 查询封装.
type NSLogsIngestStore struct {
client *Client
}
// NewNSLogsIngestStore 创建 NSLogsIngestStore.
func NewNSLogsIngestStore() *NSLogsIngestStore {
return &NSLogsIngestStore{client: NewClient()}
}
// Client 返回底层 client.
func (s *NSLogsIngestStore) Client() *Client {
return s.client
}
// List 列出 DNS 访问日志,返回列表、下一游标与是否有更多.
func (s *NSLogsIngestStore) List(ctx context.Context, f NSListFilter) (rows []*NSLogsIngestRow, nextCursor string, hasMore bool, err error) {
if !s.client.IsConfigured() {
return nil, "", false, fmt.Errorf("clickhouse: not configured")
}
if f.Day == "" {
return nil, "", false, fmt.Errorf("clickhouse: day required")
}
dayNumber, err := normalizeDayNumber(f.Day)
if err != nil {
return nil, "", false, err
}
table := quoteIdent("dns_logs_ingest")
if s.client.cfg.Database != "" && s.client.cfg.Database != "default" {
table = quoteIdent(s.client.cfg.Database) + "." + quoteIdent("dns_logs_ingest")
}
conditions := []string{"toYYYYMMDD(timestamp) = " + strconv.Itoa(dayNumber)}
if f.NSClusterId > 0 {
conditions = append(conditions, "cluster_id = "+strconv.FormatInt(f.NSClusterId, 10))
}
if f.NSNodeId > 0 {
conditions = append(conditions, "node_id = "+strconv.FormatInt(f.NSNodeId, 10))
}
if f.NSDomainId > 0 {
conditions = append(conditions, "domain_id = "+strconv.FormatInt(f.NSDomainId, 10))
}
if f.NSRecordId > 0 {
conditions = append(conditions, "record_id = "+strconv.FormatInt(f.NSRecordId, 10))
}
if f.RecordType != "" {
conditions = append(conditions, "question_type = '"+escapeString(f.RecordType)+"'")
}
if f.Keyword != "" {
keyword := escapeString(f.Keyword)
conditions = append(conditions, fmt.Sprintf("(remote_addr LIKE '%%%s%%' OR question_name LIKE '%%%s%%' OR record_value LIKE '%%%s%%' OR error LIKE '%%%s%%')", keyword, keyword, keyword, keyword))
}
// 游标分页reverse=false 查更旧reverse=true 查更新。
if f.LastRequestId != "" {
if f.Reverse {
conditions = append(conditions, "request_id > '"+escapeString(f.LastRequestId)+"'")
} else {
conditions = append(conditions, "request_id < '"+escapeString(f.LastRequestId)+"'")
}
}
orderDir := "DESC"
if f.Reverse {
orderDir = "ASC"
}
limit := f.Size
if limit <= 0 {
limit = 20
}
if limit > 1000 {
limit = 1000
}
// 列表查询不 SELECT content_json 大字段,减少翻页时的数据传输量。
// 详情查看时通过 FindByRequestId 单独获取完整信息。
query := fmt.Sprintf(
"SELECT timestamp, request_id, node_id, cluster_id, domain_id, record_id, remote_addr, question_name, question_type, record_name, record_type, record_value, networking, is_recursive, error, ns_route_codes FROM %s WHERE %s ORDER BY timestamp %s, request_id %s LIMIT %d",
table,
strings.Join(conditions, " AND "),
orderDir,
orderDir,
limit+1,
)
var rawRows []map[string]interface{}
if err = s.client.Query(ctx, query, &rawRows); err != nil {
return nil, "", false, err
}
rows = make([]*NSLogsIngestRow, 0, len(rawRows))
for _, rawRow := range rawRows {
row := mapToNSLogsIngestRow(rawRow)
if row != nil {
rows = append(rows, row)
}
}
hasMore = len(rows) > int(limit)
if hasMore {
nextCursor = rows[limit].RequestId
rows = rows[:limit]
} else if len(rows) > 0 {
nextCursor = rows[len(rows)-1].RequestId
}
if f.Reverse {
for left, right := 0, len(rows)-1; left < right; left, right = left+1, right-1 {
rows[left], rows[right] = rows[right], rows[left]
}
if len(rows) > 0 {
nextCursor = rows[0].RequestId
}
}
return rows, nextCursor, hasMore, nil
}
// FindByRequestId 按 request_id 查单条 DNS 日志.
func (s *NSLogsIngestStore) FindByRequestId(ctx context.Context, requestId string) (*NSLogsIngestRow, error) {
if !s.client.IsConfigured() {
return nil, fmt.Errorf("clickhouse: not configured")
}
if requestId == "" {
return nil, nil
}
table := quoteIdent("dns_logs_ingest")
if s.client.cfg.Database != "" && s.client.cfg.Database != "default" {
table = quoteIdent(s.client.cfg.Database) + "." + quoteIdent("dns_logs_ingest")
}
query := fmt.Sprintf(
"SELECT timestamp, request_id, node_id, cluster_id, domain_id, record_id, remote_addr, question_name, question_type, record_name, record_type, record_value, networking, is_recursive, error, ns_route_codes, content_json FROM %s WHERE request_id = '%s' LIMIT 1",
table,
escapeString(requestId),
)
var rawRows []map[string]interface{}
if err := s.client.Query(ctx, query, &rawRows); err != nil {
return nil, err
}
if len(rawRows) == 0 {
return nil, nil
}
return mapToNSLogsIngestRow(rawRows[0]), nil
}
func mapToNSLogsIngestRow(row map[string]interface{}) *NSLogsIngestRow {
result := &NSLogsIngestRow{}
u64 := func(key string) uint64 {
value, ok := row[key]
if !ok || value == nil {
return 0
}
switch typed := value.(type) {
case float64:
return uint64(typed)
case string:
number, _ := strconv.ParseUint(typed, 10, 64)
return number
case json.Number:
number, _ := typed.Int64()
return uint64(number)
case int64:
return uint64(typed)
case uint64:
return typed
}
return 0
}
str := func(key string) string {
value, ok := row[key]
if !ok || value == nil {
return ""
}
switch typed := value.(type) {
case string:
return typed
case json.Number:
return typed.String()
default:
return fmt.Sprintf("%v", typed)
}
}
ts := func(key string) time.Time {
value, ok := row[key]
if !ok || value == nil {
return time.Time{}
}
switch typed := value.(type) {
case string:
if typed == "" {
return time.Time{}
}
layouts := []string{
"2006-01-02 15:04:05",
time.RFC3339,
"2006-01-02T15:04:05",
}
for _, layout := range layouts {
if parsed, err := time.Parse(layout, typed); err == nil {
return parsed
}
}
case float64:
return time.Unix(int64(typed), 0)
case json.Number:
number, _ := typed.Int64()
return time.Unix(number, 0)
}
return time.Time{}
}
boolValue := func(key string) bool {
value, ok := row[key]
if !ok || value == nil {
return false
}
switch typed := value.(type) {
case bool:
return typed
case float64:
return typed > 0
case int64:
return typed > 0
case uint64:
return typed > 0
case string:
switch strings.ToLower(strings.TrimSpace(typed)) {
case "1", "true", "yes":
return true
}
}
return false
}
parseStringArray := func(key string) []string {
value, ok := row[key]
if !ok || value == nil {
return nil
}
switch typed := value.(type) {
case []string:
return typed
case []interface{}:
result := make([]string, 0, len(typed))
for _, one := range typed {
if one == nil {
continue
}
result = append(result, fmt.Sprintf("%v", one))
}
return result
case string:
if typed == "" {
return nil
}
var result []string
if json.Unmarshal([]byte(typed), &result) == nil {
return result
}
return []string{typed}
}
return nil
}
result.Timestamp = ts("timestamp")
result.RequestId = str("request_id")
result.NodeId = u64("node_id")
result.ClusterId = u64("cluster_id")
result.DomainId = u64("domain_id")
result.RecordId = u64("record_id")
result.RemoteAddr = str("remote_addr")
result.QuestionName = str("question_name")
result.QuestionType = str("question_type")
result.RecordName = str("record_name")
result.RecordType = str("record_type")
result.RecordValue = str("record_value")
result.Networking = str("networking")
result.IsRecursive = boolValue("is_recursive")
result.Error = str("error")
result.NSRouteCodes = parseStringArray("ns_route_codes")
result.ContentJSON = str("content_json")
return result
}
// NSRowToPB 将 ClickHouse 行转换为 pb.NSAccessLog.
func NSRowToPB(row *NSLogsIngestRow) *pb.NSAccessLog {
if row == nil {
return nil
}
log := &pb.NSAccessLog{
NsNodeId: int64(row.NodeId),
NsDomainId: int64(row.DomainId),
NsRecordId: int64(row.RecordId),
NsRouteCodes: row.NSRouteCodes,
RemoteAddr: row.RemoteAddr,
QuestionName: row.QuestionName,
QuestionType: row.QuestionType,
RecordName: row.RecordName,
RecordType: row.RecordType,
RecordValue: row.RecordValue,
Networking: row.Networking,
Timestamp: row.Timestamp.Unix(),
RequestId: row.RequestId,
Error: row.Error,
IsRecursive: row.IsRecursive,
}
if !row.Timestamp.IsZero() {
log.TimeLocal = row.Timestamp.Format("2/Jan/2006:15:04:05 -0700")
}
if row.ContentJSON != "" {
contentLog := &pb.NSAccessLog{}
if json.Unmarshal([]byte(row.ContentJSON), contentLog) == nil {
contentLog.RequestId = row.RequestId
return contentLog
}
}
return log
}