diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index f4c04a21cb7..a8cee8598fc 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -868,7 +868,7 @@ class CommandXPending : public Commander { } if (end_id != "+") { - auto s = ParseStreamEntryID(start_id, &options_.end_id); + auto s = ParseStreamEntryID(end_id, &options_.end_id); if (!s.IsOK()) { return s; } diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index e7082648b81..106c7f799ad 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -1758,7 +1758,9 @@ rocksdb::Status Stream::GetPendingEntries(engine::Context &ctx, StreamPendingOpt std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.end_id); rocksdb::ReadOptions read_options = ctx.DefaultScanOptions(); - rocksdb::Slice upper_bound(end_key); + // Make upper_bound exclusive by appending to end_key, so iteration includes end_key + std::string upper_bound_key = end_key + std::string("\x00", 1); + rocksdb::Slice upper_bound(upper_bound_key); read_options.iterate_upper_bound = &upper_bound; rocksdb::Slice lower_bound(prefix_key); read_options.iterate_lower_bound = &lower_bound; diff --git a/tests/gocase/go.mod b/tests/gocase/go.mod index 7ec1c0e2b96..050ac452ac8 100644 --- a/tests/gocase/go.mod +++ b/tests/gocase/go.mod @@ -3,7 +3,6 @@ module github.com/apache/kvrocks/tests/gocase go 1.24.0 require ( - github.com/linxGnu/grocksdb v1.10.2 github.com/redis/go-redis/v9 v9.14.0 github.com/shirou/gopsutil/v4 v4.25.8 github.com/stretchr/testify v1.11.1 diff --git a/tests/gocase/go.sum b/tests/gocase/go.sum index fc666ae92e6..6ed90d2263f 100644 --- a/tests/gocase/go.sum +++ b/tests/gocase/go.sum @@ -8,8 +8,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/ebitengine/purego v0.8.4 h1:CF7LEKg5FFOsASUj0+QwaXf8Ht6TlFxg09+S9wz0omw= -github.com/ebitengine/purego v0.8.4/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/ebitengine/purego v0.9.0 h1:mh0zpKBIXDceC63hpvPuGLiJ8ZAa3DfrFTudmfi8A4k= github.com/ebitengine/purego v0.9.0/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= @@ -17,8 +15,6 @@ github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= -github.com/linxGnu/grocksdb v1.10.2 h1:y0dXsWYULY15/BZMcwAZzLd13ZuyA470vyoNzWwmqG0= -github.com/linxGnu/grocksdb v1.10.2/go.mod h1:C3CNe9UYc9hlEM2pC82AqiGS3LRW537u9LFV4wIZuHk= github.com/lufia/plan9stats v0.0.0-20250827001030-24949be3fa54 h1:mFWunSatvkQQDhpdyuFAYwyAan3hzCuma+Pz8sqvOfg= github.com/lufia/plan9stats v0.0.0-20250827001030-24949be3fa54/go.mod h1:autxFIvghDt3jPTLoqZ9OZ7s9qTGNAWmYCjVFWPX/zg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index e97cde87eab..4d8656f75dc 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -937,14 +937,14 @@ func TestStreamOffset(t *testing.T) { groupName := "test-group" consumerName := "test-consumer" require.NoError(t, rdb.Del(ctx, streamName).Err()) - //No such stream + // No such stream require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ Stream: streamName, ID: "1-0", Values: []string{"data", "a"}, }).Err()) - //no such group + // no such group require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err()) @@ -959,7 +959,7 @@ func TestStreamOffset(t *testing.T) { groupName := "test-group" consumerName := "test-consumer" require.NoError(t, rdb.Del(ctx, streamName).Err()) - //No such stream + // No such stream require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ Stream: streamName, @@ -967,7 +967,7 @@ func TestStreamOffset(t *testing.T) { Values: []string{"data", "a"}, }).Err()) - //no such group + // no such group expectedError := fmt.Sprintf("NOGROUP No such consumer group %s for key name %s", groupName, streamName) require.EqualError(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err(), expectedError) require.EqualError(t, rdb.XGroupDelConsumer(ctx, streamName, groupName, consumerName).Err(), expectedError) @@ -1070,14 +1070,14 @@ func TestStreamOffset(t *testing.T) { streamName := "test-stream" groupName := "test-group" require.NoError(t, rdb.Del(ctx, streamName).Err()) - //No such stream + // No such stream require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, "$").Err()) require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ Stream: streamName, ID: "1-0", Values: []string{"data", "a"}, }).Err()) - //No such group + // No such group require.EqualError(t, rdb.XGroupSetID(ctx, streamName, groupName, "$").Err(), fmt.Sprintf("NOGROUP No such consumer group %s for key name %s", groupName, streamName)) require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err()) @@ -1344,8 +1344,10 @@ func TestStreamOffset(t *testing.T) { require.NoError(t, err) require.Equal(t, []redis.XStream{{ Stream: streamName, - Messages: []redis.XMessage{{ID: "1-0", Values: map[string]interface{}{"field1": "data1"}}, - {ID: "2-0", Values: map[string]interface{}{"field2": "data2"}}}, + Messages: []redis.XMessage{ + {ID: "1-0", Values: map[string]interface{}{"field1": "data1"}}, + {ID: "2-0", Values: map[string]interface{}{"field2": "data2"}}, + }, }}, r) require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ @@ -1375,8 +1377,10 @@ func TestStreamOffset(t *testing.T) { require.NoError(t, err) require.Equal(t, []redis.XStream{{ Stream: streamName, - Messages: []redis.XMessage{{ID: "1-0", Values: map[string]interface{}{"field1": "data1"}}, - {ID: "2-0", Values: map[string]interface{}{"field2": "data2"}}}, + Messages: []redis.XMessage{ + {ID: "1-0", Values: map[string]interface{}{"field1": "data1"}}, + {ID: "2-0", Values: map[string]interface{}{"field2": "data2"}}, + }, }}, r) c := srv.NewClient() @@ -1691,7 +1695,6 @@ func TestStreamOffset(t *testing.T) { }) t.Run("XAUTOCLAIM can claim PEL items from another consume", func(t *testing.T) { - streamName := "mystream" groupName := "mygroup" var id1 string @@ -2361,6 +2364,138 @@ func TestStreamOffset(t *testing.T) { require.Greater(t, pendingEntry.Idle, time.Millisecond) require.Less(t, pendingEntry.Idle, 10*time.Second) }) + + t.Run("XPending test", func(t *testing.T) { + key := "stream_xpending_bug" + group := "group1" + + // Create stream and group + require.NoError(t, rdb.XGroupCreateMkStream(ctx, key, group, "0").Err()) + + // Add 5 messages + id1, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: key, Values: map[string]interface{}{"k": "v1"}}).Result() + require.NoError(t, err) + id2, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: key, Values: map[string]interface{}{"k": "v2"}}).Result() + require.NoError(t, err) + id3, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: key, Values: map[string]interface{}{"k": "v3"}}).Result() + require.NoError(t, err) + id4, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: key, Values: map[string]interface{}{"k": "v4"}}).Result() + require.NoError(t, err) + id5, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: key, Values: map[string]interface{}{"k": "v5"}}).Result() + require.NoError(t, err) + + // Read to make them pending + _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: group, + Consumer: "c1", + Streams: []string{key, ">"}, + Count: 10, + }).Result() + require.NoError(t, err) + + // Test 1: XPENDING key group - + 10 (all entries) + res, err := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: key, + Group: group, + Start: "-", + End: "+", + Count: 10, + }).Result() + require.NoError(t, err) + require.Len(t, res, 5) + require.Equal(t, id1, res[0].ID) + require.Equal(t, id5, res[4].ID) + + // Test 2: start_id == end_id (single entry) + res, err = rdb.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: key, + Group: group, + Start: id1, + End: id1, + Count: 10, + }).Result() + require.NoError(t, err) + require.Len(t, res, 1) + require.Equal(t, id1, res[0].ID) + + // Test 3: explicit range id1 to id3 + res, err = rdb.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: key, + Group: group, + Start: id1, + End: id3, + Count: 10, + }).Result() + require.NoError(t, err) + require.Len(t, res, 3) + require.Equal(t, id1, res[0].ID) + require.Equal(t, id2, res[1].ID) + require.Equal(t, id3, res[2].ID) + + // Test 4: explicit range id2 to id4 + res, err = rdb.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: key, + Group: group, + Start: id2, + End: id4, + Count: 10, + }).Result() + require.NoError(t, err) + require.Len(t, res, 3) + require.Equal(t, id2, res[0].ID) + require.Equal(t, id3, res[1].ID) + require.Equal(t, id4, res[2].ID) + + // Test 5: single entry in the middle + res, err = rdb.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: key, + Group: group, + Start: id3, + End: id3, + Count: 10, + }).Result() + require.NoError(t, err) + require.Len(t, res, 1) + require.Equal(t, id3, res[0].ID) + + // Test 6: last entry only + res, err = rdb.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: key, + Group: group, + Start: id5, + End: id5, + Count: 10, + }).Result() + require.NoError(t, err) + require.Len(t, res, 1) + require.Equal(t, id5, res[0].ID) + + // Test 7: range from id3 to end + res, err = rdb.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: key, + Group: group, + Start: id3, + End: "+", + Count: 10, + }).Result() + require.NoError(t, err) + require.Len(t, res, 3) + require.Equal(t, id3, res[0].ID) + require.Equal(t, id5, res[2].ID) + + // Test 8: range from start to id2 + res, err = rdb.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: key, + Group: group, + Start: "-", + End: id2, + Count: 10, + }).Result() + require.NoError(t, err) + require.Len(t, res, 2) + require.Equal(t, id1, res[0].ID) + require.Equal(t, id2, res[1].ID) + }) } func parseStreamEntryID(id string) (ts int64, seqNum int64) {