Initial commit (code only without large binaries)

This commit is contained in:
robin
2026-02-15 18:58:44 +08:00
commit 35df75498f
9442 changed files with 1495866 additions and 0 deletions

View File

@@ -0,0 +1,147 @@
package clickhouse
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
)
// Client 通过 HTTP 接口执行只读查询SELECT返回 JSONEachRow 解析为 map 或结构体
type Client struct {
cfg *Config
httpCli *http.Client
}
// NewClient 使用共享配置创建客户端
func NewClient() *Client {
cfg := SharedConfig()
transport := &http.Transport{}
if cfg != nil && strings.EqualFold(cfg.Scheme, "https") {
transport.TLSClientConfig = &tls.Config{
InsecureSkipVerify: cfg.TLSSkipVerify,
ServerName: cfg.TLSServerName,
}
}
return &Client{
cfg: cfg,
httpCli: &http.Client{
Timeout: 30 * time.Second,
Transport: transport,
},
}
}
// IsConfigured 是否已配置
func (c *Client) IsConfigured() bool {
return c.cfg != nil && c.cfg.IsConfigured()
}
// Query 执行 SELECT将每行 JSON 解析到 dest 切片dest 元素类型需为 *struct 或 map
func (c *Client) Query(ctx context.Context, query string, dest interface{}) error {
if !c.IsConfigured() {
return fmt.Errorf("clickhouse: not configured")
}
// 强制 JSONEachRow 便于解析
q := strings.TrimSpace(query)
if !strings.HasSuffix(strings.ToUpper(q), "FORMAT JSONEACHROW") {
query = q + " FORMAT JSONEachRow"
}
u := c.buildURL(query)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return err
}
if c.cfg.User != "" || c.cfg.Password != "" {
req.SetBasicAuth(c.cfg.User, c.cfg.Password)
}
resp, err := c.httpCli.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("clickhouse HTTP %d: %s", resp.StatusCode, string(body))
}
dec := json.NewDecoder(resp.Body)
return decodeRows(dec, dest)
}
// QueryRow 执行仅返回一行的查询,将结果解析到 dest*struct 或 *map
func (c *Client) QueryRow(ctx context.Context, query string, dest interface{}) error {
if !c.IsConfigured() {
return fmt.Errorf("clickhouse: not configured")
}
q := strings.TrimSpace(query)
if !strings.HasSuffix(strings.ToUpper(q), "FORMAT JSONEACHROW") {
query = q + " FORMAT JSONEachRow"
}
u := c.buildURL(query)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return err
}
if c.cfg.User != "" || c.cfg.Password != "" {
req.SetBasicAuth(c.cfg.User, c.cfg.Password)
}
resp, err := c.httpCli.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("clickhouse HTTP %d: %s", resp.StatusCode, string(body))
}
dec := json.NewDecoder(resp.Body)
return decodeOneRow(dec, dest)
}
func (c *Client) buildURL(query string) string {
scheme := "http"
if c.cfg != nil && strings.EqualFold(c.cfg.Scheme, "https") {
scheme = "https"
}
rawURL := fmt.Sprintf("%s://%s:%d/?query=%s&database=%s",
scheme, c.cfg.Host, c.cfg.Port, url.QueryEscape(query), url.QueryEscape(c.cfg.Database))
return rawURL
}
// decodeRows 将 JSONEachRow 流解析到 slice元素类型须为 *struct 或 *[]map[string]interface{}
func decodeRows(dec *json.Decoder, dest interface{}) error {
// dest 应为 *[]*SomeStruct 或 *[]map[string]interface{}
switch d := dest.(type) {
case *[]map[string]interface{}:
*d = (*d)[:0]
for {
var row map[string]interface{}
if err := dec.Decode(&row); err != nil {
if err == io.EOF {
return nil
}
return err
}
*d = append(*d, row)
}
default:
return fmt.Errorf("clickhouse: unsupported dest type for Query (use *[]map[string]interface{} or implement decoder)")
}
}
func decodeOneRow(dec *json.Decoder, dest interface{}) error {
switch d := dest.(type) {
case *map[string]interface{}:
if err := dec.Decode(d); err != nil {
return err
}
return nil
default:
return fmt.Errorf("clickhouse: unsupported dest type for QueryRow (use *map[string]interface{})")
}
}

