Belle II Software  release-05-02-19
test_support.py
1 # This file includes utilities for the unittests and is not needed during production cases
2 import atexit
3 import json
4 import os
5 import shutil
6 import signal
7 import subprocess
8 import sys
9 import tempfile
10 from contextlib import contextmanager
11 from time import sleep, time
12 from typing import Dict, List
13 from unittest import TestCase
14 import zmq
15 
16 
17 class HLTZMQTestCase(TestCase):
18  """
19  Base class for all HLT ZMQ tests helping to start the needed programs,
20  create ZMQ sockets and send/receive messages easily.
21  Has also some functionality for asserting test-correctness
22  """
23 
24  ctx = zmq.Context()
25 
26  needed_programs = dict()
27 
28  @staticmethod
30  """
31  Get a free port number by reusing ZMQ's function for this.
32  """
33  socket = HLTZMQTestCase.ctx.socket(zmq.ROUTER)
34  port = socket.bind_to_random_port("tcp://*")
35  socket.close()
36  return port
37 
38  def setUp(self):
39  """
40  Custom setUp function to go into a temporary folder
41  and start the needed programs.
42  """
43 
44  self.test_dir = tempfile.mkdtemp()
45 
46  self.previous_dir = os.getcwd()
47  os.chdir(self.test_dir)
48 
49 
50  self.started_programs = dict()
51  for name, command in self.needed_programs.items():
52  self.started_programs[name] = subprocess.Popen(command, start_new_session=True)
53  self.assertIsRunning(name)
54 
55  atexit._clear()
56  atexit.register(self.tearDown)
57 
58  def tearDown(self):
59  """
60  Custom tearDown function to kill the started programs if still present
61  and remove the temporary folder again.
62  """
63  for name, process in self.started_programs.items():
64  if self._is_running(name):
65  os.killpg(process.pid, signal.SIGKILL)
66  process.wait()
67  os.chdir(self.previous_dir)
68  shutil.rmtree(self.test_dir)
69 
70  atexit._clear()
71 
72  def _is_running(self, name):
73  """
74  Check if a given program is still running.
75  """
76  process = self.started_programs[name]
77  pid, sts = process._try_wait(os.WNOHANG)
78  assert pid == process.pid or pid == 0
79  return pid == 0
80 
81  def assertIsDown(self, name, timeout=5, minimum_delay=0.1):
82  """
83  Test helper to assert the given program has terminated - at least after timeout in seconds has passed.
84  Checks every "minimal_delay seconds.
85  """
86  endtime = time() + timeout
87  while True:
88  if not self._is_running(name):
89  return
90 
91  remaining = endtime - time()
92  self.assertFalse(remaining <= 0)
93 
94  sleep(minimum_delay)
95 
96  def assertIsRunning(self, name):
97  """
98  Assert that a given program is still running.
99  """
100  self.assertTrue(self._is_running(name))
101 
102  @staticmethod
103  def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False):
104  """
105  Create and return a ZMQ socket with the given type and identity and
106  bind or connect it to localhost and the given port.
107  """
108  socket = HLTZMQTestCase.ctx.socket(socket_type)
109  socket.rcvtimeo = 10000
110  socket.linger = 0
111  if identity:
112  socket.setsockopt_string(zmq.IDENTITY, identity)
113  if bind:
114  if port is None:
115  port = socket.bind_to_random_port("tcp://*")
116  return socket, port
117  else:
118  socket.bind(f"tcp://*:{port}")
119  else:
120  if port is None:
121  raise RuntimeError("Cannot connect to unknown port")
122 
123  socket.connect(f"tcp://localhost:{port}")
124 
125  return socket
126 
127  @staticmethod
129  """
130  Shortcut to create a ROUTER type socket with the typical parameters
131  binding to the given port.
132  """
133  return HLTZMQTestCase.create_socket(port, socket_type=zmq.ROUTER, identity="", bind=True)
134 
135  @staticmethod
136  def send(socket, message_type, first_data=b"", second_data=b"", identity=""):
137  """
138  Send a message consisting of the message type, the first and the second data
139  either to the identity if given or without identity if omitted.
140  """
141  if identity:
142  socket.send_multipart([identity.encode(), message_type.encode(), first_data, second_data])
143  else:
144  socket.send_multipart([message_type.encode(), first_data, second_data])
145 
146  @staticmethod
147  def recv(socket):
148  """
149  Try to receive a message from the socket (or throw an assertion error if none comes after the set timeout
150  of the socket).
151  """
152  try:
153  return socket.recv_multipart()
154  except zmq.error.Again:
155  raise AssertionError("No answer from socket")
156 
157  def assertMonitoring(self, socket, search_key, search_value, timeout=10):
158  """
159  Ask the given socket for a monitoring JSON and make sure, the value related to "search_key"
160  is set to "search_value" - at least after the given timeout.
161  The search key can should be in the form "<category>.<key>".
162  """
163  end_time = time() + timeout
164  monitoring = dict()
165  while time() < end_time:
166  HLTZMQTestCase.send(socket, "m")
167  answer = self.assertIsAndGet(socket, "c")
168 
169  dict_monitoring = json.loads(answer[1])
170  for parent_key, parent_dict in dict_monitoring.items():
171  for key, value in parent_dict.items():
172  monitoring[parent_key + "." + key] = value
173 
174  if search_key in monitoring and monitoring[search_key] == search_value:
175  break
176  else:
177  if monitoring:
178  if search_key not in monitoring:
179  raise AssertionError(f"Monitoring did not have a result with key {search_key}")
180  else:
181  raise AssertionError(
182  f"Monitoring did not show the result {search_value} for {search_key}, instead {monitoring[search_key]}")
183  else:
184  raise AssertionError(f"Monitoring did not answer in time.")
185 
186  self.assertNothingMore(socket)
187 
188  def assertIsAndGet(self, socket, message_type, final=True, router=False):
189  """
190  Assert that the next message received on the socket has the given message type.
191  If final is set to True, also assert that there is no additional message on the socket.
192  Use router only for router sockets.
193  """
194  answer = HLTZMQTestCase.recv(socket)
195  type_index = 0
196  if router:
197  type_index = 1
198  self.assertEqual(answer[type_index], message_type.encode())
199  if final:
200  self.assertNothingMore(socket)
201  return answer
202 
203  def assertIsMsgType(self, socket, message_type, final=True, router=False):
204  """
205  Deprecated copy of "assertIsAndGet".
206  """
207  return self.assertIsAndGet(socket, message_type, final=final, router=router)
208 
209  def assertNothingMore(self, socket):
210  """
211  Assert that there is no pending message to be received on the socket.
212  """
213  self.assertFalse(socket.poll(0))
214 
215  def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1):
216  """
217  Assert that - at least after the given timeout - the output file
218  is present. If unlink is set to True, remove the file after checking.
219  """
220  endtime = time() + timeout
221 
222  while True:
223  if os.path.exists(output_file):
224  if unlink:
225  os.unlink(output_file)
226  return
227 
228  remaining = endtime - time()
229  self.assertFalse(remaining <= 0)
230 
231  sleep(minimum_delay)
232 
233  def assertNotHasOutputFile(self, output_file, timeout=0.5):
234  """
235  Assert that after the timeout the given file is not present
236  (a.k.a. no process has created it)
237  """
238  sleep(timeout)
239  self.assertFalse(os.path.exists(output_file))
240 
241 
243  """
244  As the collectors are mostly equal, use a common base test case class
245  """
246 
247  final_collector = False
248 
249  def setUp(self):
250  """Setup port numbers and necessary programs"""
251 
252  self.input_port = HLTZMQTestCase.get_free_port()
253 
254  self.output_port = HLTZMQTestCase.get_free_port()
255 
256  self.monitoring_port = HLTZMQTestCase.get_free_port()
257 
258  command = "b2hlt_finalcollector" if self.final_collector else "b2hlt_collector"
259  output = "localhost" if self.final_collector else "*"
260  self.needed_programs = {
261  "collector": [
262  command,
263  "--input", f"tcp://*:{self.input_port}",
264  "--output", f"tcp://{output}:{self.output_port}",
265  "--monitor", f"tcp://*:{self.monitoring_port}"
266  ]
267  }
268  # programs are setup, call parent setup function now
269  super().setUp()
270 
272  """create the output socket depending if final collector or not"""
273  if self.final_collector:
274  output_socket = self.create_socket(self.output_port, socket_type=zmq.STREAM, bind=True)
275  output_socket.send(b"")
276  self.recv(output_socket)
277  else:
278  output_socket = self.create_socket(self.output_port)
279  self.send(output_socket, "r")
280  return output_socket
281 
282  def get_signal(self, output_socket, signal_type):
283  """get a signal from the socket depending if final collector or not"""
284  if self.final_collector:
285  self.assertNothingMore(output_socket)
286  else:
287  self.assertIsMsgType(output_socket, signal_type, final=True)
288 
289  def get_event(self, output_socket):
290  """get an event from the socket depending if final collector or not"""
291  if self.final_collector:
292  self.recv(output_socket)
293  self.assertNothingMore(output_socket)
294  else:
295  self.assertIsMsgType(output_socket, "u")
296 
298  """test function"""
299  monitoring_socket = self.create_socket(self.monitoring_port)
300 
301  # Register first worker
302  input_socket = self.create_socket(self.input_port)
303  self.send(input_socket, "h")
304 
305  # However the collector is only polling for ready now...
306  self.assertNothingMore(input_socket)
307  self.assertMonitoring(monitoring_socket, "input.registered_workers", 0)
308 
309  # To make it actually look for input messages, it needs at least a single worker
310  output_socket = self.create_output_socket()
311 
312  # Now we can go on
313  self.assertIsMsgType(input_socket, "c")
314  self.assertMonitoring(monitoring_socket, "input.registered_workers", 1)
315 
316  # Register second worker
317  second_input_socket = self.create_socket(self.input_port, identity="other_socket")
318  self.send(second_input_socket, "h")
319  self.assertIsMsgType(second_input_socket, "c")
320  self.assertMonitoring(monitoring_socket, "input.registered_workers", 2)
321 
322  # So far no stop messages should be there
323  self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
324  self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
325 
326  # the first stop message should not trigger a transmission
327  self.send(input_socket, "l")
328  self.assertIsMsgType(input_socket, "c")
329  self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 1)
330  self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
331  self.assertNothingMore(output_socket)
332 
333  # The second stop message should
334  self.send(second_input_socket, "l")
335  self.assertIsMsgType(second_input_socket, "c")
336  self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 2)
337  self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
338 
339  self.get_signal(output_socket, "l")
340 
341  # Another stop message should not change anything
342  self.send(input_socket, "l")
343  self.assertIsMsgType(input_socket, "c")
344  self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 2)
345  self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
346  self.assertNothingMore(output_socket)
347 
348  # But if we reset...
349  self.send(monitoring_socket, "n")
350  self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
351  self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
352 
353  # .. it should
354  self.send(input_socket, "l")
355  self.assertIsMsgType(input_socket, "c")
356  self.send(second_input_socket, "l")
357  self.assertIsMsgType(second_input_socket, "c")
358 
359  self.get_signal(output_socket, "l")
360 
361  # Now we reset again
362  self.send(monitoring_socket, "n")
363  self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
364  self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
365 
366  # send just one stop message
367  self.send(input_socket, "l")
368  self.assertIsMsgType(input_socket, "c")
369 
370  # and unregister the second
371  self.send(second_input_socket, "d", b"other_socket")
372  self.assertIsMsgType(second_input_socket, "c")
373  self.assertMonitoring(monitoring_socket, "input.registered_workers", 1)
374 
375  # which should also give us a stop message
376  self.get_signal(output_socket, "l")
377  self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 1)
378  self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
379 
380  # lets register and reset it again
381  self.send(second_input_socket, "h")
382  self.assertIsMsgType(second_input_socket, "c")
383  self.send(monitoring_socket, "n")
384  self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
385  self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
386  self.assertMonitoring(monitoring_socket, "input.registered_workers", 2)
387 
388  # and try the other way round: first unregister, then send stop
389  self.send(second_input_socket, "d", b"other_socket")
390  self.assertIsMsgType(second_input_socket, "c")
391  self.send(input_socket, "l")
392  self.assertIsMsgType(input_socket, "c")
393  self.assertMonitoring(monitoring_socket, "input.registered_workers", 1)
394  # which should also give us a stop message
395  self.get_signal(output_socket, "l")
396  self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 1)
397  self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
398 
399  # reset the state
400  self.send(second_input_socket, "h")
401  self.assertIsMsgType(second_input_socket, "c")
402  self.send(monitoring_socket, "n")
403  self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
404  self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
405  self.assertMonitoring(monitoring_socket, "input.registered_workers", 2)
406 
407  # The same applies to terminate messages:
408  # Nothing at the beginning
409  self.assertMonitoring(monitoring_socket, "input.received_terminate_messages", 0)
410  self.assertMonitoring(monitoring_socket, "input.all_terminate_messages", False)
411 
412  # the first terminate message should not trigger a transmission
413  self.send(input_socket, "x")
414  self.assertIsMsgType(input_socket, "c")
415  self.assertMonitoring(monitoring_socket, "input.received_terminate_messages", 1)
416  self.assertMonitoring(monitoring_socket, "input.all_terminate_messages", False)
417  self.assertNothingMore(output_socket)
418 
419  # Another terminate message should not change anything
420  self.send(input_socket, "x")
421  self.assertIsMsgType(input_socket, "c")
422  self.assertMonitoring(monitoring_socket, "input.received_terminate_messages", 1)
423  self.assertMonitoring(monitoring_socket, "input.all_terminate_messages", False)
424  self.assertNothingMore(output_socket)
425 
426  # But if we reset...
427  self.send(monitoring_socket, "n")
428  self.assertMonitoring(monitoring_socket, "input.received_terminate_messages", 0)
429  self.assertMonitoring(monitoring_socket, "input.all_terminate_messages", False)
430 
431  # ... and send again ...
432  self.send(input_socket, "x")
433  self.assertIsMsgType(input_socket, "c")
434  self.send(second_input_socket, "x")
435  self.assertIsMsgType(second_input_socket, "c")
436 
437  self.get_signal(output_socket, "x")
438 
439  # ... and the collector should have terminated
440  self.assertIsDown("collector")
441 
442  # TODO: test and implement: timeout in wait for stop or terminate messages
443 
445  """test function"""
446  # To make it actually look for input messages, it needs at least a single worker
447  output_socket = self.create_output_socket()
448 
449  # Register first worker
450  input_socket = self.create_socket(self.input_port)
451  self.send(input_socket, "h")
452  self.assertIsMsgType(input_socket, "c")
453 
454  # Send with a second, unregistered worker
455  second_input_socket = self.create_socket(self.input_port, identity="other_socket")
456  self.send(second_input_socket, "l")
457 
458  # The collector should die
459  self.assertIsDown("collector")
460 
462  """test function"""
463  monitoring_socket = self.create_socket(self.monitoring_port)
464 
465  # Send two ready messages from the first socket
466  output_socket = self.create_output_socket()
467  if not self.final_collector:
468  self.send(output_socket, "r")
469 
470  self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 2)
471  self.assertMonitoring(monitoring_socket, "output.registered_workers", 1)
472 
473  if not self.final_collector:
474  # Send two ready message from the second socket (the last one is needed to keep the collector listening)
475  second_output_socket = self.create_socket(self.output_port, identity="other_socket")
476  self.send(second_output_socket, "r")
477  self.send(second_output_socket, "r")
478 
479  self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 4)
480  self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
481 
482  # Register two workers
483  input_socket = self.create_socket(self.input_port)
484  self.send(input_socket, "h")
485  self.assertIsMsgType(input_socket, "c")
486  self.assertMonitoring(monitoring_socket, "input.registered_workers", 1)
487 
488  second_input_socket = self.create_socket(self.input_port, identity="other_socket")
489  self.send(second_input_socket, "h")
490  self.assertIsMsgType(second_input_socket, "c")
491  self.assertMonitoring(monitoring_socket, "input.registered_workers", 2)
492 
493  # The first event should go to the first worker
494  self.send(input_socket, "u", b"event data")
495  self.assertIsMsgType(input_socket, "c")
496 
497  self.get_event(output_socket)
498  if not self.final_collector:
499  self.assertNothingMore(second_output_socket)
500 
501  # The second also
502  self.send(second_input_socket, "u", b"event data")
503  self.assertIsMsgType(second_input_socket, "c")
504 
505  self.get_event(output_socket)
506  if not self.final_collector:
507  self.assertNothingMore(second_output_socket)
508 
509  # But the third to the second worker
510  self.send(input_socket, "u", b"event data")
511  self.assertIsMsgType(input_socket, "c")
512 
513  if not self.final_collector:
514  self.assertNothingMore(output_socket)
515  self.get_event(second_output_socket)
516  else:
517  self.get_event(output_socket)
518 
519  # A stop message should be sent to all workers
520  self.send(input_socket, "l")
521  self.assertIsMsgType(input_socket, "c")
522 
523  # But only if it is complete...
524  self.assertNothingMore(output_socket)
525  if not self.final_collector:
526  self.assertNothingMore(second_output_socket)
527 
528  self.send(second_input_socket, "l")
529  self.assertIsMsgType(second_input_socket, "c")
530 
531  self.get_signal(output_socket, "l")
532  if not self.final_collector:
533  self.get_signal(second_output_socket, "l")
534 
535  # TODO: Test and implement: should not transmit events after stop
536 
537  # As well as a terminate message
538  self.send(input_socket, "x")
539  self.assertIsMsgType(input_socket, "c")
540 
541  self.assertNothingMore(output_socket)
542  if not self.final_collector:
543  self.assertNothingMore(second_output_socket)
544 
545  self.send(second_input_socket, "x")
546  self.assertIsMsgType(second_input_socket, "c")
547 
548  self.get_signal(output_socket, "x")
549  if not self.final_collector:
550  self.get_signal(second_output_socket, "x")
551 
552  self.assertIsDown("collector")
zmq_daq.test_support.HLTZMQTestCase.test_dir
test_dir
use a temporary folder for testing
Definition: test_support.py:44
zmq_daq.test_support.BaseCollectorTestCase
Definition: test_support.py:242
zmq_daq.test_support.HLTZMQTestCase.get_free_port
def get_free_port()
Definition: test_support.py:29
zmq_daq.test_support.BaseCollectorTestCase.input_port
input_port
input_port
Definition: test_support.py:252
zmq_daq.test_support.HLTZMQTestCase.assertNothingMore
def assertNothingMore(self, socket)
Definition: test_support.py:209
zmq_daq.test_support.HLTZMQTestCase
Definition: test_support.py:17
zmq_daq.test_support.HLTZMQTestCase.needed_programs
needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
Definition: test_support.py:26
zmq_daq.test_support.HLTZMQTestCase.assertIsDown
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)
Definition: test_support.py:81
zmq_daq.test_support.HLTZMQTestCase.recv
def recv(socket)
Definition: test_support.py:147
zmq_daq.test_support.HLTZMQTestCase.assertIsMsgType
def assertIsMsgType(self, socket, message_type, final=True, router=False)
Definition: test_support.py:203
zmq_daq.test_support.HLTZMQTestCase.create_socket
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
Definition: test_support.py:103
zmq_daq.test_support.BaseCollectorTestCase.output_port
output_port
output_port
Definition: test_support.py:254
zmq_daq.test_support.BaseCollectorTestCase.get_event
def get_event(self, output_socket)
Definition: test_support.py:289
zmq_daq.test_support.HLTZMQTestCase.tearDown
def tearDown(self)
Definition: test_support.py:58
zmq_daq.test_support.HLTZMQTestCase.assertNotHasOutputFile
def assertNotHasOutputFile(self, output_file, timeout=0.5)
Definition: test_support.py:233
zmq_daq.test_support.HLTZMQTestCase.assertHasOutputFile
def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
Definition: test_support.py:215
zmq_daq.test_support.HLTZMQTestCase.assertIsRunning
def assertIsRunning(self, name)
Definition: test_support.py:96
zmq_daq.test_support.BaseCollectorTestCase.testHelloAndMessageTransmission
def testHelloAndMessageTransmission(self)
Definition: test_support.py:297
zmq_daq.test_support.BaseCollectorTestCase.final_collector
bool final_collector
final_collector
Definition: test_support.py:247
zmq_daq.test_support.HLTZMQTestCase.send
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
Definition: test_support.py:136
zmq_daq.test_support.BaseCollectorTestCase.get_signal
def get_signal(self, output_socket, signal_type)
Definition: test_support.py:282
zmq_daq.test_support.HLTZMQTestCase.started_programs
started_programs
dict for all started programs
Definition: test_support.py:50
zmq_daq.test_support.BaseCollectorTestCase.testEventPropagation
def testEventPropagation(self)
Definition: test_support.py:461
zmq_daq.test_support.HLTZMQTestCase.setUp
def setUp(self)
Definition: test_support.py:38
zmq_daq.test_support.BaseCollectorTestCase.setUp
def setUp(self)
Definition: test_support.py:249
zmq_daq.test_support.BaseCollectorTestCase.testWrongRegistration
def testWrongRegistration(self)
Definition: test_support.py:444
zmq_daq.test_support.BaseCollectorTestCase.create_output_socket
def create_output_socket(self)
Definition: test_support.py:271
zmq_daq.test_support.BaseCollectorTestCase.monitoring_port
monitoring_port
monitoring_port
Definition: test_support.py:256
zmq_daq.test_support.HLTZMQTestCase.create_router_socket
def create_router_socket(port)
Definition: test_support.py:128
zmq_daq.test_support.HLTZMQTestCase.previous_dir
previous_dir
remember current working directory
Definition: test_support.py:46
zmq_daq.test_support.HLTZMQTestCase._is_running
def _is_running(self, name)
Definition: test_support.py:72
zmq_daq.test_support.HLTZMQTestCase.assertMonitoring
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
Definition: test_support.py:157
zmq_daq.test_support.HLTZMQTestCase.assertIsAndGet
def assertIsAndGet(self, socket, message_type, final=True, router=False)
Definition: test_support.py:188