Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.ooc.stream.StreamContext;
import org.apache.sysds.runtime.util.IndexRange;

import java.util.HashMap;
Expand Down Expand Up @@ -119,9 +118,10 @@ public void processInstruction( ExecutionContext ec ) {
});

// global reduce
submitOOCTask(() -> {
IndexedMatrixValue partial;
while ((partial = qLocal.dequeue()) != LocalTaskQueue.NO_MORE_TASKS) {
addOutStream(qOut);
submitOOCTasks(qLocal, callback -> {
IndexedMatrixValue partial = callback.get();
synchronized(aggTracker) {
long idx = aggun.isRowAggregate() ? partial.getIndexes().getRowIndex() : partial.getIndexes()
.getColumnIndex();

Expand Down Expand Up @@ -150,8 +150,7 @@ public void processInstruction( ExecutionContext ec ) {
corrs.remove(idx);
}
}
qOut.closeInput();
}, new StreamContext().addOutStream(qOut));
}).thenRun(qOut::closeInput);
}
// full aggregation
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1196,7 +1196,7 @@ protected <T> CompletableFuture<Void> submitOOCTasks(OOCStream<T> queue, Consume
}

protected CompletableFuture<Void> submitOOCTask(Runnable r, StreamContext ctx) {
ExecutorService pool = CommonThreadPool.get();
ExecutorService pool = CommonThreadPool.getDynamicPool();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not make a difference, The CommonThreadPool.get() should be the right way to get our shared threadpool that does get killed with the Java process.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should make a difference because submitOOCTask may be used for blocking work (e.g. dequeue()). If we now use a fixed sized thread pool (which especially causes problems in environments with fewer enabled CPUs thus smaller number of workers on the default thread pool), this may block scheduled tasks from making progress that are needed for those workers to unblock.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay did not know you had more tasks than threads in the pool.

final CompletableFuture<Void> future = new CompletableFuture<>();
try {
COMPUTE_IN_FLIGHT.incrementAndGet();
Expand All @@ -1220,9 +1220,6 @@ protected CompletableFuture<Void> submitOOCTask(Runnable r, StreamContext ctx) {
COMPUTE_IN_FLIGHT.decrementAndGet();
throw new DMLRuntimeException(ex);
}
finally {
pool.shutdown();
}

return future;
}
Expand Down
Loading