17 from time
import sleep, time
18 from unittest
import TestCase
23 ZMQ_TEST_FOR_LOOPS = 5
26 ZMQ_TEST_MAX_FAILURES = 1
31 Base class for all HLT ZMQ tests helping to start the needed programs,
32 create ZMQ sockets and send/receive messages easily.
33 Has also some functionality for asserting test-correctness
38 needed_programs = dict()
43 Get a free port number by reusing ZMQ's function for this.
45 socket = HLTZMQTestCase.ctx.socket(zmq.ROUTER)
46 port = socket.bind_to_random_port(
"tcp://*")
52 Custom setUp function to go into a temporary folder
53 and start the needed programs.
64 self.
started_programsstarted_programs[name] = subprocess.Popen(command, start_new_session=
True)
68 atexit.register(self.
tearDowntearDown)
72 Custom tearDown function to kill the started programs if still present
73 and remove the temporary folder again.
77 os.killpg(process.pid, signal.SIGKILL)
86 Check if a given program is still running.
89 pid, sts = process._try_wait(os.WNOHANG)
90 assert pid == process.pid
or pid == 0
95 Test helper to assert the given program has terminated - at least after timeout in seconds has passed.
96 Checks every "minimal_delay seconds.
98 endtime = time() + timeout
103 remaining = endtime - time()
104 self.assertFalse(remaining <= 0)
110 Assert that a given program is still running.
115 def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False):
117 Create and return a ZMQ socket with the given type and identity and
118 bind or connect it to localhost and the given port.
120 socket = HLTZMQTestCase.ctx.socket(socket_type)
121 socket.rcvtimeo = 10000
124 socket.setsockopt_string(zmq.IDENTITY, identity)
127 port = socket.bind_to_random_port(
"tcp://*")
130 socket.bind(f
"tcp://*:{port}")
133 raise RuntimeError(
"Cannot connect to unknown port")
135 socket.connect(f
"tcp://localhost:{port}")
142 Shortcut to create a ROUTER type socket with the typical parameters
143 binding to the given port.
145 return HLTZMQTestCase.create_socket(port, socket_type=zmq.ROUTER, identity=
"", bind=
True)
148 def send(socket, message_type, first_data=b"", second_data=b"", identity=""):
150 Send a message consisting of the message type, the first and the second data
151 either to the identity if given or without identity if omitted.
154 socket.send_multipart([identity.encode(), message_type.encode(), first_data, second_data])
156 socket.send_multipart([message_type.encode(), first_data, second_data])
161 Try to receive a message from the socket (or throw an assertion error if none comes after the set timeout
165 return socket.recv_multipart()
166 except zmq.error.Again:
167 raise AssertionError(
"No answer from socket")
171 Ask the given socket for a monitoring JSON and make sure, the value related to "search_key"
172 is set to "search_value" - at least after the given timeout.
173 The search key can should be in the form "<category>.<key>".
175 end_time = time() + timeout
177 while time() < end_time:
178 HLTZMQTestCase.send(socket,
"m")
181 dict_monitoring = json.loads(answer[1])
182 for parent_key, parent_dict
in dict_monitoring.items():
183 for key, value
in parent_dict.items():
184 monitoring[parent_key +
"." + key] = value
186 if search_key
in monitoring
and monitoring[search_key] == search_value:
190 if search_key
not in monitoring:
191 raise AssertionError(f
"Monitoring did not have a result with key {search_key}")
193 raise AssertionError(
194 f
"Monitoring did not show the result {search_value} for {search_key}, instead {monitoring[search_key]}")
196 raise AssertionError(
"Monitoring did not answer in time.")
202 Assert that the next message received on the socket has the given message type.
203 If final is set to True, also assert that there is no additional message on the socket.
204 Use router only for router sockets.
206 answer = HLTZMQTestCase.recv(socket)
210 self.assertEqual(answer[type_index], message_type.encode())
217 Deprecated copy of "assertIsAndGet".
219 return self.
assertIsAndGetassertIsAndGet(socket, message_type, final=final, router=router)
223 Assert that there is no pending message to be received on the socket.
225 self.assertFalse(socket.poll(0))
229 Assert that - at least after the given timeout - the output file
230 is present. If unlink is set to True, remove the file after checking.
232 endtime = time() + timeout
235 if os.path.exists(output_file):
237 os.unlink(output_file)
240 remaining = endtime - time()
241 self.assertFalse(remaining <= 0)
247 Assert that after the timeout the given file is not present
248 (a.k.a. no process has created it)
251 self.assertFalse(os.path.exists(output_file))
256 As the collectors are mostly equal, use a common base test case class
259 final_collector =
False
262 """Setup port numbers and necessary programs"""
270 command =
"b2hlt_finalcollector" if self.
final_collectorfinal_collector
else "b2hlt_collector"
275 "--input", f
"tcp://*:{self.input_port}",
276 "--output", f
"tcp://{output}:{self.output_port}",
277 "--monitor", f
"tcp://*:{self.monitoring_port}"
284 """create the output socket depending if final collector or not"""
287 output_socket.send(b
"")
288 self.
recvrecv(output_socket)
291 self.
sendsend(output_socket,
"r")
295 """get a signal from the socket depending if final collector or not"""
299 self.
assertIsMsgTypeassertIsMsgType(output_socket, signal_type, final=
True)
302 """get an event from the socket depending if final collector or not"""
304 self.
recvrecv(output_socket)
315 self.
sendsend(input_socket,
"h")
319 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.registered_workers", 0)
326 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.registered_workers", 1)
330 self.
sendsend(second_input_socket,
"h")
332 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.registered_workers", 2)
335 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_stop_messages", 0)
336 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_stop_messages",
False)
339 self.
sendsend(input_socket,
"l")
341 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_stop_messages", 1)
342 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_stop_messages",
False)
346 self.
sendsend(second_input_socket,
"l")
348 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_stop_messages", 2)
349 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_stop_messages",
True)
354 self.
sendsend(input_socket,
"l")
356 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_stop_messages", 2)
357 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_stop_messages",
True)
361 self.
sendsend(monitoring_socket,
"n")
362 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_stop_messages", 0)
363 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_stop_messages",
False)
366 self.
sendsend(input_socket,
"l")
368 self.
sendsend(second_input_socket,
"l")
374 self.
sendsend(monitoring_socket,
"n")
375 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_stop_messages", 0)
376 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_stop_messages",
False)
379 self.
sendsend(input_socket,
"l")
383 self.
sendsend(second_input_socket,
"d", b
"other_socket")
385 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.registered_workers", 1)
389 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_stop_messages", 1)
390 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_stop_messages",
True)
393 self.
sendsend(second_input_socket,
"h")
395 self.
sendsend(monitoring_socket,
"n")
396 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_stop_messages", 0)
397 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_stop_messages",
False)
398 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.registered_workers", 2)
401 self.
sendsend(second_input_socket,
"d", b
"other_socket")
403 self.
sendsend(input_socket,
"l")
405 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.registered_workers", 1)
408 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_stop_messages", 1)
409 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_stop_messages",
True)
412 self.
sendsend(second_input_socket,
"h")
414 self.
sendsend(monitoring_socket,
"n")
415 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_stop_messages", 0)
416 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_stop_messages",
False)
417 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.registered_workers", 2)
421 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_terminate_messages", 0)
422 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_terminate_messages",
False)
425 self.
sendsend(input_socket,
"x")
427 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_terminate_messages", 1)
428 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_terminate_messages",
False)
432 self.
sendsend(input_socket,
"x")
434 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_terminate_messages", 1)
435 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_terminate_messages",
False)
439 self.
sendsend(monitoring_socket,
"n")
440 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_terminate_messages", 0)
441 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.all_terminate_messages",
False)
444 self.
sendsend(input_socket,
"x")
446 self.
sendsend(second_input_socket,
"x")
463 self.
sendsend(input_socket,
"h")
468 self.
sendsend(second_input_socket,
"l")
480 self.
sendsend(output_socket,
"r")
482 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.ready_queue_size", 2)
483 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.registered_workers", 1)
488 self.
sendsend(second_output_socket,
"r")
489 self.
sendsend(second_output_socket,
"r")
491 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.ready_queue_size", 4)
492 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.registered_workers", 2)
496 self.
sendsend(input_socket,
"h")
498 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.registered_workers", 1)
501 self.
sendsend(second_input_socket,
"h")
503 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.registered_workers", 2)
506 self.
sendsend(input_socket,
"u", b
"event data")
514 self.
sendsend(second_input_socket,
"u", b
"event data")
522 self.
sendsend(input_socket,
"u", b
"event data")
527 self.
get_eventget_event(second_output_socket)
532 self.
sendsend(input_socket,
"l")
540 self.
sendsend(second_input_socket,
"l")
545 self.
get_signalget_signal(second_output_socket,
"l")
550 self.
sendsend(input_socket,
"x")
557 self.
sendsend(second_input_socket,
"x")
562 self.
get_signalget_signal(second_output_socket,
"x")
def testWrongRegistration(self)
def get_event(self, output_socket)
def get_signal(self, output_socket, signal_type)
def create_output_socket(self)
def testHelloAndMessageTransmission(self)
bool final_collector
final_collector
def testEventPropagation(self)
monitoring_port
monitoring_port
def assertIsMsgType(self, socket, message_type, final=True, router=False)
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
started_programs
dict for all started programs
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def create_router_socket(port)
def assertNothingMore(self, socket)
def _is_running(self, name)
def assertIsRunning(self, name)
def assertNotHasOutputFile(self, output_file, timeout=0.5)
def assertIsAndGet(self, socket, message_type, final=True, router=False)
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)
previous_dir
remember current working directory
test_dir
use a temporary folder for testing