Belle II Software  release-06-00-14
test_backends.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 
11 
12 """
13 Unit tests for the backends.py classes and functions. This is a little difficult to test due to
14 the Batch system backends not really being testable when not running on specific hosts with bsub/qsub
15 installed. But we can try to implement tests using the Local multiprocessing backend and testing the
16 basic behaviour of the classes so that they will fail if we modify assumptions in the future.
17 """
18 
19 from basf2 import find_file
20 
21 import unittest
22 from unittest import TestCase
23 import shutil
24 from pathlib import Path
25 
26 from caf.backends import ArgumentsSplitter, Job, MaxFilesSplitter, MaxSubjobsSplitter
27 from caf.backends import ArgumentsGenerator, range_arguments, SplitterError
28 
29 # A testing directory so that we can do cleanup
30 test_dir = Path("test_backends").absolute()
31 # A simple bash script for testing jobs
32 test_script = Path(find_file("calibration/examples/job_submission/test_script.sh")).absolute()
33 
34 
35 class TestJob(TestCase):
36  """
37  UnitTest for the `caf.backends.Job` class
38  """
39 
40  def setUp(self):
41  """
42  Create useful objects for each test and the teardown
43  """
44  # We will deliberately use strings and not Path objects for the job attributes so that we can later
45  # check that they are converted to Paths by the class itself.
46  name1 = 'TestJob1'
47  job1 = Job(name1) # Set up this one manually setting attributes
48  job1.working_dir = Path(test_dir, job1.name, "working_dir").absolute().as_posix()
49  job1.output_dir = Path(test_dir, job1.name, "output_dir").absolute().as_posix()
50  job1.cmd = ["bash", test_script.name]
51  job1.input_sandbox_files = [test_script.as_posix()]
52  self.job1job1 = job1
53 
54  name2 = 'TestJob2'
55  job_dict = {}
56  job_dict["name"] = name2
57  job_dict["working_dir"] = Path(test_dir, name2, "working_dir").as_posix()
58  job_dict["output_dir"] = Path(test_dir, name2, "output_dir").as_posix()
59  job_dict["output_patterns"] = []
60  job_dict["cmd"] = ["bash", test_script.name]
61  job_dict["args"] = []
62  job_dict["input_sandbox_files"] = [test_script.as_posix()]
63  job_dict["input_files"] = []
64  job_dict["setup_cmds"] = []
65  job_dict["backend_args"] = {}
66  job_dict["subjobs"] = [{"id": i, "input_files": [], "args": [str(i)]} for i in range(4)]
67  self.job2_dictjob2_dict = job_dict
68  self.job2job2 = Job(name2, job_dict=job_dict) # Set up this one from a dictionary
69 
70  # Create a directory just in case we need it for each test so that we can delete everything easily at the end
71  test_dir.mkdir(parents=True, exist_ok=False)
72 
73  def test_dict_setup(self):
74  self.maxDiffmaxDiff = None # If this test fails you will need to see the diff of a large dictionary
75  self.assertEqual(len(self.job2job2.subjobs), 4)
76  self.assertEqual(self.job2_dictjob2_dict, self.job2job2.job_dict)
77  self.job2_dictjob2_dict["subjobs"].pop()
78  del self.job2job2.subjobs[3]
79  self.assertEqual(self.job2_dictjob2_dict, self.job2job2.job_dict)
80 
81  def test_job_json_serialise(self):
82  json_path = Path(test_dir, "job2.json")
83  self.job2job2.dump_to_json(json_path)
84  job2_copy = Job.from_json(json_path)
85  self.assertEqual(self.job2job2.job_dict, job2_copy.job_dict)
86 
87  def test_status(self):
88  """
89  The Jobs haven't been run so they should be in the 'init' status.
90  They also shouldn't throw exceptions due to missing result objects.
91  """
92  self.assertEqual(self.job1job1.status, "init")
93  self.assertEqual(self.job2job2.status, "init")
94  self.assertFalse(self.job1job1.ready())
95  self.assertFalse(self.job2job2.ready())
96  self.assertEqual(self.job1job1.update_status(), "init")
97  self.assertEqual(self.job2job2.update_status(), "init")
98  for subjob in self.job2job2.subjobs.values():
99  self.assertEqual(subjob.status, "init")
100  self.assertFalse(subjob.ready())
101  self.assertEqual(subjob.update_status(), "init")
102 
104  """
105  Make sure that the two ways of setting up Job objects correctly converted attributes to be Paths instead of strings.
106  """
107  self.assertIsInstance(self.job1job1.output_dir, Path)
108  self.assertIsInstance(self.job1job1.working_dir, Path)
109  for path in self.job1job1.input_sandbox_files:
110  self.assertIsInstance(path, Path)
111  for path in self.job1job1.input_files:
112  self.assertIsInstance(path, Path)
113 
114  self.assertIsInstance(self.job2job2.output_dir, Path)
115  self.assertIsInstance(self.job2job2.working_dir, Path)
116  for path in self.job2job2.input_sandbox_files:
117  self.assertIsInstance(path, Path)
118  for path in self.job2job2.input_files:
119  self.assertIsInstance(path, Path)
120 
121  for subjob in self.job2job2.subjobs.values():
122  self.assertIsInstance(subjob.output_dir, Path)
123  self.assertIsInstance(subjob.working_dir, Path)
124 
126  """
127  Test the creation of SubJobs and assignment of input data files via splitter classes.
128  """
129  self.assertIsNone(self.job1job1.splitter)
130  self.assertIsNone(self.job2job2.splitter)
131  # Set the splitter for job1
132  self.job1job1.max_files_per_subjob = 2
133  self.assertIsInstance(self.job1job1.splitter, MaxFilesSplitter)
134  self.assertEqual(self.job1job1.splitter.max_files_per_subjob, 2)
135  self.job1job1.max_subjobs = 3
136  self.assertIsInstance(self.job1job1.splitter, MaxSubjobsSplitter)
137  self.assertEqual(self.job1job1.splitter.max_subjobs, 3)
138 
139  # Generate some empty input files
140  for i in range(5):
141  input_file = Path(test_dir, f"{i}.txt")
142  input_file.touch(exist_ok=False)
143  self.job1job1.input_files.append(input_file)
144 
145  self.job1job1.splitter = MaxFilesSplitter(max_files_per_subjob=2)
146  self.job1job1.splitter.create_subjobs(self.job1job1)
147  self.assertEqual(len(self.job1job1.subjobs), 3) # Did the splitter create the number of jobs we expect?
148  for i, subjob in self.job1job1.subjobs.items():
149  self.assertTrue((len(subjob.input_files) == 2 or len(subjob.input_files) == 1))
150 
151  self.job1job1.subjobs = {}
152  self.job1job1.splitter = MaxSubjobsSplitter(max_subjobs=4)
153  self.job1job1.splitter.create_subjobs(self.job1job1)
154  self.assertEqual(len(self.job1job1.subjobs), 4) # Did the splitter create the number of jobs we expect?
155  for i, subjob in self.job1job1.subjobs.items():
156  self.assertTrue((len(subjob.input_files) == 2 or len(subjob.input_files) == 1))
157 
158  # Does the ArgumentSplitter create jobs
159  self.job1job1.subjobs = {}
160  arg_gen = ArgumentsGenerator(range_arguments, 3, stop=12, step=2)
161  self.job1job1.splitter = ArgumentsSplitter(arguments_generator=arg_gen, max_subjobs=10)
162  self.job1job1.splitter.create_subjobs(self.job1job1)
163  self.assertEqual(len(self.job1job1.subjobs), 5)
164  for (i, subjob), arg in zip(self.job1job1.subjobs.items(), range(3, 12, 2)):
165  # Does each subjob receive the correct setup
166  self.assertEqual(self.job1job1.input_files, subjob.input_files)
167  self.assertEqual(arg, subjob.args[0])
168 
169  # Does max_jobs prevent infinite subjob numbers
170  self.job1job1.subjobs = {}
171  self.job1job1.splitter = ArgumentsSplitter(arguments_generator=arg_gen, max_subjobs=2)
172  self.assertRaises(SplitterError, self.job1job1.splitter.create_subjobs, self.job1job1)
173 
175  """
176  Does the copy of files/directories for the input sandbox work correctly?
177  """
178  # We create a directory to add to the Job's input sandbox list, to tes if directories + contents are copied.
179  input_sandbox_dir = Path(test_dir, "test_input_sandbox_dir")
180  input_sandbox_dir.mkdir(parents=True, exist_ok=False)
181  for i in range(2):
182  input_file = Path(input_sandbox_dir, f"{i}.txt")
183  input_file.touch(exist_ok=False)
184  # Instead of identifying every file, we just use the whole directory
185  self.job1job1.input_sandbox_files.append(input_sandbox_dir)
186  # Manually create the working dir first (normally done by the Backend)
187  self.job1job1.working_dir.mkdir(parents=True, exist_ok=False)
188  self.job1job1.copy_input_sandbox_files_to_working_dir()
189 
190  # We expect the original script and the above extra files + parent directory to be copied to the working directory.
191  expected_paths = []
192  expected_paths.append(Path(self.job1job1.working_dir, test_script.name))
193  expected_paths.append(Path(self.job1job1.working_dir, "test_input_sandbox_dir"))
194  for i in range(2):
195  path = Path(self.job1job1.working_dir, "test_input_sandbox_dir", f"{i}.txt")
196  expected_paths.append(path)
197 
198  # Now check that every path in the working directory is one we expect to be there
199  for p in self.job1job1.working_dir.rglob("*"):
200  self.assertIn(p, expected_paths)
201 
202  def tearDown(self):
203  """
204  Removes files/directories that were created during these tests
205  """
206  shutil.rmtree(test_dir)
207 
208 
209 def main():
210  unittest.main()
211 
212 
213 if __name__ == '__main__':
214  main()
def test_subjob_splitting(self)
def test_path_object_conversion(self)
def test_input_sandbox_copy(self)
int main(int argc, char **argv)
Run all tests.
Definition: test_main.cc:75