9 from unittest
import main
20 event_data = open(basf2.find_file(
"daq/hbasf2/tests/out.raw"),
"br").read()
23 """Setup port numbers and necessary programs"""
25 self.
input_portinput_port = HLTZMQTestCase.get_free_port()
34 "--input", f
"tcp://localhost:{self.input_port}",
35 "--output", f
"tcp://*:{self.output_port}",
36 "--monitor", f
"tcp://*:{self.monitoring_port}",
37 "--stopWaitingTime",
"1"]}
44 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_state",
"disconnected")
45 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_connects", 0)
46 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_disconnects", 0)
51 self.
sendsend(output_socket,
"r")
57 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_state",
"disconnected")
58 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_connects", 0)
59 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_disconnects", 0)
62 identity, _ = self.
recvrecv(input_socket)
64 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_state",
"connected")
65 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_connects", 1)
66 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_disconnects", 0)
71 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_state",
"disconnected")
72 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_connects", 1)
73 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_disconnects", 1)
77 identity, _ = self.
recvrecv(input_socket)
79 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_state",
"connected")
80 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_connects", 2)
81 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_disconnects", 1)
84 input_socket.send_multipart([identity, self.
event_dataevent_data])
85 input_socket.send_multipart([identity, self.
event_dataevent_data])
86 input_socket.send_multipart([identity, self.
event_dataevent_data])
87 input_socket.send_multipart([identity, self.
event_dataevent_data])
89 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.received_events", 4)
91 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_state",
"connected")
92 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_connects", 2)
93 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_disconnects", 1)
97 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_state",
"disconnected")
98 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_connects", 2)
99 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_disconnects", 2)
107 identity, _ = self.
recvrecv(input_socket)
113 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.ready_queue_size", 0)
114 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.registered_workers", 0)
119 self.
sendsend(output_socket,
"r")
126 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.ready_queue_size", 1)
127 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.registered_workers", 1)
128 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.ready_messages[socket]", 1)
129 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_state",
"connected")
130 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_connects", 1)
131 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_disconnects", 0)
134 input_socket.send_multipart([identity, self.
event_dataevent_data])
140 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.ready_queue_size", 0)
141 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.registered_workers", 1)
142 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.ready_messages[socket]", 0)
143 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_state",
"connected")
144 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_connects", 1)
145 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_disconnects", 0)
148 input_socket.send_multipart([identity, self.
event_dataevent_data])
155 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.ready_queue_size", 0)
156 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.registered_workers", 1)
157 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.ready_messages[socket]", 0)
158 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_state",
"connected")
159 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_connects", 1)
160 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_disconnects", 0)
163 self.
sendsend(second_output_socket,
"r")
168 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.ready_queue_size", 0)
169 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.registered_workers", 2)
170 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_state",
"connected")
171 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_connects", 1)
172 self.
assertMonitoringassertMonitoring(monitoring_socket,
"input.socket_disconnects", 0)
175 input_socket.send_multipart([identity, self.
event_dataevent_data])
177 self.
sendsend(second_output_socket,
"r")
182 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.ready_queue_size", 0)
183 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.registered_workers", 2)
186 self.
sendsend(output_socket,
"r")
187 self.
sendsend(output_socket,
"r")
188 self.
sendsend(second_output_socket,
"r")
189 self.
sendsend(second_output_socket,
"r")
191 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.ready_queue_size", 4)
192 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.registered_workers", 2)
202 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.ready_queue_size", 0)
203 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.registered_workers", 0)
206 self.
sendsend(output_socket,
"r")
207 self.
sendsend(second_output_socket,
"r")
209 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.ready_queue_size", 2)
210 self.
assertMonitoringassertMonitoring(monitoring_socket,
"output.registered_workers", 2)
213 self.
sendsend(monitoring_socket,
"l")
219 self.
sendsend(monitoring_socket,
"l")
226 identity, _ = self.
recvrecv(input_socket)
227 input_socket.send_multipart([identity, self.
event_dataevent_data])
228 input_socket.send_multipart([identity, self.
event_dataevent_data])
234 self.
sendsend(monitoring_socket,
"l")
240 self.
sendsend(monitoring_socket,
"x")
249 if __name__ ==
'__main__':
251 number_of_failures = 0
253 for i
in range(ZMQ_TEST_FOR_LOOPS):
256 except AssertionError:
257 number_of_failures += 1
260 message = f
'Number of failed for loops: {number_of_failures}/{ZMQ_TEST_FOR_LOOPS}'
261 if number_of_failures <= ZMQ_TEST_MAX_FAILURES:
262 basf2.B2INFO(message)
264 basf2.B2FATAL(message)
needed_programs
needed_programs
def testConnectAndDisconnect(self)
monitoring_port
monitoring_port
def assertIsMsgType(self, socket, message_type, final=True, router=False)
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def assertNothingMore(self, socket)
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)