Belle II Software development
test_backends.py
1#!/usr/bin/env python3
2
3# disable doxygen check for this file
4# @cond
5
6
13
14"""
15Unit tests for the backends.py classes and functions. This is a little difficult to test due to
16the Batch system backends not really being testable when not running on specific hosts with bsub/qsub
17installed. But we can try to implement tests using the Local multiprocessing backend and testing the
18basic behaviour of the classes so that they will fail if we modify assumptions in the future.
19"""
20
21from basf2 import find_file
22
23import unittest
24from unittest import TestCase
25import shutil
26from pathlib import Path
27
28from caf.backends import ArgumentsSplitter, Job, MaxFilesSplitter, MaxSubjobsSplitter
29from caf.backends import ArgumentsGenerator, range_arguments, SplitterError
30
31# A testing directory so that we can do cleanup
32test_dir = Path("test_backends").absolute()
33# A simple bash script for testing jobs
34test_script = Path(find_file("calibration/examples/job_submission/test_script.sh")).absolute()
35
36
37class TestJob(TestCase):
38 """
39 UnitTest for the `caf.backends.Job` class
40 """
41
42 def setUp(self):
43 """
44 Create useful objects for each test and the teardown
45 """
46 # We will deliberately use strings and not Path objects for the job attributes so that we can later
47 # check that they are converted to Paths by the class itself.
48 name1 = 'TestJob1'
49 job1 = Job(name1) # Set up this one manually setting attributes
50 job1.working_dir = Path(test_dir, job1.name, "working_dir").absolute().as_posix()
51 job1.output_dir = Path(test_dir, job1.name, "output_dir").absolute().as_posix()
52 job1.cmd = ["bash", test_script.name]
53 job1.input_sandbox_files = [test_script.as_posix()]
54 self.job1 = job1
55
56 name2 = 'TestJob2'
57 job_dict = {}
58 job_dict["name"] = name2
59 job_dict["working_dir"] = Path(test_dir, name2, "working_dir").as_posix()
60 job_dict["output_dir"] = Path(test_dir, name2, "output_dir").as_posix()
61 job_dict["output_patterns"] = []
62 job_dict["cmd"] = ["bash", test_script.name]
63 job_dict["args"] = []
64 job_dict["input_sandbox_files"] = [test_script.as_posix()]
65 job_dict["input_files"] = []
66 job_dict["setup_cmds"] = []
67 job_dict["backend_args"] = {}
68 job_dict["subjobs"] = [{"id": i, "input_files": [], "args": [str(i)]} for i in range(4)]
69 self.job2_dict = job_dict
70 self.job2 = Job(name2, job_dict=job_dict) # Set up this one from a dictionary
71
72 # Create a directory just in case we need it for each test so that we can delete everything easily at the end
73 test_dir.mkdir(parents=True, exist_ok=False)
74
75 def test_dict_setup(self):
76 self.maxDiff = None # If this test fails you will need to see the diff of a large dictionary
77 self.assertEqual(len(self.job2.subjobs), 4)
78 self.assertEqual(self.job2_dict, self.job2.job_dict)
79 self.job2_dict["subjobs"].pop()
80 del self.job2.subjobs[3]
81 self.assertEqual(self.job2_dict, self.job2.job_dict)
82
83 def test_job_json_serialise(self):
84 json_path = Path(test_dir, "job2.json")
85 self.job2.dump_to_json(json_path)
86 job2_copy = Job.from_json(json_path)
87 self.assertEqual(self.job2.job_dict, job2_copy.job_dict)
88
89 def test_status(self):
90 """
91 The Jobs haven't been run so they should be in the 'init' status. They also shouldn't throw exceptions due to missing result objects.
92 """
93 self.assertEqual(self.job1.status, "init")
94 self.assertEqual(self.job2.status, "init")
95 self.assertFalse(self.job1.ready())
96 self.assertFalse(self.job2.ready())
97 self.assertEqual(self.job1.update_status(), "init")
98 self.assertEqual(self.job2.update_status(), "init")
99 for subjob in self.job2.subjobs.values():
100 self.assertEqual(subjob.status, "init")
101 self.assertFalse(subjob.ready())
102 self.assertEqual(subjob.update_status(), "init")
103
104 def test_path_object_conversion(self):
105 """
106 Make sure that the two ways of setting up Job objects correctly converted attributes to be Paths instead of strings.
107 """
108 self.assertIsInstance(self.job1.output_dir, Path)
109 self.assertIsInstance(self.job1.working_dir, Path)
110 for path in self.job1.input_sandbox_files:
111 self.assertIsInstance(path, Path)
112 for path in self.job1.input_files:
113 self.assertIsInstance(path, Path)
114
115 self.assertIsInstance(self.job2.output_dir, Path)
116 self.assertIsInstance(self.job2.working_dir, Path)
117 for path in self.job2.input_sandbox_files:
118 self.assertIsInstance(path, Path)
119 for path in self.job2.input_files:
120 self.assertIsInstance(path, Path)
121
122 for subjob in self.job2.subjobs.values():
123 self.assertIsInstance(subjob.output_dir, Path)
124 self.assertIsInstance(subjob.working_dir, Path)
125
126 def test_subjob_splitting(self):
127 """
128 Test the creation of SubJobs and assignment of input data files via splitter classes.
129 """
130 self.assertIsNone(self.job1.splitter)
131 self.assertIsNone(self.job2.splitter)
132 # Set the splitter for job1
133 self.job1.max_files_per_subjob = 2
134 self.assertIsInstance(self.job1.splitter, MaxFilesSplitter)
135 self.assertEqual(self.job1.splitter.max_files_per_subjob, 2)
136 self.job1.max_subjobs = 3
137 self.assertIsInstance(self.job1.splitter, MaxSubjobsSplitter)
138 self.assertEqual(self.job1.splitter.max_subjobs, 3)
139
140 # Generate some empty input files
141 for i in range(5):
142 input_file = Path(test_dir, f"{i}.txt")
143 input_file.touch(exist_ok=False)
144 self.job1.input_files.append(input_file)
145
146 self.job1.splitter = MaxFilesSplitter(max_files_per_subjob=2)
147 self.job1.splitter.create_subjobs(self.job1)
148 self.assertEqual(len(self.job1.subjobs), 3) # Did the splitter create the number of jobs we expect?
149 for i, subjob in self.job1.subjobs.items():
150 self.assertTrue(len(subjob.input_files) == 2 or len(subjob.input_files) == 1)
151
152 self.job1.subjobs = {}
153 self.job1.splitter = MaxSubjobsSplitter(max_subjobs=4)
154 self.job1.splitter.create_subjobs(self.job1)
155 self.assertEqual(len(self.job1.subjobs), 4) # Did the splitter create the number of jobs we expect?
156 for i, subjob in self.job1.subjobs.items():
157 self.assertTrue(len(subjob.input_files) == 2 or len(subjob.input_files) == 1)
158
159 # Does the ArgumentSplitter create jobs
160 self.job1.subjobs = {}
161 arg_gen = ArgumentsGenerator(range_arguments, 3, stop=12, step=2)
162 self.job1.splitter = ArgumentsSplitter(arguments_generator=arg_gen, max_subjobs=10)
163 self.job1.splitter.create_subjobs(self.job1)
164 self.assertEqual(len(self.job1.subjobs), 5)
165 for (i, subjob), arg in zip(self.job1.subjobs.items(), range(3, 12, 2)):
166 # Does each subjob receive the correct setup
167 self.assertEqual(self.job1.input_files, subjob.input_files)
168 self.assertEqual(arg, subjob.args[0])
169
170 # Does max_jobs prevent infinite subjob numbers
171 self.job1.subjobs = {}
172 self.job1.splitter = ArgumentsSplitter(arguments_generator=arg_gen, max_subjobs=2)
173 self.assertRaises(SplitterError, self.job1.splitter.create_subjobs, self.job1)
174
175 def test_input_sandbox_copy(self):
176 """
177 Does the copy of files/directories for the input sandbox work correctly?
178 """
179 # We create a directory to add to the Job's input sandbox list, to tes if directories + contents are copied.
180 input_sandbox_dir = Path(test_dir, "test_input_sandbox_dir")
181 input_sandbox_dir.mkdir(parents=True, exist_ok=False)
182 for i in range(2):
183 input_file = Path(input_sandbox_dir, f"{i}.txt")
184 input_file.touch(exist_ok=False)
185 # Instead of identifying every file, we just use the whole directory
186 self.job1.input_sandbox_files.append(input_sandbox_dir)
187 # Manually create the working dir first (normally done by the Backend)
188 self.job1.working_dir.mkdir(parents=True, exist_ok=False)
189 self.job1.copy_input_sandbox_files_to_working_dir()
190
191 # We expect the original script and the above extra files + parent directory to be copied to the working directory.
192 expected_paths = []
193 expected_paths.append(Path(self.job1.working_dir, test_script.name))
194 expected_paths.append(Path(self.job1.working_dir, "test_input_sandbox_dir"))
195 for i in range(2):
196 path = Path(self.job1.working_dir, "test_input_sandbox_dir", f"{i}.txt")
197 expected_paths.append(path)
198
199 # Now check that every path in the working directory is one we expect to be there
200 for p in self.job1.working_dir.rglob("*"):
201 self.assertIn(p, expected_paths)
202
203 def tearDown(self):
204 """
205 Removes files/directories that were created during these tests
206 """
207 shutil.rmtree(test_dir)
208
209
210def main():
211 unittest.main()
212
213
214if __name__ == '__main__':
215 main()
216
217# @endcond
218
Definition: main.py:1