Belle II Software development
HistogramTestCase Class Reference
Inheritance diagram for HistogramTestCase:
HLTZMQTestCase

Public Member Functions

def setUp (self)
 
def testEventPropagation (self)
 
def check_histogram_repeated (self, file_name, expected_factor)
 

Public Attributes

 first_input_port
 first_input_port
 
 first_monitoring_port
 first_monitoring_port
 
 second_input_port
 second_input_port
 
 second_monitoring_port
 second_monitoring_port
 
 final_collector_input_port
 final_collector_input_port
 
 final_collector_monitoring_port
 final_collector_monitoring_port
 
 needed_programs
 needed_programs
 

Static Public Attributes

open histogram_data = open(basf2.find_file("daq/hbasf2/tests/histos.raw"), "br").read()
 histogram_data
 
event_data
 event_data
 

Detailed Description

Test case

Definition at line 42 of file test_histogram.py.

Member Function Documentation

◆ check_histogram_repeated()

def check_histogram_repeated (   self,
  file_name,
  expected_factor 
)
Repeatedly call check_histogram_output 5 times until it is actually fulfilled

Definition at line 203 of file test_histogram.py.

203 def check_histogram_repeated(self, file_name, expected_factor):
204 """Repeatedly call check_histogram_output 5 times until it is actually fulfilled"""
205 tries = 0
206 while tries < 5:
207 self.assertHasOutputFile(file_name, unlink=False, timeout=2)
208 if check_histogram_output(file_name, expected_factor):
209 break
210 tries += 1
211 else:
212 raise AssertionError("Even after retry, the output was not correct!")
213
214

◆ setUp()

def setUp (   self)
Setup port numbers and necessary programs

Reimplemented from HLTZMQTestCase.

Definition at line 62 of file test_histogram.py.

62 def setUp(self):
63 """Setup port numbers and necessary programs"""
64
65 self.first_input_port = HLTZMQTestCase.get_free_port()
66
67 self.first_monitoring_port = HLTZMQTestCase.get_free_port()
68
69
70 self.second_input_port = HLTZMQTestCase.get_free_port()
71
72 self.second_monitoring_port = HLTZMQTestCase.get_free_port()
73
74
75 self.final_collector_input_port = HLTZMQTestCase.get_free_port()
76
77 self.final_collector_monitoring_port = HLTZMQTestCase.get_free_port()
78
79 self.needed_programs = {
80 "first": [
81 "b2hlt_proxyhistoserver", "--input", f"tcp://*:{self.first_input_port}",
82 "--output", f"tcp://localhost:{self.final_collector_input_port}",
83 "--timeout", "1",
84 "--monitor", f"tcp://*:{self.first_monitoring_port}"
85 ],
86 "second": [
87 "b2hlt_proxyhistoserver",
88 "--input", f"tcp://*:{self.second_input_port}",
89 "--output", f"tcp://localhost:{self.final_collector_input_port}",
90 "--timeout", "1",
91 "--monitor", f"tcp://*:{self.second_monitoring_port}"
92 ],
93 "final_collector": [
94 "b2hlt_finalhistoserver",
95 "--input", f"tcp://*:{self.final_collector_input_port}",
96 "--rootFileName", "outputFile.root",
97 "--timeout", "1",
98 "--monitor", f"tcp://*:{self.final_collector_monitoring_port}"
99 ],
100 }
101 super().setUp()
102

◆ testEventPropagation()

def testEventPropagation (   self)
test function

Definition at line 103 of file test_histogram.py.

