9 from unittest 
import main
 
   20     event_data = open(basf2.find_file(
"daq/hbasf2/tests/out.raw"), 
"br").read()
 
   23         """Setup port numbers and necessary programs""" 
   25         self.
input_portinput_port = HLTZMQTestCase.get_free_port()
 
   34                 "--input", f
"tcp://localhost:{self.input_port}",
 
   35                 "--output", f
"tcp://*:{self.output_port}",
 
   36                 "--monitor", f
"tcp://*:{self.monitoring_port}",
 
   37                 "--stopWaitingTime", 
"1"]}
 
   44         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_state", 
"disconnected")
 
   45         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_connects", 0)
 
   46         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_disconnects", 0)
 
   51             self.
sendsend(output_socket, 
"r")
 
   57         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_state", 
"disconnected")
 
   58         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_connects", 0)
 
   59         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_disconnects", 0)
 
   62         identity, _ = self.
recvrecv(input_socket)
 
   64         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_state", 
"connected")
 
   65         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_connects", 1)
 
   66         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_disconnects", 0)
 
   71         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_state", 
"disconnected")
 
   72         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_connects", 1)
 
   73         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_disconnects", 1)
 
   77         identity, _ = self.
recvrecv(input_socket)
 
   79         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_state", 
"connected")
 
   80         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_connects", 2)
 
   81         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_disconnects", 1)
 
   84         input_socket.send_multipart([identity, self.
event_dataevent_data])
 
   85         input_socket.send_multipart([identity, self.
event_dataevent_data])
 
   86         input_socket.send_multipart([identity, self.
event_dataevent_data])
 
   87         input_socket.send_multipart([identity, self.
event_dataevent_data])
 
   89         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.received_events", 4)
 
   91         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_state", 
"connected")
 
   92         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_connects", 2)
 
   93         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_disconnects", 1)
 
   97         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_state", 
"disconnected")
 
   98         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_connects", 2)
 
   99         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_disconnects", 2)
 
  107         identity, _ = self.
recvrecv(input_socket)
 
  113         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.ready_queue_size", 0)
 
  114         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.registered_workers", 0)
 
  119         self.
sendsend(output_socket, 
"r")
 
  126         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.ready_queue_size", 1)
 
  127         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.registered_workers", 1)
 
  128         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.ready_messages[socket]", 1)
 
  129         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_state", 
"connected")
 
  130         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_connects", 1)
 
  131         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_disconnects", 0)
 
  134         input_socket.send_multipart([identity, self.
event_dataevent_data])
 
  140         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.ready_queue_size", 0)
 
  141         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.registered_workers", 1)
 
  142         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.ready_messages[socket]", 0)
 
  143         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_state", 
"connected")
 
  144         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_connects", 1)
 
  145         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_disconnects", 0)
 
  148         input_socket.send_multipart([identity, self.
event_dataevent_data])
 
  155         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.ready_queue_size", 0)
 
  156         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.registered_workers", 1)
 
  157         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.ready_messages[socket]", 0)
 
  158         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_state", 
"connected")
 
  159         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_connects", 1)
 
  160         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_disconnects", 0)
 
  163         self.
sendsend(second_output_socket, 
"r")
 
  168         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.ready_queue_size", 0)
 
  169         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.registered_workers", 2)
 
  170         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_state", 
"connected")
 
  171         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_connects", 1)
 
  172         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"input.socket_disconnects", 0)
 
  175         input_socket.send_multipart([identity, self.
event_dataevent_data])
 
  177         self.
sendsend(second_output_socket, 
"r")
 
  182         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.ready_queue_size", 0)
 
  183         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.registered_workers", 2)
 
  186         self.
sendsend(output_socket, 
"r")
 
  187         self.
sendsend(output_socket, 
"r")
 
  188         self.
sendsend(second_output_socket, 
"r")
 
  189         self.
sendsend(second_output_socket, 
"r")
 
  191         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.ready_queue_size", 4)
 
  192         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.registered_workers", 2)
 
  202         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.ready_queue_size", 0)
 
  203         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.registered_workers", 0)
 
  206         self.
sendsend(output_socket, 
"r")
 
  207         self.
sendsend(second_output_socket, 
"r")
 
  209         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.ready_queue_size", 2)
 
  210         self.
assertMonitoringassertMonitoring(monitoring_socket, 
"output.registered_workers", 2)
 
  213         self.
sendsend(monitoring_socket, 
"l")
 
  219         self.
sendsend(monitoring_socket, 
"l")
 
  226         identity, _ = self.
recvrecv(input_socket)
 
  227         input_socket.send_multipart([identity, self.
event_dataevent_data])
 
  228         input_socket.send_multipart([identity, self.
event_dataevent_data])
 
  234         self.
sendsend(monitoring_socket, 
"l")
 
  240         self.
sendsend(monitoring_socket, 
"x")
 
  249 if __name__ == 
'__main__':
 
  251     number_of_failures = 0
 
  253     for i 
in range(ZMQ_TEST_FOR_LOOPS):
 
  256         except AssertionError:
 
  257             number_of_failures += 1
 
  260     message = f
'Number of failed for loops: {number_of_failures}/{ZMQ_TEST_FOR_LOOPS}' 
  261     if number_of_failures <= ZMQ_TEST_MAX_FAILURES:
 
  262         basf2.B2INFO(message)
 
  264         basf2.B2FATAL(message)
 
needed_programs
needed_programs
def testConnectAndDisconnect(self)
monitoring_port
monitoring_port
def assertIsMsgType(self, socket, message_type, final=True, router=False)
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def assertNothingMore(self, socket)
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)