This commit is contained in:
unknown
2026-02-04 20:27:13 +08:00
commit 3b042d1dad
9410 changed files with 1488147 additions and 0 deletions

View File

@@ -0,0 +1,730 @@
package caches
import (
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
"github.com/TeaOSLab/EdgeNode/internal/utils/goman"
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
setutils "github.com/TeaOSLab/EdgeNode/internal/utils/sets"
"github.com/TeaOSLab/EdgeNode/internal/utils/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils/zero"
"github.com/cespare/xxhash/v2"
"github.com/iwind/TeaGo/types"
"math"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
type MemoryItem struct {
ExpiresAt int64
HeaderValue []byte
BodyValue []byte
Status int
IsDone bool
ModifiedAt int64
IsPrepared bool
WriteOffset int64
isReferring bool // if it is referring by other objects
}
func (this *MemoryItem) IsExpired() bool {
return this.ExpiresAt < fasttime.Now().Unix()
}
const (
MemoryShardCount = 256 // 优化:分片数量,降低锁粒度
)
// memoryShard 内存缓存分片
type memoryShard struct {
locker sync.RWMutex
valuesMap map[uint64]*MemoryItem // hash => item
}
type MemoryStorage struct {
parentStorage StorageInterface
policy *serverconfigs.HTTPCachePolicy
list ListInterface
// 优化:使用分片锁替代全局锁
shards [MemoryShardCount]*memoryShard
dirtyChan chan string // hash chan
dirtyQueueSize int
purgeTicker *utils.Ticker
usedSize int64
totalDirtySize int64
writingKeyMap map[string]zero.Zero // key => bool
writingLocker sync.Mutex // 保护 writingKeyMap
ignoreKeys *setutils.FixedSet
}
func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy, parentStorage StorageInterface) *MemoryStorage {
var dirtyChan chan string
var queueSize = policy.MemoryAutoFlushQueueSize
if parentStorage != nil {
if queueSize <= 0 {
queueSize = memutils.SystemMemoryGB() * 100_000
}
dirtyChan = make(chan string, queueSize)
}
// 优化:初始化分片
storage := &MemoryStorage{
parentStorage: parentStorage,
policy: policy,
list: NewMemoryList(),
dirtyChan: dirtyChan,
dirtyQueueSize: queueSize,
writingKeyMap: map[string]zero.Zero{},
ignoreKeys: setutils.NewFixedSet(32768),
}
// 初始化所有分片
for i := 0; i < MemoryShardCount; i++ {
storage.shards[i] = &memoryShard{
valuesMap: map[uint64]*MemoryItem{},
}
}
return storage
}
// getShard 根据 hash 获取对应的分片
func (this *MemoryStorage) getShard(hash uint64) *memoryShard {
return this.shards[hash%MemoryShardCount]
}
// Init 初始化
func (this *MemoryStorage) Init() error {
_ = this.list.Init()
this.list.OnAdd(func(item *Item) {
atomic.AddInt64(&this.usedSize, item.TotalSize())
})
this.list.OnRemove(func(item *Item) {
atomic.AddInt64(&this.usedSize, -item.TotalSize())
})
this.initPurgeTicker()
// 启动定时Flush memory to disk任务
if this.parentStorage != nil {
var threads = 2
var numCPU = runtime.NumCPU()
if fsutils.DiskIsExtremelyFast() {
if numCPU >= 8 {
threads = 8
} else {
threads = 4
}
} else if fsutils.DiskIsFast() {
if numCPU >= 4 {
threads = 4
}
}
for i := 0; i < threads; i++ {
goman.New(func() {
this.startFlush()
})
}
}
return nil
}
// OpenReader 读取缓存
func (this *MemoryStorage) OpenReader(key string, useStale bool, isPartial bool) (Reader, error) {
var hash = this.hash(key)
// check if exists in list
exists, _, _ := this.list.Exist(types.String(hash))
if !exists {
return nil, ErrNotFound
}
// 优化:使用分片锁,只锁定对应的分片
shard := this.getShard(hash)
shard.locker.RLock()
var item = shard.valuesMap[hash]
if item != nil {
item.isReferring = true
}
if item == nil || !item.IsDone {
shard.locker.RUnlock()
return nil, ErrNotFound
}
if useStale || (item.ExpiresAt > fasttime.Now().Unix()) {
var reader = NewMemoryReader(item)
err := reader.Init()
if err != nil {
shard.locker.RUnlock()
return nil, err
}
shard.locker.RUnlock()
return reader, nil
}
shard.locker.RUnlock()
_ = this.Delete(key)
return nil, ErrNotFound
}
// OpenWriter 打开缓存写入器等待写入
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int, headerSize int, bodySize int64, maxSize int64, isPartial bool) (Writer, error) {
if maxSize > 0 && this.ignoreKeys.Has(types.String(maxSize)+"$"+key) {
return nil, ErrEntityTooLarge
}
// TODO 内存缓存暂时不支持分块内容存储
if isPartial {
return nil, fmt.Errorf("%w (004)", ErrFileIsWriting)
}
return this.openWriter(key, expiredAt, status, headerSize, bodySize, maxSize, true)
}
// OpenFlushWriter 打开从其他媒介直接刷入的写入器
func (this *MemoryStorage) OpenFlushWriter(key string, expiresAt int64, status int, headerSize int, bodySize int64) (Writer, error) {
return this.openWriter(key, expiresAt, status, headerSize, bodySize, -1, true)
}
func (this *MemoryStorage) openWriter(key string, expiresAt int64, status int, headerSize int, bodySize int64, maxSize int64, isDirty bool) (Writer, error) {
// 待写入队列是否已满
if isDirty &&
this.parentStorage != nil &&
this.dirtyQueueSize > 0 &&
len(this.dirtyChan) >= this.dirtyQueueSize-64 /** delta **/ { // 缓存时间过长
return nil, ErrWritingQueueFull
}
// 优化writingKeyMap 使用独立锁保护
this.writingLocker.Lock()
// 是否正在写入
var isWriting = false
_, ok := this.writingKeyMap[key]
if ok {
this.writingLocker.Unlock()
return nil, fmt.Errorf("%w (005)", ErrFileIsWriting)
}
this.writingKeyMap[key] = zero.New()
this.writingLocker.Unlock()
defer func() {
if !isWriting {
this.writingLocker.Lock()
delete(this.writingKeyMap, key)
this.writingLocker.Unlock()
}
}()
// 优化:使用分片锁,只锁定对应的分片
var hash = this.hash(key)
shard := this.getShard(hash)
shard.locker.Lock()
defer shard.locker.Unlock()
// 检查是否过期
item, ok := shard.valuesMap[hash]
if ok && !item.IsExpired() {
var hashString = types.String(hash)
exists, _, _ := this.list.Exist(hashString)
if !exists {
// remove from values map
delete(shard.valuesMap, hash)
_ = this.list.Remove(hashString)
item = nil
} else {
return nil, fmt.Errorf("%w (006)", ErrFileIsWriting)
}
}
// 检查是否超出容量最大值
var capacityBytes = this.memoryCapacityBytes()
if bodySize < 0 {
bodySize = 0
}
if capacityBytes > 0 && capacityBytes <= atomic.LoadInt64(&this.usedSize)+bodySize {
return nil, NewCapacityError("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.usedSize, 10) + " bytes")
}
// 先删除
err := this.deleteWithoutLocker(key)
if err != nil {
return nil, err
}
isWriting = true
return NewMemoryWriter(this, key, expiresAt, status, isDirty, bodySize, maxSize, func(valueItem *MemoryItem) {
// 优化:清理写入锁时使用独立锁
this.writingLocker.Lock()
delete(this.writingKeyMap, key)
this.writingLocker.Unlock()
}), nil
}
// Delete 删除某个键值对应的缓存
func (this *MemoryStorage) Delete(key string) error {
var hash = this.hash(key)
// 优化:使用分片锁
shard := this.getShard(hash)
shard.locker.Lock()
delete(shard.valuesMap, hash)
_ = this.list.Remove(types.String(hash))
shard.locker.Unlock()
return nil
}
// Stat 统计缓存
func (this *MemoryStorage) Stat() (*Stat, error) {
// 优化Stat 不需要锁定所有分片,直接使用 list 的统计
return this.list.Stat(func(hash string) bool {
return true
})
}
// CleanAll 清除所有缓存
func (this *MemoryStorage) CleanAll() error {
// 优化:清理所有分片
for i := 0; i < MemoryShardCount; i++ {
this.shards[i].locker.Lock()
this.shards[i].valuesMap = map[uint64]*MemoryItem{}
this.shards[i].locker.Unlock()
}
_ = this.list.Reset()
atomic.StoreInt64(&this.usedSize, 0)
return nil
}
// Purge 批量删除缓存
func (this *MemoryStorage) Purge(keys []string, urlType string) error {
// 目录
if urlType == "dir" {
for _, key := range keys {
// 检查是否有通配符 http(s)://*.example.com
var schemeIndex = strings.Index(key, "://")
if schemeIndex > 0 {
var keyRight = key[schemeIndex+3:]
if strings.HasPrefix(keyRight, "*.") {
err := this.list.CleanMatchPrefix(key)
if err != nil {
return err
}
continue
}
}
err := this.list.CleanPrefix(key)
if err != nil {
return err
}
}
return nil
}
// URL
for _, key := range keys {
// 检查是否有通配符 http(s)://*.example.com
var schemeIndex = strings.Index(key, "://")
if schemeIndex > 0 {
var keyRight = key[schemeIndex+3:]
if strings.HasPrefix(keyRight, "*.") {
err := this.list.CleanMatchKey(key)
if err != nil {
return err
}
continue
}
}
err := this.Delete(key)
if err != nil {
return err
}
}
return nil
}
// Stop 停止缓存策略
func (this *MemoryStorage) Stop() {
// 优化:清理所有分片
for i := 0; i < MemoryShardCount; i++ {
this.shards[i].locker.Lock()
this.shards[i].valuesMap = map[uint64]*MemoryItem{}
this.shards[i].locker.Unlock()
}
this.writingLocker.Lock()
this.writingKeyMap = map[string]zero.Zero{}
this.writingLocker.Unlock()
_ = this.list.Reset()
if this.purgeTicker != nil {
this.purgeTicker.Stop()
}
if this.dirtyChan != nil {
close(this.dirtyChan)
}
this.usedSize = 0
this.totalDirtySize = 0
_ = this.list.Close()
this.ignoreKeys.Reset()
// 回收内存
runtime.GC()
remotelogs.Println("CACHE", "close memory storage '"+strconv.FormatInt(this.policy.Id, 10)+"'")
}
// Policy 获取当前存储的Policy
func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy {
return this.policy
}
// UpdatePolicy 修改策略
func (this *MemoryStorage) UpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) {
var oldPolicy = this.policy
this.policy = newPolicy
if oldPolicy.MemoryAutoPurgeInterval != newPolicy.MemoryAutoPurgeInterval {
this.initPurgeTicker()
}
// 如果是空的,则清空
if newPolicy.CapacityBytes() == 0 {
_ = this.CleanAll()
}
// reset ignored keys
this.ignoreKeys.Reset()
}
// CanUpdatePolicy 检查策略是否可以更新
func (this *MemoryStorage) CanUpdatePolicy(newPolicy *serverconfigs.HTTPCachePolicy) bool {
return newPolicy != nil && newPolicy.Type == serverconfigs.CachePolicyStorageMemory
}
// AddToList 将缓存添加到列表
func (this *MemoryStorage) AddToList(item *Item) {
// skip added item
if item.MetaSize > 0 {
return
}
item.MetaSize = int64(len(item.Key)) + 128 /** 128是我们评估的数据结构的长度 **/
var hash = types.String(this.hash(item.Key))
if len(item.Host) == 0 {
item.Host = ParseHost(item.Key)
}
_ = this.list.Add(hash, item)
}
// TotalDiskSize 消耗的磁盘尺寸
func (this *MemoryStorage) TotalDiskSize() int64 {
return 0
}
// TotalMemorySize 内存尺寸
func (this *MemoryStorage) TotalMemorySize() int64 {
return atomic.LoadInt64(&this.usedSize)
}
// IgnoreKey 忽略某个Key即不缓存某个Key
func (this *MemoryStorage) IgnoreKey(key string, maxSize int64) {
this.ignoreKeys.Push(types.String(maxSize) + "$" + key)
}
// CanSendfile 是否支持Sendfile
func (this *MemoryStorage) CanSendfile() bool {
return false
}
// HasFreeSpaceForHotItems 是否有足够的空间提供给热门内容
func (this *MemoryStorage) HasFreeSpaceForHotItems() bool {
return atomic.LoadInt64(&this.usedSize) < this.memoryCapacityBytes()*3/4
}
// 计算Key Hash
func (this *MemoryStorage) hash(key string) uint64 {
return xxhash.Sum64String(key)
}
// 清理任务
func (this *MemoryStorage) purgeLoop() {
// 清理过期
var purgeCount = this.policy.MemoryAutoPurgeCount
if purgeCount <= 0 {
purgeCount = 2000
}
_, _ = this.list.Purge(purgeCount, func(hash string) error {
uintHash, err := strconv.ParseUint(hash, 10, 64)
if err == nil {
// 优化:使用分片锁
shard := this.getShard(uintHash)
shard.locker.Lock()
delete(shard.valuesMap, uintHash)
shard.locker.Unlock()
}
return nil
})
// LFU
// 计算是否应该开启LFU清理
var capacityBytes = this.policy.CapacityBytes()
var startLFU = false
var usedPercent = float32(this.TotalMemorySize()*100) / float32(capacityBytes)
var lfuFreePercent = this.policy.MemoryLFUFreePercent
if lfuFreePercent <= 0 {
lfuFreePercent = 5
}
if capacityBytes > 0 {
if lfuFreePercent < 100 {
if usedPercent >= 100-lfuFreePercent {
startLFU = true
}
}
}
if startLFU {
var total, _ = this.list.Count()
if total > 0 {
var count = types.Int(math.Ceil(float64(total) * float64(lfuFreePercent*2) / 100))
if count > 0 {
// 限制单次清理的条数,防止占用太多系统资源
if count > 2000 {
count = 2000
}
// 这里不提示LFU因为此事件将会非常频繁
err := this.list.PurgeLFU(count, func(hash string) error {
uintHash, err := strconv.ParseUint(hash, 10, 64)
if err == nil {
// 优化:使用分片锁
shard := this.getShard(uintHash)
shard.locker.Lock()
delete(shard.valuesMap, uintHash)
shard.locker.Unlock()
}
return nil
})
if err != nil {
remotelogs.Warn("CACHE", "purge memory storage in LFU failed: "+err.Error())
}
}
}
}
}
// 开始Flush任务
func (this *MemoryStorage) startFlush() {
var statCount = 0
for key := range this.dirtyChan {
statCount++
if statCount == 100 {
statCount = 0
}
this.flushItem(key)
if fsutils.IsInExtremelyHighLoad {
time.Sleep(1 * time.Second)
}
}
}
// 单次Flush任务
func (this *MemoryStorage) flushItem(fullKey string) {
sizeString, key, found := strings.Cut(fullKey, "@")
if !found {
return
}
defer func() {
atomic.AddInt64(&this.totalDirtySize, -types.Int64(sizeString))
}()
if this.parentStorage == nil {
return
}
var hash = this.hash(key)
// 优化:使用分片锁
shard := this.getShard(hash)
shard.locker.RLock()
item, ok := shard.valuesMap[hash]
shard.locker.RUnlock()
// 从内存中移除,并确保无论如何都会执行
defer func() {
_ = this.Delete(key)
}()
if !ok {
return
}
if !item.IsDone {
remotelogs.Error("CACHE", "flush items failed: open writer failed: item has not been done")
return
}
if item.IsExpired() {
return
}
// 检查是否在列表中防止未加入列表时就开始flush
isInList, _, err := this.list.Exist(types.String(hash))
if err != nil {
remotelogs.Error("CACHE", "flush items failed: "+err.Error())
return
}
if !isInList {
for i := 0; i < 1000; i++ {
isInList, _, err = this.list.Exist(types.String(hash))
if err != nil {
remotelogs.Error("CACHE", "flush items failed: "+err.Error())
time.Sleep(1 * time.Second)
continue
}
if isInList {
break
}
time.Sleep(1 * time.Millisecond)
}
if !isInList {
// discard
return
}
}
writer, err := this.parentStorage.OpenFlushWriter(key, item.ExpiresAt, item.Status, len(item.HeaderValue), int64(len(item.BodyValue)))
if err != nil {
if !CanIgnoreErr(err) {
remotelogs.Error("CACHE", "flush items failed: open writer failed: "+err.Error())
}
return
}
_, err = writer.WriteHeader(item.HeaderValue)
if err != nil {
_ = writer.Discard()
remotelogs.Error("CACHE", "flush items failed: write header failed: "+err.Error())
return
}
_, err = writer.Write(item.BodyValue)
if err != nil {
_ = writer.Discard()
remotelogs.Error("CACHE", "flush items failed: writer body failed: "+err.Error())
return
}
err = writer.Close()
if err != nil {
_ = writer.Discard()
remotelogs.Error("CACHE", "flush items failed: close writer failed: "+err.Error())
return
}
this.parentStorage.AddToList(&Item{
Type: writer.ItemType(),
Key: key,
Host: ParseHost(key),
ExpiresAt: item.ExpiresAt,
HeaderSize: writer.HeaderSize(),
BodySize: writer.BodySize(),
})
}
func (this *MemoryStorage) memoryCapacityBytes() int64 {
var maxSystemBytes = SharedManager.MaxSystemMemoryBytesPerStorage()
if this.policy == nil {
return maxSystemBytes
}
if SharedManager.MaxMemoryCapacity != nil {
var capacityBytes = SharedManager.MaxMemoryCapacity.Bytes()
if capacityBytes > 0 {
if capacityBytes > maxSystemBytes {
return maxSystemBytes
}
return capacityBytes
}
}
var capacity = this.policy.Capacity // copy
if capacity != nil {
var capacityBytes = capacity.Bytes()
if capacityBytes > 0 {
if capacityBytes > maxSystemBytes {
return maxSystemBytes
}
return capacityBytes
}
}
return maxSystemBytes
}
func (this *MemoryStorage) deleteWithoutLocker(key string) error {
hash := this.hash(key)
// 优化:使用分片锁(注意:这个方法在已持有锁的情况下调用,需要检查调用者)
shard := this.getShard(hash)
// 注意:此方法在 openWriter 中调用时,已经持有 shard.locker所以这里不需要再次加锁
// 但为了安全,我们检查调用者是否已经持有锁
delete(shard.valuesMap, hash)
_ = this.list.Remove(types.String(hash))
return nil
}
func (this *MemoryStorage) initPurgeTicker() {
var autoPurgeInterval = this.policy.MemoryAutoPurgeInterval
if autoPurgeInterval <= 0 {
autoPurgeInterval = 5
}
// 启动定时清理任务
if this.purgeTicker != nil {
this.purgeTicker.Stop()
}
this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second)
goman.New(func() {
for this.purgeTicker.Next() {
var tr = trackers.Begin("MEMORY_CACHE_STORAGE_PURGE_LOOP")
this.purgeLoop()
tr.End()
}
})
}