Belle II Software  release-05-01-25
test_backends.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 """
5 Unit tests for the backends.py classes and functions. This is a little difficult to test due to
6 the Batch system backends not really being testable when not running on specific hosts with bsub/qsub
7 installed. But we can try to implement tests using the Local multiprocessing backend and testing the
8 basic behaviour of the classes so that they will fail if we modify assumptions in the future.
9 """
10 
11 from basf2 import find_file
12 
13 import unittest
14 from unittest import TestCase
15 import shutil
16 from pathlib import Path
17 
18 from caf.backends import ArgumentsSplitter, Job, MaxFilesSplitter, MaxSubjobsSplitter
19 from caf.backends import ArgumentsGenerator, range_arguments, SplitterError
20 
21 # A testing directory so that we can do cleanup
22 test_dir = Path("test_backends").absolute()
23 # A simple bash script for testing jobs
24 test_script = Path(find_file("calibration/examples/job_submission/test_script.sh")).absolute()
25 
26 
27 class TestJob(TestCase):
28  """
29  UnitTest for the `caf.backends.Job` class
30  """
31 
32  def setUp(self):
33  """
34  Create useful objects for each test and the teardown
35  """
36  # We will deliberately use strings and not Path objects for the job attributes so that we can later
37  # check that they are converted to Paths by the class itself.
38  name1 = 'TestJob1'
39  job1 = Job(name1) # Set up this one manually setting attributes
40  job1.working_dir = Path(test_dir, job1.name, "working_dir").absolute().as_posix()
41  job1.output_dir = Path(test_dir, job1.name, "output_dir").absolute().as_posix()
42  job1.cmd = ["bash", test_script.name]
43  job1.input_sandbox_files = [test_script.as_posix()]
44  self.job1 = job1
45 
46  name2 = 'TestJob2'
47  job_dict = {}
48  job_dict["name"] = name2
49  job_dict["working_dir"] = Path(test_dir, name2, "working_dir").as_posix()
50  job_dict["output_dir"] = Path(test_dir, name2, "output_dir").as_posix()
51  job_dict["output_patterns"] = []
52  job_dict["cmd"] = ["bash", test_script.name]
53  job_dict["args"] = []
54  job_dict["input_sandbox_files"] = [test_script.as_posix()]
55  job_dict["input_files"] = []
56  job_dict["setup_cmds"] = []
57  job_dict["backend_args"] = {}
58  job_dict["subjobs"] = [{"id": i, "input_files": [], "args": [str(i)]} for i in range(4)]
59  self.job2_dict = job_dict
60  self.job2 = Job(name2, job_dict=job_dict) # Set up this one from a dictionary
61 
62  # Create a directory just in case we need it for each test so that we can delete everything easily at the end
63  test_dir.mkdir(parents=True, exist_ok=False)
64 
65  def test_dict_setup(self):
66  self.maxDiff = None # If this test fails you will need to see the diff of a large dictionary
67  self.assertEqual(len(self.job2.subjobs), 4)
68  self.assertEqual(self.job2_dict, self.job2.job_dict)
69  self.job2_dict["subjobs"].pop()
70  del self.job2.subjobs[3]
71  self.assertEqual(self.job2_dict, self.job2.job_dict)
72 
73  def test_job_json_serialise(self):
74  json_path = Path(test_dir, "job2.json")
75  self.job2.dump_to_json(json_path)
76  job2_copy = Job.from_json(json_path)
77  self.assertEqual(self.job2.job_dict, job2_copy.job_dict)
78 
79  def test_status(self):
80  """
81  The Jobs haven't been run so they should be in the 'init' status.
82  They also shouldn't throw exceptions due to missing result objects.
83  """
84  self.assertEqual(self.job1.status, "init")
85  self.assertEqual(self.job2.status, "init")
86  self.assertFalse(self.job1.ready())
87  self.assertFalse(self.job2.ready())
88  self.assertEqual(self.job1.update_status(), "init")
89  self.assertEqual(self.job2.update_status(), "init")
90  for subjob in self.job2.subjobs.values():
91  self.assertEqual(subjob.status, "init")
92  self.assertFalse(subjob.ready())
93  self.assertEqual(subjob.update_status(), "init")
94 
96  """
97  Make sure that the two ways of setting up Job objects correctly converted attributes to be Paths instead of strings.
98  """
99  self.assertIsInstance(self.job1.output_dir, Path)
100  self.assertIsInstance(self.job1.working_dir, Path)
101  for path in self.job1.input_sandbox_files:
102  self.assertIsInstance(path, Path)
103  for path in self.job1.input_files:
104  self.assertIsInstance(path, Path)
105 
106  self.assertIsInstance(self.job2.output_dir, Path)
107  self.assertIsInstance(self.job2.working_dir, Path)
108  for path in self.job2.input_sandbox_files:
109  self.assertIsInstance(path, Path)
110  for path in self.job2.input_files:
111  self.assertIsInstance(path, Path)
112 
113  for subjob in self.job2.subjobs.values():
114  self.assertIsInstance(subjob.output_dir, Path)
115  self.assertIsInstance(subjob.working_dir, Path)
116 
118  """
119  Test the creation of SubJobs and assignment of input data files via splitter classes.
120  """
121  self.assertIsNone(self.job1.splitter)
122  self.assertIsNone(self.job2.splitter)
123  # Set the splitter for job1
124  self.job1.max_files_per_subjob = 2
125  self.assertIsInstance(self.job1.splitter, MaxFilesSplitter)
126  self.assertEqual(self.job1.splitter.max_files_per_subjob, 2)
127  self.job1.max_subjobs = 3
128  self.assertIsInstance(self.job1.splitter, MaxSubjobsSplitter)
129  self.assertEqual(self.job1.splitter.max_subjobs, 3)
130 
131  # Generate some empty input files
132  for i in range(5):
133  input_file = Path(test_dir, f"{i}.txt")
134  input_file.touch(exist_ok=False)
135  self.job1.input_files.append(input_file)
136 
137  self.job1.splitter = MaxFilesSplitter(max_files_per_subjob=2)
138  self.job1.splitter.create_subjobs(self.job1)
139  self.assertEqual(len(self.job1.subjobs), 3) # Did the splitter create the number of jobs we expect?
140  for i, subjob in self.job1.subjobs.items():
141  self.assertTrue((len(subjob.input_files) == 2 or len(subjob.input_files) == 1))
142 
143  self.job1.subjobs = {}
144  self.job1.splitter = MaxSubjobsSplitter(max_subjobs=4)
145  self.job1.splitter.create_subjobs(self.job1)
146  self.assertEqual(len(self.job1.subjobs), 4) # Did the splitter create the number of jobs we expect?
147  for i, subjob in self.job1.subjobs.items():
148  self.assertTrue((len(subjob.input_files) == 2 or len(subjob.input_files) == 1))
149 
150  # Does the ArgumentSplitter create jobs
151  self.job1.subjobs = {}
152  arg_gen = ArgumentsGenerator(range_arguments, 3, stop=12, step=2)
153  self.job1.splitter = ArgumentsSplitter(arguments_generator=arg_gen, max_subjobs=10)
154  self.job1.splitter.create_subjobs(self.job1)
155  self.assertEqual(len(self.job1.subjobs), 5)
156  for (i, subjob), arg in zip(self.job1.subjobs.items(), range(3, 12, 2)):
157  # Does each subjob receive the correct setup
158  self.assertEqual(self.job1.input_files, subjob.input_files)
159  self.assertEqual(arg, subjob.args[0])
160 
161  # Does max_jobs prevent infinite subjob numbers
162  self.job1.subjobs = {}
163  self.job1.splitter = ArgumentsSplitter(arguments_generator=arg_gen, max_subjobs=2)
164  self.assertRaises(SplitterError, self.job1.splitter.create_subjobs, self.job1)
165 
167  """
168  Does the copy of files/directories for the input sandbox work correctly?
169  """
170  # We create a directory to add to the Job's input sandbox list, to tes if directories + contents are copied.
171  input_sandbox_dir = Path(test_dir, "test_input_sandbox_dir")
172  input_sandbox_dir.mkdir(parents=True, exist_ok=False)
173  for i in range(2):
174  input_file = Path(input_sandbox_dir, f"{i}.txt")
175  input_file.touch(exist_ok=False)
176  # Instead of identifying every file, we just use the whole directory
177  self.job1.input_sandbox_files.append(input_sandbox_dir)
178  # Manually create the working dir first (normally done by the Backend)
179  self.job1.working_dir.mkdir(parents=True, exist_ok=False)
180  self.job1.copy_input_sandbox_files_to_working_dir()
181 
182  # We expect the original script and the above extra files + parent directory to be copied to the working directory.
183  expected_paths = []
184  expected_paths.append(Path(self.job1.working_dir, test_script.name))
185  expected_paths.append(Path(self.job1.working_dir, "test_input_sandbox_dir"))
186  for i in range(2):
187  path = Path(self.job1.working_dir, "test_input_sandbox_dir", f"{i}.txt")
188  expected_paths.append(path)
189 
190  # Now check that every path in the working directory is one we expect to be there
191  for p in self.job1.working_dir.rglob("*"):
192  self.assertIn(p, expected_paths)
193 
194  def tearDown(self):
195  """
196  Removes files/directories that were created during these tests
197  """
198  shutil.rmtree(test_dir)
199 
200 
201 def main():
202  unittest.main()
203 
204 
205 if __name__ == '__main__':
206  main()
test_backends.TestJob.tearDown
def tearDown(self)
Definition: test_backends.py:194
test_backends.TestJob.test_path_object_conversion
def test_path_object_conversion(self)
Definition: test_backends.py:95
main
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:77
test_backends.TestJob.job1
job1
Definition: test_backends.py:44
test_backends.TestJob.test_input_sandbox_copy
def test_input_sandbox_copy(self)
Definition: test_backends.py:166
test_backends.TestJob.test_status
def test_status(self)
Definition: test_backends.py:79
test_backends.TestJob.job2_dict
job2_dict
Definition: test_backends.py:59
test_backends.TestJob
Definition: test_backends.py:27
test_backends.TestJob.maxDiff
maxDiff
Definition: test_backends.py:66
test_backends.TestJob.test_subjob_splitting
def test_subjob_splitting(self)
Definition: test_backends.py:117
test_backends.TestJob.setUp
def setUp(self)
Definition: test_backends.py:32
test_backends.TestJob.job2
job2
Definition: test_backends.py:60