10 from contextlib
import contextmanager
11 from time
import sleep, time
12 from typing
import Dict, List
13 from unittest
import TestCase
19 Base class for all HLT ZMQ tests helping to start the needed programs,
20 create ZMQ sockets and send/receive messages easily.
21 Has also some functionality for asserting test-correctness
26 needed_programs = dict()
31 Get a free port number by reusing ZMQ's function for this.
33 socket = HLTZMQTestCase.ctx.socket(zmq.ROUTER)
34 port = socket.bind_to_random_port(
"tcp://*")
40 Custom setUp function to go into a temporary folder
41 and start the needed programs.
52 self.
started_programs[name] = subprocess.Popen(command, start_new_session=
True)
60 Custom tearDown function to kill the started programs if still present
61 and remove the temporary folder again.
65 os.killpg(process.pid, signal.SIGKILL)
74 Check if a given program is still running.
77 pid, sts = process._try_wait(os.WNOHANG)
78 assert pid == process.pid
or pid == 0
83 Test helper to assert the given program has terminated - at least after timeout in seconds has passed.
84 Checks every "minimal_delay seconds.
86 endtime = time() + timeout
91 remaining = endtime - time()
92 self.assertFalse(remaining <= 0)
98 Assert that a given program is still running.
103 def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False):
105 Create and return a ZMQ socket with the given type and identity and
106 bind or connect it to localhost and the given port.
108 socket = HLTZMQTestCase.ctx.socket(socket_type)
109 socket.rcvtimeo = 10000
112 socket.setsockopt_string(zmq.IDENTITY, identity)
115 port = socket.bind_to_random_port(
"tcp://*")
118 socket.bind(f
"tcp://*:{port}")
121 raise RuntimeError(
"Cannot connect to unknown port")
123 socket.connect(f
"tcp://localhost:{port}")
130 Shortcut to create a ROUTER type socket with the typical parameters
131 binding to the given port.
133 return HLTZMQTestCase.create_socket(port, socket_type=zmq.ROUTER, identity=
"", bind=
True)
136 def send(socket, message_type, first_data=b"", second_data=b"", identity=""):
138 Send a message consisting of the message type, the first and the second data
139 either to the identity if given or without identity if omitted.
142 socket.send_multipart([identity.encode(), message_type.encode(), first_data, second_data])
144 socket.send_multipart([message_type.encode(), first_data, second_data])
149 Try to receive a message from the socket (or throw an assertion error if none comes after the set timeout
153 return socket.recv_multipart()
154 except zmq.error.Again:
155 raise AssertionError(
"No answer from socket")
159 Ask the given socket for a monitoring JSON and make sure, the value related to "search_key"
160 is set to "search_value" - at least after the given timeout.
161 The search key can should be in the form "<category>.<key>".
163 end_time = time() + timeout
165 while time() < end_time:
166 HLTZMQTestCase.send(socket,
"m")
169 dict_monitoring = json.loads(answer[1])
170 for parent_key, parent_dict
in dict_monitoring.items():
171 for key, value
in parent_dict.items():
172 monitoring[parent_key +
"." + key] = value
174 if search_key
in monitoring
and monitoring[search_key] == search_value:
178 if search_key
not in monitoring:
179 raise AssertionError(f
"Monitoring did not have a result with key {search_key}")
181 raise AssertionError(
182 f
"Monitoring did not show the result {search_value} for {search_key}, instead {monitoring[search_key]}")
184 raise AssertionError(f
"Monitoring did not answer in time.")
190 Assert that the next message received on the socket has the given message type.
191 If final is set to True, also assert that there is no additional message on the socket.
192 Use router only for router sockets.
194 answer = HLTZMQTestCase.recv(socket)
198 self.assertEqual(answer[type_index], message_type.encode())
205 Deprecated copy of "assertIsAndGet".
207 return self.
assertIsAndGet(socket, message_type, final=final, router=router)
211 Assert that there is no pending message to be received on the socket.
213 self.assertFalse(socket.poll(0))
217 Assert that - at least after the given timeout - the output file
218 is present. If unlink is set to True, remove the file after checking.
220 endtime = time() + timeout
223 if os.path.exists(output_file):
225 os.unlink(output_file)
228 remaining = endtime - time()
229 self.assertFalse(remaining <= 0)
235 Assert that after the timeout the given file is not present
236 (a.k.a. no process has created it)
239 self.assertFalse(os.path.exists(output_file))
244 As the collectors are mostly equal, use a common base test case class
247 final_collector =
False
250 """Setup port numbers and necessary programs"""
258 command =
"b2hlt_finalcollector" if self.
final_collector else "b2hlt_collector"
263 "--input", f
"tcp://*:{self.input_port}",
264 "--output", f
"tcp://{output}:{self.output_port}",
265 "--monitor", f
"tcp://*:{self.monitoring_port}"
272 """create the output socket depending if final collector or not"""
275 output_socket.send(b
"")
276 self.
recv(output_socket)
279 self.
send(output_socket,
"r")
283 """get a signal from the socket depending if final collector or not"""
290 """get an event from the socket depending if final collector or not"""
292 self.
recv(output_socket)
303 self.
send(input_socket,
"h")
318 self.
send(second_input_socket,
"h")
327 self.
send(input_socket,
"l")
334 self.
send(second_input_socket,
"l")
342 self.
send(input_socket,
"l")
349 self.
send(monitoring_socket,
"n")
354 self.
send(input_socket,
"l")
356 self.
send(second_input_socket,
"l")
362 self.
send(monitoring_socket,
"n")
367 self.
send(input_socket,
"l")
371 self.
send(second_input_socket,
"d", b
"other_socket")
381 self.
send(second_input_socket,
"h")
383 self.
send(monitoring_socket,
"n")
389 self.
send(second_input_socket,
"d", b
"other_socket")
391 self.
send(input_socket,
"l")
400 self.
send(second_input_socket,
"h")
402 self.
send(monitoring_socket,
"n")
409 self.
assertMonitoring(monitoring_socket,
"input.received_terminate_messages", 0)
410 self.
assertMonitoring(monitoring_socket,
"input.all_terminate_messages",
False)
413 self.
send(input_socket,
"x")
415 self.
assertMonitoring(monitoring_socket,
"input.received_terminate_messages", 1)
416 self.
assertMonitoring(monitoring_socket,
"input.all_terminate_messages",
False)
420 self.
send(input_socket,
"x")
422 self.
assertMonitoring(monitoring_socket,
"input.received_terminate_messages", 1)
423 self.
assertMonitoring(monitoring_socket,
"input.all_terminate_messages",
False)
427 self.
send(monitoring_socket,
"n")
428 self.
assertMonitoring(monitoring_socket,
"input.received_terminate_messages", 0)
429 self.
assertMonitoring(monitoring_socket,
"input.all_terminate_messages",
False)
432 self.
send(input_socket,
"x")
434 self.
send(second_input_socket,
"x")
451 self.
send(input_socket,
"h")
456 self.
send(second_input_socket,
"l")
468 self.
send(output_socket,
"r")
476 self.
send(second_output_socket,
"r")
477 self.
send(second_output_socket,
"r")
484 self.
send(input_socket,
"h")
489 self.
send(second_input_socket,
"h")
494 self.
send(input_socket,
"u", b
"event data")
502 self.
send(second_input_socket,
"u", b
"event data")
510 self.
send(input_socket,
"u", b
"event data")
520 self.
send(input_socket,
"l")
528 self.
send(second_input_socket,
"l")
538 self.
send(input_socket,
"x")
545 self.
send(second_input_socket,
"x")