Belle II Software development
test_backends.py
1#!/usr/bin/env python3
2
3# disable doxygen check for this file
4# @cond
5
6
13
14"""
15Unit tests for the backends.py classes and functions. This is a little difficult to test due to
16the Batch system backends not really being testable when not running on specific hosts with bsub/qsub
17installed. But we can try to implement tests using the Local multiprocessing backend and testing the
18basic behaviour of the classes so that they will fail if we modify assumptions in the future.
19"""
20
21from basf2 import find_file
22
23import unittest
24from unittest import TestCase
25import shutil
26from pathlib import Path
27
28from caf.backends import ArgumentsSplitter, Job, MaxFilesSplitter, MaxSubjobsSplitter
29from caf.backends import ArgumentsGenerator, range_arguments, SplitterError
30
31# A testing directory so that we can do cleanup
32test_dir = Path("test_backends").absolute()
33# A simple bash script for testing jobs
34test_script = Path(find_file("calibration/examples/job_submission/test_script.sh")).absolute()
35
36
37class TestJob(TestCase):
38 """
39 UnitTest for the `caf.backends.Job` class
40 """
41
42 def setUp(self):
43 """
44 Create useful objects for each test and the teardown
45 """
46 # We will deliberately use strings and not Path objects for the job attributes so that we can later
47 # check that they are converted to Paths by the class itself.
48 name1 = 'TestJob1'
49 job1 = Job(name1) # Set up this one manually setting attributes
50 job1.working_dir = Path(test_dir, job1.name, "working_dir").absolute().as_posix()
51 job1.output_dir = Path(test_dir, job1.name, "output_dir").absolute().as_posix()
52 job1.cmd = ["bash", test_script.name]
53 job1.input_sandbox_files = [test_script.as_posix()]
54 self.job1 = job1
55
56 name2 = 'TestJob2'
57 job_dict = {}
58 job_dict["name"] = name2
59 job_dict["working_dir"] = Path(test_dir, name2, "working_dir").as_posix()
60 job_dict["output_dir"] = Path(test_dir, name2, "output_dir").as_posix()
61 job_dict["output_patterns"] = []
62 job_dict["cmd"] = ["bash", test_script.name]
63 job_dict["args"] = []
64 job_dict["input_sandbox_files"] = [test_script.as_posix()]
65 job_dict["input_files"] = []
66 job_dict["setup_cmds"] = []
67 job_dict["backend_args"] = {}
68 job_dict["subjobs"] = [{"id": i, "input_files": [], "args": [str(i)]} for i in range(4)]
69 self.job2_dict = job_dict
70 self.job2 = Job(name2, job_dict=job_dict) # Set up this one from a dictionary
71
72 # Create a directory just in case we need it for each test so that we can delete everything easily at the end
73 test_dir.mkdir(parents=True, exist_ok=False)
74
75 def test_dict_setup(self):
76 self.maxDiff = None # If this test fails you will need to see the diff of a large dictionary
77 self.assertEqual(len(self.job2.subjobs), 4)
78 self.assertEqual(self.job2_dict, self.job2.job_dict)
79 self.job2_dict["subjobs"].pop()
80 del self.job2.subjobs[3]
81 self.assertEqual(self.job2_dict, self.job2.job_dict)
82
83 def test_job_json_serialise(self):
84 json_path = Path(test_dir, "job2.json")
85 self.job2.dump_to_json(json_path)
86 job2_copy = Job.from_json(json_path)
87 self.assertEqual(self.job2.job_dict, job2_copy.job_dict)
88
89 def test_status(self):
90 """
91 The Jobs haven't been run so they should be in the 'init' status.
92 They also shouldn't throw exceptions due to missing result objects.
93 """
94 self.assertEqual(self.job1.status, "init")
95 self.assertEqual(self.job2.status, "init")
96 self.assertFalse(self.job1.ready())
97 self.assertFalse(self.job2.ready())
98 self.assertEqual(self.job1.update_status(), "init")
99 self.assertEqual(self.job2.update_status(), "init")
100 for subjob in self.job2.subjobs.values():
101 self.assertEqual(subjob.status, "init")
102 self.assertFalse(subjob.ready())
103 self.assertEqual(subjob.update_status(), "init")
104
105 def test_path_object_conversion(self):
106 """
107 Make sure that the two ways of setting up Job objects correctly converted attributes to be Paths instead of strings.
108 """
109 self.assertIsInstance(self.job1.output_dir, Path)
110 self.assertIsInstance(self.job1.working_dir, Path)
111 for path in self.job1.input_sandbox_files:
112 self.assertIsInstance(path, Path)
113 for path in self.job1.input_files:
114 self.assertIsInstance(path, Path)
115
116 self.assertIsInstance(self.job2.output_dir, Path)
117 self.assertIsInstance(self.job2.working_dir, Path)
118 for path in self.job2.input_sandbox_files:
119 self.assertIsInstance(path, Path)
120 for path in self.job2.input_files:
121 self.assertIsInstance(path, Path)
122
123 for subjob in self.job2.subjobs.values():
124 self.assertIsInstance(subjob.output_dir, Path)
125 self.assertIsInstance(subjob.working_dir, Path)
126
127 def test_subjob_splitting(self):
128 """
129 Test the creation of SubJobs and assignment of input data files via splitter classes.
130 """
131 self.assertIsNone(self.job1.splitter)
132 self.assertIsNone(self.job2.splitter)
133 # Set the splitter for job1
134 self.job1.max_files_per_subjob = 2
135 self.assertIsInstance(self.job1.splitter, MaxFilesSplitter)
136 self.assertEqual(self.job1.splitter.max_files_per_subjob, 2)
137 self.job1.max_subjobs = 3
138 self.assertIsInstance(self.job1.splitter, MaxSubjobsSplitter)
139 self.assertEqual(self.job1.splitter.max_subjobs, 3)
140
141 # Generate some empty input files
142 for i in range(5):
143 input_file = Path(test_dir, f"{i}.txt")
144 input_file.touch(exist_ok=False)
145 self.job1.input_files.append(input_file)
146
147 self.job1.splitter = MaxFilesSplitter(max_files_per_subjob=2)
148 self.job1.splitter.create_subjobs(self.job1)
149 self.assertEqual(len(self.job1.subjobs), 3) # Did the splitter create the number of jobs we expect?
150 for i, subjob in self.job1.subjobs.items():
151 self.assertTrue(len(subjob.input_files) == 2 or len(subjob.input_files) == 1)
152
153 self.job1.subjobs = {}
154 self.job1.splitter = MaxSubjobsSplitter(max_subjobs=4)
155 self.job1.splitter.create_subjobs(self.job1)
156 self.assertEqual(len(self.job1.subjobs), 4) # Did the splitter create the number of jobs we expect?
157 for i, subjob in self.job1.subjobs.items():
158 self.assertTrue(len(subjob.input_files) == 2 or len(subjob.input_files) == 1)
159
160 # Does the ArgumentSplitter create jobs
161 self.job1.subjobs = {}
162 arg_gen = ArgumentsGenerator(range_arguments, 3, stop=12, step=2)
163 self.job1.splitter = ArgumentsSplitter(arguments_generator=arg_gen, max_subjobs=10)
164 self.job1.splitter.create_subjobs(self.job1)
165 self.assertEqual(len(self.job1.subjobs), 5)
166 for (i, subjob), arg in zip(self.job1.subjobs.items(), range(3, 12, 2)):
167 # Does each subjob receive the correct setup
168 self.assertEqual(self.job1.input_files, subjob.input_files)
169 self.assertEqual(arg, subjob.args[0])
170
171 # Does max_jobs prevent infinite subjob numbers
172 self.job1.subjobs = {}
173 self.job1.splitter = ArgumentsSplitter(arguments_generator=arg_gen, max_subjobs=2)
174 self.assertRaises(SplitterError, self.job1.splitter.create_subjobs, self.job1)
175
176 def test_input_sandbox_copy(self):
177 """
178 Does the copy of files/directories for the input sandbox work correctly?
179 """
180 # We create a directory to add to the Job's input sandbox list, to tes if directories + contents are copied.
181 input_sandbox_dir = Path(test_dir, "test_input_sandbox_dir")
182 input_sandbox_dir.mkdir(parents=True, exist_ok=False)
183 for i in range(2):
184 input_file = Path(input_sandbox_dir, f"{i}.txt")
185 input_file.touch(exist_ok=False)
186 # Instead of identifying every file, we just use the whole directory
187 self.job1.input_sandbox_files.append(input_sandbox_dir)
188 # Manually create the working dir first (normally done by the Backend)
189 self.job1.working_dir.mkdir(parents=True, exist_ok=False)
190 self.job1.copy_input_sandbox_files_to_working_dir()
191
192 # We expect the original script and the above extra files + parent directory to be copied to the working directory.
193 expected_paths = []
194 expected_paths.append(Path(self.job1.working_dir, test_script.name))
195 expected_paths.append(Path(self.job1.working_dir, "test_input_sandbox_dir"))
196 for i in range(2):
197 path = Path(self.job1.working_dir, "test_input_sandbox_dir", f"{i}.txt")
198 expected_paths.append(path)
199
200 # Now check that every path in the working directory is one we expect to be there
201 for p in self.job1.working_dir.rglob("*"):
202 self.assertIn(p, expected_paths)
203
204 def tearDown(self):
205 """
206 Removes files/directories that were created during these tests
207 """
208 shutil.rmtree(test_dir)
209
210
211def main():
212 unittest.main()
213
214
215if __name__ == '__main__':
216 main()
217
218# @endcond
Definition main.py:1