diff --git a/index/keys.go b/index/keys.go index c0b66ea..9d3362e 100644 --- a/index/keys.go +++ b/index/keys.go @@ -2,13 +2,15 @@ package index import ( "encoding/binary" + "fmt" "hash/fnv" - art "github.com/plar/go-adaptive-radix-tree/v2" - "github.com/klev-dev/klevdb/message" + art "github.com/plar/go-adaptive-radix-tree/v2" ) +var ErrKeyNotFound = fmt.Errorf("key: %w", message.ErrNotFound) + func KeyHash(key []byte) uint64 { hasher := fnv.New64a() hasher.Write(key) @@ -41,5 +43,5 @@ func Keys(keys art.Tree, keyHash []byte) ([]int64, error) { return v.([]int64), nil } - return nil, message.ErrNotFound + return nil, ErrKeyNotFound } diff --git a/index/keys_test.go b/index/keys_test.go index d499c49..e9872c3 100644 --- a/index/keys_test.go +++ b/index/keys_test.go @@ -12,7 +12,7 @@ func TestKeys(t *testing.T) { t.Run("Empty", func(t *testing.T) { keys := art.New() pos, err := Keys(keys, KeyHashEncoded(1)) - require.ErrorIs(t, message.ErrNotFound, err) + require.ErrorIs(t, err, message.ErrNotFound) require.Empty(t, pos) }) @@ -27,7 +27,7 @@ func TestKeys(t *testing.T) { require.ElementsMatch(t, []int64{item.Position}, pos) pos, err = Keys(keys, KeyHashEncoded(321)) - require.ErrorIs(t, message.ErrNotFound, err) + require.ErrorIs(t, err, message.ErrNotFound) require.Empty(t, pos) }) @@ -48,7 +48,7 @@ func TestKeys(t *testing.T) { require.ElementsMatch(t, []int64{item3.Position}, pos) pos, err = Keys(keys, KeyHashEncoded(321)) - require.ErrorIs(t, message.ErrNotFound, err) + require.ErrorIs(t, err, message.ErrNotFound) require.Empty(t, pos) }) } diff --git a/index/offset.go b/index/offset.go index b5a69d5..f4115ba 100644 --- a/index/offset.go +++ b/index/offset.go @@ -6,11 +6,14 @@ import ( "github.com/klev-dev/klevdb/message" ) -var ErrIndexEmpty = fmt.Errorf("%w: no items", message.ErrInvalidOffset) +var ErrOffsetIndexEmpty = fmt.Errorf("%w: no offset items", message.ErrInvalidOffset) +var ErrOffsetBeforeStart = fmt.Errorf("%w: offset before start", message.ErrNotFound) +var ErrOffsetAfterEnd = fmt.Errorf("%w: offset after end", message.ErrInvalidOffset) +var ErrOffsetNotFound = fmt.Errorf("%w: offset not found", message.ErrNotFound) func Consume(items []Item, offset int64) (int64, int64, error) { if len(items) == 0 { - return 0, 0, ErrIndexEmpty + return 0, 0, ErrOffsetIndexEmpty } switch offset { @@ -32,7 +35,7 @@ func Consume(items []Item, offset int64) (int64, int64, error) { endItem := items[endIndex] switch { case offset > endItem.Offset: - return 0, 0, message.ErrInvalidOffset + return 0, 0, ErrOffsetAfterEnd case offset == endItem.Offset: return endItem.Position, endItem.Position, nil } @@ -55,7 +58,7 @@ func Consume(items []Item, offset int64) (int64, int64, error) { func Get(items []Item, offset int64) (int64, error) { if len(items) == 0 { - return 0, ErrIndexEmpty + return 0, ErrOffsetIndexEmpty } switch offset { @@ -69,7 +72,7 @@ func Get(items []Item, offset int64) (int64, error) { beginItem := items[beginIndex] switch { case offset < beginItem.Offset: - return 0, message.ErrNotFound + return 0, ErrOffsetBeforeStart case offset == beginItem.Offset: return beginItem.Position, nil } @@ -78,7 +81,7 @@ func Get(items []Item, offset int64) (int64, error) { endItem := items[endIndex] switch { case offset > endItem.Offset: - return 0, message.ErrNotFound + return 0, ErrOffsetAfterEnd case offset == endItem.Offset: return endItem.Position, nil } @@ -96,5 +99,5 @@ func Get(items []Item, offset int64) (int64, error) { } } - return 0, message.ErrNotFound + return 0, ErrOffsetNotFound } diff --git a/index/offset_test.go b/index/offset_test.go index 204120f..62dc9d2 100644 --- a/index/offset_test.go +++ b/index/offset_test.go @@ -25,20 +25,20 @@ func TestConsume(t *testing.T) { err error }{ // empty tests - {items: nil, offset: 0, err: ErrIndexEmpty}, + {items: nil, offset: 0, err: ErrOffsetIndexEmpty}, // single item tests {items: []int64{1}, offset: message.OffsetOldest, position: 1, max: 1}, {items: []int64{1}, offset: message.OffsetNewest, position: 1, max: 1}, {items: []int64{1}, offset: 0, position: 1, max: 1}, {items: []int64{1}, offset: 1, position: 1, max: 1}, - {items: []int64{1}, offset: 2, err: message.ErrInvalidOffset}, + {items: []int64{1}, offset: 2, err: ErrOffsetAfterEnd}, // continuous tests {items: []int64{1, 2, 3}, offset: message.OffsetOldest, position: 1, max: 3}, {items: []int64{1, 2, 3}, offset: message.OffsetNewest, position: 3, max: 3}, {items: []int64{1, 2, 3}, offset: 0, position: 1, max: 3}, {items: []int64{1, 2, 3}, offset: 1, position: 1, max: 3}, {items: []int64{1, 2, 3}, offset: 3, position: 3, max: 3}, - {items: []int64{1, 2, 3}, offset: 4, err: message.ErrInvalidOffset}, + {items: []int64{1, 2, 3}, offset: 4, err: ErrOffsetAfterEnd}, // gaps tests {items: []int64{1, 3}, offset: message.OffsetOldest, position: 1, max: 3}, {items: []int64{1, 3}, offset: message.OffsetNewest, position: 3, max: 3}, @@ -46,7 +46,7 @@ func TestConsume(t *testing.T) { {items: []int64{1, 3}, offset: 1, position: 1, max: 3}, {items: []int64{1, 3}, offset: 2, position: 3, max: 3}, {items: []int64{1, 3}, offset: 3, position: 3, max: 3}, - {items: []int64{1, 3}, offset: 4, err: message.ErrInvalidOffset}, + {items: []int64{1, 3}, offset: 4, err: ErrOffsetAfterEnd}, {items: []int64{1, 3, 5}, offset: message.OffsetOldest, position: 1, max: 5}, {items: []int64{1, 3, 5}, offset: message.OffsetNewest, position: 5, max: 5}, {items: []int64{1, 3, 5}, offset: 0, position: 1, max: 5}, @@ -55,7 +55,7 @@ func TestConsume(t *testing.T) { {items: []int64{1, 3, 5}, offset: 3, position: 3, max: 5}, {items: []int64{1, 3, 5}, offset: 4, position: 5, max: 5}, {items: []int64{1, 3, 5}, offset: 5, position: 5, max: 5}, - {items: []int64{1, 3, 5}, offset: 6, err: message.ErrInvalidOffset}, + {items: []int64{1, 3, 5}, offset: 6, err: ErrOffsetAfterEnd}, } for _, tc := range tests { @@ -76,37 +76,37 @@ func TestGet(t *testing.T) { err error }{ // empty tests - {items: nil, offset: 0, err: ErrIndexEmpty}, + {items: nil, offset: 0, err: ErrOffsetIndexEmpty}, // single item tests {items: []int64{1}, offset: message.OffsetOldest, position: 1}, {items: []int64{1}, offset: message.OffsetNewest, position: 1}, - {items: []int64{1}, offset: 0, err: message.ErrNotFound}, + {items: []int64{1}, offset: 0, err: ErrOffsetBeforeStart}, {items: []int64{1}, offset: 1, position: 1}, - {items: []int64{1}, offset: 2, err: message.ErrNotFound}, + {items: []int64{1}, offset: 2, err: ErrOffsetAfterEnd}, // continuous tests {items: []int64{1, 2, 3}, offset: message.OffsetOldest, position: 1}, {items: []int64{1, 2, 3}, offset: message.OffsetNewest, position: 3}, - {items: []int64{1, 2, 3}, offset: 0, err: message.ErrNotFound}, + {items: []int64{1, 2, 3}, offset: 0, err: ErrOffsetBeforeStart}, {items: []int64{1, 2, 3}, offset: 1, position: 1}, {items: []int64{1, 2, 3}, offset: 3, position: 3}, - {items: []int64{1, 2, 3}, offset: 4, err: message.ErrNotFound}, + {items: []int64{1, 2, 3}, offset: 4, err: ErrOffsetAfterEnd}, // gaps tests {items: []int64{1, 3}, offset: message.OffsetOldest, position: 1}, {items: []int64{1, 3}, offset: message.OffsetNewest, position: 3}, - {items: []int64{1, 3}, offset: 0, err: message.ErrNotFound}, + {items: []int64{1, 3}, offset: 0, err: ErrOffsetBeforeStart}, {items: []int64{1, 3}, offset: 1, position: 1}, - {items: []int64{1, 3}, offset: 2, err: message.ErrNotFound}, + {items: []int64{1, 3}, offset: 2, err: ErrOffsetNotFound}, {items: []int64{1, 3}, offset: 3, position: 3}, - {items: []int64{1, 3}, offset: 4, err: message.ErrNotFound}, + {items: []int64{1, 3}, offset: 4, err: ErrOffsetAfterEnd}, {items: []int64{1, 3, 5}, offset: message.OffsetOldest, position: 1}, {items: []int64{1, 3, 5}, offset: message.OffsetNewest, position: 5}, - {items: []int64{1, 3, 5}, offset: 0, err: message.ErrNotFound}, + {items: []int64{1, 3, 5}, offset: 0, err: ErrOffsetBeforeStart}, {items: []int64{1, 3, 5}, offset: 1, position: 1}, - {items: []int64{1, 3, 5}, offset: 2, err: message.ErrNotFound}, + {items: []int64{1, 3, 5}, offset: 2, err: ErrOffsetNotFound}, {items: []int64{1, 3, 5}, offset: 3, position: 3}, - {items: []int64{1, 3, 5}, offset: 4, err: message.ErrNotFound}, + {items: []int64{1, 3, 5}, offset: 4, err: ErrOffsetNotFound}, {items: []int64{1, 3, 5}, offset: 5, position: 5}, - {items: []int64{1, 3, 5}, offset: 6, err: message.ErrNotFound}, + {items: []int64{1, 3, 5}, offset: 6, err: ErrOffsetAfterEnd}, } for _, tc := range tests { diff --git a/index/times.go b/index/times.go index 7d9acbe..24438c1 100644 --- a/index/times.go +++ b/index/times.go @@ -1,21 +1,27 @@ package index import ( + "errors" + "fmt" "sort" "github.com/klev-dev/klevdb/message" ) +var ErrTimeIndexEmpty = fmt.Errorf("%w: no time items", message.ErrInvalidOffset) +var ErrTimeBeforeStart = errors.New("time before start") +var ErrTimeAfterEnd = errors.New("time after end") + func Time(items []Item, ts int64) (int64, error) { if len(items) == 0 { - return 0, ErrIndexEmpty + return 0, ErrTimeIndexEmpty } beginIndex := 0 beginItem := items[beginIndex] switch { case ts < beginItem.Timestamp: - return 0, message.ErrInvalidOffset + return 0, ErrTimeBeforeStart case ts == beginItem.Timestamp: return beginItem.Position, nil } @@ -24,7 +30,7 @@ func Time(items []Item, ts int64) (int64, error) { endItem := items[endIndex] switch { case endItem.Timestamp < ts: - return 0, message.ErrNotFound + return 0, ErrTimeAfterEnd } foundIndex := sort.Search(len(items), func(midIndex int) bool { diff --git a/index/times_test.go b/index/times_test.go index 301555d..96b9693 100644 --- a/index/times_test.go +++ b/index/times_test.go @@ -4,7 +4,6 @@ import ( "fmt" "testing" - "github.com/klev-dev/klevdb/message" "github.com/stretchr/testify/require" ) @@ -21,19 +20,19 @@ func TestTime(t *testing.T) { t.Run("Empty", func(t *testing.T) { items := gen() _, err := Time(items, 1) - require.ErrorIs(t, ErrIndexEmpty, err) + require.ErrorIs(t, ErrTimeIndexEmpty, err) }) t.Run("Before", func(t *testing.T) { items := gen(1) _, err := Time(items, 0) - require.ErrorIs(t, message.ErrInvalidOffset, err) + require.ErrorIs(t, ErrTimeBeforeStart, err) }) t.Run("After", func(t *testing.T) { items := gen(1) _, err := Time(items, 2) - require.ErrorIs(t, message.ErrNotFound, err) + require.ErrorIs(t, ErrTimeAfterEnd, err) }) t.Run("Exact", func(t *testing.T) { diff --git a/log.go b/log.go index f237fae..b352b32 100644 --- a/log.go +++ b/log.go @@ -18,9 +18,9 @@ import ( ) var errNoKeyIndex = fmt.Errorf("%w by key", ErrNoIndex) -var errKeyNotFound = fmt.Errorf("key %w", ErrNotFound) +var errKeyNotFound = fmt.Errorf("key %w", message.ErrNotFound) var errNoTimeIndex = fmt.Errorf("%w by time", ErrNoIndex) -var errTimeNotFound = fmt.Errorf("time %w", ErrNotFound) +var errTimeNotFound = fmt.Errorf("time %w", message.ErrNotFound) var errDeleteRelative = fmt.Errorf("%w: delete relative offsets", message.ErrInvalidOffset) // Open create a log based on a dir and set of options @@ -196,15 +196,13 @@ func (l *log) Consume(offset int64, maxCount int64) (int64, []message.Message, e l.readersMu.RLock() defer l.readersMu.RUnlock() - rdr, index := segment.Consume(l.readers, offset) + rdr, segmentIndex := segment.Consume(l.readers, offset) nextOffset, msgs, err := rdr.Consume(offset, maxCount) - if err != nil && err == message.ErrInvalidOffset { - if index < len(l.readers)-1 { - // this is after the end, consume starting the next one - next := l.readers[index+1] - return next.Consume(message.OffsetOldest, maxCount) - } + if err == index.ErrOffsetAfterEnd && segmentIndex < len(l.readers)-1 { + // this is after the end, consume starting the next one + next := l.readers[segmentIndex+1] + return next.Consume(message.OffsetOldest, maxCount) } return nextOffset, msgs, err @@ -243,12 +241,16 @@ func (l *log) Get(offset int64) (message.Message, error) { l.readersMu.RLock() defer l.readersMu.RUnlock() - rdr, _, err := segment.Get(l.readers, offset) + rdr, segmentIndex, err := segment.Get(l.readers, offset) if err != nil { return message.Invalid, err } - return rdr.Get(offset) + msg, err := rdr.Get(offset) + if err == index.ErrOffsetAfterEnd && segmentIndex < len(l.readers)-1 { + return msg, index.ErrOffsetNotFound + } + return msg, err } func (l *log) GetByKey(key []byte) (message.Message, error) { @@ -267,7 +269,7 @@ func (l *log) GetByKey(key []byte) (message.Message, error) { switch msg, err := rdr.GetByKey(key, hash); { case err == nil: return msg, nil - case err == message.ErrNotFound: + case err == index.ErrKeyNotFound: // not in this segment, try the rest default: return message.Invalid, err @@ -302,19 +304,18 @@ func (l *log) GetByTime(start time.Time) (message.Message, error) { switch msg, err := rdr.GetByTime(ts); { case err == nil: return msg, nil - case err == message.ErrInvalidOffset: + case err == index.ErrTimeBeforeStart: // not in this segment, try the rest if i == 0 { return rdr.Get(message.OffsetOldest) } - case err == message.ErrNotFound: + case err == index.ErrTimeAfterEnd: // time is between end of this and begin next if i < len(l.readers)-1 { nextRdr := l.readers[i+1] return nextRdr.Get(message.OffsetOldest) } - - return message.Invalid, err + return message.Invalid, errTimeNotFound default: return message.Invalid, err } diff --git a/reader.go b/reader.go index fd8749f..5d94c3f 100644 --- a/reader.go +++ b/reader.go @@ -118,25 +118,25 @@ func (r *reader) Consume(offset, maxCount int64) (int64, []message.Message, erro } func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int64, []message.Message, error) { - index, err := r.getIndex() + ix, err := r.getIndex() if err != nil { return OffsetInvalid, nil, err } if offset == OffsetNewest { - nextOffset, err := index.GetNextOffset() + nextOffset, err := ix.GetNextOffset() if err != nil { return OffsetInvalid, nil, err } return nextOffset, nil, nil } - positions, err := index.Keys(keyHash) + positions, err := ix.Keys(keyHash) switch { case err == nil: break - case err == message.ErrNotFound: - nextOffset, err := index.GetNextOffset() + case err == index.ErrKeyNotFound: + nextOffset, err := ix.GetNextOffset() if err != nil { return OffsetInvalid, nil, err } @@ -169,7 +169,7 @@ func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int6 } if len(msgs) == 0 { - nextOffset, err := index.GetNextOffset() + nextOffset, err := ix.GetNextOffset() if err != nil { return OffsetInvalid, nil, err } @@ -200,12 +200,12 @@ func (r *reader) Get(offset int64) (message.Message, error) { } func (r *reader) GetByKey(key, keyHash []byte) (message.Message, error) { - index, err := r.getIndex() + ix, err := r.getIndex() if err != nil { return message.Invalid, err } - positions, err := index.Keys(keyHash) + positions, err := ix.Keys(keyHash) if err != nil { return message.Invalid, err } @@ -226,7 +226,7 @@ func (r *reader) GetByKey(key, keyHash []byte) (message.Message, error) { } } - return message.Invalid, message.ErrNotFound + return message.Invalid, index.ErrKeyNotFound } func (r *reader) GetByTime(ts int64) (message.Message, error) { @@ -435,24 +435,15 @@ func (ix *readerIndex) GetNextOffset() (int64, error) { func (ix *readerIndex) Consume(offset int64) (int64, int64, int64, error) { position, maxPosition, err := index.Consume(ix.items, offset) - if err != nil && ix.head { - switch { - case err == index.ErrIndexEmpty: - if offset <= ix.nextOffset { - return -1, -1, ix.nextOffset, nil - } - case err == message.ErrInvalidOffset: - if offset == ix.nextOffset { - return -1, -1, ix.nextOffset, nil - } - } + if (err == index.ErrOffsetIndexEmpty || err == index.ErrOffsetAfterEnd) && ix.head && offset <= ix.nextOffset { + return -1, -1, ix.nextOffset, nil } return position, maxPosition, offset, err } func (ix *readerIndex) Get(offset int64) (int64, error) { position, err := index.Get(ix.items, offset) - if err == message.ErrNotFound && ix.head && offset >= ix.nextOffset { + if err == index.ErrOffsetAfterEnd && ix.head && offset >= ix.nextOffset { return -1, message.ErrInvalidOffset } return position, err diff --git a/segment/index.go b/segment/index.go index 7458241..8180103 100644 --- a/segment/index.go +++ b/segment/index.go @@ -1,6 +1,8 @@ package segment import ( + "fmt" + "github.com/klev-dev/klevdb/message" ) @@ -47,6 +49,9 @@ func Consume[S ~[]O, O Offsetter](segments S, offset int64) (O, int) { return segments[beginIndex], beginIndex } +var ErrOffsetRelative = fmt.Errorf("%w: get relative offset", message.ErrInvalidOffset) +var ErrOffsetBeforeStart = fmt.Errorf("%w: offset before start", message.ErrNotFound) + func Get[S ~[]O, O Offsetter](segments S, offset int64) (O, int, error) { switch offset { case message.OffsetOldest: @@ -61,9 +66,9 @@ func Get[S ~[]O, O Offsetter](segments S, offset int64) (O, int, error) { case offset < beginSegment.GetOffset(): var v O if beginSegment.GetOffset() == 0 { - return v, -1, message.ErrInvalidOffset + return v, -1, ErrOffsetRelative } - return v, -1, message.ErrNotFound + return v, -1, ErrOffsetBeforeStart case offset == beginSegment.GetOffset(): return beginSegment, 0, nil } diff --git a/writer.go b/writer.go index eda6765..867ff2f 100644 --- a/writer.go +++ b/writer.go @@ -272,14 +272,10 @@ func (ix *writerIndex) Consume(offset int64) (int64, int64, int64, error) { defer ix.mu.RUnlock() position, maxPosition, err := index.Consume(ix.items, offset) - if err == index.ErrIndexEmpty { + if err == index.ErrOffsetIndexEmpty || err == index.ErrOffsetAfterEnd { if nextOffset := ix.nextOffset.Load(); offset <= nextOffset { return -1, -1, nextOffset, nil } - } else if err == message.ErrInvalidOffset { - if nextOffset := ix.nextOffset.Load(); offset == nextOffset { - return -1, -1, nextOffset, nil - } } return position, maxPosition, offset, err } @@ -289,7 +285,7 @@ func (ix *writerIndex) Get(offset int64) (int64, error) { defer ix.mu.RUnlock() position, err := index.Get(ix.items, offset) - if err == message.ErrNotFound { + if err == index.ErrOffsetAfterEnd { if nextOffset := ix.nextOffset.Load(); offset >= nextOffset { return 0, message.ErrInvalidOffset }