From 5542583a578b09833ca8420970cc0d4ca819e4a5 Mon Sep 17 00:00:00 2001 From: Jannik Lindemann Date: Mon, 15 Jun 2026 11:55:15 +0200 Subject: [PATCH] Fix Flaky Tests from Blocking Thread Pools --- .../ooc/AggregateUnaryOOCInstruction.java | 11 +++++------ .../runtime/instructions/ooc/OOCInstruction.java | 5 +---- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java index f0d4fd29af7..38b228ccb1d 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java @@ -35,7 +35,6 @@ import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysds.runtime.matrix.operators.Operator; import org.apache.sysds.runtime.meta.DataCharacteristics; -import org.apache.sysds.runtime.ooc.stream.StreamContext; import org.apache.sysds.runtime.util.IndexRange; import java.util.HashMap; @@ -119,9 +118,10 @@ public void processInstruction( ExecutionContext ec ) { }); // global reduce - submitOOCTask(() -> { - IndexedMatrixValue partial; - while ((partial = qLocal.dequeue()) != LocalTaskQueue.NO_MORE_TASKS) { + addOutStream(qOut); + submitOOCTasks(qLocal, callback -> { + IndexedMatrixValue partial = callback.get(); + synchronized(aggTracker) { long idx = aggun.isRowAggregate() ? partial.getIndexes().getRowIndex() : partial.getIndexes() .getColumnIndex(); @@ -150,8 +150,7 @@ public void processInstruction( ExecutionContext ec ) { corrs.remove(idx); } } - qOut.closeInput(); - }, new StreamContext().addOutStream(qOut)); + }).thenRun(qOut::closeInput); } // full aggregation else { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java index 679e7187e5e..859bca42dfe 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java @@ -1196,7 +1196,7 @@ protected CompletableFuture submitOOCTasks(OOCStream queue, Consume } protected CompletableFuture submitOOCTask(Runnable r, StreamContext ctx) { - ExecutorService pool = CommonThreadPool.get(); + ExecutorService pool = CommonThreadPool.getDynamicPool(); final CompletableFuture future = new CompletableFuture<>(); try { COMPUTE_IN_FLIGHT.incrementAndGet(); @@ -1220,9 +1220,6 @@ protected CompletableFuture submitOOCTask(Runnable r, StreamContext ctx) { COMPUTE_IN_FLIGHT.decrementAndGet(); throw new DMLRuntimeException(ex); } - finally { - pool.shutdown(); - } return future; }