View File

@@ -0,0 +1,134 @@
// Package clickhouse 提供 ClickHouse 只读客户端,用于查询 logs_ingestFluent Bit 写入)。
// 配置优先从后台页面edgeSysSettings.clickhouseConfig读取其次 api.yaml最后环境变量。
package clickhouse
import (
"github.com/TeaOSLab/EdgeAPI/internal/configs"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"os"
"strconv"
"strings"
"sync"
)
const (
envHost = "CLICKHOUSE_HOST"
envPort = "CLICKHOUSE_PORT"
envUser = "CLICKHOUSE_USER"
envPassword = "CLICKHOUSE_PASSWORD"
envDatabase = "CLICKHOUSE_DATABASE"
envScheme = "CLICKHOUSE_SCHEME"
defaultPort = 8443
defaultDB = "default"
defaultScheme = "https"
)
var (
sharedConfig *Config
configOnce sync.Once
configLocker sync.Mutex
)
// Config ClickHouse 连接配置(仅查询,不从代码写库)
type Config struct {
Host string
Port int
User string
Password string
Database string
Scheme string
TLSSkipVerify bool
TLSServerName string
}
// SharedConfig 返回全局配置(优先从后台 DB 读取,其次 api.yaml最后环境变量
func SharedConfig() *Config {
configLocker.Lock()
defer configLocker.Unlock()
if sharedConfig != nil {
return sharedConfig
}
sharedConfig = loadConfig()
return sharedConfig
}
// ResetSharedConfig 清空缓存,下次 SharedConfig() 时重新从 DB/文件/环境变量加载(后台保存 ClickHouse 配置后调用)
func ResetSharedConfig() {
configLocker.Lock()
defer configLocker.Unlock()
sharedConfig = nil
}
func loadConfig() *Config {
cfg := &Config{Port: defaultPort, Database: defaultDB, Scheme: defaultScheme, TLSSkipVerify: true}
// 1) 优先从后台页面配置DB读取
if models.SharedSysSettingDAO != nil {
if dbCfg, err := models.SharedSysSettingDAO.ReadClickHouseConfig(nil); err == nil && dbCfg != nil && dbCfg.Host != "" {
cfg.Host = dbCfg.Host
cfg.Port = dbCfg.Port
cfg.User = dbCfg.User
cfg.Password = dbCfg.Password
cfg.Database = dbCfg.Database
cfg.Scheme = normalizeScheme(dbCfg.Scheme)
cfg.TLSSkipVerify = true
cfg.TLSServerName = ""
if cfg.Port <= 0 {
cfg.Port = defaultPort
}
if cfg.Database == "" {
cfg.Database = defaultDB
}
return cfg
}
}
// 2) 其次 api.yaml
apiConfig, err := configs.SharedAPIConfig()
if err == nil && apiConfig != nil && apiConfig.ClickHouse != nil && apiConfig.ClickHouse.Host != "" {
ch := apiConfig.ClickHouse
cfg.Host = ch.Host
cfg.Port = ch.Port
cfg.User = ch.User
cfg.Password = ch.Password
cfg.Database = ch.Database
cfg.Scheme = normalizeScheme(ch.Scheme)
cfg.TLSSkipVerify = true
cfg.TLSServerName = ""
if cfg.Port <= 0 {
cfg.Port = defaultPort
}
if cfg.Database == "" {
cfg.Database = defaultDB
}
return cfg
}
// 3) 最后环境变量
cfg.Host = os.Getenv(envHost)
cfg.User = os.Getenv(envUser)
cfg.Password = os.Getenv(envPassword)
cfg.Database = os.Getenv(envDatabase)
if cfg.Database == "" {
cfg.Database = defaultDB
}
cfg.Scheme = normalizeScheme(os.Getenv(envScheme))
cfg.TLSServerName = ""
if p := os.Getenv(envPort); p != "" {
if v, err := strconv.Atoi(p); err == nil {
cfg.Port = v
}
}
cfg.TLSSkipVerify = true
return cfg
}
func normalizeScheme(scheme string) string {
s := strings.ToLower(strings.TrimSpace(scheme))
if s == "https" {
return "https"
}
return defaultScheme
}
// IsConfigured 是否已配置Host 非空即视为启用 ClickHouse 查询)
func (c *Config) IsConfigured() bool {
return c != nil && c.Host != ""
}

