package services import ( "context" "github.com/TeaOSLab/EdgeAPI/internal/clickhouse" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/errors" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeAPI/internal/utils/regexputils" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/lists" "sync" ) // HTTPAccessLogService 访问日志相关服务 type HTTPAccessLogService struct { BaseService } // CreateHTTPAccessLogs 创建访问日志 func (this *HTTPAccessLogService) CreateHTTPAccessLogs(ctx context.Context, req *pb.CreateHTTPAccessLogsRequest) (*pb.CreateHTTPAccessLogsResponse, error) { // 校验请求 _, _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeNode) if err != nil { return nil, err } if len(req.HttpAccessLogs) == 0 { return &pb.CreateHTTPAccessLogsResponse{}, nil } var tx = this.NullTx() if this.canWriteAccessLogsToDB() { err = models.SharedHTTPAccessLogDAO.CreateHTTPAccessLogs(tx, req.HttpAccessLogs) if err != nil { return nil, err } } err = this.writeAccessLogsToPolicy(req.HttpAccessLogs) if err != nil { return nil, err } return &pb.CreateHTTPAccessLogsResponse{}, nil } // ListHTTPAccessLogs 列出单页访问日志(优先 ClickHouse,否则 MySQL;ClickHouse 路径下节点/集群批量查询避免 N+1) func (this *HTTPAccessLogService) ListHTTPAccessLogs(ctx context.Context, req *pb.ListHTTPAccessLogsRequest) (*pb.ListHTTPAccessLogsResponse, error) { _, userId, err := this.ValidateAdminAndUser(ctx, true) if err != nil { return nil, err } var tx = this.NullTx() if userId > 0 { req.UserId = userId if req.ServerId > 0 { err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId) if err != nil { return nil, err } } } store := clickhouse.NewLogsIngestStore() canReadFromClickHouse := this.shouldReadAccessLogsFromClickHouse() && store.Client().IsConfigured() && req.Day != "" canReadFromMySQL := this.shouldReadAccessLogsFromMySQL() if canReadFromClickHouse { resp, listErr := this.listHTTPAccessLogsFromClickHouse(ctx, tx, store, req, userId) if listErr == nil && resp != nil { return resp, nil } if !canReadFromMySQL { if listErr != nil { return nil, listErr } return &pb.ListHTTPAccessLogsResponse{ HttpAccessLogs: []*pb.HTTPAccessLog{}, AccessLogs: []*pb.HTTPAccessLog{}, HasMore: false, RequestId: "", }, nil } } if !canReadFromMySQL { return &pb.ListHTTPAccessLogsResponse{ HttpAccessLogs: []*pb.HTTPAccessLog{}, AccessLogs: []*pb.HTTPAccessLog{}, HasMore: false, RequestId: "", }, nil } accessLogs, requestId, hasMore, err := models.SharedHTTPAccessLogDAO.ListAccessLogs(tx, req.Partition, req.RequestId, req.Size, req.Day, req.HourFrom, req.HourTo, req.NodeClusterId, req.NodeId, req.ServerId, req.Reverse, req.HasError, req.FirewallPolicyId, req.FirewallRuleGroupId, req.FirewallRuleSetId, req.HasFirewallPolicy, req.UserId, req.Keyword, req.Ip, req.Domain) if err != nil { return nil, err } var result = []*pb.HTTPAccessLog{} var pbNodeMap = map[int64]*pb.Node{} var pbClusterMap = map[int64]*pb.NodeCluster{} for _, accessLog := range accessLogs { a, err := accessLog.ToPB() if err != nil { return nil, err } pbNode, ok := pbNodeMap[a.NodeId] if ok { a.Node = pbNode } else { node, err := models.SharedNodeDAO.FindEnabledNode(tx, a.NodeId) if err != nil { return nil, err } if node != nil { pbNode = &pb.Node{Id: int64(node.Id), Name: node.Name} var clusterId = int64(node.ClusterId) pbCluster, ok := pbClusterMap[clusterId] if !ok { cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(tx, clusterId) if err != nil { return nil, err } if cluster != nil { pbCluster = &pb.NodeCluster{Id: int64(cluster.Id), Name: cluster.Name} pbClusterMap[clusterId] = pbCluster } } if pbCluster != nil { pbNode.NodeCluster = pbCluster } pbNodeMap[a.NodeId] = pbNode a.Node = pbNode } } result = append(result, a) } return &pb.ListHTTPAccessLogsResponse{ HttpAccessLogs: result, AccessLogs: result, HasMore: hasMore, RequestId: requestId, }, nil } // listHTTPAccessLogsFromClickHouse 从 ClickHouse logs_ingest 查列表,并批量填充 Node/NodeCluster(避免 N+1) func (this *HTTPAccessLogService) listHTTPAccessLogsFromClickHouse(ctx context.Context, tx *dbs.Tx, store *clickhouse.LogsIngestStore, req *pb.ListHTTPAccessLogsRequest, userId int64) (*pb.ListHTTPAccessLogsResponse, error) { f := clickhouse.ListFilter{ Day: req.Day, HourFrom: req.HourFrom, HourTo: req.HourTo, Size: req.Size, Reverse: req.Reverse, HasError: req.HasError, HasFirewallPolicy: req.HasFirewallPolicy, FirewallPolicyId: req.FirewallPolicyId, NodeId: req.NodeId, ClusterId: req.NodeClusterId, LastRequestId: req.RequestId, Keyword: req.Keyword, Ip: req.Ip, Domain: req.Domain, } if req.ServerId > 0 { f.ServerIds = []int64{req.ServerId} } else if userId > 0 { serverIds, err := models.SharedServerDAO.FindAllEnabledServerIdsWithUserId(tx, userId) if err != nil { return nil, err } if len(serverIds) == 0 { return &pb.ListHTTPAccessLogsResponse{HttpAccessLogs: nil, AccessLogs: nil, HasMore: false, RequestId: ""}, nil } f.ServerIds = serverIds } if req.NodeClusterId > 0 { nodeIds, err := models.SharedNodeDAO.FindAllEnabledNodeIdsWithClusterId(tx, req.NodeClusterId) if err != nil { return nil, err } f.NodeIds = nodeIds } rows, nextCursor, err := store.List(ctx, f) if err != nil { return nil, err } if len(rows) == 0 { return &pb.ListHTTPAccessLogsResponse{HttpAccessLogs: []*pb.HTTPAccessLog{}, AccessLogs: []*pb.HTTPAccessLog{}, HasMore: false, RequestId: ""}, nil } result := make([]*pb.HTTPAccessLog, 0, len(rows)) nodeIdSet := make(map[int64]struct{}) for _, r := range rows { result = append(result, clickhouse.RowToPB(r)) nodeIdSet[int64(r.NodeId)] = struct{}{} } nodeIds := make([]int64, 0, len(nodeIdSet)) for id := range nodeIdSet { nodeIds = append(nodeIds, id) } nodes, err := models.SharedNodeDAO.FindEnabledBasicNodesWithIds(tx, nodeIds) if err != nil { return nil, err } clusterIds := make(map[int64]struct{}) for _, node := range nodes { if node.ClusterId > 0 { clusterIds[int64(node.ClusterId)] = struct{}{} } } clusterIdList := make([]int64, 0, len(clusterIds)) for cid := range clusterIds { clusterIdList = append(clusterIdList, cid) } clusters, _ := models.SharedNodeClusterDAO.FindEnabledNodeClustersWithIds(tx, clusterIdList) clusterMap := make(map[int64]*pb.NodeCluster) for _, c := range clusters { clusterMap[int64(c.Id)] = &pb.NodeCluster{Id: int64(c.Id), Name: c.Name} } pbNodeMap := make(map[int64]*pb.Node) for _, node := range nodes { pbNode := &pb.Node{Id: int64(node.Id), Name: node.Name} if c := clusterMap[int64(node.ClusterId)]; c != nil { pbNode.NodeCluster = c } pbNodeMap[int64(node.Id)] = pbNode } for _, a := range result { if n := pbNodeMap[a.NodeId]; n != nil { a.Node = n } } hasMore := false if !req.Reverse { hasMore = nextCursor != "" } return &pb.ListHTTPAccessLogsResponse{ HttpAccessLogs: result, AccessLogs: result, HasMore: hasMore, RequestId: nextCursor, }, nil } // FindHTTPAccessLog 查找单个日志 func (this *HTTPAccessLogService) FindHTTPAccessLog(ctx context.Context, req *pb.FindHTTPAccessLogRequest) (*pb.FindHTTPAccessLogResponse, error) { // 校验请求 _, userId, err := this.ValidateAdminAndUser(ctx, true) if err != nil { return nil, err } // 优先从 ClickHouse 查询 store := clickhouse.NewLogsIngestStore() canReadFromClickHouse := this.shouldReadAccessLogsFromClickHouse() && store.Client().IsConfigured() canReadFromMySQL := this.shouldReadAccessLogsFromMySQL() if canReadFromClickHouse { row, err := store.FindByTraceId(ctx, req.RequestId) if err != nil { if !canReadFromMySQL { return nil, err } } else if row != nil { // 检查权限 if userId > 0 { var tx = this.NullTx() err = models.SharedServerDAO.CheckUserServer(tx, userId, int64(row.ServerId)) if err != nil { return nil, err } } a := clickhouse.RowToPB(row) return &pb.FindHTTPAccessLogResponse{HttpAccessLog: a}, nil } } if !canReadFromMySQL { return &pb.FindHTTPAccessLogResponse{HttpAccessLog: nil}, nil } // 如果 ClickHouse 未配置或未找到,则回退到 MySQL var tx = this.NullTx() accessLog, err := models.SharedHTTPAccessLogDAO.FindAccessLogWithRequestId(tx, req.RequestId) if err != nil { return nil, err } if accessLog == nil { return &pb.FindHTTPAccessLogResponse{HttpAccessLog: nil}, nil } // 检查权限 if userId > 0 { err = models.SharedServerDAO.CheckUserServer(tx, userId, int64(accessLog.ServerId)) if err != nil { return nil, err } } a, err := accessLog.ToPB() if err != nil { return nil, err } return &pb.FindHTTPAccessLogResponse{HttpAccessLog: a}, nil } // FindHTTPAccessLogPartitions 查找日志分区 func (this *HTTPAccessLogService) FindHTTPAccessLogPartitions(ctx context.Context, req *pb.FindHTTPAccessLogPartitionsRequest) (*pb.FindHTTPAccessLogPartitionsResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } if !regexputils.YYYYMMDD.MatchString(req.Day) { return nil, errors.New("invalid 'day': " + req.Day) } var dbList = models.AllAccessLogDBs() if len(dbList) == 0 { return &pb.FindHTTPAccessLogPartitionsResponse{ Partitions: nil, }, nil } var partitions = []int32{} var locker sync.Mutex var wg = sync.WaitGroup{} wg.Add(len(dbList)) var lastErr error for _, db := range dbList { go func(db *dbs.DB) { defer wg.Done() names, err := models.SharedHTTPAccessLogManager.FindTableNames(db, req.Day) if err != nil { lastErr = err } for _, name := range names { var partition = models.SharedHTTPAccessLogManager.TablePartition(name) locker.Lock() if !lists.Contains(partitions, partition) { partitions = append(partitions, partition) } locker.Unlock() } }(db) } wg.Wait() if lastErr != nil { return nil, lastErr } var reversePartitions = []int32{} for i := len(partitions) - 1; i >= 0; i-- { reversePartitions = append(reversePartitions, partitions[i]) } return &pb.FindHTTPAccessLogPartitionsResponse{ Partitions: partitions, ReversePartitions: reversePartitions, }, nil }