From 3a6800607ebcd4148180d7a8e9da3e7223026f5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Thu, 19 Jun 2025 22:12:19 +0100 Subject: [PATCH 1/5] datastore: Upgrade solrj version. #TASK-6217 --- .../datastore/solr/SolrFacetToFacetFieldsConverter.java | 4 ++-- pom.xml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/commons-datastore/commons-datastore-solr/src/main/java/org/opencb/commons/datastore/solr/SolrFacetToFacetFieldsConverter.java b/commons-datastore/commons-datastore-solr/src/main/java/org/opencb/commons/datastore/solr/SolrFacetToFacetFieldsConverter.java index e3405d35..54cf313e 100644 --- a/commons-datastore/commons-datastore-solr/src/main/java/org/opencb/commons/datastore/solr/SolrFacetToFacetFieldsConverter.java +++ b/commons-datastore/commons-datastore-solr/src/main/java/org/opencb/commons/datastore/solr/SolrFacetToFacetFieldsConverter.java @@ -103,14 +103,14 @@ private static void parseBuckets(SimpleOrderedMap solrFacets, FacetField List buckets = new ArrayList<>(); for (SimpleOrderedMap solrBucket: solrBuckets) { - int count = 0; + long count = 0; String value = ""; FacetField subfield; List subfields = new ArrayList<>(); for (int i = 0; i < solrBucket.size(); i++) { String fullname = solrBucket.getName(i); if ("count".equals(fullname)) { - count = (int) solrBucket.getVal(i); + count = ((Number) solrBucket.getVal(i)).longValue(); } else if ("val".equals(fullname)) { value = solrBucket.getVal(i).toString(); } else { diff --git a/pom.xml b/pom.xml index e89c2f07..69a77cc3 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ 1.7.36 1.7.7 4.11.4 - 8.8.2 + 8.11.3 1.69 1.10.12 2.4.0 From 2a8b096f0cca17b21e3ea8fc8815aac3a4027f1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Fri, 11 Jul 2025 11:38:44 +0100 Subject: [PATCH 2/5] lib: Allow DataWriter concat. #TASK-6217 --- .../org/opencb/commons/io/DataWriter.java | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/commons-lib/src/main/java/org/opencb/commons/io/DataWriter.java b/commons-lib/src/main/java/org/opencb/commons/io/DataWriter.java index 5e80ddc5..fc86358a 100644 --- a/commons-lib/src/main/java/org/opencb/commons/io/DataWriter.java +++ b/commons-lib/src/main/java/org/opencb/commons/io/DataWriter.java @@ -98,4 +98,61 @@ public void post() throws Exception { }; } + default DataWriter then(DataWriter nextWriter) { + return then(nextWriter.asTask()); + } + + default DataWriter then(Task nextTask) { + return new DataWriter() { + @Override + public boolean open() { + return DataWriter.this.open(); + } + + @Override + public boolean pre() { + boolean res = DataWriter.this.pre(); + try { + nextTask.pre(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + return res; + } + + @Override + public boolean close() { + return DataWriter.this.close(); + } + + @Override + public boolean post() { + boolean res = DataWriter.this.post(); + try { + nextTask.post(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + return res; + } + + @Override + public boolean write(List batch) { + boolean res = DataWriter.this.write(batch); + try { + nextTask.apply(batch); + return res; + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + } + } From 1be186d2c8e482a4ce88252536c647e8073dd717 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Fri, 11 Jul 2025 11:39:07 +0100 Subject: [PATCH 3/5] lib: ParallelTaskRunner should print stats even on shutdown. #TASK-6217 --- .../commons/run/ParallelTaskRunner.java | 44 ++++++++++++++----- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/commons-lib/src/main/java/org/opencb/commons/run/ParallelTaskRunner.java b/commons-lib/src/main/java/org/opencb/commons/run/ParallelTaskRunner.java index ac1d8174..c192864a 100644 --- a/commons-lib/src/main/java/org/opencb/commons/run/ParallelTaskRunner.java +++ b/commons-lib/src/main/java/org/opencb/commons/run/ParallelTaskRunner.java @@ -56,6 +56,8 @@ public class ParallelTaskRunner { private static final int RETRY_AWAIT_TERMINATION_TIMEOUT = 50; private static final int MAX_SHUTDOWN_RETRIES = 300; private static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("0.000"); + private long startTime; + private boolean interrupted; @FunctionalInterface @Deprecated @@ -272,6 +274,7 @@ public ParallelTaskRunner(DataReader reader, Supplier reader, List(tasks); check(); + interrupted = false; } private void check() { @@ -308,6 +312,8 @@ private void check() { } private void init() { + startTime = System.nanoTime(); + interrupted = false; finishedTasks = 0; if (reader != null) { readBlockingQueue = new ArrayBlockingQueue<>(config.capacity); @@ -338,11 +344,31 @@ public void run() throws ExecutionException { } public void run(long timeout, TimeUnit unit) throws ExecutionException, InterruptedException { - long start = System.nanoTime(); //If there is any InterruptionException, finish as quick as possible. - boolean interrupted = false; init(); + Thread hook = new Thread(() -> { + logger.warn("Shutdown hook called! Aborting ParallelTaskRunner execution!"); + logTimes(); + }); + Runtime.getRuntime().addShutdownHook(hook); + try { + start(timeout, unit); + } finally { + Runtime.getRuntime().removeShutdownHook(hook); + logTimes(); + } + + if (config.abortOnFail && !exceptions.isEmpty()) { + throw buildExecutionException("Error while running ParallelTaskRunner. Found " + exceptions.size() + " exceptions.", + exceptions); + } + if (interrupted) { + throw interruptions.get(0); + } + } + + private void start(long timeout, TimeUnit unit) throws ExecutionException { long auxTime = System.nanoTime(); if (reader != null) { reader.open(); @@ -449,7 +475,9 @@ public void run(long timeout, TimeUnit unit) throws ExecutionException, Interrup writer.close(); } timeWriting += System.nanoTime() - auxTime; + } + public void logTimes() { logger.info(toString()); if (reader != null) { logger.info("read: timeReading = " + durationToString(timeReading)); @@ -464,19 +492,11 @@ public void run(long timeout, TimeUnit unit) throws ExecutionException, Interrup if (writer != null) { logger.info("task: timeBlockedAtPutWrite = " + durationToString(timeBlockedAtPutWrite) + " (total)" + ", ~" + durationToString(timeBlockedAtPutWrite / config.numTasks) + " (per thread)"); - logger.info("write: timeBlockedWatingDataToWrite = " + durationToString(timeBlockedAtTakeWrite)); + logger.info("write: timeBlockedWaitingDataToWrite = " + durationToString(timeBlockedAtTakeWrite)); logger.info("write: timeWriting = " + durationToString(timeWriting)); } - logger.info("total: = " + durationToString(System.nanoTime() - start)); - - if (config.abortOnFail && !exceptions.isEmpty()) { - throw buildExecutionException("Error while running ParallelTaskRunner. Found " + exceptions.size() + " exceptions.", - exceptions); - } - if (interrupted) { - throw interruptions.get(0); - } + logger.info("total: = " + durationToString(System.nanoTime() - startTime)); } private ExecutionException buildExecutionException(String message, List exceptions) { From d1304c81e6f83d52b0d6f17bd038805cb38a51d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Fri, 11 Jul 2025 11:40:29 +0100 Subject: [PATCH 4/5] lib: Add progress rate to ProgressLogger. #TASK-6217 --- .../org/opencb/commons/ProgressLogger.java | 129 ++++++++++++++++-- 1 file changed, 116 insertions(+), 13 deletions(-) diff --git a/commons-lib/src/main/java/org/opencb/commons/ProgressLogger.java b/commons-lib/src/main/java/org/opencb/commons/ProgressLogger.java index 26102216..398ec424 100644 --- a/commons-lib/src/main/java/org/opencb/commons/ProgressLogger.java +++ b/commons-lib/src/main/java/org/opencb/commons/ProgressLogger.java @@ -16,11 +16,13 @@ package org.opencb.commons; +import org.apache.commons.lang3.tuple.Pair; import org.opencb.commons.run.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.DecimalFormat; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; @@ -43,33 +45,43 @@ public class ProgressLogger { private final String message; private final int numLinesLog; + private final long logFrequencyMillis; + private boolean progressRateEnabled = true; + private long progressRateWindowSizeSeconds; + private boolean progressRateMillionHours = false; // If true, progress rate is in millions of elements per hour private long totalCount; private boolean isApproximated; // Total count is an approximated value private final AtomicReference> futureTotalCount = new AtomicReference<>(); private final AtomicLong count; + private final long startTime; + private final LinkedList> times = new LinkedList<>(); private double batchSize; private Logger logger = LoggerFactory.getLogger(ProgressLogger.class); public ProgressLogger(String message) { - this(message, 0, null, 200); + this(message, 0, null, 200, 0); + } + + public ProgressLogger(String message, long logFrequency, TimeUnit timeUnit) { + this(message, 0, null, 0, timeUnit.toMillis(logFrequency)); } public ProgressLogger(String message, long totalCount) { - this(message, totalCount, null, 200); + this(message, totalCount, null, 200, 0); } public ProgressLogger(String message, long totalCount, int numLinesLog) { - this(message, totalCount, null, numLinesLog); + this(message, totalCount, null, numLinesLog, 0); } public ProgressLogger(String message, Future futureTotalCount) { - this(message, 0, futureTotalCount, 200); + this(message, 0, futureTotalCount, 200, 0); } public ProgressLogger(String message, Future futureTotalCount, int numLinesLog) { - this(message, 0, futureTotalCount, numLinesLog); + this(message, 0, futureTotalCount, numLinesLog, 0); } /** @@ -79,10 +91,10 @@ public ProgressLogger(String message, Future futureTotalCount, int numLine * @param numLinesLog Number of lines to print */ public ProgressLogger(String message, Callable totalCountCallable, int numLinesLog) { - this(message, 0, getFuture(totalCountCallable), numLinesLog); + this(message, 0, getFuture(totalCountCallable), numLinesLog, 0); } - private ProgressLogger(String message, long totalCount, Future futureTotalCount, int numLinesLog) { + private ProgressLogger(String message, long totalCount, Future futureTotalCount, int numLinesLog, long logFrequencyMillis) { if (message.endsWith(" ")) { this.message = message; } else { @@ -92,12 +104,21 @@ private ProgressLogger(String message, long totalCount, Future futureTotal this.totalCount = totalCount; this.futureTotalCount.set(futureTotalCount); this.count = new AtomicLong(); - if (totalCount == 0) { - batchSize = DEFAULT_BATCH_SIZE; + if (logFrequencyMillis > 0) { + this.logFrequencyMillis = logFrequencyMillis; + batchSize = 0; } else { - updateBatchSize(); + // Avoid not logging for too long. Log at least once a minute by default + this.logFrequencyMillis = TimeUnit.MINUTES.toMillis(1); + if (totalCount == 0) { + batchSize = DEFAULT_BATCH_SIZE; + } else { + updateBatchSize(); + } } isApproximated = false; + startTime = System.currentTimeMillis(); + progressRateWindowSizeSeconds = 60; } @@ -118,6 +139,25 @@ public ProgressLogger setApproximateTotalCount(long aproximateTotalCount) { return this; } + public ProgressLogger setProgressRateWindowSize(int progressRateWindowSize, TimeUnit timeUnit) { + this.progressRateWindowSizeSeconds = timeUnit.toSeconds(progressRateWindowSize); + return this; + } + + public ProgressLogger setProgressRateAtMillionsPerHours() { + return setProgressRateAtMillionsPerHours(true); + } + + public ProgressLogger setProgressRateAtMillionsPerHours(boolean progressRateMillionHours) { + this.progressRateMillionHours = progressRateMillionHours; + return this; + } + + public ProgressLogger disableProgressRate() { + this.progressRateEnabled = false; + return this; + } + public void increment(long delta) { increment(delta, "", null); } @@ -135,12 +175,36 @@ private void increment(long delta, String message, Supplier supplier) { long count = previousCount + delta; updateFutureTotalCount(); - if ((int) (previousCount / batchSize) != (int) (count / batchSize) || count == totalCount && delta > 0) { - log(count, supplier == null ? message : supplier.get()); + long currentTimeMillis = System.currentTimeMillis(); + if (shouldLog(delta, previousCount, count, currentTimeMillis)) { + log(count, supplier == null ? message : supplier.get(), currentTimeMillis); } } - protected synchronized void log(long count, String extraMessage) { + private boolean shouldLog(long delta, long previousCount, long count, long currentTimeMillis) { + if (batchSize > 0) { + if ((int) (previousCount / batchSize) != (int) (count / batchSize)) { + return true; + } + } + if (logFrequencyMillis > 0) { + long lastLogTime = times.isEmpty() ? startTime : times.getLast().getRight(); + if (currentTimeMillis - lastLogTime > logFrequencyMillis) { + return true; + } + } + if (count == totalCount && delta > 0) { + return true; + } + return false; + } + + protected synchronized void log(long count, String extraMessage, long currentTimeMillis) { + times.add(Pair.of(count, currentTimeMillis)); + if (times.size() > 5 && times.get(0).getRight() < currentTimeMillis - progressRateWindowSizeSeconds * 1000) { + // Remove old points that are outside the progress rate window + times.removeFirst(); + } long totalCount = getTotalCount(); StringBuilder sb = new StringBuilder(message).append(count); @@ -152,6 +216,45 @@ protected synchronized void log(long count, String extraMessage) { } sb.append(totalCount).append(' ').append(DECIMAL_FORMAT.format(((float) (count)) / totalCount)); } + if (progressRateEnabled) { + float elapsedTime = (float) (currentTimeMillis - startTime) / 1000; + float progressRate = count / elapsedTime; // elements per second + boolean addRelativeTime = times.size() > 5 && elapsedTime > progressRateWindowSizeSeconds; + float relativeTime; + float relativeProgressRate; // elements per second + if (addRelativeTime) { + int idx = 5; + do { + Pair relativePoint = times.get(times.size() - idx); + relativeTime = (float) (currentTimeMillis - relativePoint.getRight()) / 1000; + relativeProgressRate = (count - relativePoint.getLeft()) / relativeTime; + } while (relativeTime < progressRateWindowSizeSeconds && idx++ < times.size()); + + } else { + relativeTime = 0; + relativeProgressRate = 0; + } + String progressRateUnits; + String rateFormat; + if (progressRateMillionHours) { + progressRateUnits = "M/h"; + rateFormat = "%.2f"; + progressRate = (progressRate / 1_000_000) * 3600; // Convert to millions per hour + relativeProgressRate = (relativeProgressRate / 1_000_000) * 3600; // Convert to millions per hour + } else { + progressRateUnits = "elements/s"; + rateFormat = "%.0f"; + } + sb.append(" in ") + .append(String.format("%.2f", elapsedTime)).append("s (") + .append(String.format(rateFormat, progressRate)).append(" " + progressRateUnits + ")"); + if (addRelativeTime) { + sb.append(", (") + .append(String.format(rateFormat, relativeProgressRate)).append(" " + progressRateUnits + " in last ") + .append(String.format("%.2f", relativeTime)).append("s") + .append(')'); + } + } if (!extraMessage.isEmpty() && (!extraMessage.startsWith(" ") && !extraMessage.startsWith(",") && !extraMessage.startsWith("."))) { sb.append(' '); } From ea7ff4e28f1ec04a94f5e7ce1af64b4067fdce14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Thu, 28 Aug 2025 12:51:59 +0100 Subject: [PATCH 5/5] lib: Add "getCount" to ProgressLogger. #TASK-6217 --- .../org/opencb/commons/ProgressLogger.java | 10 +++++----- .../java/org/opencb/commons/io/DataReader.java | 18 ++++++++++++++---- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/commons-lib/src/main/java/org/opencb/commons/ProgressLogger.java b/commons-lib/src/main/java/org/opencb/commons/ProgressLogger.java index 398ec424..b87f3691 100644 --- a/commons-lib/src/main/java/org/opencb/commons/ProgressLogger.java +++ b/commons-lib/src/main/java/org/opencb/commons/ProgressLogger.java @@ -205,7 +205,7 @@ protected synchronized void log(long count, String extraMessage, long currentTim // Remove old points that are outside the progress rate window times.removeFirst(); } - long totalCount = getTotalCount(); + long totalCount = this.totalCount; StringBuilder sb = new StringBuilder(message).append(count); if (totalCount > 0) { @@ -284,10 +284,6 @@ private void updateFutureTotalCount() { } } - private long getTotalCount() { - return this.totalCount; - } - private void updateBatchSize() { batchSize = Math.max((double) totalCount / numLinesLog, MIN_BATCH_SIZE); } @@ -299,6 +295,10 @@ private static Future getFuture(Callable totalCountCallable) { return future; } + public long getCount() { + return count.get(); + } + public Task asTask() { return asTask(null); } diff --git a/commons-lib/src/main/java/org/opencb/commons/io/DataReader.java b/commons-lib/src/main/java/org/opencb/commons/io/DataReader.java index 2eee368b..e203fa7f 100644 --- a/commons-lib/src/main/java/org/opencb/commons/io/DataReader.java +++ b/commons-lib/src/main/java/org/opencb/commons/io/DataReader.java @@ -18,10 +18,8 @@ import org.opencb.commons.run.Task; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; +import java.util.*; +import java.util.function.Consumer; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -163,4 +161,16 @@ public T next() { }; } + @Override + default void forEach(Consumer action) { + forEach(action, 1); + } + + default void forEach(Consumer action, int batchSize) { + Objects.requireNonNull(action); + for (Iterator iterator = this.iterator(batchSize); iterator.hasNext();) { + T t = iterator.next(); + action.accept(t); + } + } }