Skip to content

Conversation

@navinko
Copy link
Contributor

@navinko navinko commented Dec 19, 2025

What changes were proposed in this pull request?

Replaced ByteArrayCodec with CodecBufferCodec inside processTableSequentially method.
Added unit test for processTableSequentially.

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/HDDS-14155

How was this patch tested?

Validated with existing and new unit tests .
CI:
https://github.com/navinko/ozone/actions/runs/20353031938

Copy link
Contributor

@adoroszlai adoroszlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @navinko for the patch.

Can it be replaced in processTableInParallel, too?

Table<String, byte[]> table = omMetadataManager.getStore()
.getTable(tableName, StringCodec.get(), ByteArrayCodec.get(), TableCache.CacheType.NO_CACHE);

If so, import will also become unused:

@navinko
Copy link
Contributor Author

navinko commented Dec 19, 2025

Thanks @adoroszlai for reviewing

Can it be replaced in processTableInParallel, too?

I see the processTableInParallel performs many background operation to make it work in parallel which requires key to be Comparable.
https://github.com/navinko/ozone/blob/1c817b0460f1e66e75aef0b8f5fde63c691f2a65/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java#L128

String as Key are comparable but CodecBuffer not.
https://github.com/navinko/ozone/blob/1c817b0460f1e66e75aef0b8f5fde63c691f2a65/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java#L53

--> processTableSequentially
Works for non-string keys , the keys are just byte array some binary data and with TableIterator we are again iterating over these keys , no operations being performed like comparison of two binary row data , there is no range splitting. Hence CodecBuffer works , it is not implementing Comparable.

Table<byte[], byte[]> table = omMetadataManager.getStore()
.getTable(tableName, ByteArrayCodec.get(), ByteArrayCodec.get(), TableCache.CacheType.NO_CACHE);

--> processTableInParallel , The keys are string which are comparable and under ParallelTableIteratorOperation.java we see there are operations like splitting the ranges for multiple worker and they do key comparison like key.compareTo(startKey) , since strings are comprable.

Table<String, byte[]> table = omMetadataManager.getStore()
.getTable(tableName, StringCodec.get(), ByteArrayCodec.get(), TableCache.CacheType.NO_CACHE);

This seems to be the reason there is separate flow for sequential vs parallel processing and CodecBuffer should not be used existing implementation of processTableSequentially.

@adoroszlai
Copy link
Contributor

adoroszlai commented Dec 19, 2025

String as Key are comparable but CodecBuffer not.

