v1.5.1 增强程序稳定性
This commit is contained in:
@@ -70,10 +70,10 @@ func (this *DB) Inspect(fn func(key []byte, value []byte)) error {
|
||||
if valueErr != nil {
|
||||
return valueErr
|
||||
}
|
||||
fn(it.Key(), value)
|
||||
fn(append([]byte(nil), it.Key()...), append([]byte(nil), value...))
|
||||
}
|
||||
|
||||
return nil
|
||||
return it.Error()
|
||||
}
|
||||
|
||||
// Truncate the database
|
||||
|
||||
10
EdgeNode/internal/utils/kvstore/panic.go
Normal file
10
EdgeNode/internal/utils/kvstore/panic.go
Normal file
@@ -0,0 +1,10 @@
|
||||
package kvstore
|
||||
|
||||
import "fmt"
|
||||
|
||||
func wrapRecoveredPanic(message string, panicErr any) error {
|
||||
if resultErr, ok := panicErr.(error); ok {
|
||||
return fmt.Errorf("%s: %w", message, resultErr)
|
||||
}
|
||||
return fmt.Errorf("%s: %v", message, panicErr)
|
||||
}
|
||||
@@ -5,7 +5,6 @@ package kvstore
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
byteutils "github.com/TeaOSLab/EdgeNode/internal/utils/byte"
|
||||
)
|
||||
|
||||
@@ -66,6 +65,9 @@ func (this *Query[T]) SetTable(table *Table[T]) *Query[T] {
|
||||
|
||||
func (this *Query[T]) SetTx(tx *Tx[T]) *Query[T] {
|
||||
this.tx = tx
|
||||
if tx != nil {
|
||||
this.table = tx.table
|
||||
}
|
||||
return this
|
||||
}
|
||||
|
||||
@@ -128,7 +130,12 @@ func (this *Query[T]) FieldPrefix(fieldName string, fieldPrefix string) *Query[T
|
||||
}
|
||||
|
||||
func (this *Query[T]) FieldOffset(fieldOffset []byte) *Query[T] {
|
||||
this.fieldOffsetKey = fieldOffset
|
||||
if len(fieldOffset) == 0 {
|
||||
this.fieldOffsetKey = nil
|
||||
return this
|
||||
}
|
||||
|
||||
this.fieldOffsetKey = append([]byte(nil), fieldOffset...)
|
||||
return this
|
||||
}
|
||||
|
||||
@@ -172,28 +179,12 @@ func (this *Query[T]) FindAll(fn IteratorFunc[T]) (err error) {
|
||||
defer func() {
|
||||
var panicErr = recover()
|
||||
if panicErr != nil {
|
||||
resultErr, ok := panicErr.(error)
|
||||
if ok {
|
||||
err = fmt.Errorf("execute query failed: %w", resultErr)
|
||||
}
|
||||
err = wrapRecoveredPanic("execute query failed", panicErr)
|
||||
}
|
||||
}()
|
||||
|
||||
if this.tx != nil {
|
||||
defer func() {
|
||||
_ = this.tx.Close()
|
||||
}()
|
||||
|
||||
var itErr error
|
||||
if len(this.fieldName) == 0 {
|
||||
itErr = this.iterateKeys(fn)
|
||||
} else {
|
||||
itErr = this.iterateFields(fn)
|
||||
}
|
||||
if itErr != nil {
|
||||
return itErr
|
||||
}
|
||||
return this.tx.Commit()
|
||||
return this.findAllWithTx(this.tx, fn)
|
||||
}
|
||||
|
||||
if this.table != nil {
|
||||
@@ -205,19 +196,29 @@ func (this *Query[T]) FindAll(fn IteratorFunc[T]) (err error) {
|
||||
}
|
||||
|
||||
return txFn(func(tx *Tx[T]) error {
|
||||
this.tx = tx
|
||||
|
||||
if len(this.fieldName) == 0 {
|
||||
return this.iterateKeys(fn)
|
||||
}
|
||||
return this.iterateFields(fn)
|
||||
return this.findAllWithTx(tx, fn)
|
||||
})
|
||||
}
|
||||
|
||||
return errors.New("current query require 'table' or 'tx'")
|
||||
}
|
||||
|
||||
func (this *Query[T]) iterateKeys(fn IteratorFunc[T]) error {
|
||||
func (this *Query[T]) findAllWithTx(tx *Tx[T], fn IteratorFunc[T]) error {
|
||||
if tx == nil {
|
||||
return errors.New("current query require valid tx")
|
||||
}
|
||||
|
||||
if this.table == nil {
|
||||
this.table = tx.table
|
||||
}
|
||||
|
||||
if len(this.fieldName) == 0 {
|
||||
return this.iterateKeys(tx, fn)
|
||||
}
|
||||
return this.iterateFields(tx, fn)
|
||||
}
|
||||
|
||||
func (this *Query[T]) iterateKeys(tx *Tx[T], fn IteratorFunc[T]) error {
|
||||
if this.limit == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -262,7 +263,7 @@ func (this *Query[T]) iterateKeys(fn IteratorFunc[T]) error {
|
||||
|
||||
var hasOffsetKey = len(this.offsetKey) > 0
|
||||
|
||||
it, itErr := this.tx.NewIterator(opt)
|
||||
it, itErr := tx.NewIterator(opt)
|
||||
if itErr != nil {
|
||||
return itErr
|
||||
}
|
||||
@@ -297,7 +298,7 @@ func (this *Query[T]) iterateKeys(fn IteratorFunc[T]) error {
|
||||
}
|
||||
}
|
||||
|
||||
goNext, callbackErr := fn(this.tx, Item[T]{
|
||||
goNext, callbackErr := fn(tx, Item[T]{
|
||||
Key: string(keyBytes[prefixLen:]),
|
||||
Value: value,
|
||||
})
|
||||
@@ -346,10 +347,10 @@ func (this *Query[T]) iterateKeys(fn IteratorFunc[T]) error {
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return it.Error()
|
||||
}
|
||||
|
||||
func (this *Query[T]) iterateFields(fn IteratorFunc[T]) error {
|
||||
func (this *Query[T]) iterateFields(tx *Tx[T], fn IteratorFunc[T]) error {
|
||||
if this.limit == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -390,7 +391,7 @@ func (this *Query[T]) iterateFields(fn IteratorFunc[T]) error {
|
||||
opt.UpperBound = byteutils.Append(prefix, 0xFF)
|
||||
}
|
||||
|
||||
it, itErr := this.tx.NewIterator(opt)
|
||||
it, itErr := tx.NewIterator(opt)
|
||||
if itErr != nil {
|
||||
return itErr
|
||||
}
|
||||
@@ -427,10 +428,10 @@ func (this *Query[T]) iterateFields(fn IteratorFunc[T]) error {
|
||||
|
||||
var resultItem = Item[T]{
|
||||
Key: string(keyBytes),
|
||||
FieldKey: fieldKeyBytes,
|
||||
FieldKey: append([]byte(nil), fieldKeyBytes...),
|
||||
}
|
||||
if !this.keysOnly {
|
||||
value, getErr := this.table.getWithKeyBytes(this.tx, this.table.FullKeyBytes(keyBytes))
|
||||
value, getErr := this.table.getWithKeyBytes(tx, this.table.FullKeyBytes(keyBytes))
|
||||
if getErr != nil {
|
||||
if IsNotFound(getErr) {
|
||||
return true, nil
|
||||
@@ -441,7 +442,7 @@ func (this *Query[T]) iterateFields(fn IteratorFunc[T]) error {
|
||||
resultItem.Value = value
|
||||
}
|
||||
|
||||
goNextItem, err = fn(this.tx, resultItem)
|
||||
goNextItem, err = fn(tx, resultItem)
|
||||
if err != nil {
|
||||
if IsSkipError(err) {
|
||||
return true, nil
|
||||
@@ -487,7 +488,7 @@ func (this *Query[T]) iterateFields(fn IteratorFunc[T]) error {
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return it.Error()
|
||||
}
|
||||
|
||||
func (this *Query[T]) matchOperators(fieldValueBytes []byte) bool {
|
||||
|
||||
225
EdgeNode/internal/utils/kvstore/regression_test.go
Normal file
225
EdgeNode/internal/utils/kvstore/regression_test.go
Normal file
@@ -0,0 +1,225 @@
|
||||
package kvstore_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/kvstore"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func openIsolatedTable[T any](t *testing.T, tableName string, encoder kvstore.ValueEncoder[T]) *kvstore.Table[T] {
|
||||
storeName := fmt.Sprintf("test-%d", time.Now().UnixNano())
|
||||
|
||||
store, err := kvstore.OpenStore(storeName)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Cleanup(func() {
|
||||
_ = store.Close()
|
||||
_ = kvstore.RemoveStore(store.Path())
|
||||
})
|
||||
|
||||
db, err := store.NewDB("db1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
table, err := kvstore.NewTable[T](tableName, encoder)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db.AddTable(table)
|
||||
return table
|
||||
}
|
||||
|
||||
func TestQuery_FieldKeyIsStableAcrossPaging(t *testing.T) {
|
||||
table := openIsolatedTable[*testCachedItem](t, "cache_items", &testCacheItemEncoder[*testCachedItem]{})
|
||||
|
||||
err := table.AddFields("expiresAt")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for i, key := range []string{"a1", "a2", "a3"} {
|
||||
err = table.Set(key, &testCachedItem{
|
||||
Hash: key,
|
||||
URL: "https://example.com/" + key,
|
||||
ExpiresAt: int64(i + 1),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
var firstFieldKey []byte
|
||||
var firstFieldKeySnapshot []byte
|
||||
var count int
|
||||
|
||||
err = table.Query().
|
||||
FieldAsc("expiresAt").
|
||||
Limit(2).
|
||||
FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (bool, error) {
|
||||
switch count {
|
||||
case 0:
|
||||
firstFieldKey = item.FieldKey
|
||||
firstFieldKeySnapshot = append([]byte(nil), item.FieldKey...)
|
||||
case 1:
|
||||
if !bytes.Equal(firstFieldKey, firstFieldKeySnapshot) {
|
||||
t.Fatalf("field key mutated during iteration: got %q want %q", firstFieldKey, firstFieldKeySnapshot)
|
||||
}
|
||||
}
|
||||
count++
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var keys []string
|
||||
err = table.Query().
|
||||
FieldAsc("expiresAt").
|
||||
FieldOffset(firstFieldKey).
|
||||
Limit(2).
|
||||
FindAll(func(tx *kvstore.Tx[*testCachedItem], item kvstore.Item[*testCachedItem]) (bool, error) {
|
||||
keys = append(keys, item.Key)
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(keys) != 2 || keys[0] != "a2" || keys[1] != "a3" {
|
||||
t.Fatalf("unexpected paged keys: %v", keys)
|
||||
}
|
||||
}
|
||||
|
||||
func TestQuery_FindAll_StringPanicReturnsError(t *testing.T) {
|
||||
table := openIsolatedTable[string](t, "users", kvstore.NewStringValueEncoder[string]())
|
||||
|
||||
err := table.Set("a1", "value-1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = table.Query().
|
||||
Limit(1).
|
||||
FindAll(func(tx *kvstore.Tx[string], item kvstore.Item[string]) (bool, error) {
|
||||
panic("boom")
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatal("expected error, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "execute query failed: boom") {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestQuery_ReusesFreshTableTransactionEachRun(t *testing.T) {
|
||||
table := openIsolatedTable[string](t, "users", kvstore.NewStringValueEncoder[string]())
|
||||
|
||||
for _, key := range []string{"a1", "a2", "a3"} {
|
||||
err := table.Set(key, "value-"+key)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
query := table.Query().Limit(1)
|
||||
for i := 0; i < 2; i++ {
|
||||
var keys []string
|
||||
err := query.FindAll(func(tx *kvstore.Tx[string], item kvstore.Item[string]) (bool, error) {
|
||||
keys = append(keys, item.Key)
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(keys) != 1 || keys[0] != "a1" {
|
||||
t.Fatalf("unexpected result on run %d: %v", i, keys)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestQuery_UsesExistingTxWithoutClosingIt(t *testing.T) {
|
||||
table := openIsolatedTable[string](t, "users", kvstore.NewStringValueEncoder[string]())
|
||||
|
||||
for _, key := range []string{"a1", "a2", "a3"} {
|
||||
err := table.Set(key, "value-"+key)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
err := table.ReadTx(func(tx *kvstore.Tx[string]) error {
|
||||
var keys []string
|
||||
err := tx.Query().
|
||||
Limit(2).
|
||||
FindAll(func(queryTx *kvstore.Tx[string], item kvstore.Item[string]) (bool, error) {
|
||||
if queryTx != tx {
|
||||
return false, fmt.Errorf("query did not reuse the current tx")
|
||||
}
|
||||
keys = append(keys, item.Key)
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(keys) != 2 {
|
||||
return fmt.Errorf("unexpected query size: %d", len(keys))
|
||||
}
|
||||
|
||||
_, err = tx.Get("a1")
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDB_InspectProvidesStableBuffers(t *testing.T) {
|
||||
table := openIsolatedTable[string](t, "users", kvstore.NewStringValueEncoder[string]())
|
||||
|
||||
for _, pair := range []struct {
|
||||
key string
|
||||
value string
|
||||
}{
|
||||
{key: "a1", value: "value-1"},
|
||||
{key: "a2", value: "value-2"},
|
||||
} {
|
||||
err := table.Set(pair.key, pair.value)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
var firstKey []byte
|
||||
var firstValue []byte
|
||||
var firstKeySnapshot []byte
|
||||
var firstValueSnapshot []byte
|
||||
var count int
|
||||
|
||||
err := table.DB().Inspect(func(key []byte, value []byte) {
|
||||
switch count {
|
||||
case 0:
|
||||
firstKey = key
|
||||
firstValue = value
|
||||
firstKeySnapshot = append([]byte(nil), key...)
|
||||
firstValueSnapshot = append([]byte(nil), value...)
|
||||
case 1:
|
||||
if !bytes.Equal(firstKey, firstKeySnapshot) {
|
||||
t.Fatalf("inspect key mutated after next iteration: got %q want %q", firstKey, firstKeySnapshot)
|
||||
}
|
||||
if !bytes.Equal(firstValue, firstValueSnapshot) {
|
||||
t.Fatalf("inspect value mutated after next iteration: got %q want %q", firstValue, firstValueSnapshot)
|
||||
}
|
||||
}
|
||||
count++
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
@@ -116,11 +116,11 @@ func OpenStoreDir(dir string, storeName string) (*Store, error) {
|
||||
}
|
||||
|
||||
var storeOnce = &sync.Once{}
|
||||
var defaultSore *Store
|
||||
var defaultStore *Store
|
||||
|
||||
func DefaultStore() (*Store, error) {
|
||||
if defaultSore != nil {
|
||||
return defaultSore, nil
|
||||
if defaultStore != nil {
|
||||
return defaultStore, nil
|
||||
}
|
||||
|
||||
var resultErr error
|
||||
@@ -137,10 +137,10 @@ func DefaultStore() (*Store, error) {
|
||||
remotelogs.Error("KV", resultErr.Error())
|
||||
return
|
||||
}
|
||||
defaultSore = store
|
||||
defaultStore = store
|
||||
})
|
||||
|
||||
return defaultSore, resultErr
|
||||
return defaultStore, resultErr
|
||||
}
|
||||
|
||||
func (this *Store) Path() string {
|
||||
|
||||
@@ -298,7 +298,7 @@ func (this *Table[T]) Count() (int64, error) {
|
||||
count++
|
||||
}
|
||||
|
||||
return count, err
|
||||
return count, it.Error()
|
||||
}
|
||||
|
||||
func (this *Table[T]) FullKey(realKey string) []byte {
|
||||
@@ -325,9 +325,14 @@ func (this *Table[T]) DecodeFieldKey(fieldName string, fieldKey []byte) (fieldVa
|
||||
return
|
||||
}
|
||||
|
||||
var fieldValueLen = binary.BigEndian.Uint16(fieldKey[l-2:])
|
||||
var fieldValueLen = int(binary.BigEndian.Uint16(fieldKey[l-2:]))
|
||||
var data = fieldKey[baseLen-4 : l-2]
|
||||
|
||||
if fieldValueLen+2 > len(data) {
|
||||
err = errors.New("invalid field value length")
|
||||
return
|
||||
}
|
||||
|
||||
fieldValue = data[:fieldValueLen]
|
||||
key = data[fieldValueLen+2: /** separator length **/]
|
||||
|
||||
|
||||
@@ -4,10 +4,13 @@ package kvstore
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/cockroachdb/pebble"
|
||||
)
|
||||
|
||||
var commitBatch = func(batch *pebble.Batch, opt *pebble.WriteOptions) error {
|
||||
return batch.Commit(opt)
|
||||
}
|
||||
|
||||
type Tx[T any] struct {
|
||||
table *Table[T]
|
||||
readOnly bool
|
||||
@@ -129,12 +132,9 @@ func (this *Tx[T]) commit(opt *pebble.WriteOptions) (err error) {
|
||||
defer func() {
|
||||
var panicErr = recover()
|
||||
if panicErr != nil {
|
||||
resultErr, ok := panicErr.(error)
|
||||
if ok {
|
||||
err = fmt.Errorf("commit batch failed: %w", resultErr)
|
||||
}
|
||||
err = wrapRecoveredPanic("commit batch failed", panicErr)
|
||||
}
|
||||
}()
|
||||
|
||||
return this.batch.Commit(opt)
|
||||
return commitBatch(this.batch, opt)
|
||||
}
|
||||
|
||||
27
EdgeNode/internal/utils/kvstore/tx_internal_test.go
Normal file
27
EdgeNode/internal/utils/kvstore/tx_internal_test.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package kvstore
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/cockroachdb/pebble"
|
||||
)
|
||||
|
||||
func TestTx_commit_StringPanicReturnsError(t *testing.T) {
|
||||
oldCommitBatch := commitBatch
|
||||
commitBatch = func(batch *pebble.Batch, opt *pebble.WriteOptions) error {
|
||||
panic("boom")
|
||||
}
|
||||
defer func() {
|
||||
commitBatch = oldCommitBatch
|
||||
}()
|
||||
|
||||
tx := &Tx[string]{}
|
||||
err := tx.commit(DefaultWriteOptions)
|
||||
if err == nil {
|
||||
t.Fatal("expected error, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "commit batch failed: boom") {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user