diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java index 4118e191319..31a84c66bb0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java @@ -567,7 +567,18 @@ private long getLedgerToRereplicateFromHierarchy(String parent, long depth) return -1; } - Collections.shuffle(children); + if (depth == 3) { + // Avoid workers wanting to get the same ledger lock. + Collections.shuffle(children); + } else { + // Sort children in descending order for depths 0-2 to prioritize larger ledger IDs + // The hierarchical path is built based on hexadecimal representation of ledger IDs + // Larger ledger IDs will appear first in hexadecimal string comparison + // By using reverse order sorting, we ensure that paths containing larger ledger IDs + // are visited first during directory traversal, implementing the business requirement + // of "larger ledger IDs should be processed first" + Collections.sort(children, Collections.reverseOrder()); + } while (children.size() > 0) { String tryChild = children.get(0); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java index 3e418226e61..d9f515365f2 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java @@ -30,6 +30,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -43,6 +44,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import lombok.Cleanup; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; @@ -171,6 +173,54 @@ public Long call() { }); } + /** + * Test that larger ledgerIds are processed first when getting ledgers to replicate. + * This verifies the sorting behavior in getLedgerToReplicateFromHierarchy. + * Specifically tests the Collections.sort(children, Collections.reverseOrder()) at line 564. + */ + @Test + public void testLargerLedgerIdProcessedFirst() throws Exception { + // 4294950639 (0xffffbeefL): /ledgers/0000/0000/ffff/beef/urL4294950639 + // 4207853295 (0xdeadbeefL): /ledgers/0000/0000/dead/beef/urL4207853295 + // 3735928559 (0xbeefcafeL): /ledgers/0000/0000/beef/cafe/urL3735928559 + // 3203386110 (0xfacebeefL): /ledgers/0000/0000/face/beef/urL3203386110 + Set testLedgers = new HashSet<>(); + testLedgers.add(0xffffbeefL); + testLedgers.add(0xdeadbeefL); + testLedgers.add(0xbeefcafeL); + testLedgers.add(0xfacebeefL); + + String missingReplica = "localhost:3181"; + + @Cleanup + LedgerUnderreplicationManager manager = lmf1.newLedgerUnderreplicationManager(); + + for (Long ledgerId : testLedgers) { + manager.markLedgerUnderreplicated(ledgerId, missingReplica); + } + + List processedLedgers = new ArrayList<>(); + for (int i = 0; i < testLedgers.size(); i++) { + Future future = getLedgerToReplicate(manager); + Long ledgerId = future.get(5, TimeUnit.SECONDS); + processedLedgers.add(ledgerId); + } + + List expectedLedgers = new ArrayList<>(testLedgers) + .stream().sorted(Collections.reverseOrder()).collect(Collectors.toList()); + + assertEquals("Should have processed all test ledgers", expectedLedgers, processedLedgers); + for (Long ledgerId : processedLedgers) { + assertTrue("Processed ledger should be in test set", testLedgers.contains(ledgerId)); + } + + Long firstProcessed = processedLedgers.get(0); + Long expectedFirst = 0xffffbeefL; + + assertEquals("Larger ledger ID should be processed first", expectedFirst, firstProcessed); + } + + /** * Test basic interactions with the ledger underreplication * manager. @@ -675,8 +725,8 @@ public void testHierarchyCleanup() throws Exception { final LedgerUnderreplicationManager replicaMgr = lmf1 .newLedgerUnderreplicationManager(); // 4 ledgers, 2 in the same hierarchy - long[] ledgers = { 0x00000000deadbeefL, 0x00000000deadbeeeL, - 0x00000000beefcafeL, 0x00000000cafed00dL }; + long[] ledgers = {0x00000000beefcafeL, 0x00000000cafed00dL, + 0x00000000deadbeeeL, 0x00000000deadbeefL}; for (long l : ledgers) { replicaMgr.markLedgerUnderreplicated(l, "localhost:3181");