I meant replacing ByteArrayCodec, not StringCodec.

    Table<String, CodecBuffer> table = omMetadataManager.getStore()
        .getTable(tableName, StringCodec.get(), CodecBufferCodec.get(true), TableCache.CacheType.NO_CACHE);

    ...

    try (ParallelTableIteratorOperation<String, CodecBuffer> parallelIter = new ParallelTableIteratorOperation<>(

@navinko
Copy link
Contributor Author

navinko commented Dec 19, 2025

try (ParallelTableIteratorOperation<String, CodecBuffer> parallelIter = new ParallelTableIteratorOperation<>(

Please correct me if my understanding is wrong .
The CodecBufferCodec implementation is not ThreadSafe implementation hence we can not used it for processTableInParallel .
https://github.com/navinko/ozone/blob/1c817b0460f1e66e75aef0b8f5fde63c691f2a65/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java#L34

@adoroszlai
Copy link
Contributor

adoroszlai commented Dec 19, 2025

The CodecBufferCodec implementation is not ThreadSafe implementation hence we can not used it for processTableInParallel

Is ByteArrayCodec thread-safe?

@navinko
Copy link
Contributor Author

navinko commented Dec 20, 2025

The CodecBufferCodec implementation is not ThreadSafe implementation hence we can not used it for processTableInParallel

Is the ByteArrayCodec thread-safe?

Yes 'ByteArrayCodec' is thread safe .

My bad , I am not very sure on other implementation details for CodecBufferCodec but looks like the Codec implementation CodecBufferCodec reuse CodecBuffer instances so once CodecBufferCodec will be used as values across multiple thread the CodeBuffer can be modified by other thread in ParallelTableIteratorOperation.

@adoroszlai adoroszlai requested a review from szetszwo December 20, 2025 12:18
@szetszwo szetszwo requested a review from swamirishi December 21, 2025 21:05
@szetszwo
Copy link
Contributor

@swamirishi , it would be great if you could review this. Thank you in advance.

@swamirishi
Copy link
Contributor

swamirishi commented Dec 21, 2025

ple worker and they do key comparison like key.compareTo(startKey) , since strings are comprable.
@navinko

CodecBufferCodec can be also used in parallel function. As long as one table iterator is used by only one thread we can use CodecBufferCodec there. We can make the value use CodecBufferCodec it would inturn lead to using the RDBStoreCodecBufferIterator otherwise currently the parallel iterator would use RBDStoreByteArrayIterator which is going to be unoptimal. For this we need to get rid of the batching logic which I have seen during my previous benchmark is unnecessary since we would not be blocked on IO ever from rocksdb. Let us get rid of this valueExecutor logic

// Create team of 20 worker threads with UNLIMITED queue
this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>());

Copy link
Contributor

@swamirishi swamirishi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@navinko Thanks for the patch I have added a comment to remove the ByteArrayCodec from the parallel operation as well. Please check and lmk if you agree with it.

@swamirishi
Copy link
Contributor

swamirishi commented Dec 21, 2025

This should ideally work

Index: hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java	(revision 13fd2c6028a63d55f211bff5c3e22355dac9dcdd)
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java	(date 1766358462652)
@@ -56,37 +56,26 @@
 
   // Thread Pools
   private final ExecutorService iteratorExecutor; // 5
-  private final ExecutorService valueExecutors; // 20
-
-  private final int maxNumberOfVals;
+  private final long maxNumberOfVals;
   private final OMMetadataManager metadataManager;
   private final int maxIteratorTasks;
-  private final int maxWorkerTasks;
   private final long logCountThreshold;
 
   private static final Logger LOG = LoggerFactory.getLogger(ParallelTableIteratorOperation.class);
 
   public ParallelTableIteratorOperation(OMMetadataManager metadataManager, Table<K, V> table, Codec<K> keyCodec,
-                                        int iteratorCount, int workerCount, int maxNumberOfValsInMemory,
-                                        long logThreshold) {
+                                        int iteratorCount, long logThreshold) {
     this.table = table;
     this.keyCodec = keyCodec;
     this.metadataManager = metadataManager;
     this.maxIteratorTasks = 2 * iteratorCount;  // Allow up to 10 pending iterator tasks
-    this.maxWorkerTasks = workerCount * 2;      // Allow up to 40 pending worker tasks
 
     // Create team of 5 iterator threads with UNLIMITED queue
     // LinkedBlockingQueue() with no size = can hold infinite pending tasks
     this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, iteratorCount, 1, TimeUnit.MINUTES,
                     new LinkedBlockingQueue<>());
-
-    // Create team of 20 worker threads with UNLIMITED queue
-    this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, TimeUnit.MINUTES,
-            new LinkedBlockingQueue<>());
-
-    // Calculate batch size per worker (e.g., 2000 / 20 = 100 keys per batch per worker)
-    this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / (workerCount));
     this.logCountThreshold = logThreshold;
+    this.maxNumberOfVals = Math.max(1000, this.logCountThreshold / (iteratorCount));
   }
 
   private List<K> getBounds(K startKey, K endKey) throws IOException {
@@ -166,9 +155,6 @@
     // Queue to track iterator threads (5 threads creating work)
     Queue<Future<?>> iterFutures = new LinkedList<>();
 
-    // Queue to track worker threads (20 threads doing work)
-    Queue<Future<?>> workerFutures = new ConcurrentLinkedQueue<>();
-
     AtomicLong keyCounter = new AtomicLong();
     AtomicLong prevLogCounter = new AtomicLong();
     Object logLock = new Object();
@@ -190,75 +176,42 @@
       iterFutures.add(iteratorExecutor.submit(() -> {
         try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter  = table.iterator()) {
           iter.seek(beg);
+          int count = 0;
           while (iter.hasNext()) {
-            List<Table.KeyValue<K, V>> keyValues = new ArrayList<>();
-            boolean reachedEnd = false;
-            while (iter.hasNext()) {
-              Table.KeyValue<K, V> kv = iter.next();
-              K key = kv.getKey();
-              
-              // Check if key is within this segment's range
-              boolean withinBounds;
-              if (inclusive) {
-                // Last segment: include everything from beg onwards (or until endKey if specified)
-                withinBounds = (endKey == null || key.compareTo(endKey) <= 0);
-              } else {
-                // Middle segment: include keys in range [beg, end)
-                withinBounds = key.compareTo(end) < 0;
-              }
-              
-              if (withinBounds) {
-                keyValues.add(kv);
-              } else {
-                reachedEnd = true;
-                break;
-              }
-
-              // If batch is full (2000 keys), stop collecting
-              if (keyValues.size() >= maxNumberOfVals) {
-                break;
-              }
+            Table.KeyValue<K, V> kv = iter.next();
+            K key = kv.getKey();
+            // Check if key is within this segment's range
+            boolean withinBounds;
+            if (inclusive) {
+              // Last segment: include everything from beg onwards (or until endKey if specified)
+              withinBounds = (endKey == null || key.compareTo(endKey) <= 0);
+            } else {
+              // Middle segment: include keys in range [beg, end)
+              withinBounds = key.compareTo(end) < 0;
+            }
+            if (!withinBounds) {
+              break;
             }
-
-            // ===== STEP 5: HAND BATCH TO WORKER THREAD =====
-            if (!keyValues.isEmpty()) {
-              // WAIT if worker queue is too full (max 39 pending tasks)
-              waitForQueueSize(workerFutures, maxWorkerTasks - 1);
-
-              // Submit batch to worker thread pool
-              workerFutures.add(valueExecutors.submit(() -> {
-                for (Table.KeyValue<K, V> kv : keyValues) {
-                  keyOperation.apply(kv);
-                }
-                keyCounter.addAndGet(keyValues.size());
-                if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) {
-                  synchronized (logLock) {
-                    if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) {
-                      long cnt = keyCounter.get();
-                      LOG.debug("Iterated through {} keys while performing task: {}", keyCounter.get(), taskName);
-                      prevLogCounter.set(cnt);
-                    }
+            keyOperation.apply(kv);
+            count++;
+            if (count % maxNumberOfVals == 0) {
+              keyCounter.addAndGet(count);
+              count = 0;
+              if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) {
+                synchronized (logLock) {
+                  if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) {
+                    long cnt = keyCounter.get();
+                    LOG.debug("Iterated through {} keys while performing task: {}", keyCounter.get(), taskName);
+                    prevLogCounter.set(cnt);
                   }
                 }
-                // Worker task done! Future is now complete.
-              }));
-            }
-            // If we reached the end of our segment, stop reading
-            if (reachedEnd) {
-              break;
+              }
             }
           }
+          keyCounter.addAndGet(count);
         } catch (IOException e) {
           LOG.error("IO error during parallel iteration on table {}", taskName, e);
           throw new RuntimeException("IO error during iteration", e);
-        } catch (InterruptedException e) {
-          LOG.warn("Parallel iteration interrupted for task {}", taskName, e);
-          Thread.currentThread().interrupt();
-          throw new RuntimeException("Iteration interrupted", e);
-        } catch (ExecutionException e) {
-          Throwable cause = e.getCause();
-          LOG.error("Task execution failed for {}: {}", taskName, cause.getMessage(), cause);
-          throw new RuntimeException("Task execution failed", cause);
         }
       }));
     }
