3 from unittest
import main
14 event_data = open(basf2.find_file(
"daq/hbasf2/tests/out.raw"),
"br").read()
17 """Setup port numbers and necessary programs"""
28 "--input", f
"tcp://localhost:{self.input_port}",
29 "--output", f
"tcp://*:{self.output_port}",
30 "--monitor", f
"tcp://*:{self.monitoring_port}",
31 "--stopWaitingTime",
"1"]}
38 self.
assertMonitoring(monitoring_socket,
"input.socket_state",
"disconnected")
45 self.
send(output_socket,
"r")
51 self.
assertMonitoring(monitoring_socket,
"input.socket_state",
"disconnected")
56 identity, _ = self.
recv(input_socket)
65 self.
assertMonitoring(monitoring_socket,
"input.socket_state",
"disconnected")
71 identity, _ = self.
recv(input_socket)
78 input_socket.send_multipart([identity, self.
event_data])
79 input_socket.send_multipart([identity, self.
event_data])
80 input_socket.send_multipart([identity, self.
event_data])
81 input_socket.send_multipart([identity, self.
event_data])
91 self.
assertMonitoring(monitoring_socket,
"input.socket_state",
"disconnected")
101 identity, _ = self.
recv(input_socket)
113 self.
send(output_socket,
"r")
122 self.
assertMonitoring(monitoring_socket,
"output.ready_messages[socket]", 1)
128 input_socket.send_multipart([identity, self.
event_data])
136 self.
assertMonitoring(monitoring_socket,
"output.ready_messages[socket]", 0)
142 input_socket.send_multipart([identity, self.
event_data])
151 self.
assertMonitoring(monitoring_socket,
"output.ready_messages[socket]", 0)
157 self.
send(second_output_socket,
"r")
169 input_socket.send_multipart([identity, self.
event_data])
171 self.
send(second_output_socket,
"r")
180 self.
send(output_socket,
"r")
181 self.
send(output_socket,
"r")
182 self.
send(second_output_socket,
"r")
183 self.
send(second_output_socket,
"r")
200 self.
send(output_socket,
"r")
201 self.
send(second_output_socket,
"r")
207 self.
send(monitoring_socket,
"l")
213 self.
send(monitoring_socket,
"l")
220 identity, _ = self.
recv(input_socket)
221 input_socket.send_multipart([identity, self.
event_data])
222 input_socket.send_multipart([identity, self.
event_data])
228 self.
send(monitoring_socket,
"l")
234 self.
send(monitoring_socket,
"x")
243 if __name__ ==
'__main__':