Belle II Software development
HLTTestCase Class Reference
Inheritance diagram for HLTTestCase:
HLTZMQTestCase

Public Member Functions

 setUp (self)
 
 testFullRun (self)
 
 tearDown (self)
 
 assertIsDown (self, name, timeout=5, minimum_delay=0.1)
 
 assertIsRunning (self, name)
 
 assertMonitoring (self, socket, search_key, search_value, timeout=10)
 
 assertIsAndGet (self, socket, message_type, final=True, router=False)
 
 assertIsMsgType (self, socket, message_type, final=True, router=False)
 
 assertNothingMore (self, socket)
 
 assertHasOutputFile (self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
 
 assertNotHasOutputFile (self, output_file, timeout=0.5)
 

Static Public Member Functions

 get_free_port ()
 
 create_socket (port, socket_type=zmq.DEALER, identity="socket", bind=False)
 
 create_router_socket (port)
 
 send (socket, message_type, first_data=b"", second_data=b"", identity="")
 
 recv (socket)
 

Public Attributes

 distributor_input_port = HLTZMQTestCase.get_free_port()
 distributor_input_port
 
 distributor_output_port = HLTZMQTestCase.get_free_port()
 distributor_output_port
 
 distributor_monitoring_port = HLTZMQTestCase.get_free_port()
 distributor_monitoring_port
 
 collector_input_port = HLTZMQTestCase.get_free_port()
 collector_input_port
 
 collector_output_port = HLTZMQTestCase.get_free_port()
 collector_output_port
 
 collector_monitoring_port = HLTZMQTestCase.get_free_port()
 collector_monitoring_port
 
 final_collector_input_port = HLTZMQTestCase.get_free_port()
 final_collector_input_port
 
 final_collector_output_port = HLTZMQTestCase.get_free_port()
 final_collector_output_port
 
 final_collector_monitoring_port = HLTZMQTestCase.get_free_port()
 final_collector_monitoring_port
 
 test_dir = tempfile.mkdtemp()
 use a temporary folder for testing
 
 previous_dir = os.getcwd()
 remember current working directory
 
 started_programs = dict()
 dict for all started programs
 
 tearDown = subprocess.Popen(command, start_new_session=True)
 

Static Public Attributes

 event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
 event_data
 
 ctx = zmq.Context()
 The ZMQ context.
 
 needed_programs = dict()
 The dict name -> cmd args of the programs to start, needs to be set in each test.
 

Protected Member Functions

 _is_running (self, name)
 

Detailed Description

Test case

Definition at line 18 of file test_hlt.py.

Member Function Documentation

◆ _is_running()

_is_running ( self,
name )
protectedinherited
Check if a given program is still running.

Definition at line 84 of file test_support.py.

84 def _is_running(self, name):
85 """
86 Check if a given program is still running.
87 """
88 process = self.started_programs[name]
89 pid, sts = process._try_wait(os.WNOHANG)
90 assert pid == process.pid or pid == 0
91 return pid == 0
92

◆ assertHasOutputFile()

assertHasOutputFile ( self,
output_file,
unlink = True,
timeout = 0.5,
minimum_delay = 0.1 )
inherited
Assert that - at least after the given timeout - the output file is present. If unlink is set to True, remove the file after checking.

Definition at line 227 of file test_support.py.

227 def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1):
228 """
229 Assert that - at least after the given timeout - the output file
230 is present. If unlink is set to True, remove the file after checking.
231 """
232 endtime = time() + timeout
233
234 while True:
235 if os.path.exists(output_file):
236 if unlink:
237 os.unlink(output_file)
238 return
239
240 remaining = endtime - time()
241 self.assertFalse(remaining <= 0)
242
243 sleep(minimum_delay)
244

◆ assertIsAndGet()

assertIsAndGet ( self,
socket,
message_type,
final = True,
router = False )
inherited
Assert that the next message received on the socket has the given message type. If final is set to True, also assert that there is no additional message on the socket. Use router only for router sockets.

