9from unittest
import main
20 event_data = open(basf2.find_file(
"daq/hbasf2/tests/out.raw"),
"br").read()
23 """Setup port numbers and necessary programs"""
34 "--input", f
"tcp://localhost:{self.input_port}",
35 "--output", f
"tcp://*:{self.output_port}",
36 "--monitor", f
"tcp://*:{self.monitoring_port}",
37 "--stopWaitingTime",
"1"]}
44 self.
assertMonitoring(monitoring_socket,
"input.socket_state",
"disconnected")
51 self.
send(output_socket,
"r")
57 self.
assertMonitoring(monitoring_socket,
"input.socket_state",
"disconnected")
62 identity, _ = self.
recv(input_socket)
71 self.
assertMonitoring(monitoring_socket,
"input.socket_state",
"disconnected")
77 identity, _ = self.
recv(input_socket)
84 input_socket.send_multipart([identity, self.
event_data])
85 input_socket.send_multipart([identity, self.
event_data])
86 input_socket.send_multipart([identity, self.
event_data])
87 input_socket.send_multipart([identity, self.
event_data])
97 self.
assertMonitoring(monitoring_socket,
"input.socket_state",
"disconnected")
107 identity, _ = self.
recv(input_socket)
119 self.
send(output_socket,
"r")
128 self.
assertMonitoring(monitoring_socket,
"output.ready_messages[socket]", 1)
134 input_socket.send_multipart([identity, self.
event_data])
142 self.
assertMonitoring(monitoring_socket,
"output.ready_messages[socket]", 0)
148 input_socket.send_multipart([identity, self.
event_data])
157 self.
assertMonitoring(monitoring_socket,
"output.ready_messages[socket]", 0)
163 self.
send(second_output_socket,
"r")
175 input_socket.send_multipart([identity, self.
event_data])
177 self.
send(second_output_socket,
"r")
186 self.
send(output_socket,
"r")
187 self.
send(output_socket,
"r")
188 self.
send(second_output_socket,
"r")
189 self.
send(second_output_socket,
"r")
206 self.
send(output_socket,
"r")
207 self.
send(second_output_socket,
"r")
213 self.
send(monitoring_socket,
"l")
219 self.
send(monitoring_socket,
"l")
226 identity, _ = self.
recv(input_socket)
227 input_socket.send_multipart([identity, self.
event_data])
228 input_socket.send_multipart([identity, self.
event_data])
234 self.
send(monitoring_socket,
"l")
240 self.
send(monitoring_socket,
"x")
249if __name__ ==
'__main__':
251 number_of_failures = 0
253 for i
in range(ZMQ_TEST_FOR_LOOPS):
256 except AssertionError:
257 number_of_failures += 1
260 message = f
'Number of failed for loops: {number_of_failures}/{ZMQ_TEST_FOR_LOOPS}'
261 if number_of_failures <= ZMQ_TEST_MAX_FAILURES:
262 basf2.B2INFO(message)
264 basf2.B2FATAL(message)
needed_programs
needed_programs
def testConnectAndDisconnect(self)
open event_data
event_data
monitoring_port
monitoring_port
def assertIsMsgType(self, socket, message_type, final=True, router=False)
dict needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def assertNothingMore(self, socket)
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)