Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ class CommandXPending : public Commander {
}

if (end_id != "+") {
auto s = ParseStreamEntryID(start_id, &options_.end_id);
auto s = ParseStreamEntryID(end_id, &options_.end_id);
if (!s.IsOK()) {
return s;
}
Expand Down
4 changes: 3 additions & 1 deletion src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1758,7 +1758,9 @@ rocksdb::Status Stream::GetPendingEntries(engine::Context &ctx, StreamPendingOpt
std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.end_id);

rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(end_key);
// Make upper_bound exclusive by appending to end_key, so iteration includes end_key
std::string upper_bound_key = end_key + std::string("\x00", 1);
rocksdb::Slice upper_bound(upper_bound_key);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;
Expand Down
1 change: 0 additions & 1 deletion tests/gocase/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/apache/kvrocks/tests/gocase
go 1.24.0

require (
github.com/linxGnu/grocksdb v1.10.2
github.com/redis/go-redis/v9 v9.14.0
github.com/shirou/gopsutil/v4 v4.25.8
github.com/stretchr/testify v1.11.1
Expand Down
4 changes: 0 additions & 4 deletions tests/gocase/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,13 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/ebitengine/purego v0.8.4 h1:CF7LEKg5FFOsASUj0+QwaXf8Ht6TlFxg09+S9wz0omw=
github.com/ebitengine/purego v0.8.4/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ=
github.com/ebitengine/purego v0.9.0 h1:mh0zpKBIXDceC63hpvPuGLiJ8ZAa3DfrFTudmfi8A4k=
github.com/ebitengine/purego v0.9.0/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/linxGnu/grocksdb v1.10.2 h1:y0dXsWYULY15/BZMcwAZzLd13ZuyA470vyoNzWwmqG0=
github.com/linxGnu/grocksdb v1.10.2/go.mod h1:C3CNe9UYc9hlEM2pC82AqiGS3LRW537u9LFV4wIZuHk=
github.com/lufia/plan9stats v0.0.0-20250827001030-24949be3fa54 h1:mFWunSatvkQQDhpdyuFAYwyAan3hzCuma+Pz8sqvOfg=
github.com/lufia/plan9stats v0.0.0-20250827001030-24949be3fa54/go.mod h1:autxFIvghDt3jPTLoqZ9OZ7s9qTGNAWmYCjVFWPX/zg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
157 changes: 146 additions & 11 deletions tests/gocase/unit/type/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,14 +937,14 @@ func TestStreamOffset(t *testing.T) {
groupName := "test-group"
consumerName := "test-consumer"
require.NoError(t, rdb.Del(ctx, streamName).Err())
//No such stream
// No such stream
require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "1-0",
Values: []string{"data", "a"},
}).Err())
//no such group
// no such group
require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err())
require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err())

Expand All @@ -959,15 +959,15 @@ func TestStreamOffset(t *testing.T) {
groupName := "test-group"
consumerName := "test-consumer"
require.NoError(t, rdb.Del(ctx, streamName).Err())
//No such stream
// No such stream
require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "1-0",
Values: []string{"data", "a"},
}).Err())

//no such group
// no such group
expectedError := fmt.Sprintf("NOGROUP No such consumer group %s for key name %s", groupName, streamName)
require.EqualError(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err(), expectedError)
require.EqualError(t, rdb.XGroupDelConsumer(ctx, streamName, groupName, consumerName).Err(), expectedError)
Expand Down Expand Up @@ -1070,14 +1070,14 @@ func TestStreamOffset(t *testing.T) {
streamName := "test-stream"
groupName := "test-group"
require.NoError(t, rdb.Del(ctx, streamName).Err())
//No such stream
// No such stream
require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, "$").Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "1-0",
Values: []string{"data", "a"},
}).Err())
//No such group
// No such group
require.EqualError(t, rdb.XGroupSetID(ctx, streamName, groupName, "$").Err(),
fmt.Sprintf("NOGROUP No such consumer group %s for key name %s", groupName, streamName))
require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err())
Expand Down Expand Up @@ -1344,8 +1344,10 @@ func TestStreamOffset(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []redis.XStream{{
Stream: streamName,
Messages: []redis.XMessage{{ID: "1-0", Values: map[string]interface{}{"field1": "data1"}},
{ID: "2-0", Values: map[string]interface{}{"field2": "data2"}}},
Messages: []redis.XMessage{
{ID: "1-0", Values: map[string]interface{}{"field1": "data1"}},
{ID: "2-0", Values: map[string]interface{}{"field2": "data2"}},
},
}}, r)

