From 6c5dd25ca2b59654699549cbdeda706009e10875 Mon Sep 17 00:00:00 2001 From: neelkanth-kaushik Date: Mon, 17 Nov 2025 15:07:58 +0530 Subject: [PATCH 01/16] Patch for Github issue #524 --- .../analytics/internal/AnalyticsClient.java | 48 +++++++++++++++++-- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index f7560004..7921a255 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -25,7 +25,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -67,6 +69,7 @@ public class AnalyticsClient { private final ScheduledExecutorService flushScheduler; private final AtomicBoolean isShutDown; private final String writeKey; + private volatile Future looperFuture; public static AnalyticsClient create( HttpUrl uploadUrl, @@ -130,7 +133,9 @@ public AnalyticsClient( this.currentQueueSizeInBytes = 0; - if (!isShutDown.get()) looperExecutor.submit(new Looper()); + if (!isShutDown.get()) { + this.looperFuture = looperExecutor.submit(new Looper()); + } flushScheduler = Executors.newScheduledThreadPool(1, threadFactory); flushScheduler.scheduleAtFixedRate( @@ -218,6 +223,8 @@ public void shutdown() { // we can shutdown the flush scheduler without worrying flushScheduler.shutdownNow(); + // Wait for the looper to complete processing before shutting down executors + waitForLooperCompletion(); shutdownAndWait(looperExecutor, "looper"); shutdownAndWait(networkExecutor, "network"); @@ -226,6 +233,29 @@ public void shutdown() { } } + /** + * Wait for the looper to complete processing all messages before proceeding with shutdown. + * This prevents the race condition where the network executor is shut down before the looper + * finishes submitting all batches. + */ + private void waitForLooperCompletion() { + if (looperFuture != null) { + try { + // Wait for the looper to complete processing the STOP message and finish + // Use a reasonable timeout to avoid hanging indefinitely + looperFuture.get(5, TimeUnit.SECONDS); + log.print(VERBOSE, "Looper completed successfully."); + } catch (Exception e) { + log.print(ERROR, e, "Error waiting for looper to complete: %s", e.getMessage()); + // Cancel the looper if it's taking too long or if there's an error + if (!looperFuture.isDone()) { + looperFuture.cancel(true); + log.print(VERBOSE, "Looper was cancelled due to timeout or error."); + } + } + } + } + public void shutdownAndWait(ExecutorService executor, String name) { try { executor.shutdown(); @@ -299,8 +329,20 @@ public void run() { "Batching %s message(s) into batch %s.", batch.batch().size(), batch.sequence()); - networkExecutor.submit( - BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); + try { + networkExecutor.submit( + BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); + } catch (RejectedExecutionException e) { + log.print(ERROR, e, + "Failed to submit batch %s to network executor during shutdown. Batch will be lost.", + batch.sequence()); + // Notify callbacks about the failure + for (Message msg : batch.batch()) { + for (Callback callback : callbacks) { + callback.failure(msg, e); + } + } + } currentBatchSize.set(0); messages.clear(); From 5a18f2ff7ec02799b19cee4f7fbdd4a963d02567 Mon Sep 17 00:00:00 2001 From: neelkanth-kaushik Date: Mon, 17 Nov 2025 15:17:53 +0530 Subject: [PATCH 02/16] Patch for Github issue #524 --- .../java/com/segment/analytics/internal/AnalyticsClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 7921a255..c29ce02d 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -25,7 +25,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; From e3259215903dfc6a24543df3cfb1cab0cfbd6498 Mon Sep 17 00:00:00 2001 From: neelkanth-kaushik Date: Mon, 17 Nov 2025 15:20:01 +0530 Subject: [PATCH 03/16] Patch for Github issue #524 --- .../java/com/segment/analytics/internal/AnalyticsClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index c29ce02d..7921a255 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -25,7 +25,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; From ee88f451446e6be676b9e1e858b6749fc00a000c Mon Sep 17 00:00:00 2001 From: neelkanth-kaushik Date: Mon, 17 Nov 2025 15:27:27 +0530 Subject: [PATCH 04/16] Fixed formatting issues identified by spotless --- .../segment/analytics/internal/AnalyticsClient.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 7921a255..ba64646a 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -234,9 +234,9 @@ public void shutdown() { } /** - * Wait for the looper to complete processing all messages before proceeding with shutdown. - * This prevents the race condition where the network executor is shut down before the looper - * finishes submitting all batches. + * Wait for the looper to complete processing all messages before proceeding with shutdown. This + * prevents the race condition where the network executor is shut down before the looper finishes + * submitting all batches. */ private void waitForLooperCompletion() { if (looperFuture != null) { @@ -333,8 +333,10 @@ public void run() { networkExecutor.submit( BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); } catch (RejectedExecutionException e) { - log.print(ERROR, e, - "Failed to submit batch %s to network executor during shutdown. Batch will be lost.", + log.print( + ERROR, + e, + "Failed to submit batch %s to network executor during shutdown. Batch will be lost.", batch.sequence()); // Notify callbacks about the failure for (Message msg : batch.batch()) { From 9b752596156c55fea6a58a1587e5a6ba20a9ab2f Mon Sep 17 00:00:00 2001 From: neelkanth-kaushik Date: Tue, 25 Nov 2025 14:04:19 +0530 Subject: [PATCH 05/16] Fixing github issue 524 --- .../analytics/internal/AnalyticsClient.java | 39 ++++++++++++++----- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index ba64646a..90124905 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -234,8 +234,7 @@ public void shutdown() { } /** - * Wait for the looper to complete processing all messages before proceeding with shutdown. This - * prevents the race condition where the network executor is shut down before the looper finishes + * Wait for the looper to complete processing all messages before proceedin with shutdown. This prevents the race condition where the network executor is shut down before the looper finishes * submitting all batches. */ private void waitForLooperCompletion() { @@ -257,18 +256,38 @@ private void waitForLooperCompletion() { } public void shutdownAndWait(ExecutorService executor, String name) { + boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper"); try { executor.shutdown(); - final boolean executorTerminated = executor.awaitTermination(1, TimeUnit.SECONDS); - - log.print( - VERBOSE, - "%s executor %s.", - name, - executorTerminated ? "terminated normally" : "timed out"); + boolean terminated = executor.awaitTermination(1, TimeUnit.MILLISECONDS); + if (terminated) { + log.print(VERBOSE, "%s executor terminated normally.", name); + return; + } + if (isLooperExecutor) { + // not terminated within timeout -> force shutdown + log.print(VERBOSE, "%s did not terminate in %d ms; requesting shutdownNow().", name, 1); + List dropped = executor.shutdownNow(); // interrupts running tasks + log.print(VERBOSE, "%s shutdownNow returned %d queued tasks that never started.", name, dropped.size()); + + // optional short wait to give interrupted tasks a chance to exit + boolean terminatedAfterForce = executor.awaitTermination(1, TimeUnit.MILLISECONDS); + log.print(VERBOSE, "%s executor %s after shutdownNow().", name, + terminatedAfterForce ? "terminated" : "still running (did not terminate)"); + + if (!terminatedAfterForce) { + // final warning — investigate tasks that ignore interrupts + log.print(ERROR, "%s executor still did not terminate; tasks may be ignoring interrupts.", name); + } + } } catch (InterruptedException e) { + // Preserve interrupt status and attempt forceful shutdown log.print(ERROR, e, "Interrupted while stopping %s executor.", name); Thread.currentThread().interrupt(); + if (isLooperExecutor) { + List dropped = executor.shutdownNow(); + log.print(VERBOSE, "%s shutdownNow invoked after interrupt; %d tasks returned.", name, dropped.size()); + } } } @@ -522,4 +541,4 @@ private static int getBatchDefaultSize(int contextSize, int currentMessageNumber + String.valueOf(Integer.MAX_VALUE).length(); } } -} +} \ No newline at end of file From bc4e6da807f25a97e01eac12754826a9be0eae0e Mon Sep 17 00:00:00 2001 From: neelkanth-kaushik Date: Tue, 25 Nov 2025 14:16:17 +0530 Subject: [PATCH 06/16] Fixed Github Issue #524 --- .../analytics/internal/AnalyticsClient.java | 110 +++++++++++------- 1 file changed, 71 insertions(+), 39 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 90124905..267ae304 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -159,7 +159,8 @@ public int messageSizeInBytes(Message message) { private Boolean isBackPressuredAfterSize(int incomingSize) { int POISON_BYTE_SIZE = messageSizeInBytes(FlushMessage.POISON); int sizeAfterAdd = this.currentQueueSizeInBytes + incomingSize + POISON_BYTE_SIZE; - // Leave a 10% buffer since the unsynchronized enqueue could add multiple at a time + // Leave a 10% buffer since the unsynchronized enqueue could add multiple at a + // time return sizeAfterAdd >= Math.min(this.maximumQueueByteSize, BATCH_MAX_SIZE) * 0.9; } @@ -174,12 +175,14 @@ public void enqueue(Message message) { } try { - // @jorgen25 message here could be regular msg, POISON or STOP. Only do regular logic if its + // @jorgen25 message here could be regular msg, POISON or STOP. Only do regular + // logic if its // valid message if (message != StopMessage.STOP && message != FlushMessage.POISON) { int messageByteSize = messageSizeInBytes(message); - // @jorgen25 check if message is below 32kb limit for individual messages, no need to check + // @jorgen25 check if message is below 32kb limit for individual messages, no + // need to check // for extra characters if (messageByteSize <= MSG_MAX_SIZE) { if (isBackPressuredAfterSize(messageByteSize)) { @@ -234,7 +237,9 @@ public void shutdown() { } /** - * Wait for the looper to complete processing all messages before proceedin with shutdown. This prevents the race condition where the network executor is shut down before the looper finishes + * Wait for the looper to complete processing all messages before proceedin with + * shutdown. This prevents the race condition where the network executor is shut + * down before the looper finishes * submitting all batches. */ private void waitForLooperCompletion() { @@ -292,7 +297,8 @@ public void shutdownAndWait(ExecutorService executor, String name) { } /** - * Looper runs on a background thread and takes messages from the queue. Once it collects enough + * Looper runs on a background thread and takes messages from the queue. Once it + * collects enough * messages, it triggers a flush. */ class Looper implements Runnable { @@ -320,18 +326,20 @@ public void run() { log.print(VERBOSE, "Flushing messages."); } } else { - // we do +1 because we are accounting for this new message we just took from the queue + // we do +1 because we are accounting for this new message we just took from the + // queue // which is not in list yet - // need to check if this message is going to make us go over the limit considering + // need to check if this message is going to make us go over the limit + // considering // default batch size as well - int defaultBatchSize = - BatchUtility.getBatchDefaultSize(contextSize, messages.size() + 1); + int defaultBatchSize = BatchUtility.getBatchDefaultSize(contextSize, messages.size() + 1); int msgSize = messageSizeInBytes(message); if (currentBatchSize.get() + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) { messages.add(message); currentBatchSize.addAndGet(msgSize); } else { - // put message that did not make the cut this time back on the queue, we already took + // put message that did not make the cut this time back on the queue, we already + // took // this message if we dont put it back its lost // we take care of that after submitting the batch batchSizeLimitReached = true; @@ -368,7 +376,8 @@ public void run() { currentBatchSize.set(0); messages.clear(); if (batchSizeLimitReached) { - // If this is true that means the last message that would make us go over the limit + // If this is true that means the last message that would make us go over the + // limit // was not added, // add it to the now cleared messages list so its not lost messages.add(message); @@ -385,12 +394,11 @@ public void run() { } static class BatchUploadTask implements Runnable { - private static final Backo BACKO = - Backo.builder() // - .base(TimeUnit.SECONDS, 15) // - .cap(TimeUnit.HOURS, 1) // - .jitter(1) // - .build(); + private static final Backo BACKO = Backo.builder() // + .base(TimeUnit.SECONDS, 15) // + .cap(TimeUnit.HOURS, 1) // + .jitter(1) // + .build(); private final AnalyticsClient client; private final Backo backo; @@ -416,7 +424,10 @@ private void notifyCallbacksWithException(Batch batch, Exception exception) { } } - /** Returns {@code true} to indicate a batch should be retried. {@code false} otherwise. */ + /** + * Returns {@code true} to indicate a batch should be retried. {@code false} + * otherwise. + */ boolean upload() { client.log.print(VERBOSE, "Uploading batch %s.", batch.sequence()); @@ -469,7 +480,8 @@ public void run() { int attempt = 0; for (; attempt <= maxRetries; attempt++) { boolean retry = upload(); - if (!retry) return; + if (!retry) + return; try { backo.sleep(attempt); } catch (InterruptedException e) { @@ -492,45 +504,65 @@ private static boolean is5xx(int status) { public static class BatchUtility { /** - * Method to determine what is the expected default size of the batch regardless of messages + * Method to determine what is the expected default size of the batch regardless + * of messages * - *

