From afdf250443eca04068c04407a6f0afe2931872a8 Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Thu, 11 Jun 2026 21:19:20 +0000 Subject: [PATCH 1/6] Raise Java test job timeout to 60 minutes The component.c** suite cannot finish within the 30-minute GitHub Actions job cap. The cap cancels the whole job before surefire's per-fork timeout (test-forkedProcessTimeout, 600s) can kill hung forks and run the remaining tests in the suite. Raise the job cap to 60 minutes so surefire's kill-and-continue has room to complete the suite instead of GitHub Actions cancelling the job mid-run. --- .github/workflows/javaTests.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/javaTests.yml b/.github/workflows/javaTests.yml index 0d6ac02fb82..c5c7b126110 100644 --- a/.github/workflows/javaTests.yml +++ b/.github/workflows/javaTests.yml @@ -50,7 +50,10 @@ concurrency: jobs: java_tests: runs-on: ubuntu-24.04 - timeout-minutes: 30 + # Job cap kept above the per-fork surefire timeout (test-forkedProcessTimeout, + # 600s) so surefire can kill hung forks and finish the remaining tests in a + # suite before GitHub Actions cancels the whole job. + timeout-minutes: 60 strategy: fail-fast: false matrix: From c7126ef65e5504bf05d244c65ee894c77cdf721f Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Fri, 12 Jun 2026 16:36:14 +0000 Subject: [PATCH 2/6] Prevent leaked federated workers from hanging test forks In-JVM federated workers (used by the lineage federated tests) ran on non-daemon threads, and their Netty boss/worker event loops were also non-daemon. When a test failed before calling shutdownThreads(...), the leaked worker kept the surefire fork JVM alive until the CI job timeout cancelled the whole job. - FederatedWorker: create the Netty event-loop groups with daemon threads. A standalone worker process keeps the JVM alive via its main thread, so this only affects in-JVM (test) workers, where it lets a leaked worker no longer block JVM exit. - AutomatedTestBase: mark the in-JVM worker wrapper threads as daemon. - TestUtils.shutdownThread: bound the join() so cleanup cannot block indefinitely if a worker ignores the interrupt. - LineageFedReuseAlg, FedFullReuseTest, FedUDFReuseTest: shut down workers in a finally block so they are reaped on the failure path. --- .../federated/FederatedWorker.java | 10 ++- .../apache/sysds/test/AutomatedTestBase.java | 6 ++ .../java/org/apache/sysds/test/TestUtils.java | 12 ++- .../functions/lineage/FedFullReuseTest.java | 88 ++++++++++--------- .../functions/lineage/FedUDFReuseTest.java | 74 ++++++++-------- .../functions/lineage/LineageFedReuseAlg.java | 6 +- 6 files changed, 110 insertions(+), 86 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java index 55f2f17cd8a..91937500766 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java @@ -65,6 +65,7 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; +import io.netty.util.concurrent.DefaultThreadFactory; @SuppressWarnings("deprecation") public class FederatedWorker { @@ -99,9 +100,14 @@ private void run() { LOG.info("Setting up Federated Worker on port " + _port); int par_conn = ConfigurationManager.getDMLConfig().getIntValue(DMLConfig.FEDERATED_PAR_CONN); final int EVENT_LOOP_THREADS = (par_conn > 0) ? par_conn : InfrastructureAnalyzer.getLocalParallelism(); - NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); + // Use daemon threads for the Netty event loops. When the worker runs in-JVM (e.g. in tests) + // this guarantees a leaked worker can never keep the JVM alive after the owning thread is + // gone. In a standalone worker process the main thread keeps the JVM alive and drives the + // lifecycle, so daemon event loops have no effect on production shutdown. + NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, + new DefaultThreadFactory("fed-worker-boss", true)); ThreadPoolExecutor workerTPE = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, - new SynchronousQueue(true)); + new SynchronousQueue(true), new DefaultThreadFactory("fed-worker-pool", true)); NioEventLoopGroup workerGroup = new NioEventLoopGroup(EVENT_LOOP_THREADS, workerTPE); final boolean ssl = ConfigurationManager.isFederatedSSL(); diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java index 150a358bdf0..2ff98c921ea 100644 --- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java +++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java @@ -1939,6 +1939,9 @@ private static Thread spawnLocalFedWorkerThread(int port, String[] otherArgs) { LOG.error("Exception in startup of federated worker", e); } }); + // Daemon so a worker left running by a failed/forgetful test cannot keep the + // surefire fork JVM alive and stall CI until the job-level timeout. + t.setDaemon(true); t.start(); return t; } @@ -1979,6 +1982,9 @@ public static Thread startLocalFedWorkerWithArgs(String[] args) { LOG.error("Exception in startup of federated worker on port " + port, e); } }); + // Daemon so a worker left running by a failed/forgetful test cannot keep the + // surefire fork JVM alive and stall CI until the job-level timeout. + t.setDaemon(true); t.start(); FederatedWorkerUtils.waitForWorker(t, port, FED_WORKER_WAIT); return t; diff --git a/src/test/java/org/apache/sysds/test/TestUtils.java b/src/test/java/org/apache/sysds/test/TestUtils.java index 5ebc243dd44..2394a5d7633 100644 --- a/src/test/java/org/apache/sysds/test/TestUtils.java +++ b/src/test/java/org/apache/sysds/test/TestUtils.java @@ -3489,15 +3489,23 @@ public static void shutdownThreads(Process... ts) { } } + /** Upper bound (ms) on how long {@link #shutdownThread(Thread)} waits for a worker to stop. */ + private static final long THREAD_SHUTDOWN_JOIN_MS = 30_000; + public static void shutdownThread(Thread t) { // kill the worker if( t != null ) { t.interrupt(); try { - t.join(); + // Bounded join: workers are daemon threads, so even if one ignores the interrupt + // we must not block cleanup (and the JVM) indefinitely waiting for it. + t.join(THREAD_SHUTDOWN_JOIN_MS); + if( t.isAlive() ) + LOG.warn("Federated worker thread " + t.getName() + + " did not stop within " + THREAD_SHUTDOWN_JOIN_MS + "ms; leaving it as a daemon."); } catch (InterruptedException e) { - e.printStackTrace(); + Thread.currentThread().interrupt(); } } } diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/FedFullReuseTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/FedFullReuseTest.java index 0c46cd68ea5..4852220861e 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/FedFullReuseTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/FedFullReuseTest.java @@ -105,50 +105,52 @@ public void federatedReuse(String test) { Lineage.resetInternalState(); Thread[] workers = startLocalFedWorkerThreads(new int[] {port1, port2}, otherargs, FED_WORKER_WAIT); - TestConfiguration config = availableTestConfigurations.get(test); - loadTestConfiguration(config); - - // Run reference dml script with normal matrix. Reuse of ba+*. - fullDMLScriptName = HOME + test + "Reference.dml"; - programArgs = new String[] {"-stats", "-lineage", "reuse_full", - "-nvargs", "X1=" + input("X1"), "X2=" + input("X2"), "Y1=" + input("Y1"), - "Y2=" + input("Y2"), "Z=" + expected("Z")}; - runTest(true, false, null, -1); - long mmCount = Statistics.getCPHeavyHitterCount(Opcodes.MMULT.toString()); - - // Run actual dml script with federated matrix - // The fed workers reuse ba+* - fullDMLScriptName = HOME + test + ".dml"; - programArgs = new String[] {"-stats","-lineage", "reuse_full", - "-nvargs", "X1=" + TestUtils.federatedAddress(port1, input("X1")), - "X2=" + TestUtils.federatedAddress(port2, input("X2")), - "Y1=" + TestUtils.federatedAddress(port1, input("Y1")), - "Y2=" + TestUtils.federatedAddress(port2, input("Y2")), "r=" + rows, "c=" + cols, "Z=" + output("Z")}; - runTest(true, false, null, -1); - long mmCount_fed = Statistics.getCPHeavyHitterCount(Opcodes.MMULT.toString()); - long fedMMCount = Statistics.getCPHeavyHitterCount("fed_ba+*"); - - // compare results - compareResults(1e-9); - // compare matrix multiplication count - // #federated execution of ba+* = #threads times #non-federated execution of ba+* (after reuse) - Assert.assertTrue("Violated reuse count: "+mmCount_fed+" == "+mmCount*2, - mmCount_fed == mmCount * 2); // #threads = 2 - switch(test) { - case TEST_NAME1: - // If the o/p is federated, fed_ba+* will be called everytime - // but the workers should be able to reuse ba+* - assertTrue(fedMMCount > mmCount_fed); - break; - case TEST_NAME2: - // If the o/p is non-federated, fed_ba+* will be called once - // and each worker will call ba+* once. - assertTrue(fedMMCount < mmCount_fed); - break; + try { + TestConfiguration config = availableTestConfigurations.get(test); + loadTestConfiguration(config); + + // Run reference dml script with normal matrix. Reuse of ba+*. + fullDMLScriptName = HOME + test + "Reference.dml"; + programArgs = new String[] {"-stats", "-lineage", "reuse_full", + "-nvargs", "X1=" + input("X1"), "X2=" + input("X2"), "Y1=" + input("Y1"), + "Y2=" + input("Y2"), "Z=" + expected("Z")}; + runTest(true, false, null, -1); + long mmCount = Statistics.getCPHeavyHitterCount(Opcodes.MMULT.toString()); + + // Run actual dml script with federated matrix + // The fed workers reuse ba+* + fullDMLScriptName = HOME + test + ".dml"; + programArgs = new String[] {"-stats","-lineage", "reuse_full", + "-nvargs", "X1=" + TestUtils.federatedAddress(port1, input("X1")), + "X2=" + TestUtils.federatedAddress(port2, input("X2")), + "Y1=" + TestUtils.federatedAddress(port1, input("Y1")), + "Y2=" + TestUtils.federatedAddress(port2, input("Y2")), "r=" + rows, "c=" + cols, "Z=" + output("Z")}; + runTest(true, false, null, -1); + long mmCount_fed = Statistics.getCPHeavyHitterCount(Opcodes.MMULT.toString()); + long fedMMCount = Statistics.getCPHeavyHitterCount("fed_ba+*"); + + // compare results + compareResults(1e-9); + // compare matrix multiplication count + // #federated execution of ba+* = #threads times #non-federated execution of ba+* (after reuse) + Assert.assertTrue("Violated reuse count: "+mmCount_fed+" == "+mmCount*2, + mmCount_fed == mmCount * 2); // #threads = 2 + switch(test) { + case TEST_NAME1: + // If the o/p is federated, fed_ba+* will be called everytime + // but the workers should be able to reuse ba+* + assertTrue(fedMMCount > mmCount_fed); + break; + case TEST_NAME2: + // If the o/p is non-federated, fed_ba+* will be called once + // and each worker will call ba+* once. + assertTrue(fedMMCount < mmCount_fed); + break; + } + } + finally { + TestUtils.shutdownThreads(workers); } - - - TestUtils.shutdownThreads(workers); } } diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/FedUDFReuseTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/FedUDFReuseTest.java index 0cf9d972719..eca3628a89b 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/FedUDFReuseTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/FedUDFReuseTest.java @@ -110,43 +110,45 @@ private void runTriUDFReuse(ExecMode execMode) { Lineage.resetInternalState(); Thread[] workers = startLocalFedWorkerThreads(new int[] {port1, port2, port3, port4}, otherargs, FED_WORKER_WAIT); - rtplatform = execMode; - if(rtplatform == ExecMode.SPARK) { - System.out.println(7); - DMLScript.USE_LOCAL_SPARK_CONFIG = true; + try { + rtplatform = execMode; + if(rtplatform == ExecMode.SPARK) { + System.out.println(7); + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + } + TestConfiguration config = availableTestConfigurations.get(TEST_NAME); + loadTestConfiguration(config); + + // Run reference dml script with normal matrix + fullDMLScriptName = HOME + TEST_NAME + "Reference.dml"; + programArgs = new String[] {"-lineage", "reuse_full", "-stats", "100", "-args", + input("X1"), input("X2"), input("X3"), input("X4"), + Boolean.toString(rowPartitioned).toUpperCase(), expected("S")}; + runTest(null); + + // Run actual dml script with federated matrix + fullDMLScriptName = HOME + TEST_NAME + ".dml"; + programArgs = new String[] {"-lineage", "reuse_full", "-stats", "100", "-nvargs", + "in_X1=" + TestUtils.federatedAddress(port1, input("X1")), + "in_X2=" + TestUtils.federatedAddress(port2, input("X2")), + "in_X3=" + TestUtils.federatedAddress(port3, input("X3")), + "in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "rows=" + rows, "cols=" + cols, + "rP=" + Boolean.toString(rowPartitioned).toUpperCase(), "out_S=" + output("S")}; + + runTest(null); + + // compare via files + compareResults(1e-9); + // check if lowertri is federated + Assert.assertTrue(heavyHittersContainsString("fed_lowertri")); + // assert reuse count + Assert.assertTrue(LineageCacheStatistics.getInstHits() > 0); + } + finally { + TestUtils.shutdownThreads(workers); + rtplatform = platformOld; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; } - TestConfiguration config = availableTestConfigurations.get(TEST_NAME); - loadTestConfiguration(config); - - // Run reference dml script with normal matrix - fullDMLScriptName = HOME + TEST_NAME + "Reference.dml"; - programArgs = new String[] {"-lineage", "reuse_full", "-stats", "100", "-args", - input("X1"), input("X2"), input("X3"), input("X4"), - Boolean.toString(rowPartitioned).toUpperCase(), expected("S")}; - runTest(null); - - // Run actual dml script with federated matrix - fullDMLScriptName = HOME + TEST_NAME + ".dml"; - programArgs = new String[] {"-lineage", "reuse_full", "-stats", "100", "-nvargs", - "in_X1=" + TestUtils.federatedAddress(port1, input("X1")), - "in_X2=" + TestUtils.federatedAddress(port2, input("X2")), - "in_X3=" + TestUtils.federatedAddress(port3, input("X3")), - "in_X4=" + TestUtils.federatedAddress(port4, input("X4")), "rows=" + rows, "cols=" + cols, - "rP=" + Boolean.toString(rowPartitioned).toUpperCase(), "out_S=" + output("S")}; - - runTest(null); - - // compare via files - compareResults(1e-9); - // check if lowertri is federated - Assert.assertTrue(heavyHittersContainsString("fed_lowertri")); - // assert reuse count - Assert.assertTrue(LineageCacheStatistics.getInstHits() > 0); - - TestUtils.shutdownThreads(workers); - - rtplatform = platformOld; - DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; } } diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageFedReuseAlg.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageFedReuseAlg.java index b8a66196670..4fe5cc0921a 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageFedReuseAlg.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageFedReuseAlg.java @@ -69,6 +69,7 @@ public void federatedLmPipeline(ExecMode execMode, boolean contSplits, String TE getAndLoadTestConfiguration(TEST_NAME); String HOME = SCRIPT_DIR + TEST_DIR; + Thread[] workers = null; try { // generated lm data MatrixBlock X = MatrixBlock.randOperations(rows, cols, 1.0, 0, 1, "uniform", 7); @@ -93,7 +94,7 @@ public void federatedLmPipeline(ExecMode execMode, boolean contSplits, String TE int port3 = getRandomAvailablePort(); int port4 = getRandomAvailablePort(); String[] otherargs = new String[] {"-lineage", "reuse_full"}; - Thread[] workers = startLocalFedWorkerThreads(new int[] {port1, port2}, otherargs, FED_WORKER_WAIT); + workers = startLocalFedWorkerThreads(new int[] {port1, port2}, otherargs, FED_WORKER_WAIT); TestConfiguration config = availableTestConfigurations.get(TEST_NAME); loadTestConfiguration(config); @@ -134,10 +135,9 @@ public void federatedLmPipeline(ExecMode execMode, boolean contSplits, String TE assertTrue(fed_tsmmCount > fed_tsmmCount_reuse); assertTrue(mmCount > mmCount_reuse); assertTrue(fed_mmCount > fed_mmCount_reuse); - - TestUtils.shutdownThreads(workers); } finally { + TestUtils.shutdownThreads(workers); resetExecMode(oldExec); ColumnEncoderRecode.SORT_RECODE_MAP = oldSort; } From d82adbefd997b2c54a57143943e4787e7cc5a00a Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Mon, 15 Jun 2026 09:27:00 +0000 Subject: [PATCH 3/6] Shorten daemon event-loop comment in FederatedWorker --- .../runtime/controlprogram/federated/FederatedWorker.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java index 91937500766..fc8989053bc 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java @@ -100,10 +100,7 @@ private void run() { LOG.info("Setting up Federated Worker on port " + _port); int par_conn = ConfigurationManager.getDMLConfig().getIntValue(DMLConfig.FEDERATED_PAR_CONN); final int EVENT_LOOP_THREADS = (par_conn > 0) ? par_conn : InfrastructureAnalyzer.getLocalParallelism(); - // Use daemon threads for the Netty event loops. When the worker runs in-JVM (e.g. in tests) - // this guarantees a leaked worker can never keep the JVM alive after the owning thread is - // gone. In a standalone worker process the main thread keeps the JVM alive and drives the - // lifecycle, so daemon event loops have no effect on production shutdown. + // Daemon event loops so a leaked in-JVM (test) worker cannot block JVM exit. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("fed-worker-boss", true)); ThreadPoolExecutor workerTPE = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, From e2f478616983538c7b900623d6a0dc192f902048 Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Mon, 15 Jun 2026 11:48:14 +0000 Subject: [PATCH 4/6] Use daemon threads for CommonThreadPool fallback pools CommonThreadPool.get(k) returns a ForkJoinPool (daemon threads) for main/PARFOR/FedExec threads, but falls back to a plain Executors.newFixedThreadPool(k) for any other caller thread, and getDynamicPool() uses Executors.newCachedThreadPool(); both default to non-daemon threads. Under parallel test execution the test thread is not the master thread, so component tests that compress directly hit the fallback path. A pool left unshut (e.g. stored as a field or skipped on an exception path) then keeps its non-daemon threads alive and blocks the surefire fork JVM from exiting, stalling the suite until the CI job timeout. Make both fallback pools use daemon threads, matching the ForkJoinPool paths. Daemon-ness only affects JVM shutdown: pending work keeps the submitting thread alive, so this cannot terminate in-flight tasks early. --- .../sysds/runtime/util/CommonThreadPool.java | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/util/CommonThreadPool.java b/src/main/java/org/apache/sysds/runtime/util/CommonThreadPool.java index 3ee08da0def..156a7820f86 100644 --- a/src/main/java/org/apache/sysds/runtime/util/CommonThreadPool.java +++ b/src/main/java/org/apache/sysds/runtime/util/CommonThreadPool.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -141,11 +142,26 @@ else if(mainThread || threadName.contains("PARFOR") || threadName.contains("FedE incorrectPoolUse = true; } - return Executors.newFixedThreadPool(k); + return Executors.newFixedThreadPool(k, daemonThreadFactory()); } } + /** + * Thread factory that produces daemon threads. The ForkJoinPool-backed pools already use daemon + * threads; the fallback {@link Executors#newFixedThreadPool} and {@link Executors#newCachedThreadPool} + * pools default to non-daemon threads, which can keep the JVM (e.g. a surefire test fork) alive + * if a caller forgets to shut the pool down. Making them daemon keeps that behavior uniform. + */ + private static ThreadFactory daemonThreadFactory() { + final ThreadFactory base = Executors.defaultThreadFactory(); + return r -> { + Thread t = base.newThread(r); + t.setDaemon(true); + return t; + }; + } + /** * Invoke the collection of tasks and shutdown the pool upon job termination. * @@ -180,7 +196,7 @@ public synchronized static ExecutorService getDynamicPool() { // It is guaranteed not to be shut down because of the synchronized barrier return asyncPool; else { - asyncPool = Executors.newCachedThreadPool(); + asyncPool = Executors.newCachedThreadPool(daemonThreadFactory()); return asyncPool; } } From ae14a165f390f6572d188b77adfa97f5ac4a7a40 Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Mon, 15 Jun 2026 14:08:45 +0000 Subject: [PATCH 5/6] Revert Java test job timeout to 30 minutes Restore the 30-minute job cap now that the underlying thread-leak hangs are fixed (daemon Netty event loops, daemon CommonThreadPool fallback pools, and bounded test worker cleanup). The 600s per-fork surefire timeout still operates well within this cap, so a genuinely hung fork is killed by surefire before GitHub Actions cancels the job, restoring fast feedback on real hangs. --- .github/workflows/javaTests.yml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/.github/workflows/javaTests.yml b/.github/workflows/javaTests.yml index c5c7b126110..0d4c71e946b 100644 --- a/.github/workflows/javaTests.yml +++ b/.github/workflows/javaTests.yml @@ -51,9 +51,8 @@ jobs: java_tests: runs-on: ubuntu-24.04 # Job cap kept above the per-fork surefire timeout (test-forkedProcessTimeout, - # 600s) so surefire can kill hung forks and finish the remaining tests in a - # suite before GitHub Actions cancels the whole job. - timeout-minutes: 60 + # 600s) so surefire can kill a hung fork before GitHub Actions cancels the job. + timeout-minutes: 30 strategy: fail-fast: false matrix: From a8fc528e821849c1b1c04c8d87e4b620a4db4505 Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Tue, 16 Jun 2026 10:05:25 +0000 Subject: [PATCH 6/6] Restore interrupt status in swallowed InterruptedException handlers Re-assert the interrupt flag (Thread.currentThread().interrupt()) instead of swallowing InterruptedException or only printing a stack trace, so the cancellation signal is not lost for callers up the stack. - TestUtils.shutdownThread(Process): log and restore interrupt instead of printStackTrace, matching the Thread overload - OOCMatrixIOHandler: restore interrupt in shutdown and scheduleEviction instead of silently ignoring - FederatedBackendPerformanceTest: split the InterruptedException/ ExecutionException multi-catch so the test fails instead of swallowing, and restore interrupt on interruption - Base, FederatedMatrixScalarOperationsTest, GenMatrices, dev release Utility: restore interrupt instead of printStackTrace/swallowing --- .../src/test/java/org/apache/sysds/validation/Utility.java | 1 + .../apache/sysds/runtime/ooc/cache/OOCMatrixIOHandler.java | 6 ++++-- .../apache/sysds/performance/generators/GenMatrices.java | 2 +- src/test/java/org/apache/sysds/test/TestUtils.java | 3 ++- .../monitoring/FederatedBackendPerformanceTest.java | 7 +++++-- .../part5/FederatedMatrixScalarOperationsTest.java | 2 +- src/test/java/org/apache/sysds/test/usertest/Base.java | 2 +- 7 files changed, 15 insertions(+), 8 deletions(-) diff --git a/dev/release/src/test/java/org/apache/sysds/validation/Utility.java b/dev/release/src/test/java/org/apache/sysds/validation/Utility.java index 34f5eeaf92b..da23b8e40d0 100644 --- a/dev/release/src/test/java/org/apache/sysds/validation/Utility.java +++ b/dev/release/src/test/java/org/apache/sysds/validation/Utility.java @@ -185,6 +185,7 @@ public static int runCommand(String [] command, String strCurDir, String strOutp try { exitValue = process.waitFor(); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); debugPrint(Constants.DEBUG_ERROR, "Program interrunpted: " + ie); } debugPrint(Constants.DEBUG_CODE, "Program '" + String.join(" ", command) + "' exited with exit status " + exitValue, strOutputFile); diff --git a/src/main/java/org/apache/sysds/runtime/ooc/cache/OOCMatrixIOHandler.java b/src/main/java/org/apache/sysds/runtime/ooc/cache/OOCMatrixIOHandler.java index 3146439165f..7509b669701 100644 --- a/src/main/java/org/apache/sysds/runtime/ooc/cache/OOCMatrixIOHandler.java +++ b/src/main/java/org/apache/sysds/runtime/ooc/cache/OOCMatrixIOHandler.java @@ -148,7 +148,8 @@ public void shutdown() { _q[i].close(); } } - catch(InterruptedException ignored) { + catch(InterruptedException e) { + Thread.currentThread().interrupt(); } } _writeExec.getQueue().clear(); @@ -174,7 +175,8 @@ public CompletableFuture scheduleEviction(BlockEntry block) { int i = (int)(q % WRITER_SIZE); _q[i].enqueueIfOpen(new Tuple2<>(block, future)); } - catch(InterruptedException ignored) { + catch(InterruptedException e) { + Thread.currentThread().interrupt(); } return future; diff --git a/src/test/java/org/apache/sysds/performance/generators/GenMatrices.java b/src/test/java/org/apache/sysds/performance/generators/GenMatrices.java index f96233ae6fe..9ae1a0f1048 100644 --- a/src/test/java/org/apache/sysds/performance/generators/GenMatrices.java +++ b/src/test/java/org/apache/sysds/performance/generators/GenMatrices.java @@ -72,7 +72,7 @@ public void generate(int N) throws InterruptedException { } } catch(InterruptedException e) { - e.printStackTrace(); + Thread.currentThread().interrupt(); } }); } diff --git a/src/test/java/org/apache/sysds/test/TestUtils.java b/src/test/java/org/apache/sysds/test/TestUtils.java index 2394a5d7633..683d355e05c 100644 --- a/src/test/java/org/apache/sysds/test/TestUtils.java +++ b/src/test/java/org/apache/sysds/test/TestUtils.java @@ -3522,7 +3522,8 @@ public static void shutdownThread(Process t) { forciblyDestroyed.waitFor(); // Wait until it's definitely terminated } } catch (InterruptedException e) { - e.printStackTrace(); + LOG.warn("Interrupted while shutting down federated worker process", e); + Thread.currentThread().interrupt(); } } } diff --git a/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedBackendPerformanceTest.java b/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedBackendPerformanceTest.java index df886fc0086..5de429a3c53 100644 --- a/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedBackendPerformanceTest.java +++ b/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedBackendPerformanceTest.java @@ -91,8 +91,11 @@ public void testBackendPerformance() throws InterruptedException { taskFutures.forEach(res -> { try { Assert.assertEquals("Stats parsed correctly", res.get().statusCode(), 200); - } catch (InterruptedException | ExecutionException e) { - e.printStackTrace(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + Assert.fail("Interrupted while fetching statistics: " + e.getMessage()); + } catch (ExecutionException e) { + Assert.fail("Failed to fetch statistics: " + e.getMessage()); } }); diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/part5/FederatedMatrixScalarOperationsTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/part5/FederatedMatrixScalarOperationsTest.java index 84b906b9a49..4c5cc1682cf 100644 --- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/part5/FederatedMatrixScalarOperationsTest.java +++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/part5/FederatedMatrixScalarOperationsTest.java @@ -209,7 +209,7 @@ private void runGenericTest(String dmlFile, int scalar) { compareResults(); } catch(InterruptedException e) { - e.printStackTrace(); + Thread.currentThread().interrupt(); assert (false); } finally { diff --git a/src/test/java/org/apache/sysds/test/usertest/Base.java b/src/test/java/org/apache/sysds/test/usertest/Base.java index 4a7f64824d5..fd9ac743eeb 100644 --- a/src/test/java/org/apache/sysds/test/usertest/Base.java +++ b/src/test/java/org/apache/sysds/test/usertest/Base.java @@ -98,7 +98,7 @@ public static Pair runThread(String[] args) { t.join(); } catch(InterruptedException e) { - e.printStackTrace(); + Thread.currentThread().interrupt(); } System.setOut(old);