103 def testEventPropagation(self):
104 """test function"""
105 self.assertNotHasOutputFile("outputFile.root")
106
107 first_monitoring_socket = self.create_socket(self.first_monitoring_port)
108 second_monitoring_socket = self.create_socket(self.second_monitoring_port)
109 final_monitoring_socket = self.create_socket(self.final_collector_monitoring_port)
110
111 input_sockets = [
112 self.create_socket(self.first_input_port, identity="1"),
113 self.create_socket(self.first_input_port, identity="2"),
114 self.create_socket(self.second_input_port, identity="3"),
115 self.create_socket(self.second_input_port, identity="4"),
116 self.create_socket(self.second_input_port, identity="5"),
117 ]
118
119 for input_socket in input_sockets:
120 self.send(input_socket, "h")
121 self.assertIsMsgType(input_socket, "c")
122
123 # At the beginning, everything should be at normal state
124 self.assertMonitoring(first_monitoring_socket, "input.registered_workers", 2)
125 self.assertMonitoring(second_monitoring_socket, "input.registered_workers", 3)
126 self.assertMonitoring(final_monitoring_socket, "input.registered_workers", 2)
127
128 # So far no stop messages should be there
129 self.assertMonitoring(first_monitoring_socket, "input.received_stop_messages", 0)
130 self.assertMonitoring(second_monitoring_socket, "input.received_stop_messages", 0)
131 self.assertMonitoring(final_monitoring_socket, "input.received_stop_messages", 0)
132
133 self.assertMonitoring(first_monitoring_socket, "input.all_stop_messages", False)
134 self.assertMonitoring(second_monitoring_socket, "input.all_stop_messages", False)
135 self.assertMonitoring(final_monitoring_socket, "input.all_stop_messages", False)
136
137 # send some events, which should eventually trigger a merge
138 for _ in range(10):
139 for input_socket in input_sockets:
140 self.send(input_socket, "v", self.histogram_data, self.event_data)
141 self.assertIsMsgType(input_socket, "c")
142
143 self.assertMonitoring(first_monitoring_socket, "input.received_events", 20)
144 self.assertMonitoring(second_monitoring_socket, "input.received_events", 30)
145 self.assertMonitoring(final_monitoring_socket, "input.received_events", 2)
146
147 # Make sure to get the newest data
148 self.check_histogram_repeated("outputFile.root", 5)
149
150 # send out stop messages, which should also trigger a merge
151 for input_socket in input_sockets:
152 self.send(input_socket, "l")
153 self.assertIsMsgType(input_socket, "c")
154
155 self.assertMonitoring(first_monitoring_socket, "input.received_stop_messages", 2)
156 self.assertMonitoring(first_monitoring_socket, "input.all_stop_messages", True)
157 self.assertMonitoring(second_monitoring_socket, "input.received_stop_messages", 3)
158 self.assertMonitoring(second_monitoring_socket, "input.all_stop_messages", True)
159
160 self.assertMonitoring(final_monitoring_socket, "input.received_stop_messages", 2)
161 self.assertMonitoring(final_monitoring_socket, "input.all_stop_messages", True)
162 self.assertHasOutputFile("outputFile.root", unlink=False)
163 self.check_histogram_repeated("outputFile.root", 5)
164
165 # Now we clean up
166 self.send(first_monitoring_socket, "n")
167 self.send(second_monitoring_socket, "n")
168 self.send(final_monitoring_socket, "n")
169
170 # which should give us a clean root file again
171 self.assertNotHasOutputFile("outputFile.root", timeout=1)
172
173 self.assertMonitoring(first_monitoring_socket, "input.received_stop_messages", 0)
174 self.assertMonitoring(first_monitoring_socket, "input.all_stop_messages", False)
175 self.assertMonitoring(second_monitoring_socket, "input.received_stop_messages", 0)
176 self.assertMonitoring(second_monitoring_socket, "input.all_stop_messages", False)
177
178 self.assertMonitoring(final_monitoring_socket, "input.received_stop_messages", 0)
179 self.assertMonitoring(final_monitoring_socket, "input.all_stop_messages", False)
180
181 # and send some more events, but only to the first two clients
182 for _ in range(5):
183 for input_socket in input_sockets[:2]:
184 self.send(input_socket, "v", self.histogram_data, self.event_data)
185 self.assertIsMsgType(input_socket, "c")
186
187 # this time we expect less entries
188 self.check_histogram_repeated("outputFile.root", 2)
189
190 # Now send a terminate message
191 for input_socket in input_sockets:
192 self.send(input_socket, "x")
193 self.assertIsMsgType(input_socket, "c")
194
195 # We expect another merge here
196 self.assertHasOutputFile("outputFile.root", unlink=False)
197 self.check_histogram_repeated("outputFile.root", 2)
198
199 self.assertIsDown("first")
200 self.assertIsDown("second")
201 self.assertIsDown("final_collector")
202

Member Data Documentation

◆ event_data

b event_data
static
Initial value:
= b"""{
"_typename" : "Belle2::EventMetaData",
"fUniqueID" : 0,
"fBits" : 33554432,
"m_event" : 1,
"m_run" : 1,
"m_subrun" : 0,
"m_experiment" : 1,
"m_production" : 0,
"m_time" : 0,
"m_parentLfn" : "",
"m_generatedWeight" : 1,
"m_errorFlag" : 0
}"""

event_data

Definition at line 47 of file test_histogram.py.

◆ final_collector_input_port

final_collector_input_port

final_collector_input_port

Definition at line 75 of file test_histogram.py.

◆ final_collector_monitoring_port

final_collector_monitoring_port

final_collector_monitoring_port

Definition at line 77 of file test_histogram.py.

◆ first_input_port

first_input_port

first_input_port

Definition at line 65 of file test_histogram.py.

◆ first_monitoring_port

first_monitoring_port

first_monitoring_port

Definition at line 67 of file test_histogram.py.

◆ histogram_data

open histogram_data = open(basf2.find_file("daq/hbasf2/tests/histos.raw"), "br").read()
static

histogram_data

Definition at line 45 of file test_histogram.py.

◆ needed_programs

needed_programs

needed_programs

Definition at line 79 of file test_histogram.py.

◆ second_input_port

second_input_port

second_input_port

Definition at line 70 of file test_histogram.py.

◆ second_monitoring_port

second_monitoring_port

second_monitoring_port

Definition at line 72 of file test_histogram.py.


The documentation for this class was generated from the following file: