diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index f7560004..2430cd1e 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -25,7 +25,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -42,6 +44,8 @@ public class AnalyticsClient { private static final Charset ENCODING = StandardCharsets.UTF_8; private Gson gsonInstance; private static final String instanceId = UUID.randomUUID().toString(); + private static final int WAIT_FOR_THREAD_COMPLETE_S = 5; + private static final int TERMINATION_TIMEOUT_S = 1; static { Map library = new LinkedHashMap<>(); @@ -67,6 +71,7 @@ public class AnalyticsClient { private final ScheduledExecutorService flushScheduler; private final AtomicBoolean isShutDown; private final String writeKey; + private volatile Future looperFuture; public static AnalyticsClient create( HttpUrl uploadUrl, @@ -130,7 +135,9 @@ public AnalyticsClient( this.currentQueueSizeInBytes = 0; - if (!isShutDown.get()) looperExecutor.submit(new Looper()); + if (!isShutDown.get()) { + this.looperFuture = looperExecutor.submit(new Looper()); + } flushScheduler = Executors.newScheduledThreadPool(1, threadFactory); flushScheduler.scheduleAtFixedRate( @@ -218,6 +225,8 @@ public void shutdown() { // we can shutdown the flush scheduler without worrying flushScheduler.shutdownNow(); + // Wait for the looper to complete processing before shutting down executors + waitForLooperCompletion(); shutdownAndWait(looperExecutor, "looper"); shutdownAndWait(networkExecutor, "network"); @@ -226,19 +235,81 @@ public void shutdown() { } } + /** + * Wait for the looper to complete processing all messages before proceeding with shutdown. This + * prevents the race condition where the network executor is shut down before the looper finishes + * submitting all batches. + */ + private void waitForLooperCompletion() { + if (looperFuture != null) { + try { + // Wait for the looper to complete processing the STOP message and finish + // Use a reasonable timeout to avoid hanging indefinitely + looperFuture.get(WAIT_FOR_THREAD_COMPLETE_S, TimeUnit.SECONDS); + log.print(VERBOSE, "Looper completed successfully."); + } catch (Exception e) { + log.print(ERROR, e, "Error waiting for looper to complete."); + // Cancel the looper if it's taking too long or if there's an error + if (!looperFuture.isDone()) { + looperFuture.cancel(true); + log.print(VERBOSE, "Looper was cancelled due to timeout or error."); + } + } + } + } + public void shutdownAndWait(ExecutorService executor, String name) { + boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper"); try { executor.shutdown(); - final boolean executorTerminated = executor.awaitTermination(1, TimeUnit.SECONDS); - - log.print( - VERBOSE, - "%s executor %s.", - name, - executorTerminated ? "terminated normally" : "timed out"); + boolean terminated = executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS); + if (terminated) { + log.print(VERBOSE, "%s executor terminated normally.", name); + return; + } + if (isLooperExecutor) { // Handle looper - network should finish on its own + // not terminated within timeout -> force shutdown + log.print( + VERBOSE, + "%s did not terminate in %d seconds; requesting shutdownNow().", + name, + TERMINATION_TIMEOUT_S); + List dropped = executor.shutdownNow(); // interrupts running tasks + log.print( + VERBOSE, + "%s shutdownNow returned %d queued tasks that never started.", + name, + dropped.size()); + + // optional short wait to give interrupted tasks a chance to exit + boolean terminatedAfterForce = + executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS); + log.print( + VERBOSE, + "%s executor %s after shutdownNow().", + name, + terminatedAfterForce ? "terminated" : "still running (did not terminate)"); + + if (!terminatedAfterForce) { + // final warning — investigate tasks that ignore interrupts + log.print( + ERROR, + "%s executor still did not terminate; tasks may be ignoring interrupts.", + name); + } + } } catch (InterruptedException e) { + // Preserve interrupt status and attempt forceful shutdown log.print(ERROR, e, "Interrupted while stopping %s executor.", name); Thread.currentThread().interrupt(); + if (isLooperExecutor) { + List dropped = executor.shutdownNow(); + log.print( + VERBOSE, + "%s shutdownNow invoked after interrupt; %d tasks returned.", + name, + dropped.size()); + } } } @@ -299,8 +370,22 @@ public void run() { "Batching %s message(s) into batch %s.", batch.batch().size(), batch.sequence()); - networkExecutor.submit( - BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); + try { + networkExecutor.submit( + BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); + } catch (RejectedExecutionException e) { + log.print( + ERROR, + e, + "Failed to submit batch %s to network executor during shutdown. Batch will be lost.", + batch.sequence()); + // Notify callbacks about the failure + for (Message msg : batch.batch()) { + for (Callback callback : callbacks) { + callback.failure(msg, e); + } + } + } currentBatchSize.set(0); messages.clear();