View File

@@ -0,0 +1,413 @@
// Package clickhouse 提供 logs_ingest 表的只读查询(列表分页),用于访问日志列表优先走 ClickHouse。
package clickhouse
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
// LogsIngestRow 对应 ClickHouse logs_ingest 表的一行(用于 List 结果与 RowToPB
type LogsIngestRow struct {
Timestamp time.Time
NodeId uint64
ClusterId uint64
ServerId uint64
Host string
IP string
Method string
Path string
Status uint16
BytesIn uint64
BytesOut uint64
CostMs uint32
UA string
Referer string
LogType string
TraceId string
FirewallPolicyId uint64
FirewallRuleGroupId uint64
FirewallRuleSetId uint64
FirewallRuleId uint64
RequestHeaders string
RequestBody string
ResponseHeaders string
ResponseBody string
}
// ListFilter 列表查询条件(与 ListHTTPAccessLogsRequest 对齐)
type ListFilter struct {
Day string
HourFrom string
HourTo string
Size int64
Reverse bool
HasError bool
HasFirewallPolicy bool
FirewallPolicyId int64
NodeId int64
ClusterId int64
LastRequestId string
ServerIds []int64
NodeIds []int64
// 搜索条件
Keyword string
Ip string
Domain string
}
// LogsIngestStore 封装对 logs_ingest 的只读列表查询
type LogsIngestStore struct {
client *Client
}
// NewLogsIngestStore 创建 store内部使用共享 Client
func NewLogsIngestStore() *LogsIngestStore {
return &LogsIngestStore{client: NewClient()}
}
// Client 返回底层 Client供调用方判断 IsConfigured()
func (s *LogsIngestStore) Client() *Client {
return s.client
}
// List 按条件分页查询 logs_ingest返回行、下一页游标trace_id与错误
func (s *LogsIngestStore) List(ctx context.Context, f ListFilter) (rows []*LogsIngestRow, nextCursor string, err error) {
if !s.client.IsConfigured() {
return nil, "", fmt.Errorf("clickhouse: not configured")
}
if f.Day == "" {
return nil, "", fmt.Errorf("clickhouse: day required")
}
dayNumber, err := normalizeDayNumber(f.Day)
if err != nil {
return nil, "", err
}
table := "logs_ingest"
if s.client.cfg.Database != "" && s.client.cfg.Database != "default" {
table = quoteIdent(s.client.cfg.Database) + "." + quoteIdent("logs_ingest")
} else {
table = quoteIdent(table)
}
conditions := []string{"toYYYYMMDD(timestamp) = " + strconv.Itoa(dayNumber)}
if f.HourFrom != "" {
if _, err := strconv.Atoi(f.HourFrom); err == nil {
conditions = append(conditions, "toHour(timestamp) >= "+f.HourFrom)
}
}
if f.HourTo != "" {
if _, err := strconv.Atoi(f.HourTo); err == nil {
conditions = append(conditions, "toHour(timestamp) <= "+f.HourTo)
}
}
if len(f.ServerIds) > 0 {
parts := make([]string, 0, len(f.ServerIds))
for _, id := range f.ServerIds {
parts = append(parts, strconv.FormatInt(id, 10))
}
conditions = append(conditions, "server_id IN ("+strings.Join(parts, ",")+")")
}
if len(f.NodeIds) > 0 {
parts := make([]string, 0, len(f.NodeIds))
for _, id := range f.NodeIds {
parts = append(parts, strconv.FormatInt(id, 10))
}
conditions = append(conditions, "node_id IN ("+strings.Join(parts, ",")+")")
}
if f.NodeId > 0 {
conditions = append(conditions, "node_id = "+strconv.FormatInt(f.NodeId, 10))
}
if f.ClusterId > 0 {
conditions = append(conditions, "cluster_id = "+strconv.FormatInt(f.ClusterId, 10))
}
if f.HasFirewallPolicy {
conditions = append(conditions, "firewall_policy_id > 0")
}
if f.HasError {
conditions = append(conditions, "status >= 400")
}
if f.FirewallPolicyId > 0 {
conditions = append(conditions, "firewall_policy_id = "+strconv.FormatInt(f.FirewallPolicyId, 10))
}
// 搜索条件
if f.Keyword != "" {
keyword := escapeString(f.Keyword)
// 在 host, path, ip, ua 中模糊搜索
conditions = append(conditions, fmt.Sprintf("(host LIKE '%%%s%%' OR path LIKE '%%%s%%' OR ip LIKE '%%%s%%' OR ua LIKE '%%%s%%')", keyword, keyword, keyword, keyword))
}
if f.Ip != "" {
conditions = append(conditions, "ip = '"+escapeString(f.Ip)+"'")
}
if f.Domain != "" {
conditions = append(conditions, "host LIKE '%"+escapeString(f.Domain)+"%'")
}
// 游标分页:使用 trace_id 作为游标
// Reverse=false历史向后翻页查询更早的数据
// Reverse=true实时增量拉新查询更新的数据
if f.LastRequestId != "" {
if f.Reverse {
conditions = append(conditions, "trace_id > '"+escapeString(f.LastRequestId)+"'")
} else {
conditions = append(conditions, "trace_id < '"+escapeString(f.LastRequestId)+"'")
}
}
where := strings.Join(conditions, " AND ")
// 默认按时间倒序(最新的在前面),与前端默认行为一致
orderDir := "DESC"
if f.Reverse {
orderDir = "ASC"
}
limit := f.Size
if limit <= 0 {
limit = 20
}
if limit > 1000 {
limit = 1000
}
orderBy := fmt.Sprintf("timestamp %s, trace_id %s", orderDir, orderDir)
// 列表查询不 SELECT 大字段request_headers / request_body / response_headers / response_body
// 避免每次翻页读取 GB 级数据。详情查看时通过 FindByTraceId 单独获取。
query := fmt.Sprintf("SELECT timestamp, node_id, cluster_id, server_id, host, ip, method, path, status, bytes_in, bytes_out, cost_ms, ua, referer, log_type, trace_id, firewall_policy_id, firewall_rule_group_id, firewall_rule_set_id, firewall_rule_id FROM %s WHERE %s ORDER BY %s LIMIT %d",
table, where, orderBy, limit+1)
var rawRows []map[string]interface{}
if err = s.client.Query(ctx, query, &rawRows); err != nil {
return nil, "", err
}
rows = make([]*LogsIngestRow, 0, len(rawRows))
for _, m := range rawRows {
r := mapToLogsIngestRow(m)
if r != nil {
rows = append(rows, r)
}
}
if !f.Reverse {
if len(rows) > int(limit) {
nextCursor = rows[limit].TraceId
rows = rows[:limit]
}
return rows, nextCursor, nil
}
if len(rows) > int(limit) {
rows = rows[:limit]
}
if len(rows) > 0 {
nextCursor = rows[len(rows)-1].TraceId
}
// 实时模式统一返回为“最新在前”,与前端显示和 MySQL 语义一致。
for left, right := 0, len(rows)-1; left < right; left, right = left+1, right-1 {
rows[left], rows[right] = rows[right], rows[left]
}
return rows, nextCursor, nil
}
// FindByTraceId 按 trace_id 查询单条日志详情
func (s *LogsIngestStore) FindByTraceId(ctx context.Context, traceId string) (*LogsIngestRow, error) {
if !s.client.IsConfigured() {
return nil, fmt.Errorf("clickhouse: not configured")
}
if traceId == "" {
return nil, nil
}
table := quoteIdent("logs_ingest")
query := fmt.Sprintf("SELECT timestamp, node_id, cluster_id, server_id, host, ip, method, path, status, bytes_in, bytes_out, cost_ms, ua, referer, log_type, trace_id, firewall_policy_id, firewall_rule_group_id, firewall_rule_set_id, firewall_rule_id, request_headers, request_body, response_headers, response_body FROM %s WHERE trace_id = '%s' LIMIT 1",
table, escapeString(traceId))
var rawRows []map[string]interface{}
if err := s.client.Query(ctx, query, &rawRows); err != nil {
return nil, err
}
if len(rawRows) == 0 {
return nil, nil
}
return mapToLogsIngestRow(rawRows[0]), nil
}
func quoteIdent(name string) string {
return "`" + strings.ReplaceAll(name, "`", "``") + "`"
}
func escapeString(s string) string {
return strings.ReplaceAll(s, "'", "''")
}
func normalizeDayNumber(day string) (int, error) {
normalized := strings.TrimSpace(day)
if normalized == "" {
return 0, fmt.Errorf("clickhouse: day required")
}
normalized = strings.ReplaceAll(normalized, "-", "")
if len(normalized) != 8 {
return 0, fmt.Errorf("clickhouse: invalid day '%s'", day)
}
dayNumber, err := strconv.Atoi(normalized)
if err != nil {
return 0, fmt.Errorf("clickhouse: invalid day '%s'", day)
}
return dayNumber, nil
}
func mapToLogsIngestRow(m map[string]interface{}) *LogsIngestRow {
r := &LogsIngestRow{}
u64 := func(key string) uint64 {
v, ok := m[key]
if !ok || v == nil {
return 0
}
switch x := v.(type) {
case float64:
return uint64(x)
case string:
n, _ := strconv.ParseUint(x, 10, 64)
return n
case json.Number:
n, _ := x.Int64()
return uint64(n)
}
return 0
}
u32 := func(key string) uint32 {
return uint32(u64(key))
}
str := func(key string) string {
if v, ok := m[key]; ok && v != nil {
if s, ok := v.(string); ok {
return s
}
}
return ""
}
ts := func(key string) time.Time {
v, ok := m[key]
if ok && v != nil {
switch x := v.(type) {
case string:
t, _ := time.Parse("2006-01-02 15:04:05", x)
return t
case float64:
return time.Unix(int64(x), 0)
case json.Number:
n, _ := x.Int64()
return time.Unix(n, 0)
}
}
return time.Time{}
}
r.Timestamp = ts("timestamp")
r.NodeId = u64("node_id")
r.ClusterId = u64("cluster_id")
r.ServerId = u64("server_id")
r.Host = str("host")
r.IP = str("ip")
r.Method = str("method")
r.Path = str("path")
r.Status = uint16(u64("status"))
r.BytesIn = u64("bytes_in")
r.BytesOut = u64("bytes_out")
r.CostMs = u32("cost_ms")
r.UA = str("ua")
r.Referer = str("referer")
r.LogType = str("log_type")
r.TraceId = str("trace_id")
r.FirewallPolicyId = u64("firewall_policy_id")
r.FirewallRuleGroupId = u64("firewall_rule_group_id")
r.FirewallRuleSetId = u64("firewall_rule_set_id")
r.FirewallRuleId = u64("firewall_rule_id")
r.RequestHeaders = str("request_headers")
r.RequestBody = str("request_body")
r.ResponseHeaders = str("response_headers")
r.ResponseBody = str("response_body")
return r
}
// RowToPB 将 logs_ingest 一行转为 pb.HTTPAccessLog列表展示用+详情展示)
func RowToPB(r *LogsIngestRow) *pb.HTTPAccessLog {
if r == nil {
return nil
}
a := &pb.HTTPAccessLog{
RequestId: r.TraceId,
ServerId: int64(r.ServerId),
NodeId: int64(r.NodeId),
Timestamp: r.Timestamp.Unix(),
Host: r.Host,
RawRemoteAddr: r.IP,
RemoteAddr: r.IP,
RequestMethod: r.Method,
RequestPath: r.Path,
RequestURI: r.Path, // 前端使用 requestURI 显示完整路径
Scheme: "http", // 默认 http日志中未存储实际值
Proto: "HTTP/1.1", // 默认值,日志中未存储实际值
Status: int32(r.Status),
RequestLength: int64(r.BytesIn),
BytesSent: int64(r.BytesOut),
RequestTime: float64(r.CostMs) / 1000,
UserAgent: r.UA,
Referer: r.Referer,
FirewallPolicyId: int64(r.FirewallPolicyId),
FirewallRuleGroupId: int64(r.FirewallRuleGroupId),
FirewallRuleSetId: int64(r.FirewallRuleSetId),
FirewallRuleId: int64(r.FirewallRuleId),
}
if r.TimeISO8601() != "" {
a.TimeISO8601 = r.TimeISO8601()
}
// TimeLocal: 用户友好的时间格式 (e.g., "2026-02-07 23:17:12")
if !r.Timestamp.IsZero() {
a.TimeLocal = r.Timestamp.Format("2006-01-02 15:04:05")
}
// 解析请求头 (JSON -> map[string]*pb.Strings)
// ClickHouse 中存储的是 map[string]string 格式
if r.RequestHeaders != "" {
var headers map[string]string
if err := json.Unmarshal([]byte(r.RequestHeaders), &headers); err == nil {
a.Header = make(map[string]*pb.Strings)
for k, v := range headers {
a.Header[k] = &pb.Strings{Values: []string{v}}
}
}
}
// 解析响应头 (JSON -> map[string]*pb.Strings)
if r.ResponseHeaders != "" {
var headers map[string]string
if err := json.Unmarshal([]byte(r.ResponseHeaders), &headers); err == nil {
a.SentHeader = make(map[string]*pb.Strings)
for k, v := range headers {
a.SentHeader[k] = &pb.Strings{Values: []string{v}}
}
}
}
// 请求体
if r.RequestBody != "" {
a.RequestBody = []byte(r.RequestBody)
}
return a
}
// TimeISO8601 便于 RowToPB 使用
func (r *LogsIngestRow) TimeISO8601() string {
if r.Timestamp.IsZero() {
return ""
}
return r.Timestamp.UTC().Format("2006-01-02T15:04:05Z07:00")
}