@@ -266,8 +219,6 @@
     // ===== STEP 7: WAIT FOR EVERYONE TO FINISH =====
     // Wait for all 5 iterator threads to finish reading
     waitForQueueSize(iterFutures, 0);
-    // Wait for all 20 worker threads to finish processing
-    waitForQueueSize(workerFutures, 0);
     
     // Log final stats
     LOG.info("{}: Parallel iteration completed - Total keys processed: {}", taskName, keyCounter.get());
@@ -276,17 +227,12 @@
   @Override
   public void close() throws IOException {
     iteratorExecutor.shutdown();
-    valueExecutors.shutdown();
     try {
       if (!iteratorExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
         iteratorExecutor.shutdownNow();
       }
-      if (!valueExecutors.awaitTermination(60, TimeUnit.SECONDS)) {
-        valueExecutors.shutdownNow();
-      }
     } catch (InterruptedException e) {
       iteratorExecutor.shutdownNow();
-      valueExecutors.shutdownNow();
       Thread.currentThread().interrupt();
     }
   }
Index: hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java	(revision 13fd2c6028a63d55f211bff5c3e22355dac9dcdd)
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java	(date 1766358462633)
@@ -167,7 +167,7 @@
       
       try (ParallelTableIteratorOperation<String, OmKeyInfo> keyIter =
                new ParallelTableIteratorOperation<>(omMetadataManager, omKeyInfoTable,
-                   StringCodec.get(), maxIterators, maxWorkers, maxKeysInMemory, perWorkerThreshold)) {
+                   StringCodec.get(), maxIterators, perWorkerThreshold)) {
         keyIter.performTaskOnTableVals(taskName, null, null, kvOperation);
       }
 
