Belle II Software
release-05-01-25
test_worker.py
1
import
atexit
2
import
os
3
from
pathlib
import
Path
4
from
time
import
sleep
5
from
unittest
import
main
6
import
basf2
7
8
from
zmq_daq.test_support
import
HLTZMQTestCase
9
10
11
class
WorkerTestCase
(
HLTZMQTestCase
):
12
"""Test case baseclass to spawn a worker"""
13
14
event_data = open(basf2.find_file(
"daq/hbasf2/tests/out.raw"
),
"br"
).read()
15
16
17
extra_arguments = []
18
19
def
setUp
(self):
20
"""Setup necessary sockets and programs"""
21
22
self.input_socket, input_port = self.
create_router_socket
(
None
)
23
24
self.output_socket, output_port = self.
create_router_socket
(
None
)
25
# and set the list of necessary programs to use these sockets
26
self.
needed_programs
= {
27
"worker"
: [
28
"python3"
, basf2.find_file(
"daq/hbasf2/tests/passthrough.no_run_py"
),
29
"--input"
, f
"tcp://localhost:{input_port}"
,
30
"--output"
, f
"tcp://localhost:{output_port}"
,
31
] + self.
extra_arguments
32
}
33
super().
setUp
()
34
35
def
start
(self):
36
"""start the needed sockets and send some hello messages"""
37
# There should be a hello message
38
39
self.
output_identity
= self.
assertIsMsgType
(self.output_socket,
"h"
, router=
True
)[0].decode()
40
self.
send
(self.output_socket,
"c"
, identity=self.
output_identity
)
41
42
# There are probably many more ready messages, but we are only interested in at least one here
43
44
self.
input_identity
= self.
assertIsMsgType
(self.input_socket,
"r"
, router=
True
, final=
False
)[0].decode()
45
46
# Store some example events
47
48
self.
first_run_event_data
= [self.
event_data
, self.
event_data
]
49
50
self.
second_run_event_data
= [self.
event_data
, self.
event_data
]
51
52
53
class
NormalWorkerTestCase
(
WorkerTestCase
):
54
"""Tests for normal worker behavior"""
55
def
testInitialization
(self):
56
"""test function"""
57
self.
start
()
58
59
# Initialisation should be called
60
self.
assertHasOutputFile
(
"initialize_called"
, timeout=1)
61
62
def
testRunSending
(self):
63
"""test function"""
64
self.
start
()
65
66
# Send first event (should trigger begin run again)
67
self.
send
(self.input_socket,
"u"
, first_data=self.
first_run_event_data
[0], identity=self.
input_identity
)
68
self.
assertHasOutputFile
(
"beginrun_called"
, timeout=0.5)
69
self.
assertIsMsgType
(self.output_socket,
"w"
, router=
True
)
70
self.
send
(self.output_socket,
"c"
, identity=self.
output_identity
)
71
72
# Send second event (should not trigger begin run again)
73
self.
send
(self.input_socket,
"u"
, first_data=self.
first_run_event_data
[1], identity=self.
input_identity
)
74
self.
assertNotHasOutputFile
(
"beginrun_called"
, timeout=0.5)
75
self.
assertIsMsgType
(self.output_socket,
"w"
, router=
True
)
76
self.
send
(self.output_socket,
"c"
, identity=self.
output_identity
)
77
78
def
testEndRun
(self):
79
"""test function"""
80
self.
start
()
81
82
# TODO: do I want to test which run was ended?
83
# end run trigger
84
self.
send
(self.input_socket,
"l"
, identity=self.
input_identity
)
85
self.
assertHasOutputFile
(
"endrun_called"
, timeout=2)
86
self.
assertIsMsgType
(self.output_socket,
"l"
, router=
True
)
87
self.
send
(self.output_socket,
"c"
, identity=self.
output_identity
)
88
89
# Also the second one should give us an end run
90
self.
send
(self.input_socket,
"l"
, identity=self.
input_identity
)
91
self.
assertHasOutputFile
(
"endrun_called"
, timeout=1)
92
self.
assertIsMsgType
(self.output_socket,
"l"
, router=
True
)
93
self.
send
(self.output_socket,
"c"
, identity=self.
output_identity
)
94
95
# Sneak in an event in between -> should give beginRun
96
self.
send
(self.input_socket,
"u"
, first_data=self.
first_run_event_data
[0], identity=self.
input_identity
)
97
self.
assertHasOutputFile
(
"beginrun_called"
, timeout=1)
98
self.
assertIsMsgType
(self.output_socket,
"w"
, router=
True
)
99
self.
send
(self.output_socket,
"c"
, identity=self.
output_identity
)
100
101
# And end the run again
102
self.
send
(self.input_socket,
"l"
, identity=self.
input_identity
)
103
self.
assertHasOutputFile
(
"endrun_called"
, timeout=1)
104
self.
assertIsMsgType
(self.output_socket,
"l"
, router=
True
)
105
self.
send
(self.output_socket,
"c"
, identity=self.
output_identity
)
106
107
# A second time...
108
self.
send
(self.input_socket,
"l"
, identity=self.
input_identity
)
109
self.
assertHasOutputFile
(
"endrun_called"
, timeout=1)
110
self.
assertIsMsgType
(self.output_socket,
"l"
, router=
True
)
111
self.
send
(self.output_socket,
"c"
, identity=self.
output_identity
)
112
113
# Sneak in a second event in between -> should give beginRun (as it is a new run)
114
self.
send
(self.input_socket,
"u"
, first_data=self.
second_run_event_data
[0], identity=self.
input_identity
)
115
self.
assertHasOutputFile
(
"beginrun_called"
, timeout=1)
116
self.
assertIsMsgType
(self.output_socket,
"w"
, router=
True
)
117
self.
send
(self.output_socket,
"c"
, identity=self.
output_identity
)
118
119
# And end the run again
120
self.
send
(self.input_socket,
"l"
, identity=self.
input_identity
)
121
self.
assertHasOutputFile
(
"endrun_called"
, timeout=1)
122
self.
assertIsMsgType
(self.output_socket,
"l"
, router=
True
)
123
self.
send
(self.output_socket,
"c"
, identity=self.
output_identity
)
124
125
# Termination should also work
126
self.
send
(self.input_socket,
"x"
, identity=self.
input_identity
)
127
# Attention: terminate is called in the different order
128
self.
assertIsMsgType
(self.output_socket,
"x"
, router=
True
)
129
self.
send
(self.output_socket,
"c"
, identity=self.
output_identity
)
130
self.
assertHasOutputFile
(
"terminate_called"
, timeout=1)
131
132
# And the termination should cause the process to go down
133
self.
assertIsDown
(
"worker"
, timeout=200)
134
135
136
class
DyingWorkerTestCase
(
WorkerTestCase
):
137
"""Test case for dying workers"""
138
139
140
extra_arguments = [
"--exit"
,
"--prefix"
,
"dying_"
]
141
142
def
testUnregistration
(self):
143
"""test function"""
144
self.
start
()
145
146
# lets send some events
147
for
_
in
range(10):
148
self.
send
(self.input_socket,
"u"
, first_data=self.
event_data
, identity=self.
input_identity
)
149
self.
assertIsMsgType
(self.output_socket,
"w"
, router=
True
)
150
self.
send
(self.output_socket,
"c"
, identity=self.
output_identity
)
151
152
self.
assertHasOutputFile
(
"dying_beginrun_called"
, timeout=1)
153
154
# Now we kill one of the workers
155
Path(
"dying_exit_request"
).touch()
156
157
self.
send
(self.input_socket,
"u"
, first_data=self.
event_data
, identity=self.
input_identity
)
158
self.
assertHasOutputFile
(
"dying_exit_called"
, timeout=1)
159
msg = self.
assertIsMsgType
(self.output_socket,
"d"
, router=
True
)
160
# the message content should be the worker that has died
161
self.assertEqual(msg[2].decode(), self.
output_identity
)
162
self.
send
(self.output_socket,
"c"
, identity=msg[0].decode())
163
164
self.
assertIsDown
(
"worker"
, timeout=10)
165
166
167
if
__name__ ==
'__main__'
:
168
main
()
test_worker.DyingWorkerTestCase.testUnregistration
def testUnregistration(self)
Definition:
test_worker.py:142
zmq_daq.test_support.HLTZMQTestCase
Definition:
test_support.py:17
test_worker.NormalWorkerTestCase.testInitialization
def testInitialization(self)
Definition:
test_worker.py:55
zmq_daq.test_support.HLTZMQTestCase.needed_programs
needed_programs
The dict name -> cmd args of the programs to start, needs to be set in each test.
Definition:
test_support.py:26
zmq_daq.test_support.HLTZMQTestCase.assertIsDown
def assertIsDown(self, name, timeout=5, minimum_delay=0.1)
Definition:
test_support.py:81
test_worker.WorkerTestCase.first_run_event_data
first_run_event_data
some data
Definition:
test_worker.py:48
zmq_daq.test_support.HLTZMQTestCase.assertIsMsgType
def assertIsMsgType(self, socket, message_type, final=True, router=False)
Definition:
test_support.py:203
test_worker.WorkerTestCase.output_identity
output_identity
output_identity
Definition:
test_worker.py:39
zmq_daq.test_support.HLTZMQTestCase.assertNotHasOutputFile
def assertNotHasOutputFile(self, output_file, timeout=0.5)
Definition:
test_support.py:233
test_worker.WorkerTestCase.extra_arguments
list extra_arguments
extra arguments to pass to the worker script
Definition:
test_worker.py:17
test_worker.NormalWorkerTestCase
Definition:
test_worker.py:53
test_worker.WorkerTestCase.event_data
event_data
event_data
Definition:
test_worker.py:14
zmq_daq.test_support.HLTZMQTestCase.assertHasOutputFile
def assertHasOutputFile(self, output_file, unlink=True, timeout=0.5, minimum_delay=0.1)
Definition:
test_support.py:215
test_worker.WorkerTestCase.input_identity
input_identity
input_identity
Definition:
test_worker.py:44
main
int main(int argc, char **argv)
Run all tests.
Definition:
test_main.cc:77
test_worker.WorkerTestCase.start
def start(self)
Definition:
test_worker.py:35
zmq_daq.test_support.HLTZMQTestCase.send
def send(socket, message_type, first_data=b"", second_data=b"", identity="")
Definition:
test_support.py:136
test_worker.NormalWorkerTestCase.testRunSending
def testRunSending(self)
Definition:
test_worker.py:62
zmq_daq.test_support
Definition:
test_support.py:1
test_worker.WorkerTestCase
Definition:
test_worker.py:11
test_worker.NormalWorkerTestCase.testEndRun
def testEndRun(self)
Definition:
test_worker.py:78
test_worker.WorkerTestCase.setUp
def setUp(self)
Definition:
test_worker.py:19
test_worker.DyingWorkerTestCase
Definition:
test_worker.py:136
zmq_daq.test_support.HLTZMQTestCase.create_router_socket
def create_router_socket(port)
Definition:
test_support.py:128
test_worker.WorkerTestCase.second_run_event_data
second_run_event_data
some data
Definition:
test_worker.py:50
daq
hbasf2
tests
test_worker.py
Generated on Fri Nov 5 2021 03:49:17 for Belle II Software by
1.8.17