17from time
import sleep, time
18from unittest
import TestCase
26ZMQ_TEST_MAX_FAILURES = 1
31 Base class for all HLT ZMQ tests helping to start the needed programs,
32 create ZMQ sockets
and send/receive messages easily.
33 Has also some functionality
for asserting test-correctness
38 needed_programs = dict()
43 Get a free port number by reusing ZMQ's function for this.
45 socket = HLTZMQTestCase.ctx.socket(zmq.ROUTER)
46 port = socket.bind_to_random_port("tcp://*")
52 Custom setUp function to go into a temporary folder
53 and start the needed programs.
64 self.
started_programs[name] = subprocess.Popen(command, start_new_session=
True)
72 Custom tearDown function to kill the started programs if still present
73 and remove the temporary folder again.
77 os.killpg(process.pid, signal.SIGKILL)
86 Check if a given program
is still running.
89 pid, sts = process._try_wait(os.WNOHANG)
90 assert pid == process.pid
or pid == 0
95 Test helper to assert the given program has terminated - at least after timeout
in seconds has passed.
96 Checks every
"minimal_delay seconds.
98 endtime = time() + timeout
103 remaining = endtime - time()
104 self.assertFalse(remaining <= 0)
110 Assert that a given program is still running.
115 def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False):
117 Create and return a ZMQ socket
with the given type
and identity
and
118 bind
or connect it to localhost
and the given port.
120 socket = HLTZMQTestCase.ctx.socket(socket_type)
121 socket.rcvtimeo = 10000
124 socket.setsockopt_string(zmq.IDENTITY, identity)
127 port = socket.bind_to_random_port(
"tcp://*")
130 socket.bind(f
"tcp://*:{port}")
133 raise RuntimeError(
"Cannot connect to unknown port")
135 socket.connect(f
"tcp://localhost:{port}")
142 Shortcut to create a ROUTER type socket with the typical parameters
143 binding to the given port.
145 return HLTZMQTestCase.create_socket(port, socket_type=zmq.ROUTER, identity=
"", bind=
True)
148 def send(socket, message_type, first_data=b"", second_data=b"", identity=""):
150 Send a message consisting of the message type, the first and the second data
151 either to the identity
if given
or without identity
if omitted.
154 socket.send_multipart([identity.encode(), message_type.encode(), first_data, second_data])
156 socket.send_multipart([message_type.encode(), first_data, second_data])
161 Try to receive a message from the socket (
or throw an assertion error
if none comes after the set timeout
165 return socket.recv_multipart()
166 except zmq.error.Again:
167 raise AssertionError(
"No answer from socket")
171 Ask the given socket for a monitoring JSON
and make sure, the value related to
"search_key"
172 is set to
"search_value" - at least after the given timeout.
173 The search key can should be
in the form
"<category>.<key>".
175 end_time = time() + timeout
177 while time() < end_time:
178 HLTZMQTestCase.send(socket,
"m")
181 dict_monitoring = json.loads(answer[1])
182 for parent_key, parent_dict
in dict_monitoring.items():
183 for key, value
in parent_dict.items():
184 monitoring[parent_key +
"." + key] = value
186 if search_key
in monitoring
and monitoring[search_key] == search_value:
190 if search_key
not in monitoring:
191 raise AssertionError(f
"Monitoring did not have a result with key {search_key}")
193 raise AssertionError(
194 f
"Monitoring did not show the result {search_value} for {search_key}, instead {monitoring[search_key]}")
196 raise AssertionError(
"Monitoring did not answer in time.")
202 Assert that the next message received on the socket has the given message type.
203 If final is set to
True, also
assert that there
is no additional message on the socket.
204 Use router only
for router sockets.
206 answer = HLTZMQTestCase.recv(socket)
210 self.assertEqual(answer[type_index], message_type.encode())
217 Deprecated copy of "assertIsAndGet".
219 return self.
assertIsAndGet(socket, message_type, final=final, router=router)
223 Assert that there is no pending message to be received on the socket.
225 self.assertFalse(socket.poll(0))
229 Assert that - at least after the given timeout - the output file
230 is present. If unlink
is set to
True, remove the file after checking.
232 endtime = time() + timeout
235 if os.path.exists(output_file):
237 os.unlink(output_file)
240 remaining = endtime - time()
241 self.assertFalse(remaining <= 0)
247 Assert that after the timeout the given file is not present
248 (a.k.a. no process has created it)
251 self.assertFalse(os.path.exists(output_file))
256 As the collectors are mostly equal, use a common base test case class
259 final_collector = False
262 """Setup port numbers and necessary programs"""
270 command =
"b2hlt_finalcollector" if self.
final_collector else "b2hlt_collector"
275 "--input", f
"tcp://*:{self.input_port}",
276 "--output", f
"tcp://{output}:{self.output_port}",
277 "--monitor", f
"tcp://*:{self.monitoring_port}"
284 """create the output socket depending if final collector or not"""
287 output_socket.send(b
"")
288 self.
recv(output_socket)
291 self.
send(output_socket,
"r")
295 """get a signal from the socket depending if final collector or not"""
302 """get an event from the socket depending if final collector or not"""
304 self.
recv(output_socket)
315 self.
send(input_socket,
"h")
330 self.
send(second_input_socket,
"h")
339 self.
send(input_socket,
"l")
346 self.
send(second_input_socket,
"l")
354 self.
send(input_socket,
"l")
361 self.
send(monitoring_socket,
"n")
366 self.
send(input_socket,
"l")
368 self.
send(second_input_socket,
"l")
374 self.
send(monitoring_socket,
"n")
379 self.
send(input_socket,
"l")
383 self.
send(second_input_socket,
"d", b
"other_socket")
393 self.
send(second_input_socket,
"h")
395 self.
send(monitoring_socket,
"n")
401 self.
send(second_input_socket,
"d", b
"other_socket")
403 self.
send(input_socket,
"l")
412 self.
send(second_input_socket,
"h")
414 self.
send(monitoring_socket,
"n")
421 self.
assertMonitoring(monitoring_socket,
"input.received_terminate_messages", 0)
422 self.
assertMonitoring(monitoring_socket,
"input.all_terminate_messages",
False)
425 self.
send(input_socket,
"x")
427 self.
assertMonitoring(monitoring_socket,
"input.received_terminate_messages", 1)
428 self.
assertMonitoring(monitoring_socket,
"input.all_terminate_messages",
False)
432 self.
send(input_socket,
"x")
434 self.
assertMonitoring(monitoring_socket,
"input.received_terminate_messages", 1)
435 self.
assertMonitoring(monitoring_socket,
"input.all_terminate_messages",
False)
439 self.
send(monitoring_socket,
"n")
440 self.
assertMonitoring(monitoring_socket,
"input.received_terminate_messages", 0)
441 self.
assertMonitoring(monitoring_socket,
"input.all_terminate_messages",
False)
444 self.
send(input_socket,
"x")
446 self.
send(second_input_socket,
"x")
463 self.
send(input_socket,
"h")
468 self.
send(second_input_socket,
"l")
480 self.
send(output_socket,
"r")
488 self.
send(second_output_socket,
"r")
489 self.
send(second_output_socket,
"r")
496 self.
send(input_socket,
"h")
501 self.
send(second_input_socket,
"h")
506 self.
send(input_socket,
"u", b
"event data")
514 self.
send(second_input_socket,
"u", b
"event data")
522 self.
send(input_socket,
"u", b
"event data")
532 self.
send(input_socket,
"l")
540 self.
send(second_input_socket,
"l")
550 self.
send(input_socket,
"x")
557 self.
send(second_input_socket,
"x")
def testWrongRegistration(self)
def get_event(self, output_socket)
def get_signal(self, output_socket, signal_type)
def create_output_socket(self)
def testHelloAndMessageTransmission(self)
bool final_collector
final_collector
def testEventPropagation(self)
monitoring_port
monitoring_port
def assertIsMsgType(self, socket, message_type, final=True, router=False)
dict needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
started_programs
dict for all started programs
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def create_router_socket(port)
def assertNothingMore(self, socket)
def _is_running(self, name)
def assertIsRunning(self, name)
def assertNotHasOutputFile(self, output_file, timeout=0.5)
def assertIsAndGet(self, socket, message_type, final=True, router=False)
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)
previous_dir
remember current working directory
test_dir
use a temporary folder for testing