// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . package readers import ( "github.com/TeaOSLab/EdgeNode/internal/utils/zero" "io" "os" "sync" ) type subFileReader struct { fp *os.File notify chan zero.Zero parentIsClosed bool } func newSubFileReader(fp *os.File) *subFileReader { return &subFileReader{ fp: fp, notify: make(chan zero.Zero, 2), } } func (this *subFileReader) Read(p []byte) (n int, err error) { n, err = this.fp.Read(p) if n > 0 { return n, nil } if err != nil { if this.parentIsClosed { return } if err != io.EOF { return } // try next read <-this.notify return this.Read(p) } return } func (this *subFileReader) Close() error { close(this.notify) return this.fp.Close() } func (this *subFileReader) NotifyRead() { select { case this.notify <- zero.Zero{}: default: } } func (this *subFileReader) NotifyClose() { select { case this.notify <- zero.Zero{}: default: } this.parentIsClosed = true } type ConcurrentFileReaders struct { filePath string isClosed bool subReaders []*subFileReader locker sync.RWMutex } func NewConcurrentFileReaders(filePath string) *ConcurrentFileReaders { return &ConcurrentFileReaders{ filePath: filePath, } } func (this *ConcurrentFileReaders) Get() (io.ReadCloser, error) { this.locker.Lock() if this.isClosed { this.locker.Unlock() return nil, os.ErrNotExist } this.locker.Unlock() fp, err := os.OpenFile(this.filePath, os.O_RDONLY, 0444) if err != nil { return nil, os.ErrNotExist } var subReader = newSubFileReader(fp) this.locker.Lock() this.subReaders = append(this.subReaders, subReader) this.locker.Unlock() return subReader, nil } func (this *ConcurrentFileReaders) NotifyRead() { this.locker.RLock() for _, subReader := range this.subReaders { subReader.NotifyRead() } this.locker.RUnlock() } func (this *ConcurrentFileReaders) NotifyClose() { this.locker.Lock() this.isClosed = true this.locker.Unlock() this.locker.RLock() for _, subReader := range this.subReaders { subReader.NotifyClose() } this.locker.RUnlock() }