Belle II Software development
DistributorTestCase Class Reference
Inheritance diagram for DistributorTestCase:
HLTZMQTestCase

Public Member Functions

def setUp (self)
 
def testConnectAndDisconnect (self)
 
def testEvents (self)
 
def testEndRun (self)
 

Public Attributes

 input_port
 input_port
 
 output_port
 output_port
 
 monitoring_port
 monitoring_port
 
 needed_programs
 needed_programs
 

Static Public Attributes

open event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
 event_data
 

Detailed Description

Test case

Definition at line 17 of file test_distributor.py.

Member Function Documentation

◆ setUp()

def setUp (   self)
Setup port numbers and necessary programs

Reimplemented from HLTZMQTestCase.

Definition at line 22 of file test_distributor.py.

22 def setUp(self):
23 """Setup port numbers and necessary programs"""
24
25 self.input_port = HLTZMQTestCase.get_free_port()
26
27 self.output_port = HLTZMQTestCase.get_free_port()
28
29 self.monitoring_port = HLTZMQTestCase.get_free_port()
30
31 self.needed_programs = {
32 "distributor": [
33 "b2hlt_distributor",
34 "--input", f"tcp://localhost:{self.input_port}",
35 "--output", f"tcp://*:{self.output_port}",
36 "--monitor", f"tcp://*:{self.monitoring_port}",
37 "--stopWaitingTime", "1"]}
38 super().setUp()
39

◆ testConnectAndDisconnect()

def testConnectAndDisconnect (   self)
test function

Definition at line 40 of file test_distributor.py.

40 def testConnectAndDisconnect(self):
41 """test function"""
42 monitoring_socket = self.create_socket(self.monitoring_port)
43
44 self.assertMonitoring(monitoring_socket, "input.socket_state", "disconnected")
45 self.assertMonitoring(monitoring_socket, "input.socket_connects", 0)
46 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
47
48 # To make it actually look for input messages, it needs at least a single worker
49 output_socket = self.create_socket(self.output_port)
50 for _ in range(5):
51 self.send(output_socket, "r")
52
53 # Now we can go on
54 input_socket = self.create_socket(self.input_port, socket_type=zmq.STREAM, bind=True)
55
56 # Still no connection
57 self.assertMonitoring(monitoring_socket, "input.socket_state", "disconnected")
58 self.assertMonitoring(monitoring_socket, "input.socket_connects", 0)
59 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
60
61 # This should initiate a connection
62 identity, _ = self.recv(input_socket)
63
64 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
65 self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
66 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
67
68 # And we can also close it again
69 input_socket.close()
70
71 self.assertMonitoring(monitoring_socket, "input.socket_state", "disconnected")
72 self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
73 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 1)
74
75 # or open
76 input_socket = self.create_socket(self.input_port, socket_type=zmq.STREAM, bind=True)
77 identity, _ = self.recv(input_socket)
78
79 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
80 self.assertMonitoring(monitoring_socket, "input.socket_connects", 2)
81 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 1)
82
83 # sending an event should not change anything
84 input_socket.send_multipart([identity, self.event_data])
85 input_socket.send_multipart([identity, self.event_data])
86 input_socket.send_multipart([identity, self.event_data])
87 input_socket.send_multipart([identity, self.event_data])
88
89 self.assertMonitoring(monitoring_socket, "input.received_events", 4)
90
91 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
92 self.assertMonitoring(monitoring_socket, "input.socket_connects", 2)
93 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 1)
94
95 input_socket.close()
96
97 self.assertMonitoring(monitoring_socket, "input.socket_state", "disconnected")
98 self.assertMonitoring(monitoring_socket, "input.socket_connects", 2)
99 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 2)
100

◆ testEndRun()

def testEndRun (   self)
test function

Definition at line 194 of file test_distributor.py.