Definition at line 200 of file test_support.py.

200 def assertIsAndGet(self, socket, message_type, final=True, router=False):
201 """
202 Assert that the next message received on the socket has the given message type.
203 If final is set to True, also assert that there is no additional message on the socket.
204 Use router only for router sockets.
205 """
206 answer = HLTZMQTestCase.recv(socket)
207 type_index = 0
208 if router:
209 type_index = 1
210 self.assertEqual(answer[type_index], message_type.encode())
211 if final:
212 self.assertNothingMore(socket)
213 return answer
214

◆ assertIsDown()

assertIsDown ( self,
name,
timeout = 5,
minimum_delay = 0.1 )
inherited
Test helper to assert the given program has terminated - at least after timeout in seconds has passed. Checks every "minimal_delay seconds.

Definition at line 93 of file test_support.py.

93 def assertIsDown(self, name, timeout=5, minimum_delay=0.1):
94 """
95 Test helper to assert the given program has terminated - at least after timeout in seconds has passed.
96 Checks every "minimal_delay seconds.
97 """
98 endtime = time() + timeout
99 while True:
100 if not self._is_running(name):
101 return
102
103 remaining = endtime - time()
104 self.assertFalse(remaining <= 0)
105
106 sleep(minimum_delay)
107

◆ assertIsMsgType()

assertIsMsgType ( self,
socket,
message_type,
final = True,
router = False )
inherited
Deprecated copy of "assertIsAndGet".

Definition at line 215 of file test_support.py.

215 def assertIsMsgType(self, socket, message_type, final=True, router=False):
216 """
217 Deprecated copy of "assertIsAndGet".
218 """
219 return self.assertIsAndGet(socket, message_type, final=final, router=router)
220

◆ assertIsRunning()

assertIsRunning ( self,
name )
inherited
Assert that a given program is still running.

Definition at line 108 of file test_support.py.

108 def assertIsRunning(self, name):
109 """
110 Assert that a given program is still running.
111 """
112 self.assertTrue(self._is_running(name))
113

◆ assertMonitoring()

assertMonitoring ( self,
socket,
search_key,
search_value,
timeout = 10 )
inherited
Ask the given socket for a monitoring JSON and make sure, the value related to "search_key" is set to "search_value" - at least after the given timeout. The search key can should be in the form "<category>.<key>".

Definition at line 169 of file test_support.py.

169 def assertMonitoring(self, socket, search_key, search_value, timeout=10):
170 """
171 Ask the given socket for a monitoring JSON and make sure, the value related to "search_key"
172 is set to "search_value" - at least after the given timeout.
173 The search key can should be in the form "<category>.<key>".
174 """
175 end_time = time() + timeout
176 monitoring = dict()
177 while time() < end_time:
178 HLTZMQTestCase.send(socket, "m")
179 answer = self.assertIsAndGet(socket, "c")
180
181 dict_monitoring = json.loads(answer[1])
182 for parent_key, parent_dict in dict_monitoring.items():
183 for key, value in parent_dict.items():
184 monitoring[parent_key + "." + key] = value
185
186 if search_key in monitoring and monitoring[search_key] == search_value:
187 break
188 else:
189 if monitoring:
190 if search_key not in monitoring:
191 raise AssertionError(f"Monitoring did not have a result with key {search_key}")
192 else:
193 raise AssertionError(
194 f"Monitoring did not show the result {search_value} for {search_key}, instead {monitoring[search_key]}")
195 else:
196 raise AssertionError("Monitoring did not answer in time.")
197
198 self.assertNothingMore(socket)
199

◆ assertNotHasOutputFile()

assertNotHasOutputFile ( self,
output_file,
timeout = 0.5 )
inherited
Assert that after the timeout the given file is not present (a.k.a. no process has created it)

