diff --git a/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java b/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java index f6fbc502e..fc92d84bd 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java +++ b/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java @@ -22,6 +22,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,7 @@ final class BulkWorkerThreadPool implements WorkerThreadPool private final StreamingWorkerThreadPool backupThreadPool; private volatile boolean isActive; + private final AtomicInteger activeThreads = new AtomicInteger(0); public BulkWorkerThreadPool( final WorkerFactory workerFactory, @@ -86,10 +88,12 @@ private void execute() = new BulkWorkerTaskProvider(task, workQueue); try { + activeThreads.incrementAndGet(); bulkWorker.processTasks(taskProvider); } catch (final RuntimeException ex) { LOG.warn("Bulk Worker threw unhandled exception", ex); } finally { + activeThreads.decrementAndGet(); // Re-submit the first task if it has not been consumed // NB: It's really faulty Worker logic to not consume at least // the one task. @@ -150,6 +154,11 @@ public int getBacklogSize() { return workQueue.size() + backupThreadPool.getBacklogSize(); } + + @Override + public int getApproxActiveCount() { + return activeThreads.get() + backupThreadPool.getApproxActiveCount(); + } @Override public void submitWorkerTask(final WorkerTaskImpl workerTask) diff --git a/worker-core/src/main/java/com/github/workerframework/core/StreamingWorkerThreadPool.java b/worker-core/src/main/java/com/github/workerframework/core/StreamingWorkerThreadPool.java index 0857f6130..62927f73d 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/StreamingWorkerThreadPool.java +++ b/worker-core/src/main/java/com/github/workerframework/core/StreamingWorkerThreadPool.java @@ -76,6 +76,11 @@ public int getBacklogSize() { return workQueue.size(); } + + @Override + public int getApproxActiveCount() { + return threadPoolExecutor.getActiveCount(); + } /** * Execute the specified task at some point in the future diff --git a/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java b/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java index e2f836c2a..22062d262 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java +++ b/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java @@ -78,6 +78,8 @@ public final class WorkerApplication extends Application { private final long startTime = System.currentTimeMillis(); private static final Logger LOG = LoggerFactory.getLogger(WorkerApplication.class); + private static final long SHUTDOWN_DURATION = 5 * 60 * 1000; // 5 minutes in milliseconds + private static final int SHUTDOWN_LOG_INTERVAL = 15_000; // 15 seconds in milliseconds /** * Entry point for the asynchronous micro-service worker framework. @@ -143,25 +145,28 @@ public void start() { @Override public void stop() { - LOG.info("Worker stop requested, allowing in-progress tasks to complete."); + LOG.info("Worker stop requested."); + workerQueue.shutdownIncoming(); - while (!wtp.isIdle()) { + + final long startTime = System.currentTimeMillis(); + + int backlogSize = wtp.getBacklogSize(); + while(backlogSize > 0 && System.currentTimeMillis() - startTime < SHUTDOWN_DURATION) { try { - //The grace period will expire and the process killed so no need for time limit here - LOG.trace("Awaiting the Worker Thread Pool to become idle, {} tasks in the backlog.", - wtp.getBacklogSize()); - Thread.sleep(1000); + LOG.debug("Allowing {} backlog tasks to complete, {} currently active.", backlogSize, wtp.getApproxActiveCount()); + Thread.sleep(SHUTDOWN_LOG_INTERVAL); + backlogSize = wtp.getBacklogSize(); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); - throw new RuntimeException(e); + break; } } - LOG.trace("Worker Thread Pool is idle."); wtp.shutdown(); try { - wtp.awaitTermination(10_000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOG.warn("Shutdown interrupted", e); + wtp.awaitTermination(5, TimeUnit.MINUTES); + } catch (final InterruptedException e) { + LOG.error("Worker stop interrupted, in-progress tasks may not have completed.", e); Thread.currentThread().interrupt(); } workerQueue.shutdown(); diff --git a/worker-core/src/main/java/com/github/workerframework/core/WorkerThreadPool.java b/worker-core/src/main/java/com/github/workerframework/core/WorkerThreadPool.java index 2406e79f8..6b4c839cd 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/WorkerThreadPool.java +++ b/worker-core/src/main/java/com/github/workerframework/core/WorkerThreadPool.java @@ -61,6 +61,8 @@ void awaitTermination(long timeout, TimeUnit unit) boolean isIdle(); int getBacklogSize(); + + int getApproxActiveCount(); /** * Execute the specified task at some point in the future diff --git a/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/RabbitWorkerQueue.java b/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/RabbitWorkerQueue.java index 58a0ebfc5..edfacf009 100644 --- a/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/RabbitWorkerQueue.java +++ b/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/RabbitWorkerQueue.java @@ -73,6 +73,7 @@ public final class RabbitWorkerQueue implements ManagedWorkerQueue private final RabbitWorkerQueueConfiguration config; private final int maxTasks; private static final Logger LOG = LoggerFactory.getLogger(RabbitWorkerQueue.class); + private boolean incomingShutdownPermanent = false; /** * Setup a new RabbitWorkerQueue. @@ -214,6 +215,7 @@ public String getPausedQueue() * {@inheritDoc} * * The incoming queues will all be cancelled so the consumer will fall back to idle. + * This is permanent, and attempts to reconnectIncoming will fail. */ @Override public void shutdownIncoming() @@ -224,6 +226,7 @@ public void shutdownIncoming() try { incomingChannel.basicCancel(consumerTag); consumerTag = null; + incomingShutdownPermanent = true; } catch (IOException e) { metrics.incremementErrors(); LOG.warn("Failed to cancel consumer {}", consumerTag, e); @@ -277,6 +280,9 @@ public void disconnectIncoming() public void reconnectIncoming() { LOG.debug("Reconnecting incoming queues"); + if(incomingShutdownPermanent) { + throw new IllegalStateException("Queue is permanently shutdown"); + } synchronized (consumerLock) { if (consumerTag == null && incomingChannel.isOpen()) { try { diff --git a/worker-test/src/test/java/com/github/workerframework/workertest/ShutdownDeveloperTest.java b/worker-test/src/test/java/com/github/workerframework/workertest/ShutdownDeveloperTest.java index 272bfbb6b..5e4a6166b 100644 --- a/worker-test/src/test/java/com/github/workerframework/workertest/ShutdownDeveloperTest.java +++ b/worker-test/src/test/java/com/github/workerframework/workertest/ShutdownDeveloperTest.java @@ -45,7 +45,7 @@ public class ShutdownDeveloperTest extends TestWorkerTestBase { public void shutdownTest() throws IOException, TimeoutException, CodecException { // Usage instructions - // Comment out the iages for test worker 2 and 3 in this module's pom.xml + // Comment out the images for worker-test-2 and worker-test-no-valid-cert in this module's pom.xml // Use mvn docker:start to start test worker // Remove the @Ignore and run the test to create 100 test messages // From a terminal execute docker stop -t 300 CONTAINER_ID