require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Expand Down Expand Up @@ -1375,8 +1377,10 @@ func TestStreamOffset(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []redis.XStream{{
Stream: streamName,
Messages: []redis.XMessage{{ID: "1-0", Values: map[string]interface{}{"field1": "data1"}},
{ID: "2-0", Values: map[string]interface{}{"field2": "data2"}}},
Messages: []redis.XMessage{
{ID: "1-0", Values: map[string]interface{}{"field1": "data1"}},
{ID: "2-0", Values: map[string]interface{}{"field2": "data2"}},
},
}}, r)

c := srv.NewClient()
Expand Down Expand Up @@ -1691,7 +1695,6 @@ func TestStreamOffset(t *testing.T) {
})

t.Run("XAUTOCLAIM can claim PEL items from another consume", func(t *testing.T) {

streamName := "mystream"
groupName := "mygroup"
var id1 string
Expand Down Expand Up @@ -2361,6 +2364,138 @@ func TestStreamOffset(t *testing.T) {
require.Greater(t, pendingEntry.Idle, time.Millisecond)
require.Less(t, pendingEntry.Idle, 10*time.Second)
})

t.Run("XPending test", func(t *testing.T) {
key := "stream_xpending_bug"
group := "group1"

// Create stream and group
require.NoError(t, rdb.XGroupCreateMkStream(ctx, key, group, "0").Err())

// Add 5 messages
id1, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: key, Values: map[string]interface{}{"k": "v1"}}).Result()
require.NoError(t, err)
id2, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: key, Values: map[string]interface{}{"k": "v2"}}).Result()
require.NoError(t, err)
id3, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: key, Values: map[string]interface{}{"k": "v3"}}).Result()
require.NoError(t, err)
id4, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: key, Values: map[string]interface{}{"k": "v4"}}).Result()
require.NoError(t, err)
id5, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: key, Values: map[string]interface{}{"k": "v5"}}).Result()
require.NoError(t, err)

// Read to make them pending
_, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: group,
Consumer: "c1",
Streams: []string{key, ">"},
Count: 10,
}).Result()
require.NoError(t, err)

// Test 1: XPENDING key group - + 10 (all entries)
res, err := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: key,
Group: group,
Start: "-",
End: "+",
Count: 10,
}).Result()
require.NoError(t, err)
require.Len(t, res, 5)
require.Equal(t, id1, res[0].ID)
require.Equal(t, id5, res[4].ID)

// Test 2: start_id == end_id (single entry)
res, err = rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: key,
Group: group,
Start: id1,
End: id1,
Count: 10,
}).Result()
require.NoError(t, err)
require.Len(t, res, 1)
require.Equal(t, id1, res[0].ID)

// Test 3: explicit range id1 to id3
res, err = rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: key,
Group: group,
Start: id1,
End: id3,
Count: 10,
}).Result()
require.NoError(t, err)
require.Len(t, res, 3)
require.Equal(t, id1, res[0].ID)
require.Equal(t, id2, res[1].ID)
require.Equal(t, id3, res[2].ID)

// Test 4: explicit range id2 to id4
res, err = rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: key,
Group: group,
Start: id2,
End: id4,
Count: 10,
}).Result()
require.NoError(t, err)
require.Len(t, res, 3)
require.Equal(t, id2, res[0].ID)
require.Equal(t, id3, res[1].ID)
require.Equal(t, id4, res[2].ID)

// Test 5: single entry in the middle
res, err = rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: key,
Group: group,
Start: id3,
End: id3,
Count: 10,
}).Result()
require.NoError(t, err)
require.Len(t, res, 1)
require.Equal(t, id3, res[0].ID)

// Test 6: last entry only
res, err = rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: key,
Group: group,
Start: id5,
End: id5,
Count: 10,
}).Result()
require.NoError(t, err)
require.Len(t, res, 1)
require.Equal(t, id5, res[0].ID)

// Test 7: range from id3 to end
res, err = rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: key,
Group: group,
Start: id3,
End: "+",
Count: 10,
}).Result()
require.NoError(t, err)
require.Len(t, res, 3)
require.Equal(t, id3, res[0].ID)
require.Equal(t, id5, res[2].ID)

// Test 8: range from start to id2
res, err = rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: key,
Group: group,
Start: "-",
End: id2,
Count: 10,
}).Result()
require.NoError(t, err)
require.Len(t, res, 2)
require.Equal(t, id1, res[0].ID)
require.Equal(t, id2, res[1].ID)
})
}

func parseStreamEntryID(id string) (ts int64, seqNum int64) {
Expand Down
Loading