Definition at line 245 of file test_support.py.

245 def assertNotHasOutputFile(self, output_file, timeout=0.5):
246 """
247 Assert that after the timeout the given file is not present
248 (a.k.a. no process has created it)
249 """
250 sleep(timeout)
251 self.assertFalse(os.path.exists(output_file))
252
253

◆ assertNothingMore()

assertNothingMore ( self,
socket )
inherited
Assert that there is no pending message to be received on the socket.

Definition at line 221 of file test_support.py.

221 def assertNothingMore(self, socket):
222 """
223 Assert that there is no pending message to be received on the socket.
224 """
225 self.assertFalse(socket.poll(0))
226

◆ create_router_socket()

create_router_socket ( port)
staticinherited
Shortcut to create a ROUTER type socket with the typical parameters binding to the given port.

Definition at line 140 of file test_support.py.

140 def create_router_socket(port):
141 """
142 Shortcut to create a ROUTER type socket with the typical parameters
143 binding to the given port.
144 """
145 return HLTZMQTestCase.create_socket(port, socket_type=zmq.ROUTER, identity="", bind=True)
146

◆ create_socket()

create_socket ( port,
socket_type = zmq.DEALER,
identity = "socket",
bind = False )
staticinherited
Create and return a ZMQ socket with the given type and identity and bind or connect it to localhost and the given port.

Definition at line 115 of file test_support.py.

115 def create_socket(port, socket_type=zmq.DEALER, identity="socket", bind=False):
116 """
117 Create and return a ZMQ socket with the given type and identity and
118 bind or connect it to localhost and the given port.
119 """
120 socket = HLTZMQTestCase.ctx.socket(socket_type)
121 socket.rcvtimeo = 10000
122 socket.linger = 0
123 if identity:
124 socket.setsockopt_string(zmq.IDENTITY, identity)
125 if bind:
126 if port is None:
127 port = socket.bind_to_random_port("tcp://*")
128 return socket, port
129 else:
130 socket.bind(f"tcp://*:{port}")
131 else:
132 if port is None:
133 raise RuntimeError("Cannot connect to unknown port")
134
135 socket.connect(f"tcp://localhost:{port}")
136
137 return socket
138

◆ get_free_port()

get_free_port ( )
staticinherited
Get a free port number by reusing ZMQ's function for this.

Definition at line 41 of file test_support.py.

41 def get_free_port():
42 """
43 Get a free port number by reusing ZMQ's function for this.
44 """
45 socket = HLTZMQTestCase.ctx.socket(zmq.ROUTER)
46 port = socket.bind_to_random_port("tcp://*")
47 socket.close()
48 return port
49

◆ recv()

recv ( socket)
staticinherited
Try to receive a message from the socket (or throw an assertion error if none comes after the set timeout of the socket).

Definition at line 159 of file test_support.py.

159 def recv(socket):
160 """
161 Try to receive a message from the socket (or throw an assertion error if none comes after the set timeout
162 of the socket).
163 """
164 try:
165 return socket.recv_multipart()
166 except zmq.error.Again:
167 raise AssertionError("No answer from socket")
168

◆ send()

send ( socket,
message_type,
first_data = b"",
second_data = b"",
identity = "" )
staticinherited
Send a message consisting of the message type, the first and the second data either to the identity if given or without identity if omitted.

Definition at line 148 of file test_support.py.

148 def send(socket, message_type, first_data=b"", second_data=b"", identity=""):
149 """
150 Send a message consisting of the message type, the first and the second data
151 either to the identity if given or without identity if omitted.
152 """
153 if identity:
154 socket.send_multipart([identity.encode(), message_type.encode(), first_data, second_data])
155 else:
156 socket.send_multipart([message_type.encode(), first_data, second_data])
157

◆ setUp()

setUp ( self)
Setup port numbers and necessary programs

Reimplemented from HLTZMQTestCase.

Definition at line 23 of file test_hlt.py.