Sample batch: + *

+ * Sample batch: * {"batch":[{"type":"alias","messageId":"fc9198f9-d827-47fb-96c8-095bd3405d93","timestamp":"Nov * 18, 2021, 2:45:07 * PM","userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", - * "messageId":"3ce6f88c-36cb-4991-83f8-157e10261a89","timestamp":"Nov 18, 2021, 2:45:07 + * "messageId":"3ce6f88c-36cb-4991-83f8-157e10261a89","timestamp":"Nov 18, 2021, + * 2:45:07 * PM","userId":"jorgen25", * "integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", - * "messageId":"a328d339-899a-4a14-9835-ec91e303ac4d","timestamp":"Nov 18, 2021, 2:45:07 PM", + * "messageId":"a328d339-899a-4a14-9835-ec91e303ac4d","timestamp":"Nov 18, 2021, + * 2:45:07 PM", * "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", - * "messageId":"57b0ceb4-a1cf-4599-9fba-0a44c7041004","timestamp":"Nov 18, 2021, 2:45:07 PM", + * "messageId":"57b0ceb4-a1cf-4599-9fba-0a44c7041004","timestamp":"Nov 18, 2021, + * 2:45:07 PM", * "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"}], - * "sentAt":"Nov 18, 2021, 2:45:07 PM","context":{"library":{"name":"analytics-java", + * "sentAt":"Nov 18, 2021, 2:45:07 + * PM","context":{"library":{"name":"analytics-java", * "version":"3.1.3"}},"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} * - *

total size of batch : 932 + *

+ * total size of batch : 932 * - *

BREAKDOWN: {"batch":[MESSAGE1,MESSAGE2,MESSAGE3,MESSAGE4],"sentAt":"MMM dd, yyyy, HH:mm:ss + *

+ * BREAKDOWN: {"batch":[MESSAGE1,MESSAGE2,MESSAGE3,MESSAGE4],"sentAt":"MMM dd, + * yyyy, HH:mm:ss * tt","context":CONTEXT,"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} * - *

so we need to account for: 1 -message size: 189 * 4 = 756 2 -context object size = 55 in - * this sample -> 756 + 55 = 811 3 -Metadata (This has the sent data/sequence characters) + - * extra chars (these are chars like "batch":[] or "context": etc and will be pretty much the - * same length in every batch -> size is 73 --> 811 + 73 = 884 (well 72 actually, char 73 is the - * sequence digit which we account for in point 5) 4 -Commas between each message, the total - * number of commas is number_of_msgs - 1 = 3 -> 884 + 3 = 887 (sample is 886 because the hour - * in sentData this time happens to be 2:45 but it could be 12:45 5 -Sequence Number increments + *

+ * so we need to account for: 1 -message size: 189 * 4 = 756 2 -context object + * size = 55 in + * this sample -> 756 + 55 = 811 3 -Metadata (This has the sent data/sequence + * characters) + + * extra chars (these are chars like "batch":[] or "context": etc and will be + * pretty much the + * same length in every batch -> size is 73 --> 811 + 73 = 884 (well 72 + * actually, char 73 is the + * sequence digit which we account for in point 5) 4 -Commas between each + * message, the total + * number of commas is number_of_msgs - 1 = 3 -> 884 + 3 = 887 (sample is 886 + * because the hour + * in sentData this time happens to be 2:45 but it could be 12:45 5 -Sequence + * Number increments * with every batch created * - *

so formulae to determine the expected default size of the batch is + *

+ * so formulae to determine the expected default size of the batch is * - * @return: defaultSize = messages size + context size + metadata size + comma number + sequence - * digits + writekey + buffer + * @return: defaultSize = messages size + context size + metadata size + comma + * number + sequence + * digits + writekey + buffer * @return */ private static int getBatchDefaultSize(int contextSize, int currentMessageNumber) { - // sample data: {"batch":[],"sentAt":"MMM dd, yyyy, HH:mm:ss tt","context":,"sequence":1, - // "writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - 119 + // sample data: {"batch":[],"sentAt":"MMM dd, yyyy, HH:mm:ss + // tt","context":,"sequence":1, + // "writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - 119 // Don't need to squeeze everything possible into a batch, adding a buffer int metadataExtraCharsSize = 119 + 1024; int commaNumber = currentMessageNumber - 1; From 58ddfcb1c292721145301ca60320e9737eb0eb8a Mon Sep 17 00:00:00 2001 From: neelkanth-kaushik Date: Tue, 25 Nov 2025 14:46:45 +0530 Subject: [PATCH 07/16] Fixed Github Issue #524 --- .../analytics/internal/AnalyticsClient.java | 940 +++++++++--------- 1 file changed, 455 insertions(+), 485 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 267ae304..7efb616f 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -34,543 +34,513 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import okhttp3.HttpUrl; +import org.springframework.util.StringUtils; import retrofit2.Call; import retrofit2.Response; public class AnalyticsClient { - private static final Map CONTEXT; - private static final int BATCH_MAX_SIZE = 1024 * 500; - private static final int MSG_MAX_SIZE = 1024 * 32; - private static final Charset ENCODING = StandardCharsets.UTF_8; - private Gson gsonInstance; - private static final String instanceId = UUID.randomUUID().toString(); - - static { - Map library = new LinkedHashMap<>(); - library.put("name", "analytics-java"); - library.put("version", AnalyticsVersion.get()); - Map context = new LinkedHashMap<>(); - context.put("library", Collections.unmodifiableMap(library)); - context.put("instanceId", instanceId); - CONTEXT = Collections.unmodifiableMap(context); - } - - private final BlockingQueue messageQueue; - private final HttpUrl uploadUrl; - private final SegmentService service; - private final int size; - private final int maximumRetries; - private final int maximumQueueByteSize; - private int currentQueueSizeInBytes; - private final Log log; - private final List callbacks; - private final ExecutorService networkExecutor; - private final ExecutorService looperExecutor; - private final ScheduledExecutorService flushScheduler; - private final AtomicBoolean isShutDown; - private final String writeKey; - private volatile Future looperFuture; - - public static AnalyticsClient create( - HttpUrl uploadUrl, - SegmentService segmentService, - int queueCapacity, - int flushQueueSize, - long flushIntervalInMillis, - int maximumRetries, - int maximumQueueSizeInBytes, - Log log, - ThreadFactory threadFactory, - ExecutorService networkExecutor, - List callbacks, - String writeKey, - Gson gsonInstance) { - return new AnalyticsClient( - new LinkedBlockingQueue(queueCapacity), - uploadUrl, - segmentService, - flushQueueSize, - flushIntervalInMillis, - maximumRetries, - maximumQueueSizeInBytes, - log, - threadFactory, - networkExecutor, - callbacks, - new AtomicBoolean(false), - writeKey, - gsonInstance); - } - - public AnalyticsClient( - BlockingQueue messageQueue, - HttpUrl uploadUrl, - SegmentService service, - int maxQueueSize, - long flushIntervalInMillis, - int maximumRetries, - int maximumQueueSizeInBytes, - Log log, - ThreadFactory threadFactory, - ExecutorService networkExecutor, - List callbacks, - AtomicBoolean isShutDown, - String writeKey, - Gson gsonInstance) { - this.messageQueue = messageQueue; - this.uploadUrl = uploadUrl; - this.service = service; - this.size = maxQueueSize; - this.maximumRetries = maximumRetries; - this.maximumQueueByteSize = maximumQueueSizeInBytes; - this.log = log; - this.callbacks = callbacks; - this.looperExecutor = Executors.newSingleThreadExecutor(threadFactory); - this.networkExecutor = networkExecutor; - this.isShutDown = isShutDown; - this.writeKey = writeKey; - this.gsonInstance = gsonInstance; - - this.currentQueueSizeInBytes = 0; - - if (!isShutDown.get()) { - this.looperFuture = looperExecutor.submit(new Looper()); + private static final Map CONTEXT; + private static final int BATCH_MAX_SIZE = 1024 * 500; + private static final int MSG_MAX_SIZE = 1024 * 32; + private static final Charset ENCODING = StandardCharsets.UTF_8; + private Gson gsonInstance; + private static final String instanceId = UUID.randomUUID().toString(); + + static { + Map library = new LinkedHashMap<>(); + library.put("name", "analytics-java"); + library.put("version", AnalyticsVersion.get()); + Map context = new LinkedHashMap<>(); + context.put("library", Collections.unmodifiableMap(library)); + context.put("instanceId", instanceId); + CONTEXT = Collections.unmodifiableMap(context); } - flushScheduler = Executors.newScheduledThreadPool(1, threadFactory); - flushScheduler.scheduleAtFixedRate( - new Runnable() { - @Override - public void run() { - flush(); - } - }, - flushIntervalInMillis, - flushIntervalInMillis, - TimeUnit.MILLISECONDS); - } - - public int messageSizeInBytes(Message message) { - String stringifiedMessage = gsonInstance.toJson(message); - - return stringifiedMessage.getBytes(ENCODING).length; - } - - private Boolean isBackPressuredAfterSize(int incomingSize) { - int POISON_BYTE_SIZE = messageSizeInBytes(FlushMessage.POISON); - int sizeAfterAdd = this.currentQueueSizeInBytes + incomingSize + POISON_BYTE_SIZE; - // Leave a 10% buffer since the unsynchronized enqueue could add multiple at a - // time - return sizeAfterAdd >= Math.min(this.maximumQueueByteSize, BATCH_MAX_SIZE) * 0.9; - } - - public boolean offer(Message message) { - return messageQueue.offer(message); - } - - public void enqueue(Message message) { - if (message != StopMessage.STOP && isShutDown.get()) { - log.print(ERROR, "Attempt to enqueue a message when shutdown has been called %s.", message); - return; + private final BlockingQueue messageQueue; + private final HttpUrl uploadUrl; + private final SegmentService service; + private final int size; + private final int maximumRetries; + private final int maximumQueueByteSize; + private int currentQueueSizeInBytes; + private final Log log; + private final List callbacks; + private final ExecutorService networkExecutor; + private final ExecutorService looperExecutor; + private final ScheduledExecutorService flushScheduler; + private final AtomicBoolean isShutDown; + private final String writeKey; + private volatile Future looperFuture; + + public static AnalyticsClient create( + HttpUrl uploadUrl, + SegmentService segmentService, + int queueCapacity, + int flushQueueSize, + long flushIntervalInMillis, + int maximumRetries, + int maximumQueueSizeInBytes, + Log log, + ThreadFactory threadFactory, + ExecutorService networkExecutor, + List callbacks, + String writeKey, + Gson gsonInstance) { + return new AnalyticsClient( + new LinkedBlockingQueue(queueCapacity), + uploadUrl, + segmentService, + flushQueueSize, + flushIntervalInMillis, + maximumRetries, + maximumQueueSizeInBytes, + log, + threadFactory, + networkExecutor, + callbacks, + new AtomicBoolean(false), + writeKey, + gsonInstance); } - try { - // @jorgen25 message here could be regular msg, POISON or STOP. Only do regular - // logic if its - // valid message - if (message != StopMessage.STOP && message != FlushMessage.POISON) { - int messageByteSize = messageSizeInBytes(message); - - // @jorgen25 check if message is below 32kb limit for individual messages, no - // need to check - // for extra characters - if (messageByteSize <= MSG_MAX_SIZE) { - if (isBackPressuredAfterSize(messageByteSize)) { - this.currentQueueSizeInBytes = messageByteSize; - messageQueue.put(FlushMessage.POISON); - messageQueue.put(message); - - log.print(VERBOSE, "Maximum storage size has been hit Flushing..."); - } else { - messageQueue.put(message); - this.currentQueueSizeInBytes += messageByteSize; - } - } else { - log.print( - ERROR, "Message was above individual limit. MessageId: %s", message.messageId()); - throw new IllegalArgumentException( - "Message was above individual limit. MessageId: " + message.messageId()); + public AnalyticsClient( + BlockingQueue messageQueue, + HttpUrl uploadUrl, + SegmentService service, + int maxQueueSize, + long flushIntervalInMillis, + int maximumRetries, + int maximumQueueSizeInBytes, + Log log, + ThreadFactory threadFactory, + ExecutorService networkExecutor, + List callbacks, + AtomicBoolean isShutDown, + String writeKey, + Gson gsonInstance) { + this.messageQueue = messageQueue; + this.uploadUrl = uploadUrl; + this.service = service; + this.size = maxQueueSize; + this.maximumRetries = maximumRetries; + this.maximumQueueByteSize = maximumQueueSizeInBytes; + this.log = log; + this.callbacks = callbacks; + this.looperExecutor = Executors.newSingleThreadExecutor(threadFactory); + this.networkExecutor = networkExecutor; + this.isShutDown = isShutDown; + this.writeKey = writeKey; + this.gsonInstance = gsonInstance; + + this.currentQueueSizeInBytes = 0; + + if (!isShutDown.get()) { + this.looperFuture = looperExecutor.submit(new Looper()); } - } else { - messageQueue.put(message); - } - } catch (InterruptedException e) { - log.print(ERROR, e, "Interrupted while adding message %s.", message); - Thread.currentThread().interrupt(); - } - } - public void flush() { - if (!isShutDown.get()) { - enqueue(FlushMessage.POISON); + flushScheduler = Executors.newScheduledThreadPool(1, threadFactory); + flushScheduler.scheduleAtFixedRate( + new Runnable() { + @Override + public void run() { + flush(); + } + }, + flushIntervalInMillis, + flushIntervalInMillis, + TimeUnit.MILLISECONDS); } - } - - public void shutdown() { - if (isShutDown.compareAndSet(false, true)) { - final long start = System.currentTimeMillis(); - // first let's tell the system to stop - enqueue(StopMessage.STOP); + public int messageSizeInBytes(Message message) { + String stringifiedMessage = gsonInstance.toJson(message); - // we can shutdown the flush scheduler without worrying - flushScheduler.shutdownNow(); + return stringifiedMessage.getBytes(ENCODING).length; + } - // Wait for the looper to complete processing before shutting down executors - waitForLooperCompletion(); - shutdownAndWait(looperExecutor, "looper"); - shutdownAndWait(networkExecutor, "network"); + private Boolean isBackPressuredAfterSize(int incomingSize) { + int POISON_BYTE_SIZE = messageSizeInBytes(FlushMessage.POISON); + int sizeAfterAdd = this.currentQueueSizeInBytes + incomingSize + POISON_BYTE_SIZE; + // Leave a 10% buffer since the unsynchronized enqueue could add multiple at a time + return sizeAfterAdd >= Math.min(this.maximumQueueByteSize, BATCH_MAX_SIZE) * 0.9; + } - log.print( - VERBOSE, "Analytics client shut down in %s ms", (System.currentTimeMillis() - start)); + public boolean offer(Message message) { + return messageQueue.offer(message); } - } - - /** - * Wait for the looper to complete processing all messages before proceedin with - * shutdown. This prevents the race condition where the network executor is shut - * down before the looper finishes - * submitting all batches. - */ - private void waitForLooperCompletion() { - if (looperFuture != null) { - try { - // Wait for the looper to complete processing the STOP message and finish - // Use a reasonable timeout to avoid hanging indefinitely - looperFuture.get(5, TimeUnit.SECONDS); - log.print(VERBOSE, "Looper completed successfully."); - } catch (Exception e) { - log.print(ERROR, e, "Error waiting for looper to complete: %s", e.getMessage()); - // Cancel the looper if it's taking too long or if there's an error - if (!looperFuture.isDone()) { - looperFuture.cancel(true); - log.print(VERBOSE, "Looper was cancelled due to timeout or error."); + + public void enqueue(Message message) { + if (message != StopMessage.STOP && isShutDown.get()) { + log.print(ERROR, "Attempt to enqueue a message when shutdown has been called %s.", message); + return; } - } - } - } - - public void shutdownAndWait(ExecutorService executor, String name) { - boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper"); - try { - executor.shutdown(); - boolean terminated = executor.awaitTermination(1, TimeUnit.MILLISECONDS); - if (terminated) { - log.print(VERBOSE, "%s executor terminated normally.", name); - return; - } - if (isLooperExecutor) { - // not terminated within timeout -> force shutdown - log.print(VERBOSE, "%s did not terminate in %d ms; requesting shutdownNow().", name, 1); - List dropped = executor.shutdownNow(); // interrupts running tasks - log.print(VERBOSE, "%s shutdownNow returned %d queued tasks that never started.", name, dropped.size()); - - // optional short wait to give interrupted tasks a chance to exit - boolean terminatedAfterForce = executor.awaitTermination(1, TimeUnit.MILLISECONDS); - log.print(VERBOSE, "%s executor %s after shutdownNow().", name, - terminatedAfterForce ? "terminated" : "still running (did not terminate)"); - - if (!terminatedAfterForce) { - // final warning — investigate tasks that ignore interrupts - log.print(ERROR, "%s executor still did not terminate; tasks may be ignoring interrupts.", name); + + try { + // @jorgen25 message here could be regular msg, POISON or STOP. Only do regular logic if its + // valid message + if (message != StopMessage.STOP && message != FlushMessage.POISON) { + int messageByteSize = messageSizeInBytes(message); + + // @jorgen25 check if message is below 32kb limit for individual messages, no need to check + // for extra characters + if (messageByteSize <= MSG_MAX_SIZE) { + if (isBackPressuredAfterSize(messageByteSize)) { + this.currentQueueSizeInBytes = messageByteSize; + messageQueue.put(FlushMessage.POISON); + messageQueue.put(message); + + log.print(VERBOSE, "Maximum storage size has been hit Flushing..."); + } else { + messageQueue.put(message); + this.currentQueueSizeInBytes += messageByteSize; + } + } else { + log.print( + ERROR, "Message was above individual limit. MessageId: %s", message.messageId()); + throw new IllegalArgumentException( + "Message was above individual limit. MessageId: " + message.messageId()); + } + } else { + messageQueue.put(message); + } + } catch (InterruptedException e) { + log.print(ERROR, e, "Interrupted while adding message %s.", message); + Thread.currentThread().interrupt(); } - } - } catch (InterruptedException e) { - // Preserve interrupt status and attempt forceful shutdown - log.print(ERROR, e, "Interrupted while stopping %s executor.", name); - Thread.currentThread().interrupt(); - if (isLooperExecutor) { - List dropped = executor.shutdownNow(); - log.print(VERBOSE, "%s shutdownNow invoked after interrupt; %d tasks returned.", name, dropped.size()); - } } - } - - /** - * Looper runs on a background thread and takes messages from the queue. Once it - * collects enough - * messages, it triggers a flush. - */ - class Looper implements Runnable { - private boolean stop; - - public Looper() { - this.stop = false; + + public void flush() { + if (!isShutDown.get()) { + enqueue(FlushMessage.POISON); + } } - @Override - public void run() { - LinkedList messages = new LinkedList<>(); - AtomicInteger currentBatchSize = new AtomicInteger(); - boolean batchSizeLimitReached = false; - int contextSize = gsonInstance.toJson(CONTEXT).getBytes(ENCODING).length; - try { - while (!stop) { - Message message = messageQueue.take(); - - if (message == StopMessage.STOP) { - log.print(VERBOSE, "Stopping the Looper"); - stop = true; - } else if (message == FlushMessage.POISON) { - if (!messages.isEmpty()) { - log.print(VERBOSE, "Flushing messages."); - } - } else { - // we do +1 because we are accounting for this new message we just took from the - // queue - // which is not in list yet - // need to check if this message is going to make us go over the limit - // considering - // default batch size as well - int defaultBatchSize = BatchUtility.getBatchDefaultSize(contextSize, messages.size() + 1); - int msgSize = messageSizeInBytes(message); - if (currentBatchSize.get() + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) { - messages.add(message); - currentBatchSize.addAndGet(msgSize); - } else { - // put message that did not make the cut this time back on the queue, we already - // took - // this message if we dont put it back its lost - // we take care of that after submitting the batch - batchSizeLimitReached = true; - } - } + public void shutdown() { + if (isShutDown.compareAndSet(false, true)) { + final long start = System.currentTimeMillis(); - Boolean isBlockingSignal = message == FlushMessage.POISON || message == StopMessage.STOP; - Boolean isOverflow = messages.size() >= size; + // first let's tell the system to stop + enqueue(StopMessage.STOP); + + // we can shutdown the flush scheduler without worrying + flushScheduler.shutdownNow(); + + // Wait for the looper to complete processing before shutting down executors + waitForLooperCompletion(); + shutdownAndWait(looperExecutor, "looper"); + shutdownAndWait(networkExecutor, "network"); - if (!messages.isEmpty() && (isOverflow || isBlockingSignal || batchSizeLimitReached)) { - Batch batch = Batch.create(CONTEXT, new ArrayList<>(messages), writeKey); log.print( - VERBOSE, - "Batching %s message(s) into batch %s.", - batch.batch().size(), - batch.sequence()); + VERBOSE, "Analytics client shut down in %s ms", (System.currentTimeMillis() - start)); + } + } + + /** + * Wait for the looper to complete processing all messages before proceeding with shutdown. This + * prevents the race condition where the network executor is shut down before the looper finishes + * submitting all batches. + */ + private void waitForLooperCompletion() { + if (looperFuture != null) { try { - networkExecutor.submit( - BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); - } catch (RejectedExecutionException e) { - log.print( - ERROR, - e, - "Failed to submit batch %s to network executor during shutdown. Batch will be lost.", - batch.sequence()); - // Notify callbacks about the failure - for (Message msg : batch.batch()) { - for (Callback callback : callbacks) { - callback.failure(msg, e); + // Wait for the looper to complete processing the STOP message and finish + // Use a reasonable timeout to avoid hanging indefinitely + looperFuture.get(5, TimeUnit.SECONDS); + log.print(VERBOSE, "Looper completed successfully."); + } catch (Exception e) { + log.print(ERROR, e, "Error waiting for looper to complete: %s", e.getMessage()); + // Cancel the looper if it's taking too long or if there's an error + if (!looperFuture.isDone()) { + looperFuture.cancel(true); + log.print(VERBOSE, "Looper was cancelled due to timeout or error."); } - } - } - - currentBatchSize.set(0); - messages.clear(); - if (batchSizeLimitReached) { - // If this is true that means the last message that would make us go over the - // limit - // was not added, - // add it to the now cleared messages list so its not lost - messages.add(message); } - batchSizeLimitReached = false; - } } - } catch (InterruptedException e) { - log.print(DEBUG, "Looper interrupted while polling for messages."); - Thread.currentThread().interrupt(); - } - log.print(VERBOSE, "Looper stopped"); - } - } - - static class BatchUploadTask implements Runnable { - private static final Backo BACKO = Backo.builder() // - .base(TimeUnit.SECONDS, 15) // - .cap(TimeUnit.HOURS, 1) // - .jitter(1) // - .build(); - - private final AnalyticsClient client; - private final Backo backo; - final Batch batch; - private final int maxRetries; - - static BatchUploadTask create(AnalyticsClient client, Batch batch, int maxRetries) { - return new BatchUploadTask(client, BACKO, batch, maxRetries); - } - - BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch, int maxRetries) { - this.client = client; - this.batch = batch; - this.backo = backo; - this.maxRetries = maxRetries; } - private void notifyCallbacksWithException(Batch batch, Exception exception) { - for (Message message : batch.batch()) { - for (Callback callback : client.callbacks) { - callback.failure(message, exception); + public void shutdownAndWait(ExecutorService executor, String name) { + boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper"); + try { + executor.shutdown(); + boolean terminated = executor.awaitTermination(1, TimeUnit.MILLISECONDS); + if (terminated) { + log.print(VERBOSE, "%s executor terminated normally.", name); + return; + } + if (isLooperExecutor) { + // not terminated within timeout -> force shutdown + log.print(VERBOSE, "%s did not terminate in %d ms; requesting shutdownNow().", name, 1); + List dropped = executor.shutdownNow(); // interrupts running tasks + log.print(VERBOSE, "%s shutdownNow returned %d queued tasks that never started.", name, dropped.size()); + + // optional short wait to give interrupted tasks a chance to exit + boolean terminatedAfterForce = executor.awaitTermination(1, TimeUnit.MILLISECONDS); + log.print(VERBOSE, "%s executor %s after shutdownNow().", name, + terminatedAfterForce ? "terminated" : "still running (did not terminate)"); + + if (!terminatedAfterForce) { + // final warning — investigate tasks that ignore interrupts + log.print(ERROR, "%s executor still did not terminate; tasks may be ignoring interrupts.", name); + } + } + } catch (InterruptedException e) { + // Preserve interrupt status and attempt forceful shutdown + log.print(ERROR, e, "Interrupted while stopping %s executor.", name); + Thread.currentThread().interrupt(); + if (isLooperExecutor) { + List dropped = executor.shutdownNow(); + log.print(VERBOSE, "%s shutdownNow invoked after interrupt; %d tasks returned.", name, dropped.size()); + } } - } } /** - * Returns {@code true} to indicate a batch should be retried. {@code false} - * otherwise. + * Looper runs on a background thread and takes messages from the queue. Once it collects enough + * messages, it triggers a flush. */ - boolean upload() { - client.log.print(VERBOSE, "Uploading batch %s.", batch.sequence()); - - try { - Call call = client.service.upload(client.uploadUrl, batch); - Response response = call.execute(); + class Looper implements Runnable { + private boolean stop; - if (response.isSuccessful()) { - client.log.print(VERBOSE, "Uploaded batch %s.", batch.sequence()); + public Looper() { + this.stop = false; + } - for (Message message : batch.batch()) { - for (Callback callback : client.callbacks) { - callback.success(message); + @Override + public void run() { + LinkedList messages = new LinkedList<>(); + AtomicInteger currentBatchSize = new AtomicInteger(); + boolean batchSizeLimitReached = false; + int contextSize = gsonInstance.toJson(CONTEXT).getBytes(ENCODING).length; + try { + while (!stop) { + Message message = messageQueue.take(); + + if (message == StopMessage.STOP) { + log.print(VERBOSE, "Stopping the Looper"); + stop = true; + } else if (message == FlushMessage.POISON) { + if (!messages.isEmpty()) { + log.print(VERBOSE, "Flushing messages."); + } + } else { + // we do +1 because we are accounting for this new message we just took from the queue + // which is not in list yet + // need to check if this message is going to make us go over the limit considering + // default batch size as well + int defaultBatchSize = + BatchUtility.getBatchDefaultSize(contextSize, messages.size() + 1); + int msgSize = messageSizeInBytes(message); + if (currentBatchSize.get() + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) { + messages.add(message); + currentBatchSize.addAndGet(msgSize); + } else { + // put message that did not make the cut this time back on the queue, we already took + // this message if we dont put it back its lost + // we take care of that after submitting the batch + batchSizeLimitReached = true; + } + } + + Boolean isBlockingSignal = message == FlushMessage.POISON || message == StopMessage.STOP; + Boolean isOverflow = messages.size() >= size; + + if (!messages.isEmpty() && (isOverflow || isBlockingSignal || batchSizeLimitReached)) { + Batch batch = Batch.create(CONTEXT, new ArrayList<>(messages), writeKey); + log.print( + VERBOSE, + "Batching %s message(s) into batch %s.", + batch.batch().size(), + batch.sequence()); + try { + networkExecutor.submit( + BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); + } catch (RejectedExecutionException e) { + log.print( + ERROR, + e, + "Failed to submit batch %s to network executor during shutdown. Batch will be lost.", + batch.sequence()); + // Notify callbacks about the failure + for (Message msg : batch.batch()) { + for (Callback callback : callbacks) { + callback.failure(msg, e); + } + } + } + + currentBatchSize.set(0); + messages.clear(); + if (batchSizeLimitReached) { + // If this is true that means the last message that would make us go over the limit + // was not added, + // add it to the now cleared messages list so its not lost + messages.add(message); + } + batchSizeLimitReached = false; + } + } + } catch (InterruptedException e) { + log.print(DEBUG, "Looper interrupted while polling for messages."); + Thread.currentThread().interrupt(); } - } + log.print(VERBOSE, "Looper stopped"); + } + } - return false; + static class BatchUploadTask implements Runnable { + private static final Backo BACKO = + Backo.builder() // + .base(TimeUnit.SECONDS, 15) // + .cap(TimeUnit.HOURS, 1) // + .jitter(1) // + .build(); + + private final AnalyticsClient client; + private final Backo backo; + final Batch batch; + private final int maxRetries; + + static BatchUploadTask create(AnalyticsClient client, Batch batch, int maxRetries) { + return new BatchUploadTask(client, BACKO, batch, maxRetries); } - int status = response.code(); - if (is5xx(status)) { - client.log.print( - DEBUG, "Could not upload batch %s due to server error. Retrying.", batch.sequence()); - return true; - } else if (status == 429) { - client.log.print( - DEBUG, "Could not upload batch %s due to rate limiting. Retrying.", batch.sequence()); - return true; + BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch, int maxRetries) { + this.client = client; + this.batch = batch; + this.backo = backo; + this.maxRetries = maxRetries; } - client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence()); - notifyCallbacksWithException(batch, new IOException(response.errorBody().string())); + private void notifyCallbacksWithException(Batch batch, Exception exception) { + for (Message message : batch.batch()) { + for (Callback callback : client.callbacks) { + callback.failure(message, exception); + } + } + } - return false; - } catch (IOException error) { - client.log.print(DEBUG, error, "Could not upload batch %s. Retrying.", batch.sequence()); + /** Returns {@code true} to indicate a batch should be retried. {@code false} otherwise. */ + boolean upload() { + client.log.print(VERBOSE, "Uploading batch %s.", batch.sequence()); - return true; - } catch (Exception exception) { - client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence()); + try { + Call call = client.service.upload(client.uploadUrl, batch); + Response response = call.execute(); - notifyCallbacksWithException(batch, exception); + if (response.isSuccessful()) { + client.log.print(VERBOSE, "Uploaded batch %s.", batch.sequence()); - return false; - } - } + for (Message message : batch.batch()) { + for (Callback callback : client.callbacks) { + callback.success(message); + } + } - @Override - public void run() { - int attempt = 0; - for (; attempt <= maxRetries; attempt++) { - boolean retry = upload(); - if (!retry) - return; - try { - backo.sleep(attempt); - } catch (InterruptedException e) { - client.log.print( - DEBUG, "Thread interrupted while backing off for batch %s.", batch.sequence()); - return; + return false; + } + + int status = response.code(); + if (is5xx(status)) { + client.log.print( + DEBUG, "Could not upload batch %s due to server error. Retrying.", batch.sequence()); + return true; + } else if (status == 429) { + client.log.print( + DEBUG, "Could not upload batch %s due to rate limiting. Retrying.", batch.sequence()); + return true; + } + + client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence()); + notifyCallbacksWithException(batch, new IOException(response.errorBody().string())); + + return false; + } catch (IOException error) { + client.log.print(DEBUG, error, "Could not upload batch %s. Retrying.", batch.sequence()); + + return true; + } catch (Exception exception) { + client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence()); + + notifyCallbacksWithException(batch, exception); + + return false; + } } - } - client.log.print(ERROR, "Could not upload batch %s. Retries exhausted.", batch.sequence()); - notifyCallbacksWithException( - batch, new IOException(Integer.toString(attempt) + " retries exhausted")); - } + @Override + public void run() { + int attempt = 0; + for (; attempt <= maxRetries; attempt++) { + boolean retry = upload(); + if (!retry) return; + try { + backo.sleep(attempt); + } catch (InterruptedException e) { + client.log.print( + DEBUG, "Thread interrupted while backing off for batch %s.", batch.sequence()); + return; + } + } - private static boolean is5xx(int status) { - return status >= 500 && status < 600; - } - } + client.log.print(ERROR, "Could not upload batch %s. Retries exhausted.", batch.sequence()); + notifyCallbacksWithException( + batch, new IOException(Integer.toString(attempt) + " retries exhausted")); + } - public static class BatchUtility { + private static boolean is5xx(int status) { + return status >= 500 && status < 600; + } + } - /** - * Method to determine what is the expected default size of the batch regardless - * of messages - * - *

- * Sample batch: - * {"batch":[{"type":"alias","messageId":"fc9198f9-d827-47fb-96c8-095bd3405d93","timestamp":"Nov - * 18, 2021, 2:45:07 - * PM","userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", - * "messageId":"3ce6f88c-36cb-4991-83f8-157e10261a89","timestamp":"Nov 18, 2021, - * 2:45:07 - * PM","userId":"jorgen25", - * "integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", - * "messageId":"a328d339-899a-4a14-9835-ec91e303ac4d","timestamp":"Nov 18, 2021, - * 2:45:07 PM", - * "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", - * "messageId":"57b0ceb4-a1cf-4599-9fba-0a44c7041004","timestamp":"Nov 18, 2021, - * 2:45:07 PM", - * "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"}], - * "sentAt":"Nov 18, 2021, 2:45:07 - * PM","context":{"library":{"name":"analytics-java", - * "version":"3.1.3"}},"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - * - *

- * total size of batch : 932 - * - *

- * BREAKDOWN: {"batch":[MESSAGE1,MESSAGE2,MESSAGE3,MESSAGE4],"sentAt":"MMM dd, - * yyyy, HH:mm:ss - * tt","context":CONTEXT,"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - * - *

- * so we need to account for: 1 -message size: 189 * 4 = 756 2 -context object - * size = 55 in - * this sample -> 756 + 55 = 811 3 -Metadata (This has the sent data/sequence - * characters) + - * extra chars (these are chars like "batch":[] or "context": etc and will be - * pretty much the - * same length in every batch -> size is 73 --> 811 + 73 = 884 (well 72 - * actually, char 73 is the - * sequence digit which we account for in point 5) 4 -Commas between each - * message, the total - * number of commas is number_of_msgs - 1 = 3 -> 884 + 3 = 887 (sample is 886 - * because the hour - * in sentData this time happens to be 2:45 but it could be 12:45 5 -Sequence - * Number increments - * with every batch created - * - *

- * so formulae to determine the expected default size of the batch is - * - * @return: defaultSize = messages size + context size + metadata size + comma - * number + sequence - * digits + writekey + buffer - * @return - */ - private static int getBatchDefaultSize(int contextSize, int currentMessageNumber) { - // sample data: {"batch":[],"sentAt":"MMM dd, yyyy, HH:mm:ss - // tt","context":,"sequence":1, - // "writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - 119 - // Don't need to squeeze everything possible into a batch, adding a buffer - int metadataExtraCharsSize = 119 + 1024; - int commaNumber = currentMessageNumber - 1; - - return contextSize - + metadataExtraCharsSize - + commaNumber - + String.valueOf(Integer.MAX_VALUE).length(); + public static class BatchUtility { + + /** + * Method to determine what is the expected default size of the batch regardless of messages + * + *

Sample batch: + * {"batch":[{"type":"alias","messageId":"fc9198f9-d827-47fb-96c8-095bd3405d93","timestamp":"Nov + * 18, 2021, 2:45:07 + * PM","userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", + * "messageId":"3ce6f88c-36cb-4991-83f8-157e10261a89","timestamp":"Nov 18, 2021, 2:45:07 + * PM","userId":"jorgen25", + * "integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", + * "messageId":"a328d339-899a-4a14-9835-ec91e303ac4d","timestamp":"Nov 18, 2021, 2:45:07 PM", + * "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", + * "messageId":"57b0ceb4-a1cf-4599-9fba-0a44c7041004","timestamp":"Nov 18, 2021, 2:45:07 PM", + * "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"}], + * "sentAt":"Nov 18, 2021, 2:45:07 PM","context":{"library":{"name":"analytics-java", + * "version":"3.1.3"}},"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} + * + *

total size of batch : 932 + * + *

BREAKDOWN: {"batch":[MESSAGE1,MESSAGE2,MESSAGE3,MESSAGE4],"sentAt":"MMM dd, yyyy, HH:mm:ss + * tt","context":CONTEXT,"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} + * + *

so we need to account for: 1 -message size: 189 * 4 = 756 2 -context object size = 55 in + * this sample -> 756 + 55 = 811 3 -Metadata (This has the sent data/sequence characters) + + * extra chars (these are chars like "batch":[] or "context": etc and will be pretty much the + * same length in every batch -> size is 73 --> 811 + 73 = 884 (well 72 actually, char 73 is the + * sequence digit which we account for in point 5) 4 -Commas between each message, the total + * number of commas is number_of_msgs - 1 = 3 -> 884 + 3 = 887 (sample is 886 because the hour + * in sentData this time happens to be 2:45 but it could be 12:45 5 -Sequence Number increments + * with every batch created + * + *

so formulae to determine the expected default size of the batch is + * + * @return: defaultSize = messages size + context size + metadata size + comma number + sequence + * digits + writekey + buffer + * @return + */ + private static int getBatchDefaultSize(int contextSize, int currentMessageNumber) { + // sample data: {"batch":[],"sentAt":"MMM dd, yyyy, HH:mm:ss tt","context":,"sequence":1, + // "writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - 119 + // Don't need to squeeze everything possible into a batch, adding a buffer + int metadataExtraCharsSize = 119 + 1024; + int commaNumber = currentMessageNumber - 1; + + return contextSize + + metadataExtraCharsSize + + commaNumber + + String.valueOf(Integer.MAX_VALUE).length(); + } } - } } \ No newline at end of file From 8add4dca929a5af59572a055f7f50501b8d6ff32 Mon Sep 17 00:00:00 2001 From: neelkanth-kaushik Date: Tue, 25 Nov 2025 14:56:37 +0530 Subject: [PATCH 08/16] Fixed spotless issues --- .../java/com/segment/analytics/internal/AnalyticsClient.java | 1 - 1 file changed, 1 deletion(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 7efb616f..696f5105 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import okhttp3.HttpUrl; -import org.springframework.util.StringUtils; import retrofit2.Call; import retrofit2.Response; From 90c25394db105ebf5b83a39b7a1c55feceffc3b0 Mon Sep 17 00:00:00 2001 From: neelkanth-kaushik Date: Wed, 26 Nov 2025 11:07:12 +0530 Subject: [PATCH 09/16] Fixed spotless issues --- .../analytics/internal/AnalyticsClient.java | 906 +++++++++--------- 1 file changed, 453 insertions(+), 453 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 696f5105..e34edf21 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -38,508 +38,508 @@ import retrofit2.Response; public class AnalyticsClient { - private static final Map CONTEXT; - private static final int BATCH_MAX_SIZE = 1024 * 500; - private static final int MSG_MAX_SIZE = 1024 * 32; - private static final Charset ENCODING = StandardCharsets.UTF_8; - private Gson gsonInstance; - private static final String instanceId = UUID.randomUUID().toString(); - - static { - Map library = new LinkedHashMap<>(); - library.put("name", "analytics-java"); - library.put("version", AnalyticsVersion.get()); - Map context = new LinkedHashMap<>(); - context.put("library", Collections.unmodifiableMap(library)); - context.put("instanceId", instanceId); - CONTEXT = Collections.unmodifiableMap(context); + private static final Map CONTEXT; + private static final int BATCH_MAX_SIZE = 1024 * 500; + private static final int MSG_MAX_SIZE = 1024 * 32; + private static final Charset ENCODING = StandardCharsets.UTF_8; + private Gson gsonInstance; + private static final String instanceId = UUID.randomUUID().toString(); + + static { + Map library = new LinkedHashMap<>(); + library.put("name", "analytics-java"); + library.put("version", AnalyticsVersion.get()); + Map context = new LinkedHashMap<>(); + context.put("library", Collections.unmodifiableMap(library)); + context.put("instanceId", instanceId); + CONTEXT = Collections.unmodifiableMap(context); + } + + private final BlockingQueue messageQueue; + private final HttpUrl uploadUrl; + private final SegmentService service; + private final int size; + private final int maximumRetries; + private final int maximumQueueByteSize; + private int currentQueueSizeInBytes; + private final Log log; + private final List callbacks; + private final ExecutorService networkExecutor; + private final ExecutorService looperExecutor; + private final ScheduledExecutorService flushScheduler; + private final AtomicBoolean isShutDown; + private final String writeKey; + private volatile Future looperFuture; + + public static AnalyticsClient create( + HttpUrl uploadUrl, + SegmentService segmentService, + int queueCapacity, + int flushQueueSize, + long flushIntervalInMillis, + int maximumRetries, + int maximumQueueSizeInBytes, + Log log, + ThreadFactory threadFactory, + ExecutorService networkExecutor, + List callbacks, + String writeKey, + Gson gsonInstance) { + return new AnalyticsClient( + new LinkedBlockingQueue(queueCapacity), + uploadUrl, + segmentService, + flushQueueSize, + flushIntervalInMillis, + maximumRetries, + maximumQueueSizeInBytes, + log, + threadFactory, + networkExecutor, + callbacks, + new AtomicBoolean(false), + writeKey, + gsonInstance); + } + + public AnalyticsClient( + BlockingQueue messageQueue, + HttpUrl uploadUrl, + SegmentService service, + int maxQueueSize, + long flushIntervalInMillis, + int maximumRetries, + int maximumQueueSizeInBytes, + Log log, + ThreadFactory threadFactory, + ExecutorService networkExecutor, + List callbacks, + AtomicBoolean isShutDown, + String writeKey, + Gson gsonInstance) { + this.messageQueue = messageQueue; + this.uploadUrl = uploadUrl; + this.service = service; + this.size = maxQueueSize; + this.maximumRetries = maximumRetries; + this.maximumQueueByteSize = maximumQueueSizeInBytes; + this.log = log; + this.callbacks = callbacks; + this.looperExecutor = Executors.newSingleThreadExecutor(threadFactory); + this.networkExecutor = networkExecutor; + this.isShutDown = isShutDown; + this.writeKey = writeKey; + this.gsonInstance = gsonInstance; + + this.currentQueueSizeInBytes = 0; + + if (!isShutDown.get()) { + this.looperFuture = looperExecutor.submit(new Looper()); } - private final BlockingQueue messageQueue; - private final HttpUrl uploadUrl; - private final SegmentService service; - private final int size; - private final int maximumRetries; - private final int maximumQueueByteSize; - private int currentQueueSizeInBytes; - private final Log log; - private final List callbacks; - private final ExecutorService networkExecutor; - private final ExecutorService looperExecutor; - private final ScheduledExecutorService flushScheduler; - private final AtomicBoolean isShutDown; - private final String writeKey; - private volatile Future looperFuture; - - public static AnalyticsClient create( - HttpUrl uploadUrl, - SegmentService segmentService, - int queueCapacity, - int flushQueueSize, - long flushIntervalInMillis, - int maximumRetries, - int maximumQueueSizeInBytes, - Log log, - ThreadFactory threadFactory, - ExecutorService networkExecutor, - List callbacks, - String writeKey, - Gson gsonInstance) { - return new AnalyticsClient( - new LinkedBlockingQueue(queueCapacity), - uploadUrl, - segmentService, - flushQueueSize, - flushIntervalInMillis, - maximumRetries, - maximumQueueSizeInBytes, - log, - threadFactory, - networkExecutor, - callbacks, - new AtomicBoolean(false), - writeKey, - gsonInstance); + flushScheduler = Executors.newScheduledThreadPool(1, threadFactory); + flushScheduler.scheduleAtFixedRate( + new Runnable() { + @Override + public void run() { + flush(); + } + }, + flushIntervalInMillis, + flushIntervalInMillis, + TimeUnit.MILLISECONDS); + } + + public int messageSizeInBytes(Message message) { + String stringifiedMessage = gsonInstance.toJson(message); + + return stringifiedMessage.getBytes(ENCODING).length; + } + + private Boolean isBackPressuredAfterSize(int incomingSize) { + int POISON_BYTE_SIZE = messageSizeInBytes(FlushMessage.POISON); + int sizeAfterAdd = this.currentQueueSizeInBytes + incomingSize + POISON_BYTE_SIZE; + // Leave a 10% buffer since the unsynchronized enqueue could add multiple at a time + return sizeAfterAdd >= Math.min(this.maximumQueueByteSize, BATCH_MAX_SIZE) * 0.9; + } + + public boolean offer(Message message) { + return messageQueue.offer(message); + } + + public void enqueue(Message message) { + if (message != StopMessage.STOP && isShutDown.get()) { + log.print(ERROR, "Attempt to enqueue a message when shutdown has been called %s.", message); + return; } - public AnalyticsClient( - BlockingQueue messageQueue, - HttpUrl uploadUrl, - SegmentService service, - int maxQueueSize, - long flushIntervalInMillis, - int maximumRetries, - int maximumQueueSizeInBytes, - Log log, - ThreadFactory threadFactory, - ExecutorService networkExecutor, - List callbacks, - AtomicBoolean isShutDown, - String writeKey, - Gson gsonInstance) { - this.messageQueue = messageQueue; - this.uploadUrl = uploadUrl; - this.service = service; - this.size = maxQueueSize; - this.maximumRetries = maximumRetries; - this.maximumQueueByteSize = maximumQueueSizeInBytes; - this.log = log; - this.callbacks = callbacks; - this.looperExecutor = Executors.newSingleThreadExecutor(threadFactory); - this.networkExecutor = networkExecutor; - this.isShutDown = isShutDown; - this.writeKey = writeKey; - this.gsonInstance = gsonInstance; - - this.currentQueueSizeInBytes = 0; - - if (!isShutDown.get()) { - this.looperFuture = looperExecutor.submit(new Looper()); + try { + // @jorgen25 message here could be regular msg, POISON or STOP. Only do regular logic if its + // valid message + if (message != StopMessage.STOP && message != FlushMessage.POISON) { + int messageByteSize = messageSizeInBytes(message); + + // @jorgen25 check if message is below 32kb limit for individual messages, no need to check + // for extra characters + if (messageByteSize <= MSG_MAX_SIZE) { + if (isBackPressuredAfterSize(messageByteSize)) { + this.currentQueueSizeInBytes = messageByteSize; + messageQueue.put(FlushMessage.POISON); + messageQueue.put(message); + + log.print(VERBOSE, "Maximum storage size has been hit Flushing..."); + } else { + messageQueue.put(message); + this.currentQueueSizeInBytes += messageByteSize; + } + } else { + log.print( + ERROR, "Message was above individual limit. MessageId: %s", message.messageId()); + throw new IllegalArgumentException( + "Message was above individual limit. MessageId: " + message.messageId()); } - - flushScheduler = Executors.newScheduledThreadPool(1, threadFactory); - flushScheduler.scheduleAtFixedRate( - new Runnable() { - @Override - public void run() { - flush(); - } - }, - flushIntervalInMillis, - flushIntervalInMillis, - TimeUnit.MILLISECONDS); + } else { + messageQueue.put(message); + } + } catch (InterruptedException e) { + log.print(ERROR, e, "Interrupted while adding message %s.", message); + Thread.currentThread().interrupt(); } + } - public int messageSizeInBytes(Message message) { - String stringifiedMessage = gsonInstance.toJson(message); - - return stringifiedMessage.getBytes(ENCODING).length; + public void flush() { + if (!isShutDown.get()) { + enqueue(FlushMessage.POISON); } + } - private Boolean isBackPressuredAfterSize(int incomingSize) { - int POISON_BYTE_SIZE = messageSizeInBytes(FlushMessage.POISON); - int sizeAfterAdd = this.currentQueueSizeInBytes + incomingSize + POISON_BYTE_SIZE; - // Leave a 10% buffer since the unsynchronized enqueue could add multiple at a time - return sizeAfterAdd >= Math.min(this.maximumQueueByteSize, BATCH_MAX_SIZE) * 0.9; - } + public void shutdown() { + if (isShutDown.compareAndSet(false, true)) { + final long start = System.currentTimeMillis(); - public boolean offer(Message message) { - return messageQueue.offer(message); - } + // first let's tell the system to stop + enqueue(StopMessage.STOP); - public void enqueue(Message message) { - if (message != StopMessage.STOP && isShutDown.get()) { - log.print(ERROR, "Attempt to enqueue a message when shutdown has been called %s.", message); - return; - } + // we can shutdown the flush scheduler without worrying + flushScheduler.shutdownNow(); - try { - // @jorgen25 message here could be regular msg, POISON or STOP. Only do regular logic if its - // valid message - if (message != StopMessage.STOP && message != FlushMessage.POISON) { - int messageByteSize = messageSizeInBytes(message); - - // @jorgen25 check if message is below 32kb limit for individual messages, no need to check - // for extra characters - if (messageByteSize <= MSG_MAX_SIZE) { - if (isBackPressuredAfterSize(messageByteSize)) { - this.currentQueueSizeInBytes = messageByteSize; - messageQueue.put(FlushMessage.POISON); - messageQueue.put(message); - - log.print(VERBOSE, "Maximum storage size has been hit Flushing..."); - } else { - messageQueue.put(message); - this.currentQueueSizeInBytes += messageByteSize; - } - } else { - log.print( - ERROR, "Message was above individual limit. MessageId: %s", message.messageId()); - throw new IllegalArgumentException( - "Message was above individual limit. MessageId: " + message.messageId()); - } - } else { - messageQueue.put(message); - } - } catch (InterruptedException e) { - log.print(ERROR, e, "Interrupted while adding message %s.", message); - Thread.currentThread().interrupt(); + // Wait for the looper to complete processing before shutting down executors + waitForLooperCompletion(); + shutdownAndWait(looperExecutor, "looper"); + shutdownAndWait(networkExecutor, "network"); + + log.print( + VERBOSE, "Analytics client shut down in %s ms", (System.currentTimeMillis() - start)); + } + } + + /** + * Wait for the looper to complete processing all messages before proceeding with shutdown. This + * prevents the race condition where the network executor is shut down before the looper finishes + * submitting all batches. + */ + private void waitForLooperCompletion() { + if (looperFuture != null) { + try { + // Wait for the looper to complete processing the STOP message and finish + // Use a reasonable timeout to avoid hanging indefinitely + looperFuture.get(5, TimeUnit.SECONDS); + log.print(VERBOSE, "Looper completed successfully."); + } catch (Exception e) { + log.print(ERROR, e, "Error waiting for looper to complete: %s", e.getMessage()); + // Cancel the looper if it's taking too long or if there's an error + if (!looperFuture.isDone()) { + looperFuture.cancel(true); + log.print(VERBOSE, "Looper was cancelled due to timeout or error."); } + } } - - public void flush() { - if (!isShutDown.get()) { - enqueue(FlushMessage.POISON); + } + + public void shutdownAndWait(ExecutorService executor, String name) { + boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper"); + try { + executor.shutdown(); + boolean terminated = executor.awaitTermination(1, TimeUnit.MILLISECONDS); + if (terminated) { + log.print(VERBOSE, "%s executor terminated normally.", name); + return; + } + if (isLooperExecutor) { + // not terminated within timeout -> force shutdown + log.print(VERBOSE, "%s did not terminate in %d ms; requesting shutdownNow().", name, 1); + List dropped = executor.shutdownNow(); // interrupts running tasks + log.print(VERBOSE, "%s shutdownNow returned %d queued tasks that never started.", name, dropped.size()); + + // optional short wait to give interrupted tasks a chance to exit + boolean terminatedAfterForce = executor.awaitTermination(1, TimeUnit.MILLISECONDS); + log.print(VERBOSE, "%s executor %s after shutdownNow().", name, + terminatedAfterForce ? "terminated" : "still running (did not terminate)"); + + if (!terminatedAfterForce) { + // final warning — investigate tasks that ignore interrupts + log.print(ERROR, "%s executor still did not terminate; tasks may be ignoring interrupts.", name); } + } + } catch (InterruptedException e) { + // Preserve interrupt status and attempt forceful shutdown + log.print(ERROR, e, "Interrupted while stopping %s executor.", name); + Thread.currentThread().interrupt(); + if (isLooperExecutor) { + List dropped = executor.shutdownNow(); + log.print(VERBOSE, "%s shutdownNow invoked after interrupt; %d tasks returned.", name, dropped.size()); + } } + } - public void shutdown() { - if (isShutDown.compareAndSet(false, true)) { - final long start = System.currentTimeMillis(); + /** + * Looper runs on a background thread and takes messages from the queue. Once it collects enough + * messages, it triggers a flush. + */ + class Looper implements Runnable { + private boolean stop; - // first let's tell the system to stop - enqueue(StopMessage.STOP); + public Looper() { + this.stop = false; + } - // we can shutdown the flush scheduler without worrying - flushScheduler.shutdownNow(); + @Override + public void run() { + LinkedList messages = new LinkedList<>(); + AtomicInteger currentBatchSize = new AtomicInteger(); + boolean batchSizeLimitReached = false; + int contextSize = gsonInstance.toJson(CONTEXT).getBytes(ENCODING).length; + try { + while (!stop) { + Message message = messageQueue.take(); + + if (message == StopMessage.STOP) { + log.print(VERBOSE, "Stopping the Looper"); + stop = true; + } else if (message == FlushMessage.POISON) { + if (!messages.isEmpty()) { + log.print(VERBOSE, "Flushing messages."); + } + } else { + // we do +1 because we are accounting for this new message we just took from the queue + // which is not in list yet + // need to check if this message is going to make us go over the limit considering + // default batch size as well + int defaultBatchSize = + BatchUtility.getBatchDefaultSize(contextSize, messages.size() + 1); + int msgSize = messageSizeInBytes(message); + if (currentBatchSize.get() + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) { + messages.add(message); + currentBatchSize.addAndGet(msgSize); + } else { + // put message that did not make the cut this time back on the queue, we already took + // this message if we dont put it back its lost + // we take care of that after submitting the batch + batchSizeLimitReached = true; + } + } - // Wait for the looper to complete processing before shutting down executors - waitForLooperCompletion(); - shutdownAndWait(looperExecutor, "looper"); - shutdownAndWait(networkExecutor, "network"); + Boolean isBlockingSignal = message == FlushMessage.POISON || message == StopMessage.STOP; + Boolean isOverflow = messages.size() >= size; + if (!messages.isEmpty() && (isOverflow || isBlockingSignal || batchSizeLimitReached)) { + Batch batch = Batch.create(CONTEXT, new ArrayList<>(messages), writeKey); log.print( - VERBOSE, "Analytics client shut down in %s ms", (System.currentTimeMillis() - start)); - } - } - - /** - * Wait for the looper to complete processing all messages before proceeding with shutdown. This - * prevents the race condition where the network executor is shut down before the looper finishes - * submitting all batches. - */ - private void waitForLooperCompletion() { - if (looperFuture != null) { + VERBOSE, + "Batching %s message(s) into batch %s.", + batch.batch().size(), + batch.sequence()); try { - // Wait for the looper to complete processing the STOP message and finish - // Use a reasonable timeout to avoid hanging indefinitely - looperFuture.get(5, TimeUnit.SECONDS); - log.print(VERBOSE, "Looper completed successfully."); - } catch (Exception e) { - log.print(ERROR, e, "Error waiting for looper to complete: %s", e.getMessage()); - // Cancel the looper if it's taking too long or if there's an error - if (!looperFuture.isDone()) { - looperFuture.cancel(true); - log.print(VERBOSE, "Looper was cancelled due to timeout or error."); + networkExecutor.submit( + BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); + } catch (RejectedExecutionException e) { + log.print( + ERROR, + e, + "Failed to submit batch %s to network executor during shutdown. Batch will be lost.", + batch.sequence()); + // Notify callbacks about the failure + for (Message msg : batch.batch()) { + for (Callback callback : callbacks) { + callback.failure(msg, e); } + } } - } - } - public void shutdownAndWait(ExecutorService executor, String name) { - boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper"); - try { - executor.shutdown(); - boolean terminated = executor.awaitTermination(1, TimeUnit.MILLISECONDS); - if (terminated) { - log.print(VERBOSE, "%s executor terminated normally.", name); - return; - } - if (isLooperExecutor) { - // not terminated within timeout -> force shutdown - log.print(VERBOSE, "%s did not terminate in %d ms; requesting shutdownNow().", name, 1); - List dropped = executor.shutdownNow(); // interrupts running tasks - log.print(VERBOSE, "%s shutdownNow returned %d queued tasks that never started.", name, dropped.size()); - - // optional short wait to give interrupted tasks a chance to exit - boolean terminatedAfterForce = executor.awaitTermination(1, TimeUnit.MILLISECONDS); - log.print(VERBOSE, "%s executor %s after shutdownNow().", name, - terminatedAfterForce ? "terminated" : "still running (did not terminate)"); - - if (!terminatedAfterForce) { - // final warning — investigate tasks that ignore interrupts - log.print(ERROR, "%s executor still did not terminate; tasks may be ignoring interrupts.", name); - } - } - } catch (InterruptedException e) { - // Preserve interrupt status and attempt forceful shutdown - log.print(ERROR, e, "Interrupted while stopping %s executor.", name); - Thread.currentThread().interrupt(); - if (isLooperExecutor) { - List dropped = executor.shutdownNow(); - log.print(VERBOSE, "%s shutdownNow invoked after interrupt; %d tasks returned.", name, dropped.size()); + currentBatchSize.set(0); + messages.clear(); + if (batchSizeLimitReached) { + // If this is true that means the last message that would make us go over the limit + // was not added, + // add it to the now cleared messages list so its not lost + messages.add(message); } + batchSizeLimitReached = false; + } } + } catch (InterruptedException e) { + log.print(DEBUG, "Looper interrupted while polling for messages."); + Thread.currentThread().interrupt(); + } + log.print(VERBOSE, "Looper stopped"); } - - /** - * Looper runs on a background thread and takes messages from the queue. Once it collects enough - * messages, it triggers a flush. - */ - class Looper implements Runnable { - private boolean stop; - - public Looper() { - this.stop = false; - } - - @Override - public void run() { - LinkedList messages = new LinkedList<>(); - AtomicInteger currentBatchSize = new AtomicInteger(); - boolean batchSizeLimitReached = false; - int contextSize = gsonInstance.toJson(CONTEXT).getBytes(ENCODING).length; - try { - while (!stop) { - Message message = messageQueue.take(); - - if (message == StopMessage.STOP) { - log.print(VERBOSE, "Stopping the Looper"); - stop = true; - } else if (message == FlushMessage.POISON) { - if (!messages.isEmpty()) { - log.print(VERBOSE, "Flushing messages."); - } - } else { - // we do +1 because we are accounting for this new message we just took from the queue - // which is not in list yet - // need to check if this message is going to make us go over the limit considering - // default batch size as well - int defaultBatchSize = - BatchUtility.getBatchDefaultSize(contextSize, messages.size() + 1); - int msgSize = messageSizeInBytes(message); - if (currentBatchSize.get() + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) { - messages.add(message); - currentBatchSize.addAndGet(msgSize); - } else { - // put message that did not make the cut this time back on the queue, we already took - // this message if we dont put it back its lost - // we take care of that after submitting the batch - batchSizeLimitReached = true; - } - } - - Boolean isBlockingSignal = message == FlushMessage.POISON || message == StopMessage.STOP; - Boolean isOverflow = messages.size() >= size; - - if (!messages.isEmpty() && (isOverflow || isBlockingSignal || batchSizeLimitReached)) { - Batch batch = Batch.create(CONTEXT, new ArrayList<>(messages), writeKey); - log.print( - VERBOSE, - "Batching %s message(s) into batch %s.", - batch.batch().size(), - batch.sequence()); - try { - networkExecutor.submit( - BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); - } catch (RejectedExecutionException e) { - log.print( - ERROR, - e, - "Failed to submit batch %s to network executor during shutdown. Batch will be lost.", - batch.sequence()); - // Notify callbacks about the failure - for (Message msg : batch.batch()) { - for (Callback callback : callbacks) { - callback.failure(msg, e); - } - } - } - - currentBatchSize.set(0); - messages.clear(); - if (batchSizeLimitReached) { - // If this is true that means the last message that would make us go over the limit - // was not added, - // add it to the now cleared messages list so its not lost - messages.add(message); - } - batchSizeLimitReached = false; - } - } - } catch (InterruptedException e) { - log.print(DEBUG, "Looper interrupted while polling for messages."); - Thread.currentThread().interrupt(); - } - log.print(VERBOSE, "Looper stopped"); - } + } + + static class BatchUploadTask implements Runnable { + private static final Backo BACKO = + Backo.builder() // + .base(TimeUnit.SECONDS, 15) // + .cap(TimeUnit.HOURS, 1) // + .jitter(1) // + .build(); + + private final AnalyticsClient client; + private final Backo backo; + final Batch batch; + private final int maxRetries; + + static BatchUploadTask create(AnalyticsClient client, Batch batch, int maxRetries) { + return new BatchUploadTask(client, BACKO, batch, maxRetries); } - static class BatchUploadTask implements Runnable { - private static final Backo BACKO = - Backo.builder() // - .base(TimeUnit.SECONDS, 15) // - .cap(TimeUnit.HOURS, 1) // - .jitter(1) // - .build(); - - private final AnalyticsClient client; - private final Backo backo; - final Batch batch; - private final int maxRetries; - - static BatchUploadTask create(AnalyticsClient client, Batch batch, int maxRetries) { - return new BatchUploadTask(client, BACKO, batch, maxRetries); - } + BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch, int maxRetries) { + this.client = client; + this.batch = batch; + this.backo = backo; + this.maxRetries = maxRetries; + } - BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch, int maxRetries) { - this.client = client; - this.batch = batch; - this.backo = backo; - this.maxRetries = maxRetries; + private void notifyCallbacksWithException(Batch batch, Exception exception) { + for (Message message : batch.batch()) { + for (Callback callback : client.callbacks) { + callback.failure(message, exception); } + } + } - private void notifyCallbacksWithException(Batch batch, Exception exception) { - for (Message message : batch.batch()) { - for (Callback callback : client.callbacks) { - callback.failure(message, exception); - } - } - } + /** Returns {@code true} to indicate a batch should be retried. {@code false} otherwise. */ + boolean upload() { + client.log.print(VERBOSE, "Uploading batch %s.", batch.sequence()); - /** Returns {@code true} to indicate a batch should be retried. {@code false} otherwise. */ - boolean upload() { - client.log.print(VERBOSE, "Uploading batch %s.", batch.sequence()); + try { + Call call = client.service.upload(client.uploadUrl, batch); + Response response = call.execute(); - try { - Call call = client.service.upload(client.uploadUrl, batch); - Response response = call.execute(); + if (response.isSuccessful()) { + client.log.print(VERBOSE, "Uploaded batch %s.", batch.sequence()); - if (response.isSuccessful()) { - client.log.print(VERBOSE, "Uploaded batch %s.", batch.sequence()); + for (Message message : batch.batch()) { + for (Callback callback : client.callbacks) { + callback.success(message); + } + } - for (Message message : batch.batch()) { - for (Callback callback : client.callbacks) { - callback.success(message); - } - } + return false; + } - return false; - } + int status = response.code(); + if (is5xx(status)) { + client.log.print( + DEBUG, "Could not upload batch %s due to server error. Retrying.", batch.sequence()); + return true; + } else if (status == 429) { + client.log.print( + DEBUG, "Could not upload batch %s due to rate limiting. Retrying.", batch.sequence()); + return true; + } - int status = response.code(); - if (is5xx(status)) { - client.log.print( - DEBUG, "Could not upload batch %s due to server error. Retrying.", batch.sequence()); - return true; - } else if (status == 429) { - client.log.print( - DEBUG, "Could not upload batch %s due to rate limiting. Retrying.", batch.sequence()); - return true; - } + client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence()); + notifyCallbacksWithException(batch, new IOException(response.errorBody().string())); - client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence()); - notifyCallbacksWithException(batch, new IOException(response.errorBody().string())); + return false; + } catch (IOException error) { + client.log.print(DEBUG, error, "Could not upload batch %s. Retrying.", batch.sequence()); - return false; - } catch (IOException error) { - client.log.print(DEBUG, error, "Could not upload batch %s. Retrying.", batch.sequence()); + return true; + } catch (Exception exception) { + client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence()); - return true; - } catch (Exception exception) { - client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence()); + notifyCallbacksWithException(batch, exception); - notifyCallbacksWithException(batch, exception); + return false; + } + } - return false; - } + @Override + public void run() { + int attempt = 0; + for (; attempt <= maxRetries; attempt++) { + boolean retry = upload(); + if (!retry) return; + try { + backo.sleep(attempt); + } catch (InterruptedException e) { + client.log.print( + DEBUG, "Thread interrupted while backing off for batch %s.", batch.sequence()); + return; } + } - @Override - public void run() { - int attempt = 0; - for (; attempt <= maxRetries; attempt++) { - boolean retry = upload(); - if (!retry) return; - try { - backo.sleep(attempt); - } catch (InterruptedException e) { - client.log.print( - DEBUG, "Thread interrupted while backing off for batch %s.", batch.sequence()); - return; - } - } - - client.log.print(ERROR, "Could not upload batch %s. Retries exhausted.", batch.sequence()); - notifyCallbacksWithException( - batch, new IOException(Integer.toString(attempt) + " retries exhausted")); - } + client.log.print(ERROR, "Could not upload batch %s. Retries exhausted.", batch.sequence()); + notifyCallbacksWithException( + batch, new IOException(Integer.toString(attempt) + " retries exhausted")); + } - private static boolean is5xx(int status) { - return status >= 500 && status < 600; - } + private static boolean is5xx(int status) { + return status >= 500 && status < 600; } + } - public static class BatchUtility { - - /** - * Method to determine what is the expected default size of the batch regardless of messages - * - *

Sample batch: - * {"batch":[{"type":"alias","messageId":"fc9198f9-d827-47fb-96c8-095bd3405d93","timestamp":"Nov - * 18, 2021, 2:45:07 - * PM","userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", - * "messageId":"3ce6f88c-36cb-4991-83f8-157e10261a89","timestamp":"Nov 18, 2021, 2:45:07 - * PM","userId":"jorgen25", - * "integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", - * "messageId":"a328d339-899a-4a14-9835-ec91e303ac4d","timestamp":"Nov 18, 2021, 2:45:07 PM", - * "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", - * "messageId":"57b0ceb4-a1cf-4599-9fba-0a44c7041004","timestamp":"Nov 18, 2021, 2:45:07 PM", - * "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"}], - * "sentAt":"Nov 18, 2021, 2:45:07 PM","context":{"library":{"name":"analytics-java", - * "version":"3.1.3"}},"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - * - *

total size of batch : 932 - * - *

BREAKDOWN: {"batch":[MESSAGE1,MESSAGE2,MESSAGE3,MESSAGE4],"sentAt":"MMM dd, yyyy, HH:mm:ss - * tt","context":CONTEXT,"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - * - *

so we need to account for: 1 -message size: 189 * 4 = 756 2 -context object size = 55 in - * this sample -> 756 + 55 = 811 3 -Metadata (This has the sent data/sequence characters) + - * extra chars (these are chars like "batch":[] or "context": etc and will be pretty much the - * same length in every batch -> size is 73 --> 811 + 73 = 884 (well 72 actually, char 73 is the - * sequence digit which we account for in point 5) 4 -Commas between each message, the total - * number of commas is number_of_msgs - 1 = 3 -> 884 + 3 = 887 (sample is 886 because the hour - * in sentData this time happens to be 2:45 but it could be 12:45 5 -Sequence Number increments - * with every batch created - * - *

so formulae to determine the expected default size of the batch is - * - * @return: defaultSize = messages size + context size + metadata size + comma number + sequence - * digits + writekey + buffer - * @return - */ - private static int getBatchDefaultSize(int contextSize, int currentMessageNumber) { - // sample data: {"batch":[],"sentAt":"MMM dd, yyyy, HH:mm:ss tt","context":,"sequence":1, - // "writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - 119 - // Don't need to squeeze everything possible into a batch, adding a buffer - int metadataExtraCharsSize = 119 + 1024; - int commaNumber = currentMessageNumber - 1; - - return contextSize - + metadataExtraCharsSize - + commaNumber - + String.valueOf(Integer.MAX_VALUE).length(); - } + public static class BatchUtility { + + /** + * Method to determine what is the expected default size of the batch regardless of messages + * + *

Sample batch: + * {"batch":[{"type":"alias","messageId":"fc9198f9-d827-47fb-96c8-095bd3405d93","timestamp":"Nov + * 18, 2021, 2:45:07 + * PM","userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", + * "messageId":"3ce6f88c-36cb-4991-83f8-157e10261a89","timestamp":"Nov 18, 2021, 2:45:07 + * PM","userId":"jorgen25", + * "integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", + * "messageId":"a328d339-899a-4a14-9835-ec91e303ac4d","timestamp":"Nov 18, 2021, 2:45:07 PM", + * "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias", + * "messageId":"57b0ceb4-a1cf-4599-9fba-0a44c7041004","timestamp":"Nov 18, 2021, 2:45:07 PM", + * "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"}], + * "sentAt":"Nov 18, 2021, 2:45:07 PM","context":{"library":{"name":"analytics-java", + * "version":"3.1.3"}},"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} + * + *

total size of batch : 932 + * + *

BREAKDOWN: {"batch":[MESSAGE1,MESSAGE2,MESSAGE3,MESSAGE4],"sentAt":"MMM dd, yyyy, HH:mm:ss + * tt","context":CONTEXT,"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} + * + *

so we need to account for: 1 -message size: 189 * 4 = 756 2 -context object size = 55 in + * this sample -> 756 + 55 = 811 3 -Metadata (This has the sent data/sequence characters) + + * extra chars (these are chars like "batch":[] or "context": etc and will be pretty much the + * same length in every batch -> size is 73 --> 811 + 73 = 884 (well 72 actually, char 73 is the + * sequence digit which we account for in point 5) 4 -Commas between each message, the total + * number of commas is number_of_msgs - 1 = 3 -> 884 + 3 = 887 (sample is 886 because the hour + * in sentData this time happens to be 2:45 but it could be 12:45 5 -Sequence Number increments + * with every batch created + * + *

so formulae to determine the expected default size of the batch is + * + * @return: defaultSize = messages size + context size + metadata size + comma number + sequence + * digits + writekey + buffer + * @return + */ + private static int getBatchDefaultSize(int contextSize, int currentMessageNumber) { + // sample data: {"batch":[],"sentAt":"MMM dd, yyyy, HH:mm:ss tt","context":,"sequence":1, + // "writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - 119 + // Don't need to squeeze everything possible into a batch, adding a buffer + int metadataExtraCharsSize = 119 + 1024; + int commaNumber = currentMessageNumber - 1; + + return contextSize + + metadataExtraCharsSize + + commaNumber + + String.valueOf(Integer.MAX_VALUE).length(); } + } } \ No newline at end of file From 09f292fe76ac9419cf491296093c7247f35489f6 Mon Sep 17 00:00:00 2001 From: neelkanth-kaushik Date: Wed, 26 Nov 2025 11:11:15 +0530 Subject: [PATCH 10/16] Reverted changes --- .../analytics/internal/AnalyticsClient.java | 84 +++---------------- 1 file changed, 10 insertions(+), 74 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index e34edf21..23df3daa 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -25,9 +25,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -69,7 +67,6 @@ public class AnalyticsClient { private final ScheduledExecutorService flushScheduler; private final AtomicBoolean isShutDown; private final String writeKey; - private volatile Future looperFuture; public static AnalyticsClient create( HttpUrl uploadUrl, @@ -133,9 +130,7 @@ public AnalyticsClient( this.currentQueueSizeInBytes = 0; - if (!isShutDown.get()) { - this.looperFuture = looperExecutor.submit(new Looper()); - } + if (!isShutDown.get()) looperExecutor.submit(new Looper()); flushScheduler = Executors.newScheduledThreadPool(1, threadFactory); flushScheduler.scheduleAtFixedRate( @@ -223,8 +218,6 @@ public void shutdown() { // we can shutdown the flush scheduler without worrying flushScheduler.shutdownNow(); - // Wait for the looper to complete processing before shutting down executors - waitForLooperCompletion(); shutdownAndWait(looperExecutor, "looper"); shutdownAndWait(networkExecutor, "network"); @@ -233,62 +226,19 @@ public void shutdown() { } } - /** - * Wait for the looper to complete processing all messages before proceeding with shutdown. This - * prevents the race condition where the network executor is shut down before the looper finishes - * submitting all batches. - */ - private void waitForLooperCompletion() { - if (looperFuture != null) { - try { - // Wait for the looper to complete processing the STOP message and finish - // Use a reasonable timeout to avoid hanging indefinitely - looperFuture.get(5, TimeUnit.SECONDS); - log.print(VERBOSE, "Looper completed successfully."); - } catch (Exception e) { - log.print(ERROR, e, "Error waiting for looper to complete: %s", e.getMessage()); - // Cancel the looper if it's taking too long or if there's an error - if (!looperFuture.isDone()) { - looperFuture.cancel(true); - log.print(VERBOSE, "Looper was cancelled due to timeout or error."); - } - } - } - } - public void shutdownAndWait(ExecutorService executor, String name) { - boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper"); try { executor.shutdown(); - boolean terminated = executor.awaitTermination(1, TimeUnit.MILLISECONDS); - if (terminated) { - log.print(VERBOSE, "%s executor terminated normally.", name); - return; - } - if (isLooperExecutor) { - // not terminated within timeout -> force shutdown - log.print(VERBOSE, "%s did not terminate in %d ms; requesting shutdownNow().", name, 1); - List dropped = executor.shutdownNow(); // interrupts running tasks - log.print(VERBOSE, "%s shutdownNow returned %d queued tasks that never started.", name, dropped.size()); - - // optional short wait to give interrupted tasks a chance to exit - boolean terminatedAfterForce = executor.awaitTermination(1, TimeUnit.MILLISECONDS); - log.print(VERBOSE, "%s executor %s after shutdownNow().", name, - terminatedAfterForce ? "terminated" : "still running (did not terminate)"); - - if (!terminatedAfterForce) { - // final warning — investigate tasks that ignore interrupts - log.print(ERROR, "%s executor still did not terminate; tasks may be ignoring interrupts.", name); - } - } + final boolean executorTerminated = executor.awaitTermination(1, TimeUnit.SECONDS); + + log.print( + VERBOSE, + "%s executor %s.", + name, + executorTerminated ? "terminated normally" : "timed out"); } catch (InterruptedException e) { - // Preserve interrupt status and attempt forceful shutdown log.print(ERROR, e, "Interrupted while stopping %s executor.", name); Thread.currentThread().interrupt(); - if (isLooperExecutor) { - List dropped = executor.shutdownNow(); - log.print(VERBOSE, "%s shutdownNow invoked after interrupt; %d tasks returned.", name, dropped.size()); - } } } @@ -349,22 +299,8 @@ public void run() { "Batching %s message(s) into batch %s.", batch.batch().size(), batch.sequence()); - try { - networkExecutor.submit( - BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); - } catch (RejectedExecutionException e) { - log.print( - ERROR, - e, - "Failed to submit batch %s to network executor during shutdown. Batch will be lost.", - batch.sequence()); - // Notify callbacks about the failure - for (Message msg : batch.batch()) { - for (Callback callback : callbacks) { - callback.failure(msg, e); - } - } - } + networkExecutor.submit( + BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); currentBatchSize.set(0); messages.clear(); From d9753a6ffe614e1f3d416ce705f9771530479245 Mon Sep 17 00:00:00 2001 From: neelkanth-kaushik Date: Wed, 26 Nov 2025 11:12:45 +0530 Subject: [PATCH 11/16] Reverted changes with spotless fixes --- .../java/com/segment/analytics/internal/AnalyticsClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 23df3daa..f7560004 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -478,4 +478,4 @@ private static int getBatchDefaultSize(int contextSize, int currentMessageNumber + String.valueOf(Integer.MAX_VALUE).length(); } } -} \ No newline at end of file +} From ecd53c0faa0277a423d8dfeb0ee4f643181a2353 Mon Sep 17 00:00:00 2001 From: neelkanth-kaushik Date: Wed, 26 Nov 2025 11:15:20 +0530 Subject: [PATCH 12/16] Changes with spotless fixes --- .../analytics/internal/AnalyticsClient.java | 98 +++++++++++++++++-- 1 file changed, 88 insertions(+), 10 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index f7560004..2b924a5c 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -25,7 +25,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -67,6 +69,7 @@ public class AnalyticsClient { private final ScheduledExecutorService flushScheduler; private final AtomicBoolean isShutDown; private final String writeKey; + private volatile Future looperFuture; public static AnalyticsClient create( HttpUrl uploadUrl, @@ -130,7 +133,9 @@ public AnalyticsClient( this.currentQueueSizeInBytes = 0; - if (!isShutDown.get()) looperExecutor.submit(new Looper()); + if (!isShutDown.get()) { + this.looperFuture = looperExecutor.submit(new Looper()); + } flushScheduler = Executors.newScheduledThreadPool(1, threadFactory); flushScheduler.scheduleAtFixedRate( @@ -218,6 +223,8 @@ public void shutdown() { // we can shutdown the flush scheduler without worrying flushScheduler.shutdownNow(); + // Wait for the looper to complete processing before shutting down executors + waitForLooperCompletion(); shutdownAndWait(looperExecutor, "looper"); shutdownAndWait(networkExecutor, "network"); @@ -226,19 +233,76 @@ public void shutdown() { } } + /** + * Wait for the looper to complete processing all messages before proceeding with shutdown. This + * prevents the race condition where the network executor is shut down before the looper finishes + * submitting all batches. + */ + private void waitForLooperCompletion() { + if (looperFuture != null) { + try { + // Wait for the looper to complete processing the STOP message and finish + // Use a reasonable timeout to avoid hanging indefinitely + looperFuture.get(5, TimeUnit.SECONDS); + log.print(VERBOSE, "Looper completed successfully."); + } catch (Exception e) { + log.print(ERROR, e, "Error waiting for looper to complete: %s", e.getMessage()); + // Cancel the looper if it's taking too long or if there's an error + if (!looperFuture.isDone()) { + looperFuture.cancel(true); + log.print(VERBOSE, "Looper was cancelled due to timeout or error."); + } + } + } + } + public void shutdownAndWait(ExecutorService executor, String name) { + boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper"); try { executor.shutdown(); - final boolean executorTerminated = executor.awaitTermination(1, TimeUnit.SECONDS); - - log.print( - VERBOSE, - "%s executor %s.", - name, - executorTerminated ? "terminated normally" : "timed out"); + boolean terminated = executor.awaitTermination(1, TimeUnit.MILLISECONDS); + if (terminated) { + log.print(VERBOSE, "%s executor terminated normally.", name); + return; + } + if (isLooperExecutor) { + // not terminated within timeout -> force shutdown + log.print(VERBOSE, "%s did not terminate in %d ms; requesting shutdownNow().", name, 1); + List dropped = executor.shutdownNow(); // interrupts running tasks + log.print( + VERBOSE, + "%s shutdownNow returned %d queued tasks that never started.", + name, + dropped.size()); + + // optional short wait to give interrupted tasks a chance to exit + boolean terminatedAfterForce = executor.awaitTermination(1, TimeUnit.MILLISECONDS); + log.print( + VERBOSE, + "%s executor %s after shutdownNow().", + name, + terminatedAfterForce ? "terminated" : "still running (did not terminate)"); + + if (!terminatedAfterForce) { + // final warning — investigate tasks that ignore interrupts + log.print( + ERROR, + "%s executor still did not terminate; tasks may be ignoring interrupts.", + name); + } + } } catch (InterruptedException e) { + // Preserve interrupt status and attempt forceful shutdown log.print(ERROR, e, "Interrupted while stopping %s executor.", name); Thread.currentThread().interrupt(); + if (isLooperExecutor) { + List dropped = executor.shutdownNow(); + log.print( + VERBOSE, + "%s shutdownNow invoked after interrupt; %d tasks returned.", + name, + dropped.size()); + } } } @@ -299,8 +363,22 @@ public void run() { "Batching %s message(s) into batch %s.", batch.batch().size(), batch.sequence()); - networkExecutor.submit( - BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); + try { + networkExecutor.submit( + BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); + } catch (RejectedExecutionException e) { + log.print( + ERROR, + e, + "Failed to submit batch %s to network executor during shutdown. Batch will be lost.", + batch.sequence()); + // Notify callbacks about the failure + for (Message msg : batch.batch()) { + for (Callback callback : callbacks) { + callback.failure(msg, e); + } + } + } currentBatchSize.set(0); messages.clear(); From 9c87d7b445389f059c6f1794f07af53204406b99 Mon Sep 17 00:00:00 2001 From: neelkanth-kaushik Date: Wed, 26 Nov 2025 12:05:47 +0530 Subject: [PATCH 13/16] Changed MILLISECONDS to SECONDS --- .../java/com/segment/analytics/internal/AnalyticsClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 2b924a5c..9d320f3e 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -260,7 +260,7 @@ public void shutdownAndWait(ExecutorService executor, String name) { boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper"); try { executor.shutdown(); - boolean terminated = executor.awaitTermination(1, TimeUnit.MILLISECONDS); + boolean terminated = executor.awaitTermination(1, TimeUnit.SECONDS); if (terminated) { log.print(VERBOSE, "%s executor terminated normally.", name); return; @@ -276,7 +276,7 @@ public void shutdownAndWait(ExecutorService executor, String name) { dropped.size()); // optional short wait to give interrupted tasks a chance to exit - boolean terminatedAfterForce = executor.awaitTermination(1, TimeUnit.MILLISECONDS); + boolean terminatedAfterForce = executor.awaitTermination(1, TimeUnit.SECONDS); log.print( VERBOSE, "%s executor %s after shutdownNow().", From 7c29fef107f2a1354fc0a6c9945c2d774cdda537 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Tue, 2 Dec 2025 16:50:40 -0500 Subject: [PATCH 14/16] minor cleanup for copilot suggestions --- .../analytics/internal/AnalyticsClient.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 9d320f3e..187365ea 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -44,6 +44,8 @@ public class AnalyticsClient { private static final Charset ENCODING = StandardCharsets.UTF_8; private Gson gsonInstance; private static final String instanceId = UUID.randomUUID().toString(); + private static final int WAIT_FOR_THREAD_COMPLETE_MS = 5000; + private static final int TERMINATION_TIMEOUT_MS = 1000; static { Map library = new LinkedHashMap<>(); @@ -243,10 +245,10 @@ private void waitForLooperCompletion() { try { // Wait for the looper to complete processing the STOP message and finish // Use a reasonable timeout to avoid hanging indefinitely - looperFuture.get(5, TimeUnit.SECONDS); + looperFuture.get(WAIT_FOR_THREAD_COMPLETE_MS, TimeUnit.MILLISECONDS); log.print(VERBOSE, "Looper completed successfully."); } catch (Exception e) { - log.print(ERROR, e, "Error waiting for looper to complete: %s", e.getMessage()); + log.print(ERROR, e, "Error waiting for looper to complete."); // Cancel the looper if it's taking too long or if there's an error if (!looperFuture.isDone()) { looperFuture.cancel(true); @@ -260,14 +262,14 @@ public void shutdownAndWait(ExecutorService executor, String name) { boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper"); try { executor.shutdown(); - boolean terminated = executor.awaitTermination(1, TimeUnit.SECONDS); + boolean terminated = executor.awaitTermination(TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (terminated) { log.print(VERBOSE, "%s executor terminated normally.", name); return; } - if (isLooperExecutor) { + if (isLooperExecutor) { // Handle looper - network could be passed in and should finish on its own // not terminated within timeout -> force shutdown - log.print(VERBOSE, "%s did not terminate in %d ms; requesting shutdownNow().", name, 1); + log.print(VERBOSE, "%s did not terminate in %d ms; requesting shutdownNow().", name, TERMINATION_TIMEOUT_MS); List dropped = executor.shutdownNow(); // interrupts running tasks log.print( VERBOSE, @@ -276,7 +278,7 @@ public void shutdownAndWait(ExecutorService executor, String name) { dropped.size()); // optional short wait to give interrupted tasks a chance to exit - boolean terminatedAfterForce = executor.awaitTermination(1, TimeUnit.SECONDS); + boolean terminatedAfterForce = executor.awaitTermination(TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS); log.print( VERBOSE, "%s executor %s after shutdownNow().", From b6cf766480613fea64570601bef596dd5435fc5c Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Tue, 2 Dec 2025 18:43:23 -0500 Subject: [PATCH 15/16] Shoulda checked the history, reverting to seconds --- .../segment/analytics/internal/AnalyticsClient.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 187365ea..8919600f 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -44,8 +44,8 @@ public class AnalyticsClient { private static final Charset ENCODING = StandardCharsets.UTF_8; private Gson gsonInstance; private static final String instanceId = UUID.randomUUID().toString(); - private static final int WAIT_FOR_THREAD_COMPLETE_MS = 5000; - private static final int TERMINATION_TIMEOUT_MS = 1000; + private static final int WAIT_FOR_THREAD_COMPLETE_S = 5; + private static final int TERMINATION_TIMEOUT_S = 1; static { Map library = new LinkedHashMap<>(); @@ -245,7 +245,7 @@ private void waitForLooperCompletion() { try { // Wait for the looper to complete processing the STOP message and finish // Use a reasonable timeout to avoid hanging indefinitely - looperFuture.get(WAIT_FOR_THREAD_COMPLETE_MS, TimeUnit.MILLISECONDS); + looperFuture.get(WAIT_FOR_THREAD_COMPLETE_S, TimeUnit.SECONDS); log.print(VERBOSE, "Looper completed successfully."); } catch (Exception e) { log.print(ERROR, e, "Error waiting for looper to complete."); @@ -262,14 +262,14 @@ public void shutdownAndWait(ExecutorService executor, String name) { boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper"); try { executor.shutdown(); - boolean terminated = executor.awaitTermination(TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + boolean terminated = executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS); if (terminated) { log.print(VERBOSE, "%s executor terminated normally.", name); return; } if (isLooperExecutor) { // Handle looper - network could be passed in and should finish on its own // not terminated within timeout -> force shutdown - log.print(VERBOSE, "%s did not terminate in %d ms; requesting shutdownNow().", name, TERMINATION_TIMEOUT_MS); + log.print(VERBOSE, "%s did not terminate in %d seconds; requesting shutdownNow().", name, TERMINATION_TIMEOUT_S); List dropped = executor.shutdownNow(); // interrupts running tasks log.print( VERBOSE, @@ -278,7 +278,7 @@ public void shutdownAndWait(ExecutorService executor, String name) { dropped.size()); // optional short wait to give interrupted tasks a chance to exit - boolean terminatedAfterForce = executor.awaitTermination(TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + boolean terminatedAfterForce = executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS); log.print( VERBOSE, "%s executor %s after shutdownNow().", From b1af68cf8e3cf6f1d303bbf23880da80f81610dc Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Tue, 2 Dec 2025 18:51:47 -0500 Subject: [PATCH 16/16] fixing formatting --- .../segment/analytics/internal/AnalyticsClient.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 8919600f..2430cd1e 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -267,9 +267,13 @@ public void shutdownAndWait(ExecutorService executor, String name) { log.print(VERBOSE, "%s executor terminated normally.", name); return; } - if (isLooperExecutor) { // Handle looper - network could be passed in and should finish on its own + if (isLooperExecutor) { // Handle looper - network should finish on its own // not terminated within timeout -> force shutdown - log.print(VERBOSE, "%s did not terminate in %d seconds; requesting shutdownNow().", name, TERMINATION_TIMEOUT_S); + log.print( + VERBOSE, + "%s did not terminate in %d seconds; requesting shutdownNow().", + name, + TERMINATION_TIMEOUT_S); List dropped = executor.shutdownNow(); // interrupts running tasks log.print( VERBOSE, @@ -278,7 +282,8 @@ public void shutdownAndWait(ExecutorService executor, String name) { dropped.size()); // optional short wait to give interrupted tasks a chance to exit - boolean terminatedAfterForce = executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS); + boolean terminatedAfterForce = + executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS); log.print( VERBOSE, "%s executor %s after shutdownNow().",