diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java index 6393547c6ef..03bed3c7174 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java @@ -1,36 +1,29 @@ package datadog.trace.common.writer; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + import datadog.trace.core.DDSpan; import datadog.trace.core.MetadataConsumer; -import datadog.trace.core.tagprocessor.PeerServiceCalculator; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** List writer used by tests mostly */ public class ListWriter extends CopyOnWriteArrayList> implements Writer { - private static final Logger log = LoggerFactory.getLogger(ListWriter.class); + private static final Filter ACCEPT_ALL = trace -> true; - public static final Filter ACCEPT_ALL = - new Filter() { - @Override - public boolean accept(List trace) { - return true; - } - }; - - private final List latches = new ArrayList<>(); private final AtomicInteger traceCount = new AtomicInteger(); private final TraceStructureWriter structureWriter = new TraceStructureWriter(true); + private final Object monitor = new Object(); - private final PeerServiceCalculator peerServiceCalculator = new PeerServiceCalculator(); private Filter filter = ACCEPT_ALL; public List firstTrace() { @@ -47,30 +40,44 @@ public void write(List trace) { // remotely realistic so the test actually test something span.processTagsAndBaggage(MetadataConsumer.NO_OP); } + + add(trace); + structureWriter.write(trace); + traceCount.incrementAndGet(); - synchronized (latches) { - add(trace); - for (final CountDownLatch latch : latches) { - if (size() >= latch.getCount()) { - while (latch.getCount() > 0) { - latch.countDown(); - } - } - } + synchronized (monitor) { + monitor.notifyAll(); } - structureWriter.write(trace); } - public boolean waitForTracesMax(final int number, int seconds) - throws InterruptedException, TimeoutException { - final CountDownLatch latch = new CountDownLatch(number); - synchronized (latches) { - if (size() >= number) { + private boolean awaitUntilDeadline(long timeout, TimeUnit unit, BooleanSupplier predicate) + throws InterruptedException { + final long deadline = System.nanoTime() + unit.toNanos(timeout); + + while (true) { + if (predicate.getAsBoolean()) { return true; } - latches.add(latch); + + long now = System.nanoTime(); + long remaining = deadline - now; + if (remaining <= 0) { + break; + } + + long millis = NANOSECONDS.toMillis(remaining); + long nanos = remaining - MILLISECONDS.toNanos(millis); + + synchronized (monitor) { + monitor.wait(millis, (int) nanos); + } } - return latch.await(seconds, TimeUnit.SECONDS); + + return false; + } + + public boolean waitForTracesMax(final int number, int seconds) throws InterruptedException { + return awaitUntilDeadline(seconds, SECONDS, () -> traceCount.get() >= number); } public void waitForTraces(final int number) throws InterruptedException, TimeoutException { @@ -88,24 +95,17 @@ public void waitForTraces(final int number) throws InterruptedException, Timeout } public void waitUntilReported(final DDSpan span) throws InterruptedException, TimeoutException { - waitUntilReported(span, 20, TimeUnit.SECONDS); + waitUntilReported(span, 20, SECONDS); } public void waitUntilReported(final DDSpan span, int timeout, TimeUnit unit) throws InterruptedException, TimeoutException { - while (true) { - final CountDownLatch latch = new CountDownLatch(size() + 1); - synchronized (latches) { - latches.add(latch); - } - if (isReported(span)) { - return; - } - if (!latch.await(timeout, unit)) { - String msg = "Timeout waiting for span to be reported: " + span; - log.warn(msg); - throw new TimeoutException(msg); - } + boolean reported = awaitUntilDeadline(timeout, unit, () -> isReported(span)); + + if (!reported) { + String msg = "Timeout waiting for span to be reported: " + span; + log.warn(msg); + throw new TimeoutException(msg); } } @@ -142,17 +142,16 @@ public boolean flush() { return true; } + @Override + public void clear() { + super.clear(); + + traceCount.set(0); + } + @Override public void close() { clear(); - synchronized (latches) { - for (final CountDownLatch latch : latches) { - while (latch.getCount() > 0) { - latch.countDown(); - } - } - latches.clear(); - } } @Override