23 def setUp(self):
24 """Setup port numbers and necessary programs"""
25
26 self.distributor_input_port = HLTZMQTestCase.get_free_port()
27
28 self.distributor_output_port = HLTZMQTestCase.get_free_port()
29
30 self.distributor_monitoring_port = HLTZMQTestCase.get_free_port()
31
32
33 self.collector_input_port = HLTZMQTestCase.get_free_port()
34
35 self.collector_output_port = HLTZMQTestCase.get_free_port()
36
37 self.collector_monitoring_port = HLTZMQTestCase.get_free_port()
38
39
40 self.final_collector_input_port = HLTZMQTestCase.get_free_port()
41
42 self.final_collector_output_port = HLTZMQTestCase.get_free_port()
43
44 self.final_collector_monitoring_port = HLTZMQTestCase.get_free_port()
45
46
47 self.needed_programs = {
48 "distributor": [
49 "b2hlt_distributor",
50 "--input", f"tcp://localhost:{self.distributor_input_port}",
51 "--output", f"tcp://*:{self.distributor_output_port}",
52 "--monitor", f"tcp://*:{self.distributor_monitoring_port}"
53 ],
54 "collector": [
55 "b2hlt_collector",
56 "--input", f"tcp://*:{self.collector_input_port}",
57 "--output", f"tcp://*:{self.collector_output_port}",
58 "--monitor", f"tcp://*:{self.collector_monitoring_port}"
59 ],
60 "final_collector": [
61 "b2hlt_finalcollector",
62 "--input", f"tcp://*:{self.final_collector_input_port}",
63 "--output", f"tcp://localhost:{self.final_collector_output_port}",
64 "--monitor", f"tcp://*:{self.final_collector_monitoring_port}"
65 ],
66 "worker": [
67 "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
68 "--input", f"tcp://localhost:{self.distributor_output_port}",
69 "--output", f"tcp://localhost:{self.collector_input_port}"
70 ],
71 "output_worker": [
72 "python3", basf2.find_file("daq/hbasf2/tests/passthrough.no_run_py"),
73 "--prefix", "output_",
74 "--input", f"tcp://localhost:{self.collector_output_port}",
75 "--output", f"tcp://localhost:{self.final_collector_input_port}"
76 ],
77 }
78 super().setUp()
79

◆ tearDown()

tearDown ( self)
inherited
Custom tearDown function to kill the started programs if still present and remove the temporary folder again.

Definition at line 70 of file test_support.py.

70 def tearDown(self):
71 """
72 Custom tearDown function to kill the started programs if still present
73 and remove the temporary folder again.
74 """
75 for name, process in self.started_programs.items():
76 if self._is_running(name):
77 os.killpg(process.pid, signal.SIGKILL)
78 process.wait()
79 os.chdir(self.previous_dir)
80 shutil.rmtree(self.test_dir)
81
82 atexit._clear()
83

◆ testFullRun()

testFullRun ( self)
test function

Definition at line 80 of file test_hlt.py.

