Belle II Software  release-08-01-10
test_distributor.py
1 
8 
9 from unittest import main
10 import basf2
11 
12 import zmq
13 
14 from zmq_daq.test_support import HLTZMQTestCase, ZMQ_TEST_FOR_LOOPS, ZMQ_TEST_MAX_FAILURES
15 
16 
18  """Test case"""
19 
20  event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
21 
22  def setUp(self):
23  """Setup port numbers and necessary programs"""
24 
25  self.input_portinput_port = HLTZMQTestCase.get_free_port()
26 
27  self.output_portoutput_port = HLTZMQTestCase.get_free_port()
28 
29  self.monitoring_portmonitoring_port = HLTZMQTestCase.get_free_port()
30 
31  self.needed_programsneeded_programsneeded_programs = {
32  "distributor": [
33  "b2hlt_distributor",
34  "--input", f"tcp://localhost:{self.input_port}",
35  "--output", f"tcp://*:{self.output_port}",
36  "--monitor", f"tcp://*:{self.monitoring_port}",
37  "--stopWaitingTime", "1"]}
38  super().setUp()
39 
41  """test function"""
42  monitoring_socket = self.create_socketcreate_socket(self.monitoring_portmonitoring_port)
43 
44  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_state", "disconnected")
45  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_connects", 0)
46  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
47 
48  # To make it actually look for input messages, it needs at least a single worker
49  output_socket = self.create_socketcreate_socket(self.output_portoutput_port)
50  for _ in range(5):
51  self.sendsend(output_socket, "r")
52 
53  # Now we can go on
54  input_socket = self.create_socketcreate_socket(self.input_portinput_port, socket_type=zmq.STREAM, bind=True)
55 
56  # Still no connection
57  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_state", "disconnected")
58  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_connects", 0)
59  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
60 
61  # This should initiate a connection
62  identity, _ = self.recvrecv(input_socket)
63 
64  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_state", "connected")
65  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_connects", 1)
66  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
67 
68  # And we can also close it again
69  input_socket.close()
70 
71  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_state", "disconnected")
72  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_connects", 1)
73  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_disconnects", 1)
74 
75  # or open
76  input_socket = self.create_socketcreate_socket(self.input_portinput_port, socket_type=zmq.STREAM, bind=True)
77  identity, _ = self.recvrecv(input_socket)
78 
79  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_state", "connected")
80  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_connects", 2)
81  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_disconnects", 1)
82 
83  # sending an event should not change anything
84  input_socket.send_multipart([identity, self.event_dataevent_data])
85  input_socket.send_multipart([identity, self.event_dataevent_data])
86  input_socket.send_multipart([identity, self.event_dataevent_data])
87  input_socket.send_multipart([identity, self.event_dataevent_data])
88 
89  self.assertMonitoringassertMonitoring(monitoring_socket, "input.received_events", 4)
90 
91  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_state", "connected")
92  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_connects", 2)
93  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_disconnects", 1)
94 
95  input_socket.close()
96 
97  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_state", "disconnected")
98  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_connects", 2)
99  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_disconnects", 2)
100 
101  def testEvents(self):
102  """test function"""
103  monitoring_socket = self.create_socketcreate_socket(self.monitoring_portmonitoring_port)
104 
105  # connect the input
106  input_socket = self.create_socketcreate_socket(self.input_portinput_port, socket_type=zmq.STREAM, bind=True)
107  identity, _ = self.recvrecv(input_socket)
108 
109  # Start two workers
110  output_socket = self.create_socketcreate_socket(self.output_portoutput_port)
111  second_output_socket = self.create_socketcreate_socket(self.output_portoutput_port, identity="other_socket")
112 
113  self.assertMonitoringassertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
114  self.assertMonitoringassertMonitoring(monitoring_socket, "output.registered_workers", 0)
115 
116  # Every ready should give us an event at exactly this worker, when there is an event
117 
118  # 1. only send ready message
119  self.sendsend(output_socket, "r")
120 
121  # No event
122  self.assertNothingMoreassertNothingMore(output_socket)
123  self.assertNothingMoreassertNothingMore(second_output_socket)
124 
125  # But ready worker
126  self.assertMonitoringassertMonitoring(monitoring_socket, "output.ready_queue_size", 1)
127  self.assertMonitoringassertMonitoring(monitoring_socket, "output.registered_workers", 1)
128  self.assertMonitoringassertMonitoring(monitoring_socket, "output.ready_messages[socket]", 1)
129  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_state", "connected")
130  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_connects", 1)
131  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
132 
133  # Now the event
134  input_socket.send_multipart([identity, self.event_dataevent_data])
135 
136  self.assertIsMsgTypeassertIsMsgType(output_socket, "u")
137  self.assertNothingMoreassertNothingMore(second_output_socket)
138 
139  # And no ready worker
140  self.assertMonitoringassertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
141  self.assertMonitoringassertMonitoring(monitoring_socket, "output.registered_workers", 1)
142  self.assertMonitoringassertMonitoring(monitoring_socket, "output.ready_messages[socket]", 0)
143  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_state", "connected")
144  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_connects", 1)
145  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
146 
147  # 2. Try out sending the event first
148  input_socket.send_multipart([identity, self.event_dataevent_data])
149 
150  # Still no event
151  self.assertNothingMoreassertNothingMore(output_socket)
152  self.assertNothingMoreassertNothingMore(second_output_socket)
153 
154  # And no updated monitoring
155  self.assertMonitoringassertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
156  self.assertMonitoringassertMonitoring(monitoring_socket, "output.registered_workers", 1)
157  self.assertMonitoringassertMonitoring(monitoring_socket, "output.ready_messages[socket]", 0)
158  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_state", "connected")
159  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_connects", 1)
160  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
161 
162  # but with a ready message
163  self.sendsend(second_output_socket, "r")
164 
165  self.assertNothingMoreassertNothingMore(output_socket)
166  self.assertIsMsgTypeassertIsMsgType(second_output_socket, "u")
167 
168  self.assertMonitoringassertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
169  self.assertMonitoringassertMonitoring(monitoring_socket, "output.registered_workers", 2)
170  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_state", "connected")
171  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_connects", 1)
172  self.assertMonitoringassertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
173 
174  # and again
175  input_socket.send_multipart([identity, self.event_dataevent_data])
176 
177  self.sendsend(second_output_socket, "r")
178  self.assertNothingMoreassertNothingMore(output_socket)
179  self.assertIsMsgTypeassertIsMsgType(second_output_socket, "u")
180 
181  # As we have answered all ready messages, nothing should be in the queue
182  self.assertMonitoringassertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
183  self.assertMonitoringassertMonitoring(monitoring_socket, "output.registered_workers", 2)
184 
185  # until we start sending more ready messages
186  self.sendsend(output_socket, "r")
187  self.sendsend(output_socket, "r")
188  self.sendsend(second_output_socket, "r")
189  self.sendsend(second_output_socket, "r")
190 
191  self.assertMonitoringassertMonitoring(monitoring_socket, "output.ready_queue_size", 4)
192  self.assertMonitoringassertMonitoring(monitoring_socket, "output.registered_workers", 2)
193 
194  def testEndRun(self):
195  """test function"""
196  monitoring_socket = self.create_socketcreate_socket(self.monitoring_portmonitoring_port)
197 
198  # Start two workers
199  output_socket = self.create_socketcreate_socket(self.output_portoutput_port)
200  second_output_socket = self.create_socketcreate_socket(self.output_portoutput_port, identity="other_socket")
201 
202  self.assertMonitoringassertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
203  self.assertMonitoringassertMonitoring(monitoring_socket, "output.registered_workers", 0)
204 
205  # and register them by sending a ready
206  self.sendsend(output_socket, "r")
207  self.sendsend(second_output_socket, "r")
208 
209  self.assertMonitoringassertMonitoring(monitoring_socket, "output.ready_queue_size", 2)
210  self.assertMonitoringassertMonitoring(monitoring_socket, "output.registered_workers", 2)
211 
212  # a stop run should be sent to all workers
213  self.sendsend(monitoring_socket, "l")
214 
215  self.assertIsMsgTypeassertIsMsgType(output_socket, "l")
216  self.assertIsMsgTypeassertIsMsgType(second_output_socket, "l")
217 
218  # also multiple times
219  self.sendsend(monitoring_socket, "l")
220 
221  self.assertIsMsgTypeassertIsMsgType(output_socket, "l")
222  self.assertIsMsgTypeassertIsMsgType(second_output_socket, "l")
223 
224  # and if there are events in between they should not be transported
225  input_socket = self.create_socketcreate_socket(self.input_portinput_port, socket_type=zmq.STREAM, bind=True)
226  identity, _ = self.recvrecv(input_socket)
227  input_socket.send_multipart([identity, self.event_dataevent_data])
228  input_socket.send_multipart([identity, self.event_dataevent_data])
229 
230  self.assertNothingMoreassertNothingMore(output_socket)
231  self.assertNothingMoreassertNothingMore(second_output_socket)
232 
233  # it shouldn't matter
234  self.sendsend(monitoring_socket, "l")
235 
236  self.assertIsMsgTypeassertIsMsgType(output_socket, "l")
237  self.assertIsMsgTypeassertIsMsgType(second_output_socket, "l")
238 
239  # A terminate message should also be sent
240  self.sendsend(monitoring_socket, "x")
241 
242  self.assertIsMsgTypeassertIsMsgType(output_socket, "x")
243  self.assertIsMsgTypeassertIsMsgType(second_output_socket, "x")
244 
245  # and the distributor should go down
246  self.assertIsDownassertIsDown("distributor")
247 
248 
249 if __name__ == '__main__':
250 
251  number_of_failures = 0
252 
253  for i in range(ZMQ_TEST_FOR_LOOPS):
254  try:
255  main(exit=False)
256  except AssertionError:
257  number_of_failures += 1
258 
259 
260  message = f'Number of failed for loops: {number_of_failures}/{ZMQ_TEST_FOR_LOOPS}'
261  if number_of_failures <= ZMQ_TEST_MAX_FAILURES:
262  basf2.B2INFO(message)
263  else:
264  basf2.B2FATAL(message)
def assertIsMsgType(self, socket, message_type, final=True, router=False)
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
Definition: test_support.py:38
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)
Definition: test_support.py:93
Definition: main.py:1