194 def testEndRun(self):
195 """test function"""
196 monitoring_socket = self.create_socket(self.monitoring_port)
197
198 # Start two workers
199 output_socket = self.create_socket(self.output_port)
200 second_output_socket = self.create_socket(self.output_port, identity="other_socket")
201
202 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
203 self.assertMonitoring(monitoring_socket, "output.registered_workers", 0)
204
205 # and register them by sending a ready
206 self.send(output_socket, "r")
207 self.send(second_output_socket, "r")
208
209 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 2)
210 self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
211
212 # a stop run should be sent to all workers
213 self.send(monitoring_socket, "l")
214
215 self.assertIsMsgType(output_socket, "l")
216 self.assertIsMsgType(second_output_socket, "l")
217
218 # also multiple times
219 self.send(monitoring_socket, "l")
220
221 self.assertIsMsgType(output_socket, "l")
222 self.assertIsMsgType(second_output_socket, "l")
223
224 # and if there are events in between they should not be transported
225 input_socket = self.create_socket(self.input_port, socket_type=zmq.STREAM, bind=True)
226 identity, _ = self.recv(input_socket)
227 input_socket.send_multipart([identity, self.event_data])
228 input_socket.send_multipart([identity, self.event_data])
229
230 self.assertNothingMore(output_socket)
231 self.assertNothingMore(second_output_socket)
232
233 # it shouldn't matter
234 self.send(monitoring_socket, "l")
235
236 self.assertIsMsgType(output_socket, "l")
237 self.assertIsMsgType(second_output_socket, "l")
238
239 # A terminate message should also be sent
240 self.send(monitoring_socket, "x")
241
242 self.assertIsMsgType(output_socket, "x")
243 self.assertIsMsgType(second_output_socket, "x")
244
245 # and the distributor should go down
246 self.assertIsDown("distributor")
247
248

◆ testEvents()

def testEvents (   self)
test function

Definition at line 101 of file test_distributor.py.

101 def testEvents(self):
102 """test function"""
103 monitoring_socket = self.create_socket(self.monitoring_port)
104
105 # connect the input
106 input_socket = self.create_socket(self.input_port, socket_type=zmq.STREAM, bind=True)
107 identity, _ = self.recv(input_socket)
108
109 # Start two workers
110 output_socket = self.create_socket(self.output_port)
111 second_output_socket = self.create_socket(self.output_port, identity="other_socket")
112
113 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
114 self.assertMonitoring(monitoring_socket, "output.registered_workers", 0)
115
116 # Every ready should give us an event at exactly this worker, when there is an event
117
118 # 1. only send ready message
119 self.send(output_socket, "r")
120
121 # No event
122 self.assertNothingMore(output_socket)
123 self.assertNothingMore(second_output_socket)
124
125 # But ready worker
126 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 1)
127 self.assertMonitoring(monitoring_socket, "output.registered_workers", 1)
128 self.assertMonitoring(monitoring_socket, "output.ready_messages[socket]", 1)
129 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
130 self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
131 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
132
133 # Now the event
134 input_socket.send_multipart([identity, self.event_data])
135
136 self.assertIsMsgType(output_socket, "u")
137 self.assertNothingMore(second_output_socket)
138
139 # And no ready worker
140 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
141 self.assertMonitoring(monitoring_socket, "output.registered_workers", 1)
142 self.assertMonitoring(monitoring_socket, "output.ready_messages[socket]", 0)
143 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
144 self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
145 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
146
147 # 2. Try out sending the event first
148 input_socket.send_multipart([identity, self.event_data])
149
150 # Still no event
151 self.assertNothingMore(output_socket)
152 self.assertNothingMore(second_output_socket)
153
154 # And no updated monitoring
155 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
156 self.assertMonitoring(monitoring_socket, "output.registered_workers", 1)
157 self.assertMonitoring(monitoring_socket, "output.ready_messages[socket]", 0)
158 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
159 self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
160 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
161
162 # but with a ready message
163 self.send(second_output_socket, "r")
164
165 self.assertNothingMore(output_socket)
166 self.assertIsMsgType(second_output_socket, "u")
167
168 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
169 self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
170 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
171 self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
172 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
173
174 # and again
175 input_socket.send_multipart([identity, self.event_data])
176
177 self.send(second_output_socket, "r")
178 self.assertNothingMore(output_socket)
179 self.assertIsMsgType(second_output_socket, "u")
180
181 # As we have answered all ready messages, nothing should be in the queue
182 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
183 self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
184
185 # until we start sending more ready messages
186 self.send(output_socket, "r")
187 self.send(output_socket, "r")
188 self.send(second_output_socket, "r")
189 self.send(second_output_socket, "r")
190
191 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 4)
192 self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
193

Member Data Documentation

◆ event_data

open event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
static

event_data

Definition at line 20 of file test_distributor.py.

◆ input_port

input_port

input_port

Definition at line 25 of file test_distributor.py.

◆ monitoring_port

monitoring_port

monitoring_port

Definition at line 29 of file test_distributor.py.

◆ needed_programs

needed_programs

needed_programs

Definition at line 31 of file test_distributor.py.

◆ output_port

output_port

output_port

Definition at line 27 of file test_distributor.py.


The documentation for this class was generated from the following file: