Skip to content

Commit 97458b8

Browse files
authored
[Fix] ClusterExecutor add dependency tests (#924)
* [Fix] ClusterExecutor add dependency tests * fix number of files for TestClusterExecutor * Test adding two depending tasks * fix number of cores * fix folder name * fix slurm test * fix number of files
1 parent 8871044 commit 97458b8

3 files changed

Lines changed: 73 additions & 0 deletions

File tree

tests/unit/executor/test_api.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,23 @@ def test_empty(self):
119119
cache_lst = get_cache_data(cache_directory="rather_this_dir")
120120
self.assertEqual(len(cache_lst), 1)
121121

122+
def test_executor_dependencies(self):
123+
with TestClusterExecutor(cache_directory="cache_dir") as exe:
124+
cloudpickle_register(ind=1)
125+
fs1 = exe.submit(add_with_sleep, 1, 1)
126+
fs2 = exe.submit(add_with_sleep, fs1, 1)
127+
fs3 = exe.submit(add_with_sleep, fs1, fs2)
128+
self.assertFalse(fs1.done())
129+
self.assertFalse(fs2.done())
130+
self.assertFalse(fs3.done())
131+
self.assertEqual(fs1.result(), 2)
132+
self.assertEqual(fs2.result(), 3)
133+
self.assertEqual(fs3.result(), 5)
134+
self.assertEqual(len(os.listdir("cache_dir")), 3)
135+
self.assertTrue(fs1.done())
136+
self.assertTrue(fs2.done())
137+
self.assertTrue(fs3.done())
138+
122139
def test_executor_dependency_plot(self):
123140
with TestClusterExecutor(
124141
plot_dependency_graph=True,

tests/unit/executor/test_flux_cluster.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ def stop_function():
4747
return True
4848

4949

50+
def add_with_sleep(parameter_1, parameter_2):
51+
sleep(1)
52+
return parameter_1 + parameter_2
53+
54+
5055
@unittest.skipIf(
5156
skip_flux_test or skip_mpi4py_test,
5257
"h5py or mpi4py or flux are not installed, so the h5py, flux and mpi4py tests are skipped.",
@@ -81,6 +86,27 @@ def test_executor_blockallocation(self):
8186
self.assertEqual(len(os.listdir("executorlib_cache")), 2)
8287
self.assertTrue(fs1.done())
8388

89+
def test_executor_dependencies(self):
90+
with FluxClusterExecutor(
91+
resource_dict={"cores": 1, "cwd": "executorlib_cache"},
92+
block_allocation=False,
93+
cache_directory="executorlib_cache",
94+
pmi_mode=pmi,
95+
) as exe:
96+
fs1 = exe.submit(add_with_sleep, 1, 1)
97+
fs2 = exe.submit(add_with_sleep, fs1, 1)
98+
fs3 = exe.submit(add_with_sleep, fs1, fs2)
99+
self.assertFalse(fs1.done())
100+
self.assertFalse(fs2.done())
101+
self.assertFalse(fs3.done())
102+
self.assertEqual(fs1.result(), 2)
103+
self.assertEqual(fs2.result(), 3)
104+
self.assertEqual(fs3.result(), 5)
105+
self.assertEqual(len(os.listdir("executorlib_cache")), 6)
106+
self.assertTrue(fs1.done())
107+
self.assertTrue(fs2.done())
108+
self.assertTrue(fs3.done())
109+
84110
def test_executor_blockallocation_echo(self):
85111
with FluxClusterExecutor(
86112
resource_dict={"cores": 1, "cwd": "executorlib_cache"},

tests/unit/executor/test_slurm_cluster.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import importlib
33
import unittest
44
import shutil
5+
from time import sleep
56

67
from executorlib import SlurmClusterExecutor
78
from executorlib.standalone.serialize import cloudpickle_register
@@ -34,6 +35,9 @@
3435
#SBATCH --chdir={{working_directory}}
3536
#SBATCH --get-user-env=L
3637
#SBATCH --ntasks={{cores}}
38+
{%- if dependency_list %}
39+
#SBATCH --dependency=afterok:{{ dependency_list | join(',') }}
40+
{%- endif %}
3741
3842
{{command}}
3943
"""
@@ -47,6 +51,11 @@ def mpi_funct(i):
4751
return i, size, rank
4852

4953

54+
def add_with_sleep(parameter_1, parameter_2):
55+
sleep(1)
56+
return parameter_1 + parameter_2
57+
58+
5059
@unittest.skipIf(
5160
skip_slurm_test or skip_mpi4py_test or skip_h5py_test,
5261
"h5py or mpi4py or SLRUM are not installed, so the h5py, slurm and mpi4py tests are skipped.",
@@ -66,6 +75,27 @@ def test_executor(self):
6675
self.assertEqual(len(os.listdir("executorlib_cache")), 3)
6776
self.assertTrue(fs1.done())
6877

78+
def test_executor_dependencies(self):
79+
with SlurmClusterExecutor(
80+
resource_dict={"cores": 1, "cwd": "executorlib_cache", "submission_template": submission_template},
81+
block_allocation=False,
82+
cache_directory="executorlib_cache",
83+
pmi_mode="pmi2",
84+
) as exe:
85+
fs1 = exe.submit(add_with_sleep, 1, 1)
86+
fs2 = exe.submit(add_with_sleep, fs1, 1)
87+
fs3 = exe.submit(add_with_sleep, fs1, fs2)
88+
self.assertFalse(fs1.done())
89+
self.assertFalse(fs2.done())
90+
self.assertFalse(fs3.done())
91+
self.assertEqual(fs1.result(), 2)
92+
self.assertEqual(fs2.result(), 3)
93+
self.assertEqual(fs3.result(), 5)
94+
self.assertEqual(len(os.listdir("executorlib_cache")), 5)
95+
self.assertTrue(fs1.done())
96+
self.assertTrue(fs2.done())
97+
self.assertTrue(fs3.done())
98+
6999
def test_executor_no_cwd(self):
70100
with SlurmClusterExecutor(
71101
resource_dict={"cores": 2, "submission_template": submission_template},

0 commit comments

Comments
 (0)