80 def testFullRun(self):
81 """test function"""
82 distributor_monitoring_socket = self.create_socket(self.distributor_monitoring_port)
83 collector_monitoring_socket = self.create_socket(self.collector_monitoring_port)
84 final_collector_monitoring_socket = self.create_socket(self.final_collector_monitoring_port)
85
86 input_socket = self.create_socket(self.distributor_input_port, socket_type=zmq.STREAM, bind=True)
87 input_identity, _ = self.recv(input_socket)
88
89 output_socket = self.create_socket(self.final_collector_output_port, socket_type=zmq.STREAM, bind=True)
90 output_identity, _ = self.recv(output_socket)
91
92 # At the beginning, everything should be at normal state
93 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
94 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
95 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
96 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
97 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
98
99 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
100 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
101 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
102
103 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
104
105 self.assertHasOutputFile("initialize_called", timeout=1)
106 self.assertHasOutputFile("output_initialize_called", timeout=1)
107
108 # Now lets try sending some events
109 for _ in range(20):
110 input_socket.send_multipart([input_identity, self.event_data])
111
112 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
113 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
114 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
115 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
116 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
117 self.assertMonitoring(distributor_monitoring_socket, "output.sent_events", 20)
118
119 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
120 self.assertMonitoring(collector_monitoring_socket, "input.received_events", 20)
121 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
122 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
123 self.assertMonitoring(collector_monitoring_socket, "output.sent_events", 20)
124
125 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
126 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 20)
127 self.assertMonitoring(final_collector_monitoring_socket, "output.sent_events", 20)
128
129 self.assertHasOutputFile("beginrun_called", timeout=1)
130 self.assertHasOutputFile("output_beginrun_called", timeout=1)
131
132 buffer = b""
133 while output_socket.poll(0):
134 _, msg = self.recv(output_socket)
135 buffer += msg
136 self.assertNothingMore(output_socket)
137
138 # Data Size != raw data, as data is in different format, size taken from test itself
139 self.assertEqual(len(buffer), 122638 * 20)
140
141 # Now we stop this run
142 self.send(distributor_monitoring_socket, "l")
143
144 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
145 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
146 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
147 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
148 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
149
150 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
151 self.assertMonitoring(collector_monitoring_socket, "input.received_events", 20)
152 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
153 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
154 self.assertMonitoring(collector_monitoring_socket, "input.received_stop_messages", 1)
155 self.assertMonitoring(collector_monitoring_socket, "input.all_stop_messages", 1)
156
157 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
158 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 20)
159 self.assertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 1)
160 self.assertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 1)
161
162 # should go to the workers
163 self.assertHasOutputFile("endrun_called", timeout=1)
164 self.assertHasOutputFile("output_endrun_called", timeout=1)
165
166 # but not to the output
167 self.assertNothingMore(output_socket)
168
169 # a second stop message should only reach the first workers up to the collector
170 self.send(distributor_monitoring_socket, "l")
171
172 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
173 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
174 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
175 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
176 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
177
178 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
179 self.assertMonitoring(collector_monitoring_socket, "input.received_events", 20)
180 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
181 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
182 self.assertMonitoring(collector_monitoring_socket, "input.received_stop_messages", 1)
183 self.assertMonitoring(collector_monitoring_socket, "input.all_stop_messages", 1)
184
185 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
186 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 20)
187 self.assertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 1)
188 self.assertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 1)
189
190 # should go to the workers (longer timeout as distributor waits a bit)
191 self.assertHasOutputFile("endrun_called", timeout=5)
192 self.assertNotHasOutputFile("output_endrun_called", timeout=1)
193
194 # Now lets restart the run
195 self.send(distributor_monitoring_socket, "n")
196 self.send(collector_monitoring_socket, "n")
197 self.send(final_collector_monitoring_socket, "n")
198
199 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
200 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
201 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
202 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
203 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
204
205 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
206 self.assertMonitoring(collector_monitoring_socket, "input.received_events", 20)
207 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
208 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
209 self.assertMonitoring(collector_monitoring_socket, "input.received_stop_messages", 0)
210 self.assertMonitoring(collector_monitoring_socket, "input.all_stop_messages", 0)
211
212 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
213 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 20)
214 self.assertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 0)
215 self.assertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 0)
216
217 # And send some more events
218 for _ in range(20):
219 input_socket.send_multipart([input_identity, self.event_data])
220
221 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
222 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
223 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
224 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
225 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 1)
226 self.assertMonitoring(distributor_monitoring_socket, "output.sent_events", 40)
227
228 self.assertMonitoring(collector_monitoring_socket, "input.registered_workers", 1)
229 self.assertMonitoring(collector_monitoring_socket, "input.received_events", 40)
230 self.assertMonitoring(collector_monitoring_socket, "output.ready_queue_size", 20)
231 self.assertMonitoring(collector_monitoring_socket, "output.registered_workers", 1)
232 self.assertMonitoring(collector_monitoring_socket, "output.sent_events", 40)
233
234 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
235 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 40)
236 self.assertMonitoring(final_collector_monitoring_socket, "output.sent_events", 40)
237
238 self.assertNotHasOutputFile("endrun_called", timeout=1)
239 self.assertNotHasOutputFile("output_endrun_called", timeout=1)
240 self.assertHasOutputFile("beginrun_called", timeout=1)
241 self.assertHasOutputFile("output_beginrun_called", timeout=1)
242
243 buffer = b""
244 while output_socket.poll(0):
245 _, msg = self.recv(output_socket)
246 buffer += msg
247 self.assertNothingMore(output_socket)
248
249 # Data Size != raw data, as data is in different format, size taken from test itself
250 self.assertEqual(len(buffer), 122638 * 20)
251
252 # And finally: terminate everything
253 self.send(distributor_monitoring_socket, "x")
254
255 self.assertIsDown("collector")
256 self.assertIsDown("final_collector")
257 self.assertIsDown("distributor")
258 self.assertIsDown("worker", timeout=1)
259 self.assertIsDown("output_worker")
260
261

