package ttlcache import ( "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" "math" "runtime" "sync" "sync/atomic" "time" ) var SharedInt64Cache = NewBigCache[int64]() // GCStats GC 统计信息 type GCStats struct { ExpiredCount int // 过期数量 TotalCount int // 总数量 GCTime int64 // GC 耗时(微秒) ExpireRate float64 // 过期率 } // Cache TTL缓存 // 最大的缓存时间为30 * 86400 // Piece数据结构: // // Piece1 | Piece2 | Piece3 | ... // [ Item1, Item2, ... ] | ... type Cache[T any] struct { isDestroyed bool pieces []*Piece[T] countPieces uint64 maxItems int maxPiecesPerGC int gcPieceIndex int // 优化:GC 统计和自适应 totalWrites int64 totalExpires int64 lastGCStats GCStats gcStatsLocker sync.Mutex // 优化:自适应 GC 配置 gcBaseInterval time.Duration gcMinInterval time.Duration gcMaxInterval time.Duration adaptiveGC bool gcSampleSize int } func NewBigCache[T any]() *Cache[T] { var delta = memutils.SystemMemoryGB() / 2 if delta <= 0 { delta = 1 } return NewCache[T](NewMaxItemsOption(delta * 1_000_000)) } // calculateOptimalPieces 计算最优分片数量 func calculateOptimalPieces(minPieces, maxPieces int) int { // 基础分片数 = CPU核心数 * 2 baseShards := runtime.NumCPU() * 2 // 根据内存调整(内存越大,分片越多) memoryGB := memutils.SystemMemoryGB() if memoryGB >= 64 { baseShards *= 2 // 大内存系统,增加分片 } else if memoryGB >= 32 { baseShards = baseShards * 3 / 2 // 中等内存 } // 限制范围 if baseShards < minPieces { baseShards = minPieces } else if baseShards > maxPieces { baseShards = maxPieces } // 对齐到 2 的幂次(优化取模运算) countPieces := 1 << uint(math.Ceil(math.Log2(float64(baseShards)))) // 确保在范围内 if countPieces < minPieces { countPieces = minPieces } else if countPieces > maxPieces { // 找到小于等于 maxPieces 的最大 2 的幂次 countPieces = 1 << uint(math.Floor(math.Log2(float64(maxPieces)))) } return countPieces } func NewCache[T any](opt ...OptionInterface) *Cache[T] { // 优化:动态计算分片数量 var countPieces = 256 var maxItems = 1_000_000 var minPieces = 64 var maxPieces = 1024 var totalMemory = memutils.SystemMemoryGB() if totalMemory < 2 { // 我们限制内存过小的服务能够使用的数量 maxItems = 500_000 } else { var delta = totalMemory / 4 if delta > 0 { maxItems *= delta } } // 优化:从配置读取分片数(如果未配置则自动计算) var piecesFromConfig = 0 for _, option := range opt { if option == nil { continue } switch o := option.(type) { case *PiecesOption: if o.Count > 0 { piecesFromConfig = o.Count } case *MaxItemsOption: if o.Count > 0 { maxItems = o.Count } case *MinPiecesOption: if o.Count > 0 { minPieces = o.Count } case *MaxPiecesOption: if o.Count > 0 { maxPieces = o.Count } case *GCConfigOption: // GC 配置在下面处理 } } // 如果配置中指定了分片数,使用配置值;否则自动计算 if piecesFromConfig > 0 { countPieces = piecesFromConfig } else { countPieces = calculateOptimalPieces(minPieces, maxPieces) } var maxPiecesPerGC = 4 var numCPU = runtime.NumCPU() / 2 if numCPU > maxPiecesPerGC { maxPiecesPerGC = numCPU } // 优化:默认 GC 配置 var gcBaseInterval = 2 * time.Second var gcMinInterval = 1 * time.Second var gcMaxInterval = 10 * time.Second var adaptiveGC = true var gcSampleSize = 100 // 从配置读取 GC 参数 for _, option := range opt { if gcOpt, ok := option.(*GCConfigOption); ok && gcOpt != nil { if gcOpt.BaseInterval > 0 { gcBaseInterval = gcOpt.BaseInterval } if gcOpt.MinInterval > 0 { gcMinInterval = gcOpt.MinInterval } if gcOpt.MaxInterval > 0 { gcMaxInterval = gcOpt.MaxInterval } adaptiveGC = gcOpt.Adaptive if gcOpt.SampleSize > 0 { gcSampleSize = gcOpt.SampleSize } } } var cache = &Cache[T]{ countPieces: uint64(countPieces), maxItems: maxItems, maxPiecesPerGC: maxPiecesPerGC, gcBaseInterval: gcBaseInterval, gcMinInterval: gcMinInterval, gcMaxInterval: gcMaxInterval, adaptiveGC: adaptiveGC, gcSampleSize: gcSampleSize, } for i := 0; i < countPieces; i++ { cache.pieces = append(cache.pieces, NewPiece[T](maxItems/countPieces)) } // Add to manager SharedManager.Add(cache) return cache } func (this *Cache[T]) Write(key string, value T, expiresAt int64) (ok bool) { if this.isDestroyed { return } var currentTimestamp = fasttime.Now().Unix() if expiresAt <= currentTimestamp { return } var maxExpiresAt = currentTimestamp + 30*86400 if expiresAt > maxExpiresAt { expiresAt = maxExpiresAt } var uint64Key = HashKeyString(key) var pieceIndex = uint64Key % this.countPieces // 优化:统计写入次数 atomic.AddInt64(&this.totalWrites, 1) return this.pieces[pieceIndex].Add(uint64Key, &Item[T]{ Value: value, expiresAt: expiresAt, }) } func (this *Cache[T]) IncreaseInt64(key string, delta T, expiresAt int64, extend bool) T { if this.isDestroyed { return any(0).(T) } var currentTimestamp = fasttime.Now().Unix() if expiresAt <= currentTimestamp { return any(0).(T) } var maxExpiresAt = currentTimestamp + 30*86400 if expiresAt > maxExpiresAt { expiresAt = maxExpiresAt } var uint64Key = HashKeyString(key) var pieceIndex = uint64Key % this.countPieces return this.pieces[pieceIndex].IncreaseInt64(uint64Key, delta, expiresAt, extend) } func (this *Cache[T]) Read(key string) (item *Item[T]) { var uint64Key = HashKeyString(key) return this.pieces[uint64Key%this.countPieces].Read(uint64Key) } func (this *Cache[T]) Delete(key string) { var uint64Key = HashKeyString(key) this.pieces[uint64Key%this.countPieces].Delete(uint64Key) } func (this *Cache[T]) Count() (count int) { for _, piece := range this.pieces { count += piece.Count() } return } // calculateGCInterval 计算自适应 GC 间隔 func (this *Cache[T]) calculateGCInterval() time.Duration { if !this.adaptiveGC { return this.gcBaseInterval } this.gcStatsLocker.Lock() expireRate := this.lastGCStats.ExpireRate this.gcStatsLocker.Unlock() // 根据过期率调整 if expireRate > 0.1 { // 过期率高,加快 GC return this.gcMinInterval } else if expireRate < 0.01 { // 过期率低,减慢 GC return this.gcMaxInterval } return this.gcBaseInterval } // selectPiecesForGC 选择需要清理的分片(优化:优先清理过期率高的分片) func (this *Cache[T]) selectPiecesForGC() []int { // 如果未启用自适应,使用原有轮询策略 if !this.adaptiveGC { var index = this.gcPieceIndex var selected []int for i := index; i < index+this.maxPiecesPerGC && i < int(this.countPieces); i++ { selected = append(selected, i) } return selected } // 优化:统计每个分片的过期率 type pieceExpireInfo struct { index int expireRate float64 } var pieceInfos []pieceExpireInfo for i, piece := range this.pieces { count := piece.Count() if count > 0 { // 采样检查过期率(避免全量遍历) expireCount := piece.SampleExpireRate(this.gcSampleSize) expireRate := float64(expireCount) / float64(this.gcSampleSize) if expireRate > 0 { pieceInfos = append(pieceInfos, pieceExpireInfo{ index: i, expireRate: expireRate, }) } } } // 按过期率排序(降序) for i := 0; i < len(pieceInfos)-1; i++ { for j := i + 1; j < len(pieceInfos); j++ { if pieceInfos[i].expireRate < pieceInfos[j].expireRate { pieceInfos[i], pieceInfos[j] = pieceInfos[j], pieceInfos[i] } } } // 选择过期率最高的前 N 个分片 maxPieces := this.maxPiecesPerGC if maxPieces > len(pieceInfos) { maxPieces = len(pieceInfos) } if maxPieces == 0 { // 如果没有找到过期项,使用轮询策略 var index = this.gcPieceIndex var selected []int for i := index; i < index+this.maxPiecesPerGC && i < int(this.countPieces); i++ { selected = append(selected, i) } return selected } var selected []int for i := 0; i < maxPieces; i++ { selected = append(selected, pieceInfos[i].index) } return selected } func (this *Cache[T]) GC() { var startTime = time.Now() var totalCount = 0 var expiredCount = 0 // 优化:选择需要清理的分片 selectedPieces := this.selectPiecesForGC() // 清理选中的分片 for _, pieceIndex := range selectedPieces { if pieceIndex >= int(this.countPieces) { continue } piece := this.pieces[pieceIndex] count := piece.Count() totalCount += count // 执行 GC expired := piece.GC() expiredCount += expired } // 更新索引(用于轮询策略) if !this.adaptiveGC || len(selectedPieces) == 0 { var index = this.gcPieceIndex index += this.maxPiecesPerGC if index >= int(this.countPieces) { index = 0 } this.gcPieceIndex = index } // 更新统计信息 var expireRate float64 if totalCount > 0 { expireRate = float64(expiredCount) / float64(totalCount) } this.gcStatsLocker.Lock() this.lastGCStats = GCStats{ ExpiredCount: expiredCount, TotalCount: totalCount, GCTime: time.Since(startTime).Microseconds(), ExpireRate: expireRate, } this.gcStatsLocker.Unlock() atomic.AddInt64(&this.totalExpires, int64(expiredCount)) } func (this *Cache[T]) Clean() { for _, piece := range this.pieces { piece.Clean() } } func (this *Cache[T]) Destroy() { SharedManager.Remove(this) this.isDestroyed = true for _, piece := range this.pieces { piece.Destroy() } }