From a0594f006822b0b5c6458564b50224952f340427 Mon Sep 17 00:00:00 2001 From: Alexey Kuznetsov Date: Mon, 16 Jun 2025 09:01:57 -0400 Subject: [PATCH 1/2] Simplified ListWriter await logic. --- .../trace/common/writer/ListWriter.java | 100 ++++++++---------- 1 file changed, 47 insertions(+), 53 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java index 6393547c6ef..ed70068ab87 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java @@ -1,36 +1,27 @@ package datadog.trace.common.writer; +import static java.util.concurrent.TimeUnit.SECONDS; + import datadog.trace.core.DDSpan; import datadog.trace.core.MetadataConsumer; -import datadog.trace.core.tagprocessor.PeerServiceCalculator; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** List writer used by tests mostly */ public class ListWriter extends CopyOnWriteArrayList> implements Writer { - private static final Logger log = LoggerFactory.getLogger(ListWriter.class); + private static final Filter ACCEPT_ALL = trace -> true; - public static final Filter ACCEPT_ALL = - new Filter() { - @Override - public boolean accept(List trace) { - return true; - } - }; - - private final List latches = new ArrayList<>(); private final AtomicInteger traceCount = new AtomicInteger(); private final TraceStructureWriter structureWriter = new TraceStructureWriter(true); + private final Object monitor = new Object(); - private final PeerServiceCalculator peerServiceCalculator = new PeerServiceCalculator(); private Filter filter = ACCEPT_ALL; public List firstTrace() { @@ -47,30 +38,41 @@ public void write(List trace) { // remotely realistic so the test actually test something span.processTagsAndBaggage(MetadataConsumer.NO_OP); } + + add(trace); + structureWriter.write(trace); + traceCount.incrementAndGet(); - synchronized (latches) { - add(trace); - for (final CountDownLatch latch : latches) { - if (size() >= latch.getCount()) { - while (latch.getCount() > 0) { - latch.countDown(); - } - } - } + synchronized (monitor) { + monitor.notifyAll(); } - structureWriter.write(trace); } - public boolean waitForTracesMax(final int number, int seconds) - throws InterruptedException, TimeoutException { - final CountDownLatch latch = new CountDownLatch(number); - synchronized (latches) { - if (size() >= number) { + private boolean awaitUntilDeadline(long timeout, TimeUnit unit, BooleanSupplier predicate) + throws InterruptedException { + long deadline = System.currentTimeMillis() + unit.toMillis(timeout); + + while (true) { + if (predicate.getAsBoolean()) { return true; } - latches.add(latch); + + long now = System.currentTimeMillis(); + long waitTime = deadline - now; + if (waitTime <= 0) { + break; + } + + synchronized (monitor) { + monitor.wait(waitTime); + } } - return latch.await(seconds, TimeUnit.SECONDS); + + return false; + } + + public boolean waitForTracesMax(final int number, int seconds) throws InterruptedException { + return awaitUntilDeadline(seconds, SECONDS, () -> traceCount.get() >= number); } public void waitForTraces(final int number) throws InterruptedException, TimeoutException { @@ -88,24 +90,17 @@ public void waitForTraces(final int number) throws InterruptedException, Timeout } public void waitUntilReported(final DDSpan span) throws InterruptedException, TimeoutException { - waitUntilReported(span, 20, TimeUnit.SECONDS); + waitUntilReported(span, 20, SECONDS); } public void waitUntilReported(final DDSpan span, int timeout, TimeUnit unit) throws InterruptedException, TimeoutException { - while (true) { - final CountDownLatch latch = new CountDownLatch(size() + 1); - synchronized (latches) { - latches.add(latch); - } - if (isReported(span)) { - return; - } - if (!latch.await(timeout, unit)) { - String msg = "Timeout waiting for span to be reported: " + span; - log.warn(msg); - throw new TimeoutException(msg); - } + boolean reported = awaitUntilDeadline(timeout, unit, () -> isReported(span)); + + if (!reported) { + String msg = "Timeout waiting for span to be reported: " + span; + log.warn(msg); + throw new TimeoutException(msg); } } @@ -142,17 +137,16 @@ public boolean flush() { return true; } + @Override + public void clear() { + super.clear(); + + traceCount.set(0); + } + @Override public void close() { clear(); - synchronized (latches) { - for (final CountDownLatch latch : latches) { - while (latch.getCount() > 0) { - latch.countDown(); - } - } - latches.clear(); - } } @Override From b461bc236695401beb015f0150b1f392caee4fa7 Mon Sep 17 00:00:00 2001 From: Alexey Kuznetsov Date: Mon, 16 Jun 2025 09:25:07 -0400 Subject: [PATCH 2/2] Refactored to use nanoTime --- .../datadog/trace/common/writer/ListWriter.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java index ed70068ab87..03bed3c7174 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java @@ -1,5 +1,7 @@ package datadog.trace.common.writer; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import datadog.trace.core.DDSpan; @@ -50,21 +52,24 @@ public void write(List trace) { private boolean awaitUntilDeadline(long timeout, TimeUnit unit, BooleanSupplier predicate) throws InterruptedException { - long deadline = System.currentTimeMillis() + unit.toMillis(timeout); + final long deadline = System.nanoTime() + unit.toNanos(timeout); while (true) { if (predicate.getAsBoolean()) { return true; } - long now = System.currentTimeMillis(); - long waitTime = deadline - now; - if (waitTime <= 0) { + long now = System.nanoTime(); + long remaining = deadline - now; + if (remaining <= 0) { break; } + long millis = NANOSECONDS.toMillis(remaining); + long nanos = remaining - MILLISECONDS.toNanos(millis); + synchronized (monitor) { - monitor.wait(waitTime); + monitor.wait(millis, (int) nanos); } }