View File

@@ -0,0 +1,377 @@
package clickhouse
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
// NSLogsIngestRow 对应 dns_logs_ingest 表一行.
type NSLogsIngestRow struct {
Timestamp time.Time
RequestId string
NodeId uint64
ClusterId uint64
DomainId uint64
RecordId uint64
RemoteAddr string
QuestionName string
QuestionType string
RecordName string
RecordType string
RecordValue string
Networking string
IsRecursive bool
Error string
NSRouteCodes []string
ContentJSON string
}
// NSListFilter DNS 日志查询过滤条件.
type NSListFilter struct {
Day string
Size int64
Reverse bool
LastRequestId string
NSClusterId int64
NSNodeId int64
NSDomainId int64
NSRecordId int64
RecordType string
Keyword string
}
// NSLogsIngestStore DNS ClickHouse 查询封装.
type NSLogsIngestStore struct {
client *Client
}
// NewNSLogsIngestStore 创建 NSLogsIngestStore.
func NewNSLogsIngestStore() *NSLogsIngestStore {
return &NSLogsIngestStore{client: NewClient()}
}
// Client 返回底层 client.
func (s *NSLogsIngestStore) Client() *Client {
return s.client
}
// List 列出 DNS 访问日志,返回列表、下一游标与是否有更多.
func (s *NSLogsIngestStore) List(ctx context.Context, f NSListFilter) (rows []*NSLogsIngestRow, nextCursor string, hasMore bool, err error) {
if !s.client.IsConfigured() {
return nil, "", false, fmt.Errorf("clickhouse: not configured")
}
if f.Day == "" {
return nil, "", false, fmt.Errorf("clickhouse: day required")
}
dayNumber, err := normalizeDayNumber(f.Day)
if err != nil {
return nil, "", false, err
}
table := quoteIdent("dns_logs_ingest")
if s.client.cfg.Database != "" && s.client.cfg.Database != "default" {
table = quoteIdent(s.client.cfg.Database) + "." + quoteIdent("dns_logs_ingest")
}
conditions := []string{"toYYYYMMDD(timestamp) = " + strconv.Itoa(dayNumber)}
if f.NSClusterId > 0 {
conditions = append(conditions, "cluster_id = "+strconv.FormatInt(f.NSClusterId, 10))
}
if f.NSNodeId > 0 {
conditions = append(conditions, "node_id = "+strconv.FormatInt(f.NSNodeId, 10))
}
if f.NSDomainId > 0 {
conditions = append(conditions, "domain_id = "+strconv.FormatInt(f.NSDomainId, 10))
}
if f.NSRecordId > 0 {
conditions = append(conditions, "record_id = "+strconv.FormatInt(f.NSRecordId, 10))
}
if f.RecordType != "" {
conditions = append(conditions, "question_type = '"+escapeString(f.RecordType)+"'")
}
if f.Keyword != "" {
keyword := escapeString(f.Keyword)
conditions = append(conditions, fmt.Sprintf("(remote_addr LIKE '%%%s%%' OR question_name LIKE '%%%s%%' OR record_value LIKE '%%%s%%' OR error LIKE '%%%s%%')", keyword, keyword, keyword, keyword))
}
// 游标分页reverse=false 查更旧reverse=true 查更新。
if f.LastRequestId != "" {
if f.Reverse {
conditions = append(conditions, "request_id > '"+escapeString(f.LastRequestId)+"'")
} else {
conditions = append(conditions, "request_id < '"+escapeString(f.LastRequestId)+"'")
}
}
orderDir := "DESC"
if f.Reverse {
orderDir = "ASC"
}
limit := f.Size
if limit <= 0 {
limit = 20
}
if limit > 1000 {
limit = 1000
}
// 列表查询不 SELECT content_json 大字段,减少翻页时的数据传输量。
// 详情查看时通过 FindByRequestId 单独获取完整信息。
query := fmt.Sprintf(
"SELECT timestamp, request_id, node_id, cluster_id, domain_id, record_id, remote_addr, question_name, question_type, record_name, record_type, record_value, networking, is_recursive, error, ns_route_codes FROM %s WHERE %s ORDER BY timestamp %s, request_id %s LIMIT %d",
table,
strings.Join(conditions, " AND "),
orderDir,
orderDir,
limit+1,
)
var rawRows []map[string]interface{}
if err = s.client.Query(ctx, query, &rawRows); err != nil {
return nil, "", false, err
}
rows = make([]*NSLogsIngestRow, 0, len(rawRows))
for _, rawRow := range rawRows {
row := mapToNSLogsIngestRow(rawRow)
if row != nil {
rows = append(rows, row)
}
}
hasMore = len(rows) > int(limit)
if hasMore {
nextCursor = rows[limit].RequestId
rows = rows[:limit]
} else if len(rows) > 0 {
nextCursor = rows[len(rows)-1].RequestId
}
if f.Reverse {
for left, right := 0, len(rows)-1; left < right; left, right = left+1, right-1 {
rows[left], rows[right] = rows[right], rows[left]
}
if len(rows) > 0 {
nextCursor = rows[0].RequestId
}
}
return rows, nextCursor, hasMore, nil
}
// FindByRequestId 按 request_id 查单条 DNS 日志.
func (s *NSLogsIngestStore) FindByRequestId(ctx context.Context, requestId string) (*NSLogsIngestRow, error) {
if !s.client.IsConfigured() {
return nil, fmt.Errorf("clickhouse: not configured")
}
if requestId == "" {
return nil, nil
}
table := quoteIdent("dns_logs_ingest")
if s.client.cfg.Database != "" && s.client.cfg.Database != "default" {
table = quoteIdent(s.client.cfg.Database) + "." + quoteIdent("dns_logs_ingest")
}
query := fmt.Sprintf(
"SELECT timestamp, request_id, node_id, cluster_id, domain_id, record_id, remote_addr, question_name, question_type, record_name, record_type, record_value, networking, is_recursive, error, ns_route_codes, content_json FROM %s WHERE request_id = '%s' LIMIT 1",
table,
escapeString(requestId),
)
var rawRows []map[string]interface{}
if err := s.client.Query(ctx, query, &rawRows); err != nil {
return nil, err
}
if len(rawRows) == 0 {
return nil, nil
}
return mapToNSLogsIngestRow(rawRows[0]), nil
}
func mapToNSLogsIngestRow(row map[string]interface{}) *NSLogsIngestRow {
result := &NSLogsIngestRow{}
u64 := func(key string) uint64 {
value, ok := row[key]
if !ok || value == nil {
return 0
}
switch typed := value.(type) {
case float64:
return uint64(typed)
case string:
number, _ := strconv.ParseUint(typed, 10, 64)
return number
case json.Number:
number, _ := typed.Int64()
return uint64(number)
case int64:
return uint64(typed)
case uint64:
return typed
}
return 0
}
str := func(key string) string {
value, ok := row[key]
if !ok || value == nil {
return ""
}
switch typed := value.(type) {
case string:
return typed
case json.Number:
return typed.String()
default:
return fmt.Sprintf("%v", typed)
}
}
ts := func(key string) time.Time {
value, ok := row[key]
if !ok || value == nil {
return time.Time{}
}
switch typed := value.(type) {
case string:
if typed == "" {
return time.Time{}
}
layouts := []string{
"2006-01-02 15:04:05",
time.RFC3339,
"2006-01-02T15:04:05",
}
for _, layout := range layouts {
if parsed, err := time.Parse(layout, typed); err == nil {
return parsed
}
}
case float64:
return time.Unix(int64(typed), 0)
case json.Number:
number, _ := typed.Int64()
return time.Unix(number, 0)
}
return time.Time{}
}
boolValue := func(key string) bool {
value, ok := row[key]
if !ok || value == nil {
return false
}
switch typed := value.(type) {
case bool:
return typed
case float64:
return typed > 0
case int64:
return typed > 0
case uint64:
return typed > 0
case string:
switch strings.ToLower(strings.TrimSpace(typed)) {
case "1", "true", "yes":
return true
}
}
return false
}
parseStringArray := func(key string) []string {
value, ok := row[key]
if !ok || value == nil {
return nil
}
switch typed := value.(type) {
case []string:
return typed
case []interface{}:
result := make([]string, 0, len(typed))
for _, one := range typed {
if one == nil {
continue
}
result = append(result, fmt.Sprintf("%v", one))
}
return result
case string:
if typed == "" {
return nil
}
var result []string
if json.Unmarshal([]byte(typed), &result) == nil {
return result
}
return []string{typed}
}
return nil
}
result.Timestamp = ts("timestamp")
result.RequestId = str("request_id")
result.NodeId = u64("node_id")
result.ClusterId = u64("cluster_id")
result.DomainId = u64("domain_id")
result.RecordId = u64("record_id")
result.RemoteAddr = str("remote_addr")
result.QuestionName = str("question_name")
result.QuestionType = str("question_type")
result.RecordName = str("record_name")
result.RecordType = str("record_type")
result.RecordValue = str("record_value")
result.Networking = str("networking")
result.IsRecursive = boolValue("is_recursive")
result.Error = str("error")
result.NSRouteCodes = parseStringArray("ns_route_codes")
result.ContentJSON = str("content_json")
return result
}
// NSRowToPB 将 ClickHouse 行转换为 pb.NSAccessLog.
func NSRowToPB(row *NSLogsIngestRow) *pb.NSAccessLog {
if row == nil {
return nil
}
log := &pb.NSAccessLog{
NsNodeId: int64(row.NodeId),
NsDomainId: int64(row.DomainId),
NsRecordId: int64(row.RecordId),
NsRouteCodes: row.NSRouteCodes,
RemoteAddr: row.RemoteAddr,
QuestionName: row.QuestionName,
QuestionType: row.QuestionType,
RecordName: row.RecordName,
RecordType: row.RecordType,
RecordValue: row.RecordValue,
Networking: row.Networking,
Timestamp: row.Timestamp.Unix(),
RequestId: row.RequestId,
Error: row.Error,
IsRecursive: row.IsRecursive,
}
if !row.Timestamp.IsZero() {
log.TimeLocal = row.Timestamp.Format("2/Jan/2006:15:04:05 -0700")
}
if row.ContentJSON != "" {
contentLog := &pb.NSAccessLog{}
if json.Unmarshal([]byte(row.ContentJSON), contentLog) == nil {
contentLog.RequestId = row.RequestId
return contentLog
}
}
return log
}