Belle II Software  release-08-01-10
test_support.py
1 
8 
9 # This file includes utilities for the unittests and is not needed during production cases
10 import atexit
11 import json
12 import os
13 import shutil
14 import signal
15 import subprocess
16 import tempfile
17 from time import sleep, time
18 from unittest import TestCase
19 import zmq
20 
21 
22 
23 ZMQ_TEST_FOR_LOOPS = 5
24 
25 
26 ZMQ_TEST_MAX_FAILURES = 1
27 
28 
29 class HLTZMQTestCase(TestCase):
30  """
31  Base class for all HLT ZMQ tests helping to start the needed programs,
32  create ZMQ sockets and send/receive messages easily.
33  Has also some functionality for asserting test-correctness
34  """
35 
36  ctx = zmq.Context()
37 
38  needed_programs = dict()
39 
40  @staticmethod
42  """
43  Get a free port number by reusing ZMQ's function for this.
44  """
45  socket = HLTZMQTestCase.ctx.socket(zmq.ROUTER)
46  port = socket.bind_to_random_port("tcp://*")
47  socket.close()
48  return port
49 
50  def setUp(self):
51  """
52  Custom setUp function to go into a temporary folder
53  and start the needed programs.
54  """
55 
56  self.test_dirtest_dir = tempfile.mkdtemp()
57 
58  self.previous_dirprevious_dir = os.getcwd()
59  os.chdir(self.test_dirtest_dir)
60 
61 
62  self.started_programsstarted_programs = dict()
63  for name, command in self.needed_programsneeded_programs.items():
64  self.started_programsstarted_programs[name] = subprocess.Popen(command, start_new_session=True)
65  self.assertIsRunningassertIsRunning(name)
66 
67  atexit._clear()
68  atexit.register(self.tearDowntearDown)
69 
70  def tearDown(self):
71  """
72  Custom tearDown function to kill the started programs if still present
73  and remove the temporary folder again.
74  """
75  for name, process in self.started_programsstarted_programs.items():
76  if self._is_running_is_running(name):
77  os.killpg(process.pid, signal.SIGKILL)
78  process.wait()
79  os.chdir(self.previous_dirprevious_dir)
80  shutil.rmtree(self.test_dirtest_dir)
81 
82  atexit._clear()
83 
84  def _is_running(self, name):
85  """
86  Check if a given program is still running.
87  """
88  process = self.started_programsstarted_programs[name]
89  pid, sts = process._try_wait(os.WNOHANG)
90  assert pid == process.pid or pid == 0
91  return pid == 0
92 
93  def assertIsDown(self, name, timeout=5, minimum_delay=0.1):
94  """
95  Test helper to assert the given program has terminated - at least after timeout in seconds has passed.
96  Checks every "minimal_delay seconds.
97  """
98  endtime = time() + timeout
99  while True:
100  if not self._is_running_is_running(name):
101  return
102 
103  remaining = endtime - time()
104  self.assertFalse(remaining <= 0)
105 
106  sleep(minimum_delay)
107 
108  def assertIsRunning(self, name):
109  """
110  Assert that a given program is still running.
111  """
112  self.assertTrue(self._is_running_is_running(name))
113 
114  @staticmethod
115  def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False):
116  """
117  Create and return a ZMQ socket with the given type and identity and
118  bind or connect it to localhost and the given port.
119  """
120  socket = HLTZMQTestCase.ctx.socket(socket_type)
121  socket.rcvtimeo = 10000
122  socket.linger = 0
123  if identity:
124  socket.setsockopt_string(zmq.IDENTITY, identity)
125  if bind:
126  if port is None:
127  port = socket.bind_to_random_port("tcp://*")
128  return socket, port
129  else:
130  socket.bind(f"tcp://*:{port}")
131  else:
132  if port is None:
133  raise RuntimeError("Cannot connect to unknown port")
134 
135  socket.connect(f"tcp://localhost:{port}")
136 
137  return socket
138 
139  @staticmethod
141  """
142  Shortcut to create a ROUTER type socket with the typical parameters
143  binding to the given port.
144  """
145  return HLTZMQTestCase.create_socket(port, socket_type=zmq.ROUTER, identity="", bind=True)
146 
147  @staticmethod
148  def send(socket, message_type, first_data=b"", second_data=b"", identity=""):
149  """
150  Send a message consisting of the message type, the first and the second data
151  either to the identity if given or without identity if omitted.
152  """
153  if identity:
154  socket.send_multipart([identity.encode(), message_type.encode(), first_data, second_data])
155  else:
156  socket.send_multipart([message_type.encode(), first_data, second_data])
157 
158  @staticmethod
159  def recv(socket):
160  """
161  Try to receive a message from the socket (or throw an assertion error if none comes after the set timeout
162  of the socket).
163  """
164  try:
165  return socket.recv_multipart()
166  except zmq.error.Again:
167  raise AssertionError("No answer from socket")
168 
169  def assertMonitoring(self, socket, search_key, search_value, timeout=10):
170  """
171  Ask the given socket for a monitoring JSON and make sure, the value related to "search_key"
172  is set to "search_value" - at least after the given timeout.
173  The search key can should be in the form "<category>.<key>".
174  """
175  end_time = time() + timeout
176  monitoring = dict()
177  while time() < end_time:
178  HLTZMQTestCase.send(socket, "m")
179  answer = self.assertIsAndGetassertIsAndGet(socket, "c")
180 
181  dict_monitoring = json.loads(answer[1])
182  for parent_key, parent_dict in dict_monitoring.items():
183  for key, value in parent_dict.items():
184  monitoring[parent_key + "." + key] = value
185 
186  if search_key in monitoring and monitoring[search_key] == search_value:
187  break
188  else:
189  if monitoring:
190  if search_key not in monitoring:
191  raise AssertionError(f"Monitoring did not have a result with key {search_key}")
192  else:
193  raise AssertionError(
194  f"Monitoring did not show the result {search_value} for {search_key}, instead {monitoring[search_key]}")
195  else:
196  raise AssertionError("Monitoring did not answer in time.")
197 
198  self.assertNothingMoreassertNothingMore(socket)
199 
200  def assertIsAndGet(self, socket, message_type, final=True, router=False):
201  """
202  Assert that the next message received on the socket has the given message type.
203  If final is set to True, also assert that there is no additional message on the socket.
204  Use router only for router sockets.
205  """
206  answer = HLTZMQTestCase.recv(socket)
207  type_index = 0
208  if router:
209  type_index = 1
210  self.assertEqual(answer[type_index], message_type.encode())
211  if final:
212  self.assertNothingMoreassertNothingMore(socket)
213  return answer
214 
215  def assertIsMsgType(self, socket, message_type, final=True, router=False):
216  """
217  Deprecated copy of "assertIsAndGet".
218  """
219  return self.assertIsAndGetassertIsAndGet(socket, message_type, final=final, router=router)
220 
221  def assertNothingMore(self, socket):
222  """
223  Assert that there is no pending message to be received on the socket.
224  """
225  self.assertFalse(socket.poll(0))
226 
227  def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1):
228  """
229  Assert that - at least after the given timeout - the output file
230  is present. If unlink is set to True, remove the file after checking.
231  """
232  endtime = time() + timeout
233 
234  while True:
235  if os.path.exists(output_file):
236  if unlink:
237  os.unlink(output_file)
238  return
239 
240  remaining = endtime - time()
241  self.assertFalse(remaining <= 0)
242 
243  sleep(minimum_delay)
244 
245  def assertNotHasOutputFile(self, output_file, timeout=0.5):
246  """
247  Assert that after the timeout the given file is not present
248  (a.k.a. no process has created it)
249  """
250  sleep(timeout)
251  self.assertFalse(os.path.exists(output_file))
252 
253 
255  """
256  As the collectors are mostly equal, use a common base test case class
257  """
258 
259  final_collector = False
260 
261  def setUp(self):
262  """Setup port numbers and necessary programs"""
263 
264  self.input_portinput_port = HLTZMQTestCase.get_free_port()
265 
266  self.output_portoutput_port = HLTZMQTestCase.get_free_port()
267 
268  self.monitoring_portmonitoring_port = HLTZMQTestCase.get_free_port()
269 
270  command = "b2hlt_finalcollector" if self.final_collectorfinal_collector else "b2hlt_collector"
271  output = "localhost" if self.final_collectorfinal_collector else "*"
272  self.needed_programsneeded_programsneeded_programs = {
273  "collector": [
274  command,
275  "--input", f"tcp://*:{self.input_port}",
276  "--output", f"tcp://{output}:{self.output_port}",
277  "--monitor", f"tcp://*:{self.monitoring_port}"
278  ]
279  }
280  # programs are setup, call parent setup function now
281  super().setUp()
282 
284  """create the output socket depending if final collector or not"""
285  if self.final_collectorfinal_collector:
286  output_socket = self.create_socketcreate_socket(self.output_portoutput_port, socket_type=zmq.STREAM, bind=True)
287  output_socket.send(b"")
288  self.recvrecv(output_socket)
289  else:
290  output_socket = self.create_socketcreate_socket(self.output_portoutput_port)
291  self.sendsend(output_socket, "r")
292  return output_socket
293 
294  def get_signal(self, output_socket, signal_type):
295  """get a signal from the socket depending if final collector or not"""
296  if self.final_collectorfinal_collector:
297  self.assertNothingMoreassertNothingMore(output_socket)
298  else:
299  self.assertIsMsgTypeassertIsMsgType(output_socket, signal_type, final=True)
300 
301  def get_event(self, output_socket):
302  """get an event from the socket depending if final collector or not"""
303  if self.final_collectorfinal_collector:
304  self.recvrecv(output_socket)
305  self.assertNothingMoreassertNothingMore(output_socket)
306  else:
307  self.assertIsMsgTypeassertIsMsgType(output_socket, "u")
308 
310  """test function"""
311  monitoring_socket = self.create_socketcreate_socket(self.monitoring_portmonitoring_port)
312 
313  # Register first worker
314  input_socket = self.create_socketcreate_socket(self.input_portinput_port)
315  self.sendsend(input_socket, "h")
316 
317  # However the collector is only polling for ready now...
318  self.assertNothingMoreassertNothingMore(input_socket)
319  self.assertMonitoringassertMonitoring(monitoring_socket, "input.registered_workers", 0)
320 
321  # To make it actually look for input messages, it needs at least a single worker
322  output_socket = self.create_output_socketcreate_output_socket()
323 
324  # Now we can go on
325  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
326  self.assertMonitoringassertMonitoring(monitoring_socket, "input.registered_workers", 1)
327 
328  # Register second worker
329  second_input_socket = self.create_socketcreate_socket(self.input_portinput_port, identity="other_socket")
330  self.sendsend(second_input_socket, "h")
331  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
332  self.assertMonitoringassertMonitoring(monitoring_socket, "input.registered_workers", 2)
333 
334  # So far no stop messages should be there
335  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
336  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_stop_messages", False)
337 
338  # the first stop message should not trigger a transmission
339  self.sendsend(input_socket, "l")
340  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
341  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_stop_messages", 1)
342  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_stop_messages", False)
343  self.assertNothingMoreassertNothingMore(output_socket)
344 
345  # The second stop message should
346  self.sendsend(second_input_socket, "l")
347  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
348  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_stop_messages", 2)
349  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_stop_messages", True)
350 
351  self.get_signalget_signal(output_socket, "l")
352 
353  # Another stop message should not change anything
354  self.sendsend(input_socket, "l")
355  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
356  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_stop_messages", 2)
357  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_stop_messages", True)
358  self.assertNothingMoreassertNothingMore(output_socket)
359 
360  # But if we reset...
361  self.sendsend(monitoring_socket, "n")
362  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
363  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_stop_messages", False)
364 
365  # .. it should
366  self.sendsend(input_socket, "l")
367  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
368  self.sendsend(second_input_socket, "l")
369  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
370 
371  self.get_signalget_signal(output_socket, "l")
372 
373  # Now we reset again
374  self.sendsend(monitoring_socket, "n")
375  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
376  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_stop_messages", False)
377 
378  # send just one stop message
379  self.sendsend(input_socket, "l")
380  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
381 
382  # and unregister the second
383  self.sendsend(second_input_socket, "d", b"other_socket")
384  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
385  self.assertMonitoringassertMonitoring(monitoring_socket, "input.registered_workers", 1)
386 
387  # which should also give us a stop message
388  self.get_signalget_signal(output_socket, "l")
389  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_stop_messages", 1)
390  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_stop_messages", True)
391 
392  # lets register and reset it again
393  self.sendsend(second_input_socket, "h")
394  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
395  self.sendsend(monitoring_socket, "n")
396  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
397  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_stop_messages", False)
398  self.assertMonitoringassertMonitoring(monitoring_socket, "input.registered_workers", 2)
399 
400  # and try the other way round: first unregister, then send stop
401  self.sendsend(second_input_socket, "d", b"other_socket")
402  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
403  self.sendsend(input_socket, "l")
404  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
405  self.assertMonitoringassertMonitoring(monitoring_socket, "input.registered_workers", 1)
406  # which should also give us a stop message
407  self.get_signalget_signal(output_socket, "l")
408  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_stop_messages", 1)
409  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_stop_messages", True)
410 
411  # reset the state
412  self.sendsend(second_input_socket, "h")
413  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
414  self.sendsend(monitoring_socket, "n")
415  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
416  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_stop_messages", False)
417  self.assertMonitoringassertMonitoring(monitoring_socket, "input.registered_workers", 2)
418 
419  # The same applies to terminate messages:
420  # Nothing at the beginning
421  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_terminate_messages", 0)
422  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_terminate_messages", False)
423 
424  # the first terminate message should not trigger a transmission
425  self.sendsend(input_socket, "x")
426  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
427  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_terminate_messages", 1)
428  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_terminate_messages", False)
429  self.assertNothingMoreassertNothingMore(output_socket)
430 
431  # Another terminate message should not change anything
432  self.sendsend(input_socket, "x")
433  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
434  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_terminate_messages", 1)
435  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_terminate_messages", False)
436  self.assertNothingMoreassertNothingMore(output_socket)
437 
438  # But if we reset...
439  self.sendsend(monitoring_socket, "n")
440  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_terminate_messages", 0)
441  self.assertMonitoringassertMonitoring(monitoring_socket, "input.all_terminate_messages", False)
442 
443  # ... and send again ...
444  self.sendsend(input_socket, "x")
445  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
446  self.sendsend(second_input_socket, "x")
447  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
448 
449  self.get_signalget_signal(output_socket, "x")
450 
451  # ... and the collector should have terminated
452  self.assertIsDownassertIsDown("collector")
453 
454  # TODO: test and implement: timeout in wait for stop or terminate messages
455 
457  """test function"""
458  # To make it actually look for input messages, it needs at least a single worker
459  self.create_output_socketcreate_output_socket()
460 
461  # Register first worker
462  input_socket = self.create_socketcreate_socket(self.input_portinput_port)
463  self.sendsend(input_socket, "h")
464  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
465 
466  # Send with a second, unregistered worker
467  second_input_socket = self.create_socketcreate_socket(self.input_portinput_port, identity="other_socket")
468  self.sendsend(second_input_socket, "l")
469 
470  # The collector should die
471  self.assertIsDownassertIsDown("collector")
472 
474  """test function"""
475  monitoring_socket = self.create_socketcreate_socket(self.monitoring_portmonitoring_port)
476 
477  # Send two ready messages from the first socket
478  output_socket = self.create_output_socketcreate_output_socket()
479  if not self.final_collectorfinal_collector:
480  self.sendsend(output_socket, "r")
481 
482  self.assertMonitoringassertMonitoring(monitoring_socket, "output.ready_queue_size", 2)
483  self.assertMonitoringassertMonitoring(monitoring_socket, "output.registered_workers", 1)
484 
485  if not self.final_collectorfinal_collector:
486  # Send two ready message from the second socket (the last one is needed to keep the collector listening)
487  second_output_socket = self.create_socketcreate_socket(self.output_portoutput_port, identity="other_socket")
488  self.sendsend(second_output_socket, "r")
489  self.sendsend(second_output_socket, "r")
490 
491  self.assertMonitoringassertMonitoring(monitoring_socket, "output.ready_queue_size", 4)
492  self.assertMonitoringassertMonitoring(monitoring_socket, "output.registered_workers", 2)
493 
494  # Register two workers
495  input_socket = self.create_socketcreate_socket(self.input_portinput_port)
496  self.sendsend(input_socket, "h")
497  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
498  self.assertMonitoringassertMonitoring(monitoring_socket, "input.registered_workers", 1)
499 
500  second_input_socket = self.create_socketcreate_socket(self.input_portinput_port, identity="other_socket")
501  self.sendsend(second_input_socket, "h")
502  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
503  self.assertMonitoringassertMonitoring(monitoring_socket, "input.registered_workers", 2)
504 
505  # The first event should go to the first worker
506  self.sendsend(input_socket, "u", b"event data")
507  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
508 
509  self.get_eventget_event(output_socket)
510  if not self.final_collectorfinal_collector:
511  self.assertNothingMoreassertNothingMore(second_output_socket)
512 
513  # The second also
514  self.sendsend(second_input_socket, "u", b"event data")
515  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
516 
517  self.get_eventget_event(output_socket)
518  if not self.final_collectorfinal_collector:
519  self.assertNothingMoreassertNothingMore(second_output_socket)
520 
521  # But the third to the second worker
522  self.sendsend(input_socket, "u", b"event data")
523  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
524 
525  if not self.final_collectorfinal_collector:
526  self.assertNothingMoreassertNothingMore(output_socket)
527  self.get_eventget_event(second_output_socket)
528  else:
529  self.get_eventget_event(output_socket)
530 
531  # A stop message should be sent to all workers
532  self.sendsend(input_socket, "l")
533  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
534 
535  # But only if it is complete...
536  self.assertNothingMoreassertNothingMore(output_socket)
537  if not self.final_collectorfinal_collector:
538  self.assertNothingMoreassertNothingMore(second_output_socket)
539 
540  self.sendsend(second_input_socket, "l")
541  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
542 
543  self.get_signalget_signal(output_socket, "l")
544  if not self.final_collectorfinal_collector:
545  self.get_signalget_signal(second_output_socket, "l")
546 
547  # TODO: Test and implement: should not transmit events after stop
548 
549  # As well as a terminate message
550  self.sendsend(input_socket, "x")
551  self.assertIsMsgTypeassertIsMsgType(input_socket, "c")
552 
553  self.assertNothingMoreassertNothingMore(output_socket)
554  if not self.final_collectorfinal_collector:
555  self.assertNothingMoreassertNothingMore(second_output_socket)
556 
557  self.sendsend(second_input_socket, "x")
558  self.assertIsMsgTypeassertIsMsgType(second_input_socket, "c")
559 
560  self.get_signalget_signal(output_socket, "x")
561  if not self.final_collectorfinal_collector:
562  self.get_signalget_signal(second_output_socket, "x")
563 
564  self.assertIsDownassertIsDown("collector")
def get_signal(self, output_socket, signal_type)
def assertIsMsgType(self, socket, message_type, final=True, router=False)
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
Definition: test_support.py:38
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
started_programs
dict for all started programs
Definition: test_support.py:62
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def assertNotHasOutputFile(self, output_file, timeout=0.5)
def assertIsAndGet(self, socket, message_type, final=True, router=False)
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)
Definition: test_support.py:93
previous_dir
remember current working directory
Definition: test_support.py:58
test_dir
use a temporary folder for testing
Definition: test_support.py:56