package clickhouse import ( "context" "encoding/json" "fmt" "strconv" "strings" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" ) const httpDNSAccessLogsTable = "httpdns_access_logs_ingest" type HTTPDNSAccessLogRow struct { RequestId string ClusterId int64 NodeId int64 AppId string AppName string Domain string QType string ClientIP string ClientRegion string Carrier string SDKVersion string OS string ResultIPs string Status string ErrorCode string CostMs int32 CreatedAt int64 Day string Summary string } type HTTPDNSAccessLogListFilter struct { Day string ClusterId int64 NodeId int64 AppId string AppIds []string Domain string Status string Keyword string Offset int64 Size int64 } type HTTPDNSAccessLogsStore struct { client *Client } func NewHTTPDNSAccessLogsStore() *HTTPDNSAccessLogsStore { return &HTTPDNSAccessLogsStore{client: NewClient()} } func (s *HTTPDNSAccessLogsStore) Client() *Client { return s.client } func (s *HTTPDNSAccessLogsStore) Insert(ctx context.Context, logs []*pb.HTTPDNSAccessLog) error { if len(logs) == 0 { return nil } if !s.client.IsConfigured() { return fmt.Errorf("clickhouse: not configured") } rows := make([]map[string]interface{}, 0, len(logs)) for _, item := range logs { if item == nil { continue } rows = append(rows, map[string]interface{}{ "request_id": item.GetRequestId(), "cluster_id": item.GetClusterId(), "node_id": item.GetNodeId(), "app_id": item.GetAppId(), "app_name": item.GetAppName(), "domain": item.GetDomain(), "qtype": item.GetQtype(), "client_ip": item.GetClientIP(), "client_region": item.GetClientRegion(), "carrier": item.GetCarrier(), "sdk_version": item.GetSdkVersion(), "os": item.GetOs(), "result_ips": item.GetResultIPs(), "status": item.GetStatus(), "error_code": item.GetErrorCode(), "cost_ms": item.GetCostMs(), "created_at": item.GetCreatedAt(), "day": item.GetDay(), "summary": item.GetSummary(), }) } query := fmt.Sprintf("INSERT INTO %s (request_id, cluster_id, node_id, app_id, app_name, domain, qtype, client_ip, client_region, carrier, sdk_version, os, result_ips, status, error_code, cost_ms, created_at, day, summary)", s.tableName()) return s.client.InsertJSONEachRow(ctx, query, rows) } func (s *HTTPDNSAccessLogsStore) Count(ctx context.Context, f HTTPDNSAccessLogListFilter) (int64, error) { if !s.client.IsConfigured() { return 0, fmt.Errorf("clickhouse: not configured") } conditions := s.buildConditions(f) query := fmt.Sprintf("SELECT count() AS count FROM %s", s.tableName()) if len(conditions) > 0 { query += " WHERE " + strings.Join(conditions, " AND ") } row := map[string]interface{}{} if err := s.client.QueryRow(ctx, query, &row); err != nil { return 0, err } return toInt64(row["count"]), nil } func (s *HTTPDNSAccessLogsStore) List(ctx context.Context, f HTTPDNSAccessLogListFilter) ([]*HTTPDNSAccessLogRow, error) { if !s.client.IsConfigured() { return nil, fmt.Errorf("clickhouse: not configured") } size := f.Size if size <= 0 { size = 20 } if size > 1000 { size = 1000 } offset := f.Offset if offset < 0 { offset = 0 } conditions := s.buildConditions(f) query := fmt.Sprintf("SELECT request_id, cluster_id, node_id, app_id, app_name, domain, qtype, client_ip, client_region, carrier, sdk_version, os, result_ips, status, error_code, cost_ms, created_at, day, summary FROM %s", s.tableName()) if len(conditions) > 0 { query += " WHERE " + strings.Join(conditions, " AND ") } query += " ORDER BY created_at DESC, request_id DESC" query += fmt.Sprintf(" LIMIT %d OFFSET %d", size, offset) rawRows := []map[string]interface{}{} if err := s.client.Query(ctx, query, &rawRows); err != nil { return nil, err } result := make([]*HTTPDNSAccessLogRow, 0, len(rawRows)) for _, row := range rawRows { result = append(result, &HTTPDNSAccessLogRow{ RequestId: toString(row["request_id"]), ClusterId: toInt64(row["cluster_id"]), NodeId: toInt64(row["node_id"]), AppId: toString(row["app_id"]), AppName: toString(row["app_name"]), Domain: toString(row["domain"]), QType: toString(row["qtype"]), ClientIP: toString(row["client_ip"]), ClientRegion: toString(row["client_region"]), Carrier: toString(row["carrier"]), SDKVersion: toString(row["sdk_version"]), OS: toString(row["os"]), ResultIPs: toString(row["result_ips"]), Status: toString(row["status"]), ErrorCode: toString(row["error_code"]), CostMs: int32(toInt64(row["cost_ms"])), CreatedAt: toInt64(row["created_at"]), Day: toString(row["day"]), Summary: toString(row["summary"]), }) } return result, nil } func HTTPDNSRowToPB(row *HTTPDNSAccessLogRow) *pb.HTTPDNSAccessLog { if row == nil { return nil } return &pb.HTTPDNSAccessLog{ RequestId: row.RequestId, ClusterId: row.ClusterId, NodeId: row.NodeId, AppId: row.AppId, AppName: row.AppName, Domain: row.Domain, Qtype: row.QType, ClientIP: row.ClientIP, ClientRegion: row.ClientRegion, Carrier: row.Carrier, SdkVersion: row.SDKVersion, Os: row.OS, ResultIPs: row.ResultIPs, Status: row.Status, ErrorCode: row.ErrorCode, CostMs: row.CostMs, CreatedAt: row.CreatedAt, Day: row.Day, Summary: row.Summary, } } func (s *HTTPDNSAccessLogsStore) buildConditions(f HTTPDNSAccessLogListFilter) []string { conditions := []string{} if day := strings.TrimSpace(f.Day); day != "" { conditions = append(conditions, "day = '"+escapeString(day)+"'") } if f.ClusterId > 0 { conditions = append(conditions, "cluster_id = "+strconv.FormatInt(f.ClusterId, 10)) } if f.NodeId > 0 { conditions = append(conditions, "node_id = "+strconv.FormatInt(f.NodeId, 10)) } if appID := strings.TrimSpace(f.AppId); appID != "" { conditions = append(conditions, "app_id = '"+escapeString(appID)+"'") } else if len(f.AppIds) > 0 { validAppIds := make([]string, 0, len(f.AppIds)) for _, appID := range f.AppIds { appID = strings.TrimSpace(appID) if len(appID) == 0 { continue } validAppIds = append(validAppIds, "'"+escapeString(appID)+"'") } if len(validAppIds) == 0 { conditions = append(conditions, "1 = 0") } else { conditions = append(conditions, "app_id IN ("+strings.Join(validAppIds, ",")+")") } } if domain := strings.TrimSpace(f.Domain); domain != "" { conditions = append(conditions, "domain = '"+escapeString(domain)+"'") } if status := strings.TrimSpace(f.Status); status != "" { conditions = append(conditions, "status = '"+escapeString(status)+"'") } if keyword := strings.TrimSpace(f.Keyword); keyword != "" { kw := escapeString(keyword) conditions = append(conditions, "(summary LIKE '%"+kw+"%' OR app_name LIKE '%"+kw+"%' OR client_ip LIKE '%"+kw+"%' OR result_ips LIKE '%"+kw+"%')") } return conditions } func (s *HTTPDNSAccessLogsStore) tableName() string { if s.client != nil && s.client.cfg != nil && s.client.cfg.Database != "" && s.client.cfg.Database != "default" { return quoteIdent(s.client.cfg.Database) + "." + quoteIdent(httpDNSAccessLogsTable) } return quoteIdent(httpDNSAccessLogsTable) } func toString(value interface{}) string { if value == nil { return "" } switch v := value.(type) { case string: return v case json.Number: return v.String() default: return fmt.Sprintf("%v", v) } } func toInt64(value interface{}) int64 { if value == nil { return 0 } switch v := value.(type) { case int: return int64(v) case int32: return int64(v) case int64: return v case uint32: return int64(v) case uint64: return int64(v) case float64: return int64(v) case json.Number: n, _ := v.Int64() return n case string: n, _ := strconv.ParseInt(v, 10, 64) return n default: return 0 } }