Belle II Software development
test_distributor.py
1
8
9from unittest import main
10import basf2
11
12import zmq
13
14from zmq_daq.test_support import HLTZMQTestCase, ZMQ_TEST_FOR_LOOPS, ZMQ_TEST_MAX_FAILURES
15
16
18 """Test case"""
19
20 event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
21
22 def setUp(self):
23 """Setup port numbers and necessary programs"""
24
25 self.input_port = HLTZMQTestCase.get_free_port()
26
27 self.output_port = HLTZMQTestCase.get_free_port()
28
29 self.monitoring_port = HLTZMQTestCase.get_free_port()
30
32 "distributor": [
33 "b2hlt_distributor",
34 "--input", f"tcp://localhost:{self.input_port}",
35 "--output", f"tcp://*:{self.output_port}",
36 "--monitor", f"tcp://*:{self.monitoring_port}",
37 "--stopWaitingTime", "1"]}
38 super().setUp()
39
41 """test function"""
42 monitoring_socket = self.create_socket(self.monitoring_port)
43
44 self.assertMonitoring(monitoring_socket, "input.socket_state", "disconnected")
45 self.assertMonitoring(monitoring_socket, "input.socket_connects", 0)
46 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
47
48 # To make it actually look for input messages, it needs at least a single worker
49 output_socket = self.create_socket(self.output_port)
50 for _ in range(5):
51 self.send(output_socket, "r")
52
53 # Now we can go on
54 input_socket = self.create_socket(self.input_port, socket_type=zmq.STREAM, bind=True)
55
56 # Still no connection
57 self.assertMonitoring(monitoring_socket, "input.socket_state", "disconnected")
58 self.assertMonitoring(monitoring_socket, "input.socket_connects", 0)
59 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
60
61 # This should initiate a connection
62 identity, _ = self.recv(input_socket)
63
64 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
65 self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
66 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
67
68 # And we can also close it again
69 input_socket.close()
70
71 self.assertMonitoring(monitoring_socket, "input.socket_state", "disconnected")
72 self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
73 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 1)
74
75 # or open
76 input_socket = self.create_socket(self.input_port, socket_type=zmq.STREAM, bind=True)
77 identity, _ = self.recv(input_socket)
78
79 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
80 self.assertMonitoring(monitoring_socket, "input.socket_connects", 2)
81 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 1)
82
83 # sending an event should not change anything
84 input_socket.send_multipart([identity, self.event_data])
85 input_socket.send_multipart([identity, self.event_data])
86 input_socket.send_multipart([identity, self.event_data])
87 input_socket.send_multipart([identity, self.event_data])
88
89 self.assertMonitoring(monitoring_socket, "input.received_events", 4)
90
91 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
92 self.assertMonitoring(monitoring_socket, "input.socket_connects", 2)
93 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 1)
94
95 input_socket.close()
96
97 self.assertMonitoring(monitoring_socket, "input.socket_state", "disconnected")
98 self.assertMonitoring(monitoring_socket, "input.socket_connects", 2)
99 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 2)
100
101 def testEvents(self):
102 """test function"""
103 monitoring_socket = self.create_socket(self.monitoring_port)
104
105 # connect the input
106 input_socket = self.create_socket(self.input_port, socket_type=zmq.STREAM, bind=True)
107 identity, _ = self.recv(input_socket)
108
109 # Start two workers
110 output_socket = self.create_socket(self.output_port)
111 second_output_socket = self.create_socket(self.output_port, identity="other_socket")
112
113 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
114 self.assertMonitoring(monitoring_socket, "output.registered_workers", 0)
115
116 # Every ready should give us an event at exactly this worker, when there is an event
117
118 # 1. only send ready message
119 self.send(output_socket, "r")
120
121 # No event
122 self.assertNothingMore(output_socket)
123 self.assertNothingMore(second_output_socket)
124
125 # But ready worker
126 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 1)
127 self.assertMonitoring(monitoring_socket, "output.registered_workers", 1)
128 self.assertMonitoring(monitoring_socket, "output.ready_messages[socket]", 1)
129 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
130 self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
131 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
132
133 # Now the event
134 input_socket.send_multipart([identity, self.event_data])
135
136 self.assertIsMsgType(output_socket, "u")
137 self.assertNothingMore(second_output_socket)
138
139 # And no ready worker
140 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
141 self.assertMonitoring(monitoring_socket, "output.registered_workers", 1)
142 self.assertMonitoring(monitoring_socket, "output.ready_messages[socket]", 0)
143 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
144 self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
145 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
146
147 # 2. Try out sending the event first
148 input_socket.send_multipart([identity, self.event_data])
149
150 # Still no event
151 self.assertNothingMore(output_socket)
152 self.assertNothingMore(second_output_socket)
153
154 # And no updated monitoring
155 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
156 self.assertMonitoring(monitoring_socket, "output.registered_workers", 1)
157 self.assertMonitoring(monitoring_socket, "output.ready_messages[socket]", 0)
158 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
159 self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
160 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
161
162 # but with a ready message
163 self.send(second_output_socket, "r")
164
165 self.assertNothingMore(output_socket)
166 self.assertIsMsgType(second_output_socket, "u")
167
168 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
169 self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
170 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
171 self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
172 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
173
174 # and again
175 input_socket.send_multipart([identity, self.event_data])
176
177 self.send(second_output_socket, "r")
178 self.assertNothingMore(output_socket)
179 self.assertIsMsgType(second_output_socket, "u")
180
181 # As we have answered all ready messages, nothing should be in the queue
182 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
183 self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
184
185 # until we start sending more ready messages
186 self.send(output_socket, "r")
187 self.send(output_socket, "r")
188 self.send(second_output_socket, "r")
189 self.send(second_output_socket, "r")
190
191 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 4)
192 self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
193
194 def testEndRun(self):
195 """test function"""
196 monitoring_socket = self.create_socket(self.monitoring_port)
197
198 # Start two workers
199 output_socket = self.create_socket(self.output_port)
200 second_output_socket = self.create_socket(self.output_port, identity="other_socket")
201
202 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
203 self.assertMonitoring(monitoring_socket, "output.registered_workers", 0)
204
205 # and register them by sending a ready
206 self.send(output_socket, "r")
207 self.send(second_output_socket, "r")
208
209 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 2)
210 self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
211
212 # a stop run should be sent to all workers
213 self.send(monitoring_socket, "l")
214
215 self.assertIsMsgType(output_socket, "l")
216 self.assertIsMsgType(second_output_socket, "l")
217
218 # also multiple times
219 self.send(monitoring_socket, "l")
220
221 self.assertIsMsgType(output_socket, "l")
222 self.assertIsMsgType(second_output_socket, "l")
223
224 # and if there are events in between they should not be transported
225 input_socket = self.create_socket(self.input_port, socket_type=zmq.STREAM, bind=True)
226 identity, _ = self.recv(input_socket)
227 input_socket.send_multipart([identity, self.event_data])
228 input_socket.send_multipart([identity, self.event_data])
229
230 self.assertNothingMore(output_socket)
231 self.assertNothingMore(second_output_socket)
232
233 # it shouldn't matter
234 self.send(monitoring_socket, "l")
235
236 self.assertIsMsgType(output_socket, "l")
237 self.assertIsMsgType(second_output_socket, "l")
238
239 # A terminate message should also be sent
240 self.send(monitoring_socket, "x")
241
242 self.assertIsMsgType(output_socket, "x")
243 self.assertIsMsgType(second_output_socket, "x")
244
245 # and the distributor should go down
246 self.assertIsDown("distributor")
247
248
249if __name__ == '__main__':
250
251 number_of_failures = 0
252
253 for i in range(ZMQ_TEST_FOR_LOOPS):
254 try:
255 main(exit=False)
256 except AssertionError:
257 number_of_failures += 1
258
259
260 message = f'Number of failed for loops: {number_of_failures}/{ZMQ_TEST_FOR_LOOPS}'
261 if number_of_failures <= ZMQ_TEST_MAX_FAILURES:
262 basf2.B2INFO(message)
263 else:
264 basf2.B2FATAL(message)
def assertIsMsgType(self, socket, message_type, final=True, router=False)
dict needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
Definition: test_support.py:38
def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False)
def assertMonitoring(self, socket, search_key, search_value, timeout=10)
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)
Definition: test_support.py:93
Definition: main.py:1