Belle II Software development
test_backends.py
1#!/usr/bin/env python3
2
3
10
11"""
12Unit tests for the backends.py classes and functions. This is a little difficult to test due to
13the Batch system backends not really being testable when not running on specific hosts with bsub/qsub
14installed. But we can try to implement tests using the Local multiprocessing backend and testing the
15basic behaviour of the classes so that they will fail if we modify assumptions in the future.
16"""
17
18from basf2 import find_file
19
20import unittest
21from unittest import TestCase
22import shutil
23from pathlib import Path
24
25from caf.backends import ArgumentsSplitter, Job, MaxFilesSplitter, MaxSubjobsSplitter
26from caf.backends import ArgumentsGenerator, range_arguments, SplitterError
27
28# A testing directory so that we can do cleanup
29test_dir = Path("test_backends").absolute()
30# A simple bash script for testing jobs
31test_script = Path(find_file("calibration/examples/job_submission/test_script.sh")).absolute()
32
33
34class TestJob(TestCase):
35 """
36 UnitTest for the `caf.backends.Job` class
37 """
38
39 def setUp(self):
40 """
41 Create useful objects for each test and the teardown
42 """
43 # We will deliberately use strings and not Path objects for the job attributes so that we can later
44 # check that they are converted to Paths by the class itself.
45 name1 = 'TestJob1'
46 job1 = Job(name1) # Set up this one manually setting attributes
47 job1.working_dir = Path(test_dir, job1.name, "working_dir").absolute().as_posix()
48 job1.output_dir = Path(test_dir, job1.name, "output_dir").absolute().as_posix()
49 job1.cmd = ["bash", test_script.name]
50 job1.input_sandbox_files = [test_script.as_posix()]
51
52 self.job1 = job1
53
54 name2 = 'TestJob2'
55 job_dict = {}
56 job_dict["name"] = name2
57 job_dict["working_dir"] = Path(test_dir, name2, "working_dir").as_posix()
58 job_dict["output_dir"] = Path(test_dir, name2, "output_dir").as_posix()
59 job_dict["output_patterns"] = []
60 job_dict["cmd"] = ["bash", test_script.name]
61 job_dict["args"] = []
62 job_dict["input_sandbox_files"] = [test_script.as_posix()]
63 job_dict["input_files"] = []
64 job_dict["setup_cmds"] = []
65 job_dict["backend_args"] = {}
66 job_dict["subjobs"] = [{"id": i, "input_files": [], "args": [str(i)]} for i in range(4)]
67
68 self.job2_dict = job_dict
69
70 self.job2 = Job(name2, job_dict=job_dict)
71
72 # Create a directory just in case we need it for each test so that we can delete everything easily at the end
73 test_dir.mkdir(parents=True, exist_ok=False)
74
75 def test_dict_setup(self):
76 """
77 Test dictionary setup
78 """
79
80 self.maxDiff = None
81 self.assertEqual(len(self.job2.subjobs), 4)
82 self.assertEqual(self.job2_dict, self.job2.job_dict)
83 self.job2_dict["subjobs"].pop()
84 del self.job2.subjobs[3]
85 self.assertEqual(self.job2_dict, self.job2.job_dict)
86
88 """
89 Test job with json serialisation
90 """
91 json_path = Path(test_dir, "job2.json")
92 self.job2.dump_to_json(json_path)
93 job2_copy = Job.from_json(json_path)
94 self.assertEqual(self.job2.job_dict, job2_copy.job_dict)
95
96 def test_status(self):
97 """
98 The Jobs haven't been run so they should be in the 'init' status.
99 They also shouldn't throw exceptions due to missing result objects.
100 """
101 self.assertEqual(self.job1.status, "init")
102 self.assertEqual(self.job2.status, "init")
103 self.assertFalse(self.job1.ready())
104 self.assertFalse(self.job2.ready())
105 self.assertEqual(self.job1.update_status(), "init")
106 self.assertEqual(self.job2.update_status(), "init")
107 for subjob in self.job2.subjobs.values():
108 self.assertEqual(subjob.status, "init")
109 self.assertFalse(subjob.ready())
110 self.assertEqual(subjob.update_status(), "init")
111
113 """
114 Make sure that the two ways of setting up Job objects correctly converted attributes to be Paths instead of strings.
115 """
116 self.assertIsInstance(self.job1.output_dir, Path)
117 self.assertIsInstance(self.job1.working_dir, Path)
118 for path in self.job1.input_sandbox_files:
119 self.assertIsInstance(path, Path)
120 for path in self.job1.input_files:
121 self.assertIsInstance(path, Path)
122
123 self.assertIsInstance(self.job2.output_dir, Path)
124 self.assertIsInstance(self.job2.working_dir, Path)
125 for path in self.job2.input_sandbox_files:
126 self.assertIsInstance(path, Path)
127 for path in self.job2.input_files:
128 self.assertIsInstance(path, Path)
129
130 for subjob in self.job2.subjobs.values():
131 self.assertIsInstance(subjob.output_dir, Path)
132 self.assertIsInstance(subjob.working_dir, Path)
133
135 """
136 Test the creation of SubJobs and assignment of input data files via splitter classes.
137 """
138 self.assertIsNone(self.job1.splitter)
139 self.assertIsNone(self.job2.splitter)
140 # Set the splitter for job1
141 self.job1.max_files_per_subjob = 2
142 self.assertIsInstance(self.job1.splitter, MaxFilesSplitter)
143 self.assertEqual(self.job1.splitter.max_files_per_subjob, 2)
144 self.job1.max_subjobs = 3
145 self.assertIsInstance(self.job1.splitter, MaxSubjobsSplitter)
146 self.assertEqual(self.job1.splitter.max_subjobs, 3)
147
148 # Generate some empty input files
149 for i in range(5):
150 input_file = Path(test_dir, f"{i}.txt")
151 input_file.touch(exist_ok=False)
152 self.job1.input_files.append(input_file)
153
154 self.job1.splitter = MaxFilesSplitter(max_files_per_subjob=2)
155 self.job1.splitter.create_subjobs(self.job1)
156 self.assertEqual(len(self.job1.subjobs), 3) # Did the splitter create the number of jobs we expect?
157 for i, subjob in self.job1.subjobs.items():
158 self.assertTrue(len(subjob.input_files) == 2 or len(subjob.input_files) == 1)
159
160 self.job1.subjobs = {}
161 self.job1.splitter = MaxSubjobsSplitter(max_subjobs=4)
162 self.job1.splitter.create_subjobs(self.job1)
163 self.assertEqual(len(self.job1.subjobs), 4) # Did the splitter create the number of jobs we expect?
164 for i, subjob in self.job1.subjobs.items():
165 self.assertTrue(len(subjob.input_files) == 2 or len(subjob.input_files) == 1)
166
167 # Does the ArgumentSplitter create jobs
168 self.job1.subjobs = {}
169 arg_gen = ArgumentsGenerator(range_arguments, 3, stop=12, step=2)
170 self.job1.splitter = ArgumentsSplitter(arguments_generator=arg_gen, max_subjobs=10)
171 self.job1.splitter.create_subjobs(self.job1)
172 self.assertEqual(len(self.job1.subjobs), 5)
173 for (i, subjob), arg in zip(self.job1.subjobs.items(), range(3, 12, 2)):
174 # Does each subjob receive the correct setup
175 self.assertEqual(self.job1.input_files, subjob.input_files)
176 self.assertEqual(arg, subjob.args[0])
177
178 # Does max_jobs prevent infinite subjob numbers
179 self.job1.subjobs = {}
180 self.job1.splitter = ArgumentsSplitter(arguments_generator=arg_gen, max_subjobs=2)
181 self.assertRaises(SplitterError, self.job1.splitter.create_subjobs, self.job1)
182
184 """
185 Does the copy of files/directories for the input sandbox work correctly?
186 """
187 # We create a directory to add to the Job's input sandbox list, to tes if directories + contents are copied.
188 input_sandbox_dir = Path(test_dir, "test_input_sandbox_dir")
189 input_sandbox_dir.mkdir(parents=True, exist_ok=False)
190 for i in range(2):
191 input_file = Path(input_sandbox_dir, f"{i}.txt")
192 input_file.touch(exist_ok=False)
193 # Instead of identifying every file, we just use the whole directory
194 self.job1.input_sandbox_files.append(input_sandbox_dir)
195 # Manually create the working dir first (normally done by the Backend)
196 self.job1.working_dir.mkdir(parents=True, exist_ok=False)
197 self.job1.copy_input_sandbox_files_to_working_dir()
198
199 # We expect the original script and the above extra files + parent directory to be copied to the working directory.
200 expected_paths = []
201 expected_paths.append(Path(self.job1.working_dir, test_script.name))
202 expected_paths.append(Path(self.job1.working_dir, "test_input_sandbox_dir"))
203 for i in range(2):
204 path = Path(self.job1.working_dir, "test_input_sandbox_dir", f"{i}.txt")
205 expected_paths.append(path)
206
207 # Now check that every path in the working directory is one we expect to be there
208 for p in self.job1.working_dir.rglob("*"):
209 self.assertIn(p, expected_paths)
210
211 def tearDown(self):
212 """
213 Removes files/directories that were created during these tests
214 """
215 shutil.rmtree(test_dir)
216
217
218def main():
219 unittest.main()
220
221
222if __name__ == '__main__':
223 main()
maxDiff
If this test fails you will need to see the diff of a large dictionary.
job2_dict
job dictionary
job2
Set up job from a dictionary.
Definition main.py:1