Index: hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java	(revision 13fd2c6028a63d55f211bff5c3e22355dac9dcdd)
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java	(date 1766358462659)
@@ -182,7 +182,7 @@
 
     try (ParallelTableIteratorOperation<String, OmKeyInfo> keyIter =
              new ParallelTableIteratorOperation<>(omMetadataManager, omKeyInfoTable,
-                 StringCodec.get(), maxIterators, maxWorkers, maxKeysInMemory, perWorkerThreshold)) {
+                 StringCodec.get(), maxIterators, perWorkerThreshold)) {
       keyIter.performTaskOnTableVals(taskName, null, null, kvOperation);
     } catch (Exception ex) {
       LOG.error("Unable to populate File Size Count for {} in RocksDB.", taskName, ex);
Index: hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java	(revision 13fd2c6028a63d55f211bff5c3e22355dac9dcdd)
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java	(date 1766358462664)
@@ -35,6 +35,8 @@
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hadoop.hdds.utils.db.ByteArrayCodec;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.hdds.utils.db.CodecBufferCodec;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
 import org.apache.hadoop.hdds.utils.db.StringCodec;
@@ -192,8 +194,8 @@
   private void processTableInParallel(String tableName, OMMetadataManager omMetadataManager) throws Exception {
     int workerCount = 2;  // Only 2 workers needed for simple counting
     
-    Table<String, byte[]> table = omMetadataManager.getStore()
-        .getTable(tableName, StringCodec.get(), ByteArrayCodec.get(), TableCache.CacheType.NO_CACHE);
+    Table<String, CodecBuffer> table = omMetadataManager.getStore()
+        .getTable(tableName, StringCodec.get(), CodecBufferCodec.get(true), TableCache.CacheType.NO_CACHE);
     
     long estimatedCount = 100000;  // Default
     try {
@@ -205,9 +207,8 @@
     
     AtomicLong count = new AtomicLong(0);
 
-    try (ParallelTableIteratorOperation<String, byte[]> parallelIter = new ParallelTableIteratorOperation<>(
-        omMetadataManager, table, StringCodec.get(),
-        maxIterators, workerCount, maxKeysInMemory, loggingThreshold)) {
+    try (ParallelTableIteratorOperation<String, CodecBuffer> parallelIter = new ParallelTableIteratorOperation<>(
+        omMetadataManager, table, StringCodec.get(), maxIterators, loggingThreshold)) {
       
       parallelIter.performTaskOnTableVals(getTaskName(), null, null, kv -> {
         if (kv != null) {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants