Belle II Software  release-08-01-10
test_backends.py
1 #!/usr/bin/env python3
2 
3 # disable doxygen check for this file
4 # @cond
5 
6 
13 
14 """
15 Unit tests for the backends.py classes and functions. This is a little difficult to test due to
16 the Batch system backends not really being testable when not running on specific hosts with bsub/qsub
17 installed. But we can try to implement tests using the Local multiprocessing backend and testing the
18 basic behaviour of the classes so that they will fail if we modify assumptions in the future.
19 """
20 
21 from basf2 import find_file
22 
23 import unittest
24 from unittest import TestCase
25 import shutil
26 from pathlib import Path
27 
28 from caf.backends import ArgumentsSplitter, Job, MaxFilesSplitter, MaxSubjobsSplitter
29 from caf.backends import ArgumentsGenerator, range_arguments, SplitterError
30 
31 # A testing directory so that we can do cleanup
32 test_dir = Path("test_backends").absolute()
33 # A simple bash script for testing jobs
34 test_script = Path(find_file("calibration/examples/job_submission/test_script.sh")).absolute()
35 
36 
37 class TestJob(TestCase):
38  """
39  UnitTest for the `caf.backends.Job` class
40  """
41 
42  def setUp(self):
43  """
44  Create useful objects for each test and the teardown
45  """
46  # We will deliberately use strings and not Path objects for the job attributes so that we can later
47  # check that they are converted to Paths by the class itself.
48  name1 = 'TestJob1'
49  job1 = Job(name1) # Set up this one manually setting attributes
50  job1.working_dir = Path(test_dir, job1.name, "working_dir").absolute().as_posix()
51  job1.output_dir = Path(test_dir, job1.name, "output_dir").absolute().as_posix()
52  job1.cmd = ["bash", test_script.name]
53  job1.input_sandbox_files = [test_script.as_posix()]
54  self.job1 = job1
55 
56  name2 = 'TestJob2'
57  job_dict = {}
58  job_dict["name"] = name2
59  job_dict["working_dir"] = Path(test_dir, name2, "working_dir").as_posix()
60  job_dict["output_dir"] = Path(test_dir, name2, "output_dir").as_posix()
61  job_dict["output_patterns"] = []
62  job_dict["cmd"] = ["bash", test_script.name]
63  job_dict["args"] = []
64  job_dict["input_sandbox_files"] = [test_script.as_posix()]
65  job_dict["input_files"] = []
66  job_dict["setup_cmds"] = []
67  job_dict["backend_args"] = {}
68  job_dict["subjobs"] = [{"id": i, "input_files": [], "args": [str(i)]} for i in range(4)]
69  self.job2_dict = job_dict
70  self.job2 = Job(name2, job_dict=job_dict) # Set up this one from a dictionary
71 
72  # Create a directory just in case we need it for each test so that we can delete everything easily at the end
73  test_dir.mkdir(parents=True, exist_ok=False)
74 
75  def test_dict_setup(self):
76  self.maxDiff = None # If this test fails you will need to see the diff of a large dictionary
77  self.assertEqual(len(self.job2.subjobs), 4)
78  self.assertEqual(self.job2_dict, self.job2.job_dict)
79  self.job2_dict["subjobs"].pop()
80  del self.job2.subjobs[3]
81  self.assertEqual(self.job2_dict, self.job2.job_dict)
82 
83  def test_job_json_serialise(self):
84  json_path = Path(test_dir, "job2.json")
85  self.job2.dump_to_json(json_path)
86  job2_copy = Job.from_json(json_path)
87  self.assertEqual(self.job2.job_dict, job2_copy.job_dict)
88 
89  def test_status(self):
90  """
91  The Jobs haven't been run so they should be in the 'init' status.
92  They also shouldn't throw exceptions due to missing result objects.
93  """
94  self.assertEqual(self.job1.status, "init")
95  self.assertEqual(self.job2.status, "init")
96  self.assertFalse(self.job1.ready())
97  self.assertFalse(self.job2.ready())
98  self.assertEqual(self.job1.update_status(), "init")
99  self.assertEqual(self.job2.update_status(), "init")
100  for subjob in self.job2.subjobs.values():
101  self.assertEqual(subjob.status, "init")
102  self.assertFalse(subjob.ready())
103  self.assertEqual(subjob.update_status(), "init")
104 
105  def test_path_object_conversion(self):
106  """
107  Make sure that the two ways of setting up Job objects correctly converted attributes to be Paths instead of strings.
108  """
109  self.assertIsInstance(self.job1.output_dir, Path)
110  self.assertIsInstance(self.job1.working_dir, Path)
111  for path in self.job1.input_sandbox_files:
112  self.assertIsInstance(path, Path)
113  for path in self.job1.input_files:
114  self.assertIsInstance(path, Path)
115 
116  self.assertIsInstance(self.job2.output_dir, Path)
117  self.assertIsInstance(self.job2.working_dir, Path)
118  for path in self.job2.input_sandbox_files:
119  self.assertIsInstance(path, Path)
120  for path in self.job2.input_files:
121  self.assertIsInstance(path, Path)
122 
123  for subjob in self.job2.subjobs.values():
124  self.assertIsInstance(subjob.output_dir, Path)
125  self.assertIsInstance(subjob.working_dir, Path)
126 
127  def test_subjob_splitting(self):
128  """
129  Test the creation of SubJobs and assignment of input data files via splitter classes.
130  """
131  self.assertIsNone(self.job1.splitter)
132  self.assertIsNone(self.job2.splitter)
133  # Set the splitter for job1
134  self.job1.max_files_per_subjob = 2
135  self.assertIsInstance(self.job1.splitter, MaxFilesSplitter)
136  self.assertEqual(self.job1.splitter.max_files_per_subjob, 2)
137  self.job1.max_subjobs = 3
138  self.assertIsInstance(self.job1.splitter, MaxSubjobsSplitter)
139  self.assertEqual(self.job1.splitter.max_subjobs, 3)
140 
141  # Generate some empty input files
142  for i in range(5):
143  input_file = Path(test_dir, f"{i}.txt")
144  input_file.touch(exist_ok=False)
145  self.job1.input_files.append(input_file)
146 
147  self.job1.splitter = MaxFilesSplitter(max_files_per_subjob=2)
148  self.job1.splitter.create_subjobs(self.job1)
149  self.assertEqual(len(self.job1.subjobs), 3) # Did the splitter create the number of jobs we expect?
150  for i, subjob in self.job1.subjobs.items():
151  self.assertTrue(len(subjob.input_files) == 2 or len(subjob.input_files) == 1)
152 
153  self.job1.subjobs = {}
154  self.job1.splitter = MaxSubjobsSplitter(max_subjobs=4)
155  self.job1.splitter.create_subjobs(self.job1)
156  self.assertEqual(len(self.job1.subjobs), 4) # Did the splitter create the number of jobs we expect?
157  for i, subjob in self.job1.subjobs.items():
158  self.assertTrue(len(subjob.input_files) == 2 or len(subjob.input_files) == 1)
159 
160  # Does the ArgumentSplitter create jobs
161  self.job1.subjobs = {}
162  arg_gen = ArgumentsGenerator(range_arguments, 3, stop=12, step=2)
163  self.job1.splitter = ArgumentsSplitter(arguments_generator=arg_gen, max_subjobs=10)
164  self.job1.splitter.create_subjobs(self.job1)
165  self.assertEqual(len(self.job1.subjobs), 5)
166  for (i, subjob), arg in zip(self.job1.subjobs.items(), range(3, 12, 2)):
167  # Does each subjob receive the correct setup
168  self.assertEqual(self.job1.input_files, subjob.input_files)
169  self.assertEqual(arg, subjob.args[0])
170 
171  # Does max_jobs prevent infinite subjob numbers
172  self.job1.subjobs = {}
173  self.job1.splitter = ArgumentsSplitter(arguments_generator=arg_gen, max_subjobs=2)
174  self.assertRaises(SplitterError, self.job1.splitter.create_subjobs, self.job1)
175 
176  def test_input_sandbox_copy(self):
177  """
178  Does the copy of files/directories for the input sandbox work correctly?
179  """
180  # We create a directory to add to the Job's input sandbox list, to tes if directories + contents are copied.
181  input_sandbox_dir = Path(test_dir, "test_input_sandbox_dir")
182  input_sandbox_dir.mkdir(parents=True, exist_ok=False)
183  for i in range(2):
184  input_file = Path(input_sandbox_dir, f"{i}.txt")
185  input_file.touch(exist_ok=False)
186  # Instead of identifying every file, we just use the whole directory
187  self.job1.input_sandbox_files.append(input_sandbox_dir)
188  # Manually create the working dir first (normally done by the Backend)
189  self.job1.working_dir.mkdir(parents=True, exist_ok=False)
190  self.job1.copy_input_sandbox_files_to_working_dir()
191 
192  # We expect the original script and the above extra files + parent directory to be copied to the working directory.
193  expected_paths = []
194  expected_paths.append(Path(self.job1.working_dir, test_script.name))
195  expected_paths.append(Path(self.job1.working_dir, "test_input_sandbox_dir"))
196  for i in range(2):
197  path = Path(self.job1.working_dir, "test_input_sandbox_dir", f"{i}.txt")
198  expected_paths.append(path)
199 
200  # Now check that every path in the working directory is one we expect to be there
201  for p in self.job1.working_dir.rglob("*"):
202  self.assertIn(p, expected_paths)
203 
204  def tearDown(self):
205  """
206  Removes files/directories that were created during these tests
207  """
208  shutil.rmtree(test_dir)
209 
210 
211 def main():
212  unittest.main()
213 
214 
215 if __name__ == '__main__':
216  main()
217 
218 # @endcond
Definition: main.py:1
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:91