Member Data Documentation

◆ collector_input_port

collector_input_port = HLTZMQTestCase.get_free_port()

collector_input_port

Definition at line 33 of file test_hlt.py.

◆ collector_monitoring_port

collector_monitoring_port = HLTZMQTestCase.get_free_port()

collector_monitoring_port

Definition at line 37 of file test_hlt.py.

◆ collector_output_port

collector_output_port = HLTZMQTestCase.get_free_port()

collector_output_port

Definition at line 35 of file test_hlt.py.

◆ ctx

ctx = zmq.Context()
staticinherited

The ZMQ context.

Definition at line 36 of file test_support.py.

◆ distributor_input_port

distributor_input_port = HLTZMQTestCase.get_free_port()

distributor_input_port

Definition at line 26 of file test_hlt.py.

◆ distributor_monitoring_port

distributor_monitoring_port = HLTZMQTestCase.get_free_port()

distributor_monitoring_port

Definition at line 30 of file test_hlt.py.

◆ distributor_output_port

distributor_output_port = HLTZMQTestCase.get_free_port()

distributor_output_port

Definition at line 28 of file test_hlt.py.

◆ event_data

event_data = open(basf2.find_file("daq/hbasf2/tests/out.raw"), "br").read()
static

event_data

Definition at line 21 of file test_hlt.py.

◆ final_collector_input_port

final_collector_input_port = HLTZMQTestCase.get_free_port()

final_collector_input_port

Definition at line 40 of file test_hlt.py.

◆ final_collector_monitoring_port

final_collector_monitoring_port = HLTZMQTestCase.get_free_port()

final_collector_monitoring_port

Definition at line 44 of file test_hlt.py.

◆ final_collector_output_port

final_collector_output_port = HLTZMQTestCase.get_free_port()

final_collector_output_port

Definition at line 42 of file test_hlt.py.

◆ needed_programs

needed_programs = dict()
staticinherited

The dict name -> cmd args of the programs to start, needs to be set in each test.

Definition at line 38 of file test_support.py.

◆ previous_dir

previous_dir = os.getcwd()
inherited

remember current working directory

Definition at line 58 of file test_support.py.

◆ started_programs

started_programs = dict()
inherited

dict for all started programs

Definition at line 62 of file test_support.py.

◆ tearDown

tearDown = subprocess.Popen(command, start_new_session=True)
inherited

Definition at line 68 of file test_support.py.

◆ test_dir

test_dir = tempfile.mkdtemp()
inherited

use a temporary folder for testing

Definition at line 56 of file test_support.py.


The documentation for this class was generated from the following file: