Belle II Software development
test_runningupdate.py
1#!/usr/bin/env python3
2# pylama:ignore=W0212
3
4
11import unittest
12import re
13from unittest.mock import create_autospec
14
15import conditions_db
16from conditions_db.runningupdate import RunningTagUpdater, RunningTagUpdaterError, RunningTagUpdateMode as Mode
17from conditions_db.cli_management import command_tag_runningupdate
18
19
20class TestMergeStaging(unittest.TestCase):
21 """Test the class to calculate updates to running tags"""
22
23 RUNNING_BASE = {
24 # gapless payload with different revisions
25 "A": [(0, 0, 0, -1), (1, 0, 1, 10), (1, 11, 1, 11), (1, 12, -1, -1)],
26 # simple open payload
27 "B": [(0, 0, -1, -1)],
28 # simple closed payload
29 "C": [(0, 0, 0, -1)],
30 }
31
32 def make_payloads(self, start_revision=1, **payloads):
33 """Create a list of payloads from all keyword arguments Name of the
34 payloads will be the name of the argument, revision will increase
35 monotonic starting at start_revision for each argument and the iovs are
36 the values of the the argument.
37
38 To create a list with two payloads, A and B with two validities for B
39 and one for A one could use::
40
41 make_payloads(A=[(0,0,1,2)], B=[(1,2,3,4), (3,5,6,7)])
42 """
43 result = []
44 for name, iovs in payloads.items():
45 for rev, iov in enumerate(iovs, start_revision):
46 result.append(conditions_db.PayloadInformation(0, name, rev, None, None, None, None, iov))
47 return result
48
50 """
51 Create payloads from a plain text list of payloads in the form of
52
53 payloadN, rev i, valid from a,b to c,d
54
55 Return a list of lists of payloads where each block of payload definitions
56 without empty lines in between is returned as one list.
57 """
58 payloads = []
59 all_payloads = []
60 for match in re.findall(r'^\s*(payload\d+), rev (\d+), valid from (\d+),(\d+) to ([0-9-]+),([0-9-]+)$|^\s*$', text, re.M):
61 if not match[0]:
62 if payloads:
63 all_payloads.append(payloads)
64 payloads = []
65 continue
67 0, match[0], int(match[1]), None, None, None, None, tuple(map(int, match[2:]))
68 ))
69
70 if payloads:
71 all_payloads.append(payloads)
72
73 return all_payloads
74
75 def make_mock_db(self, running_state="RUNNING", staging_state="VALIDATED", running_payloads=None, staging_payloads=None):
76 """Create a mock object that behaves close enough to the ConditionsDB class but returns
77 fixed values for the globaltags "running" and "staging" for testing"""
78 db = create_autospec(conditions_db.ConditionsDB)
79
80 if running_payloads is None:
81 running_payloads = []
82 if staging_payloads is None:
83 staging_payloads = []
84
85 def taginfo(name):
86 return {
87 "running": {"name": "running", "globalTagStatus": {"name": running_state}},
88 "staging": {"name": "staging", "globalTagStatus": {"name": staging_state}},
89 }.get(name, None)
90
91 def all_iovs(name):
92 return {
93 "running": running_payloads,
94 "staging": staging_payloads}.get(name, None)
95
96 db.get_globalTagInfo.side_effect = taginfo
97 db.get_all_iovs.side_effect = all_iovs
98 return db
99
100 def parse_operations(self, operations):
101 """Parse the operations into a single list of [operation type, name,
102 revision, iov] for each operation"""
103 return [[op, p.name, p.revision, p.iov] for (op, p) in operations]
104
105 def test_states(self):
106 """Test that we get errors if the tags don't exist or are in the wrong state"""
107 db0 = self.make_mock_db()
108 db1 = self.make_mock_db(running_state="PUBLISHED")
109 db2 = self.make_mock_db(staging_state="OPEN")
110 db3 = self.make_mock_db(running_state="INVALID", staging_state="PUBLISHED")
111
112 with self.assertRaisesRegex(RunningTagUpdaterError, "not in RUNNING state"):
113 RunningTagUpdater(db1, "running", "staging", (0, 0), Mode.ALLOW_CLOSED)
114 with self.assertRaisesRegex(RunningTagUpdaterError, "not in VALIDATED state"):
115 RunningTagUpdater(db2, "running", "staging", (0, 0), Mode.ALLOW_CLOSED)
116 with self.assertRaisesRegex(RunningTagUpdaterError, "not in RUNNING state"):
117 RunningTagUpdater(db3, "running", "staging", (0, 0), Mode.ALLOW_CLOSED)
118 with self.assertRaisesRegex(RunningTagUpdaterError, "'funning' cannot be found"):
119 RunningTagUpdater(db0, "funning", "staging", (0, 0), Mode.ALLOW_CLOSED)
120 with self.assertRaisesRegex(RunningTagUpdaterError, "'caging' cannot be found"):
121 RunningTagUpdater(db0, "running", "caging", (0, 0), Mode.ALLOW_CLOSED)
122 with self.assertRaisesRegex(RunningTagUpdaterError, "'cunning' cannot be found"):
123 RunningTagUpdater(db0, "cunning", "stacking", (0, 0), Mode.ALLOW_CLOSED)
124
125 def test_overlaps(self):
126 """Test that we get errors on overlaps in the staging"""
127 db = self.make_mock_db()
128 updater = RunningTagUpdater(db, "running", "staging", (0, 0), Mode.ALLOW_CLOSED)
129 # no overlap between different payload names
130 payloads = self.make_payloads(A=[(0, 0, 1, 0)], B=[(0, 1, 1, 0)])
131 self.assertIsNone(updater._check_staging_tag("testoverlaps", payloads))
132
133 # but otherwise we raise errors
134 payloads = self.make_payloads(A=[(0, 0, 1, 0), (0, 1, 1, 0)])
135 with self.assertRaises(RunningTagUpdaterError) as ctx:
136 updater._check_staging_tag("testoverlaps", payloads)
137 self.assertEqual(ctx.exception.extra_vars['overlaps'], 1)
138
139 # yes, also a tiny overlap is an overlap
140 payloads = self.make_payloads(A=[(0, 0, 1, 0), (1, 0, 1, -1)])
141 with self.assertRaises(RunningTagUpdaterError) as ctx:
142 updater._check_staging_tag("testoverlaps", payloads)
143 self.assertEqual(ctx.exception.extra_vars['overlaps'], 1)
144
145 # and we even raise multiple errors on multiple overlaps
146 payloads = self.make_payloads(A=[(0, 0, 1, 0), (1, 0, 1, -1), (1, 1, 1, -1)])
147 with self.assertRaises(RunningTagUpdaterError) as ctx:
148 updater._check_staging_tag("testoverlaps", payloads)
149 self.assertEqual(ctx.exception.extra_vars['overlaps'], 2)
150
151 # and we even raise multiple errors on multiple overlaps ...
152 # also if the overlaps are not adjacent
153 payloads = self.make_payloads(A=[(0, 0, 1, 0), (3, 0, -1, -1), (1, 0, 1, -1), (2, 0, 2, -1), (1, 1, 1, -1)])
154 with self.assertRaises(RunningTagUpdaterError) as ctx:
155 updater._check_staging_tag("testoverlaps", payloads)
156 self.assertEqual(ctx.exception.extra_vars['overlaps'], 2)
157
158 # unless there's no overlap
159 payloads = self.make_payloads(A=[(0, 0, 1, 0), (1, 1, 1, -1)])
160 self.assertIsNone(updater._check_staging_tag("testoverlaps", payloads))
161
162 def test_gaps(self):
163 """Test that we get errors for gaps in the staging"""
164 db = self.make_mock_db()
165 updater = RunningTagUpdater(db, "running", "staging", (0, 0), Mode.ALLOW_CLOSED)
166
167 # consecutive payloads should be merged
168 payloads = self.make_payloads(A=[(0, 0, 1, 0), (1, 1, -1, -1)])
169 self.assertIsNone(updater._check_staging_tag("testgaps", payloads))
170
171 # and otherwise we want errors
172 payloads = self.make_payloads(A=[(0, 0, 1, 0), (1, 2, 1, -1), (2, 2, -1, -1)], B=[(0, 0, 1, 2), (2, 1, 3, 3)])
173 with self.assertRaises(RunningTagUpdaterError) as ctx:
174 updater._check_staging_tag("testgaps", payloads)
175 self.assertEqual(ctx.exception.extra_vars['gaps'], 3)
176
177 def test_closed(self):
178 """Test that we get errors on iovs not being open in the staging"""
179 db = self.make_mock_db()
180 updater = RunningTagUpdater(db, "running", "staging", (0, 0), Mode.STRICT)
181
182 # consecutive payloads should be merged but there could be gaps
183 payloads = self.make_payloads(A=[(0, 0, 1, 0), (1, 1, 1, -1)], B=[(0, 0, 1, 2), (1, 3, 3, 3)])
184 with self.assertRaises(RunningTagUpdaterError) as ctx:
185 updater._check_staging_tag("testclosed", payloads)
186 self.assertEqual(ctx.exception.extra_vars['closed payloads'], 2)
187
188 # we can fix one gap and still get an error
189 payloads = self.make_payloads(A=[(0, 0, 1, -1), (2, 0, -1, -1)], B=[(0, 0, 1, 2), (1, 3, 3, 3)])
190 with self.assertRaises(RunningTagUpdaterError) as ctx:
191 updater._check_staging_tag("testclosed", payloads)
192 self.assertEqual(ctx.exception.extra_vars['closed payloads'], 1)
193
194 # or have a different order ...
195 payloads = self.make_payloads(A=[(0, 0, 1, -1), (2, 0, 99999, 999999)], B=[(10, 0, -1, -1), (0, 0, 9, -1)])
196 with self.assertRaises(RunningTagUpdaterError) as ctx:
197 updater._check_staging_tag("testclosed", payloads)
198 self.assertEqual(ctx.exception.extra_vars['closed payloads'], 1)
199
201 """Test that we get errors if the payloads in the staging tag start too early"""
202 db = self.make_mock_db()
203 updater = RunningTagUpdater(db, "running", "staging", (2, 5), Mode.ALLOW_CLOSED)
204
205 # simple start early
206 payloads = self.make_payloads(A=[(2, 0, -1, -1)])
207 with self.assertRaises(RunningTagUpdaterError) as ctx:
208 updater._check_staging_tag("testearly", payloads)
209 self.assertEqual(ctx.exception.extra_vars['starts too early'], 1)
210
211 # but starting at 0,0 is an exception and is fine
212 payloads = self.make_payloads(A=[(0, 0, -1, -1)])
213 updater._check_staging_tag("testearly", payloads)
214
215 # overlap and gap free and starting at 0,0 ... but still wrong because
216 # the next payload starts early
217 payloads = self.make_payloads(A=[(0, 0, 1, 0), (1, 1, 1, -1)])
218 with self.assertRaises(RunningTagUpdaterError) as ctx:
219 updater._check_staging_tag("testearly", payloads)
220 self.assertEqual(ctx.exception.extra_vars['starts too early'], 1)
221
222 payloads = self.make_payloads(A=[(1, 1, 2, 4)])
223 with self.assertRaises(RunningTagUpdaterError) as ctx:
224 updater._check_staging_tag("testearly", payloads)
225 self.assertEqual(ctx.exception.extra_vars['starts too early'], 1)
226
227 # but allow multiple payloads after the starting point
228 payloads = self.make_payloads(A=[(0, 0, 2, 5), (2, 6, 2, 6), (2, 7, 2, -1), (3, 0, -1, -1)])
229 updater._check_staging_tag("testearly", payloads)
230
231 # check order dependence
232 payloads = self.make_payloads(A=[(3, 0, -1, -1), (2, 4, 2, -1)])
233 with self.assertRaises(RunningTagUpdaterError) as ctx:
234 updater._check_staging_tag("testearly", payloads)
235 self.assertEqual(ctx.exception.extra_vars['starts too early'], 1)
236
238 """Test that we only allow trivial iovs in simple mode"""
239 db = self.make_mock_db()
240 updater = RunningTagUpdater(db, "running", "staging", (2, 5), Mode.SIMPLE)
241
242 # One payload only but not the correct one
243 payloads = self.make_payloads(A=[(1, 0, -1, -1)])
244 with self.assertRaises(RunningTagUpdaterError) as ctx:
245 updater._check_staging_tag_simple("testsimple", payloads)
246 self.assertEqual(ctx.exception.extra_vars['wrong validity'], 1)
247
248 payloads = self.make_payloads(A=[(0, 1, -1, -1)])
249 with self.assertRaises(RunningTagUpdaterError) as ctx:
250 updater._check_staging_tag_simple("testsimple", payloads)
251 self.assertEqual(ctx.exception.extra_vars['wrong validity'], 1)
252
253 payloads = self.make_payloads(A=[(0, 0, 1, -1)])
254 with self.assertRaises(RunningTagUpdaterError) as ctx:
255 updater._check_staging_tag_simple("testsimple", payloads)
256 self.assertEqual(ctx.exception.extra_vars['wrong validity'], 1)
257
258 payloads = self.make_payloads(A=[(0, 0, 1, 1)])
259 with self.assertRaises(RunningTagUpdaterError) as ctx:
260 updater._check_staging_tag_simple("testsimple", payloads)
261 self.assertEqual(ctx.exception.extra_vars['wrong validity'], 1)
262
263 # but 0,0,-1,-1 is fine
264 payloads = self.make_payloads(A=[(0, 0, -1, -1)], B=[(0, 0, -1, -1)])
265 updater._check_staging_tag_simple("testsimple", payloads)
266
267 # but only one per payload
268 payloads = self.make_payloads(A=[(0, 0, -1, -1), (0, 0, -1, -1), (1, 2, 3, 4)],
269 B=[(0, 0, -1, -1)], C=[(0, 1, 2, 3)])
270 with self.assertRaises(RunningTagUpdaterError) as ctx:
271 updater._check_staging_tag_simple("testsimple", payloads)
272 self.assertEqual(ctx.exception.extra_vars['wrong validity'], 2)
273 self.assertEqual(ctx.exception.extra_vars['duplicate payloads'], 2)
274
276 """Test that we get errors on iovs not being open in the staging"""
277 db = self.make_mock_db()
278 updater = RunningTagUpdater(db, "running", "staging", (2, 5), Mode.STRICT)
279
280 # only check we do is that payloads end before the first valid run or are open
281 payloads = self.make_payloads(A=[(0, 0, 1, -1), (2, 0, 2, 4)], B=[(0, 0, -1, -1)])
282 updater._check_running_tag("testrunning", payloads)
283
284 # so we want an error even if anything closes on the same run
285 payloads = self.make_payloads(A=[(0, 0, 1, -1), (2, 0, 2, 5)], B=[(0, 0, -1, -1)], C=[(0, 0, 10, -1)])
286 with self.assertRaisesRegex(RunningTagUpdaterError, "Given first valid run conflicts with running tag") as ctx:
287 updater._check_running_tag("testrunning", payloads)
288 self.assertEqual(ctx.exception.extra_vars['payloads end after first valid run'], 2)
289
290 # also complain if it starts after the first valid run
291 payloads = self.make_payloads(A=[(2, 5, -1, -1)], B=[(0, 0, 2, 1), (2, 2, -1, -1)], C=[(10, 0, -1, -1)])
292 with self.assertRaisesRegex(RunningTagUpdaterError, "Given first valid run conflicts with running tag") as ctx:
293 updater._check_running_tag("testrunning", payloads)
294 self.assertEqual(ctx.exception.extra_vars['payloads start after first valid run'], 2)
295
297 """Test something useful."""
298 running = self.make_payloads(**self.RUNNING_BASE)
299 staging = self.make_payloads(
300 A=[(0, 0, -1, -1)],
301 B=[(0, 0, -1, -1)],
302 C=[(0, 0, -1, -1)],
303 start_revision=10)
304 db = self.make_mock_db(running_payloads=running, staging_payloads=staging)
305 updater = RunningTagUpdater(db, "running", "staging", (2, 0), Mode.STRICT)
306 result = updater.calculate_update()
307 self.assertEqual(self.parse_operations(result),
308 [
309 ['CLOSE', 'A', 4, (1, 12, 1, -1)],
310 ['CLOSE', 'B', 1, (0, 0, 1, -1)],
311 ['CREATE', 'A', 10, (2, 0, -1, -1)],
312 ['CREATE', 'B', 10, (2, 0, -1, -1)],
313 ['CREATE', 'C', 10, (2, 0, -1, -1)],
314 ])
315
317 """
318 In full mode we close B even if there's nothing in the staging tag
319 to allow "removing" payloads from the set of valid ones
320 """
321 running = self.make_payloads(**self.RUNNING_BASE)
322 staging = self.make_payloads(
323 A=[(0, 0, -1, -1)],
324 C=[(0, 0, -1, -1)],
325 start_revision=10)
326 db = self.make_mock_db(running_payloads=running, staging_payloads=staging)
327 updater = RunningTagUpdater(db, "running", "staging", (2, 0), Mode.FULL_REPLACEMENT)
328 result = updater.calculate_update()
329 self.assertEqual(self.parse_operations(result),
330 [
331 ['CLOSE', 'A', 4, (1, 12, 1, -1)],
332 ['CLOSE', 'B', 1, (0, 0, 1, -1)],
333 ['CREATE', 'A', 10, (2, 0, -1, -1)],
334 ['CREATE', 'C', 10, (2, 0, -1, -1)],
335 ])
336
338 """This time B doesn't start on the spot but a bit later. And A is a list of iovs to be preserved"""
339 running = self.make_payloads(**self.RUNNING_BASE)
340 staging = self.make_payloads(
341 A=[(0, 0, 2, 5), (2, 6, 2, 10), (2, 11, -1, -1)],
342 B=[(2, 8, -1, -1)],
343 C=[(2, 0, -1, -1)],
344 start_revision=10)
345 db = self.make_mock_db(running_payloads=running, staging_payloads=staging)
346 updater = RunningTagUpdater(db, "running", "staging", (2, 0), Mode.STRICT)
347 result = updater.calculate_update()
348 self.assertEqual(self.parse_operations(result),
349 [
350 ['CLOSE', 'A', 4, (1, 12, 1, -1)],
351 ['CLOSE', 'B', 1, (0, 0, 2, 7)],
352 ['CREATE', 'A', 10, (2, 0, 2, 5)],
353 ['CREATE', 'A', 11, (2, 6, 2, 10)],
354 ['CREATE', 'A', 12, (2, 11, -1, -1)],
355 ['CREATE', 'B', 10, (2, 8, -1, -1)],
356 ['CREATE', 'C', 10, (2, 0, -1, -1)],
357 ])
358
360 """If the staging payloads have same revision as the last one in running merge them"""
361 running = self.make_payloads(**self.RUNNING_BASE)
362 staging = self.make_payloads(
363 A=[(0, 0, 2, 5), (2, 6, 2, 10), (2, 11, -1, -1)],
364 B=[(2, 8, -1, -1)],
365 C=[(2, 0, -1, -1)],
366 start_revision=10)
367 # we nee to have the same revision for merging though so fudge a bit
368 staging[0].revision = 4
369 staging[-2].revision = 1
370 print([(e.name, e.revision, e.iov) for e in running])
371 print([(e.name, e.revision, e.iov) for e in staging])
372 db = self.make_mock_db(running_payloads=running, staging_payloads=staging)
373 updater = RunningTagUpdater(db, "running", "staging", (2, 0), Mode.STRICT)
374 result = updater.calculate_update()
375 self.assertEqual(self.parse_operations(result),
376 [
377 ['CLOSE', 'A', 4, (1, 12, 2, 5)],
378 ['CREATE', 'A', 11, (2, 6, 2, 10)],
379 ['CREATE', 'A', 12, (2, 11, -1, -1)],
380 ['CREATE', 'C', 10, (2, 0, -1, -1)],
381 ])
382
384 """Test automatic opening of the last iov if necessary"""
385 running = self.make_payloads(**self.RUNNING_BASE)
386 staging = self.make_payloads(
387 A=[(0, 0, 2, 5), (2, 6, 2, 10), (2, 11, 2, -1)],
388 B=[(2, 8, 2, 10), (2, 11, 2, 28)],
389 C=[(2, 0, 2, 100)],
390 start_revision=10)
391 db = self.make_mock_db(running_payloads=running, staging_payloads=staging)
392 updater = RunningTagUpdater(db, "running", "staging", (2, 0), Mode.FIX_CLOSED)
393 result = updater.calculate_update()
394 self.assertEqual(self.parse_operations(result),
395 [
396 ['CLOSE', 'A', 4, (1, 12, 1, -1)],
397 ['CLOSE', 'B', 1, (0, 0, 2, 7)],
398 ['CREATE', 'A', 10, (2, 0, 2, 5)],
399 ['CREATE', 'A', 11, (2, 6, 2, 10)],
400 ['CREATE', 'A', 12, (2, 11, -1, -1)],
401 ['CREATE', 'B', 10, (2, 8, 2, 10)],
402 ['CREATE', 'B', 11, (2, 11, -1, -1)],
403 ['CREATE', 'C', 10, (2, 0, -1, -1)],
404 ])
405
407 """Test that merging fails if we have gaps or overlaps"""
408 running = self.make_payloads(**self.RUNNING_BASE)
409 staging = self.make_payloads(
410 A=[(0, 0, 2, 5), (2, 5, 2, 10), (2, 11, -1, -1)],
411 B=[(2, 8, 2, 10), (2, 12, -1, -1)],
412 C=[(2, 0, -1, -1)],
413 start_revision=10)
414 db = self.make_mock_db(running_payloads=running, staging_payloads=staging)
415 updater = RunningTagUpdater(db, "running", "staging", (2, 0), Mode.STRICT)
416 with self.assertRaisesRegex(RunningTagUpdaterError, "tag 'staging' not fit for update") as ctx:
417 updater.calculate_update()
418 # check that we have one overlap and one gap using subset comparison logic
419 self.assertLessEqual({'overlaps': 1, 'gaps': 1}.items(), ctx.exception.extra_vars.items())
420
422 """Extract the example from the `b2conditionsdb tag runningupdate` docstring and run it"""
423 running, staging, expected = self.create_payloads_from_text(command_tag_runningupdate.__doc__)
424 # make a copy of the running payloads just to be able to compare later
425 result = running[:]
426 db = self.make_mock_db(running_payloads=running, staging_payloads=staging)
427 updater = RunningTagUpdater(db, "running", "staging", (1, 2), Mode.ALLOW_CLOSED)
428 operations = updater.calculate_update()
429 # check that the update does what we actually wrote in the doc. This will
430 # fail if the docs are updated.
431 self.assertEqual(self.parse_operations(operations),
432 [
433 ['CLOSE', 'payload1', 2, (1, 1, 1, 1)],
434 ['CLOSE', 'payload2', 1, (0, 1, 1, 4)],
435 ['CLOSE', 'payload4', 1, (0, 1, 1, 20)],
436 ['CREATE', 'payload1', 3, (1, 2, 1, 8)],
437 ['CREATE', 'payload1', 4, (1, 9, 1, 20)],
438 ['CREATE', 'payload2', 2, (1, 5, 1, 20)],
439 ['CREATE', 'payload3', 2, (1, 2, -1, -1)],
440 ])
441 # and then apply the operations to the input list and make sure that is
442 # what's written in the doc
443 for op, payload in operations:
444 if op == "CLOSE":
445 # adjust the end of the iov
446 i = result.index(payload)
447 result[i].iov = result[i].iov[:2] + payload.iov[2:]
448 else:
449 # or just add the full payload
450 result.append(payload)
451 result.sort()
452 self.assertEqual(expected, result)
453
455 """Extract the example from the `b2conditionsdb tag runningupdate` docstring and run it"""
456 running, staging, expected = self.create_payloads_from_text(command_tag_runningupdate.__doc__)
457 db = self.make_mock_db(running_payloads=running, staging_payloads=staging)
458 updater = RunningTagUpdater(db, "running", "staging", (1, 2), Mode.FULL_REPLACEMENT)
459 operations = updater.calculate_update()
460 # check that the update does what we actually wrote in the doc. This will
461 # fail if the docs are updated.
462 self.assertEqual(self.parse_operations(operations),
463 [
464 ['CLOSE', 'payload1', 2, (1, 1, 1, 1)],
465 ['CLOSE', 'payload2', 1, (0, 1, 1, 4)],
466 ['CLOSE', 'payload4', 1, (0, 1, 1, 20)],
467 ['CLOSE', 'payload5', 1, (0, 1, 1, 1)],
468 ['CREATE', 'payload1', 3, (1, 2, 1, 8)],
469 ['CREATE', 'payload1', 4, (1, 9, 1, 20)],
470 ['CREATE', 'payload2', 2, (1, 5, 1, 20)],
471 ['CREATE', 'payload3', 2, (1, 2, -1, -1)],
472 ])
473
475 """Extract the example from the `b2conditionsdb tag runningupdate` docstring and run it"""
476 running, staging, expected = self.create_payloads_from_text(command_tag_runningupdate.__doc__)
477 db = self.make_mock_db(running_payloads=running, staging_payloads=staging)
478 updater = RunningTagUpdater(db, "running", "staging", (1, 2), Mode.FIX_CLOSED)
479 operations = updater.calculate_update()
480 # check that the update does what we actually wrote in the doc. This will
481 # fail if the docs are updated.
482 self.assertEqual(self.parse_operations(operations),
483 [
484 ['CLOSE', 'payload1', 2, (1, 1, 1, 1)],
485 ['CLOSE', 'payload2', 1, (0, 1, 1, 4)],
486 ['CREATE', 'payload1', 3, (1, 2, 1, 8)],
487 ['CREATE', 'payload1', 4, (1, 9, -1, -1)],
488 ['CREATE', 'payload2', 2, (1, 5, -1, -1)],
489 ['CREATE', 'payload3', 2, (1, 2, -1, -1)],
490 ])
491
492
493if __name__ == "__main__":
494 # test everything
495 unittest.main()
def make_mock_db(self, running_state="RUNNING", staging_state="VALIDATED", running_payloads=None, staging_payloads=None)
dict RUNNING_BASE
Basic definition of a running tag to be used later.
def make_payloads(self, start_revision=1, **payloads)