Files
waf-platform/EdgeAPI/internal/rpc/services/service_http_access_log.go
2026-02-08 02:00:51 +08:00

348 lines
9.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package services
import (
"context"
"github.com/TeaOSLab/EdgeAPI/internal/clickhouse"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeAPI/internal/utils/regexputils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/lists"
"sync"
)
// HTTPAccessLogService 访问日志相关服务
type HTTPAccessLogService struct {
BaseService
}
// CreateHTTPAccessLogs 创建访问日志
func (this *HTTPAccessLogService) CreateHTTPAccessLogs(ctx context.Context, req *pb.CreateHTTPAccessLogsRequest) (*pb.CreateHTTPAccessLogsResponse, error) {
// 校验请求
_, _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeNode)
if err != nil {
return nil, err
}
if len(req.HttpAccessLogs) == 0 {
return &pb.CreateHTTPAccessLogsResponse{}, nil
}
var tx = this.NullTx()
if this.canWriteAccessLogsToDB() {
err = models.SharedHTTPAccessLogDAO.CreateHTTPAccessLogs(tx, req.HttpAccessLogs)
if err != nil {
return nil, err
}
}
err = this.writeAccessLogsToPolicy(req.HttpAccessLogs)
if err != nil {
return nil, err
}
return &pb.CreateHTTPAccessLogsResponse{}, nil
}
// ListHTTPAccessLogs 列出单页访问日志(优先 ClickHouse否则 MySQLClickHouse 路径下节点/集群批量查询避免 N+1
func (this *HTTPAccessLogService) ListHTTPAccessLogs(ctx context.Context, req *pb.ListHTTPAccessLogsRequest) (*pb.ListHTTPAccessLogsResponse, error) {
_, userId, err := this.ValidateAdminAndUser(ctx, true)
if err != nil {
return nil, err
}
var tx = this.NullTx()
if userId > 0 {
req.UserId = userId
if req.ServerId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}
}
}
store := clickhouse.NewLogsIngestStore()
if store.Client().IsConfigured() && req.Day != "" {
resp, listErr := this.listHTTPAccessLogsFromClickHouse(ctx, tx, store, req, userId)
if listErr != nil {
return nil, listErr
}
if resp != nil {
return resp, nil
}
}
accessLogs, requestId, hasMore, err := models.SharedHTTPAccessLogDAO.ListAccessLogs(tx, req.Partition, req.RequestId, req.Size, req.Day, req.HourFrom, req.HourTo, req.NodeClusterId, req.NodeId, req.ServerId, req.Reverse, req.HasError, req.FirewallPolicyId, req.FirewallRuleGroupId, req.FirewallRuleSetId, req.HasFirewallPolicy, req.UserId, req.Keyword, req.Ip, req.Domain)
if err != nil {
return nil, err
}
var result = []*pb.HTTPAccessLog{}
var pbNodeMap = map[int64]*pb.Node{}
var pbClusterMap = map[int64]*pb.NodeCluster{}
for _, accessLog := range accessLogs {
a, err := accessLog.ToPB()
if err != nil {
return nil, err
}
pbNode, ok := pbNodeMap[a.NodeId]
if ok {
a.Node = pbNode
} else {
node, err := models.SharedNodeDAO.FindEnabledNode(tx, a.NodeId)
if err != nil {
return nil, err
}
if node != nil {
pbNode = &pb.Node{Id: int64(node.Id), Name: node.Name}
var clusterId = int64(node.ClusterId)
pbCluster, ok := pbClusterMap[clusterId]
if !ok {
cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(tx, clusterId)
if err != nil {
return nil, err
}
if cluster != nil {
pbCluster = &pb.NodeCluster{Id: int64(cluster.Id), Name: cluster.Name}
pbClusterMap[clusterId] = pbCluster
}
}
if pbCluster != nil {
pbNode.NodeCluster = pbCluster
}
pbNodeMap[a.NodeId] = pbNode
a.Node = pbNode
}
}
result = append(result, a)
}
return &pb.ListHTTPAccessLogsResponse{
HttpAccessLogs: result,
AccessLogs: result,
HasMore: hasMore,
RequestId: requestId,
}, nil
}
// listHTTPAccessLogsFromClickHouse 从 ClickHouse logs_ingest 查列表,并批量填充 Node/NodeCluster避免 N+1
func (this *HTTPAccessLogService) listHTTPAccessLogsFromClickHouse(ctx context.Context, tx *dbs.Tx, store *clickhouse.LogsIngestStore, req *pb.ListHTTPAccessLogsRequest, userId int64) (*pb.ListHTTPAccessLogsResponse, error) {
f := clickhouse.ListFilter{
Day: req.Day,
HourFrom: req.HourFrom,
HourTo: req.HourTo,
Size: req.Size,
Reverse: req.Reverse,
HasError: req.HasError,
HasFirewallPolicy: req.HasFirewallPolicy,
FirewallPolicyId: req.FirewallPolicyId,
NodeId: req.NodeId,
ClusterId: req.NodeClusterId,
LastRequestId: req.RequestId,
Keyword: req.Keyword,
Ip: req.Ip,
Domain: req.Domain,
}
if req.ServerId > 0 {
f.ServerIds = []int64{req.ServerId}
} else if userId > 0 {
serverIds, err := models.SharedServerDAO.FindAllEnabledServerIdsWithUserId(tx, userId)
if err != nil {
return nil, err
}
if len(serverIds) == 0 {
return &pb.ListHTTPAccessLogsResponse{HttpAccessLogs: nil, AccessLogs: nil, HasMore: false, RequestId: ""}, nil
}
f.ServerIds = serverIds
}
if req.NodeClusterId > 0 {
nodeIds, err := models.SharedNodeDAO.FindAllEnabledNodeIdsWithClusterId(tx, req.NodeClusterId)
if err != nil {
return nil, err
}
f.NodeIds = nodeIds
}
rows, nextCursor, err := store.List(ctx, f)
if err != nil {
return nil, err
}
if len(rows) == 0 {
return &pb.ListHTTPAccessLogsResponse{HttpAccessLogs: []*pb.HTTPAccessLog{}, AccessLogs: []*pb.HTTPAccessLog{}, HasMore: false, RequestId: ""}, nil
}
result := make([]*pb.HTTPAccessLog, 0, len(rows))
nodeIdSet := make(map[int64]struct{})
for _, r := range rows {
result = append(result, clickhouse.RowToPB(r))
nodeIdSet[int64(r.NodeId)] = struct{}{}
}
nodeIds := make([]int64, 0, len(nodeIdSet))
for id := range nodeIdSet {
nodeIds = append(nodeIds, id)
}
nodes, err := models.SharedNodeDAO.FindEnabledBasicNodesWithIds(tx, nodeIds)
if err != nil {
return nil, err
}
clusterIds := make(map[int64]struct{})
for _, node := range nodes {
if node.ClusterId > 0 {
clusterIds[int64(node.ClusterId)] = struct{}{}
}
}
clusterIdList := make([]int64, 0, len(clusterIds))
for cid := range clusterIds {
clusterIdList = append(clusterIdList, cid)
}
clusters, _ := models.SharedNodeClusterDAO.FindEnabledNodeClustersWithIds(tx, clusterIdList)
clusterMap := make(map[int64]*pb.NodeCluster)
for _, c := range clusters {
clusterMap[int64(c.Id)] = &pb.NodeCluster{Id: int64(c.Id), Name: c.Name}
}
pbNodeMap := make(map[int64]*pb.Node)
for _, node := range nodes {
pbNode := &pb.Node{Id: int64(node.Id), Name: node.Name}
if c := clusterMap[int64(node.ClusterId)]; c != nil {
pbNode.NodeCluster = c
}
pbNodeMap[int64(node.Id)] = pbNode
}
for _, a := range result {
if n := pbNodeMap[a.NodeId]; n != nil {
a.Node = n
}
}
hasMore := false
if !req.Reverse {
hasMore = nextCursor != ""
}
return &pb.ListHTTPAccessLogsResponse{
HttpAccessLogs: result,
AccessLogs: result,
HasMore: hasMore,
RequestId: nextCursor,
}, nil
}
// FindHTTPAccessLog 查找单个日志
func (this *HTTPAccessLogService) FindHTTPAccessLog(ctx context.Context, req *pb.FindHTTPAccessLogRequest) (*pb.FindHTTPAccessLogResponse, error) {
// 校验请求
_, userId, err := this.ValidateAdminAndUser(ctx, true)
if err != nil {
return nil, err
}
// 优先从 ClickHouse 查询
store := clickhouse.NewLogsIngestStore()
if store.Client().IsConfigured() {
row, err := store.FindByTraceId(ctx, req.RequestId)
if err != nil {
return nil, err
}
if row != nil {
// 检查权限
if userId > 0 {
var tx = this.NullTx()
err = models.SharedServerDAO.CheckUserServer(tx, userId, int64(row.ServerId))
if err != nil {
return nil, err
}
}
a := clickhouse.RowToPB(row)
return &pb.FindHTTPAccessLogResponse{HttpAccessLog: a}, nil
}
}
// 如果 ClickHouse 未配置或未找到,则回退到 MySQL
var tx = this.NullTx()
accessLog, err := models.SharedHTTPAccessLogDAO.FindAccessLogWithRequestId(tx, req.RequestId)
if err != nil {
return nil, err
}
if accessLog == nil {
return &pb.FindHTTPAccessLogResponse{HttpAccessLog: nil}, nil
}
// 检查权限
if userId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, userId, int64(accessLog.ServerId))
if err != nil {
return nil, err
}
}
a, err := accessLog.ToPB()
if err != nil {
return nil, err
}
return &pb.FindHTTPAccessLogResponse{HttpAccessLog: a}, nil
}
// FindHTTPAccessLogPartitions 查找日志分区
func (this *HTTPAccessLogService) FindHTTPAccessLogPartitions(ctx context.Context, req *pb.FindHTTPAccessLogPartitionsRequest) (*pb.FindHTTPAccessLogPartitionsResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
if !regexputils.YYYYMMDD.MatchString(req.Day) {
return nil, errors.New("invalid 'day': " + req.Day)
}
var dbList = models.AllAccessLogDBs()
if len(dbList) == 0 {
return &pb.FindHTTPAccessLogPartitionsResponse{
Partitions: nil,
}, nil
}
var partitions = []int32{}
var locker sync.Mutex
var wg = sync.WaitGroup{}
wg.Add(len(dbList))
var lastErr error
for _, db := range dbList {
go func(db *dbs.DB) {
defer wg.Done()
names, err := models.SharedHTTPAccessLogManager.FindTableNames(db, req.Day)
if err != nil {
lastErr = err
}
for _, name := range names {
var partition = models.SharedHTTPAccessLogManager.TablePartition(name)
locker.Lock()
if !lists.Contains(partitions, partition) {
partitions = append(partitions, partition)
}
locker.Unlock()
}
}(db)
}
wg.Wait()
if lastErr != nil {
return nil, lastErr
}
var reversePartitions = []int32{}
for i := len(partitions) - 1; i >= 0; i-- {
reversePartitions = append(reversePartitions, partitions[i])
}
return &pb.FindHTTPAccessLogPartitionsResponse{
Partitions: partitions,
ReversePartitions: reversePartitions,
}, nil
}