Belle II Software  release-05-01-25
conditions.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 """
5 Script to make sure the conditions database interface is behaving as expected.
6 
7 We do this by creating a local http server which acts as a mock database for
8 getting the payload information and payloads and then we run through different scenarios:
9  - unknown host
10  - connection refused
11  - url not found (404)
12  - retry on (503)
13  - corrupt payload information
14  - incomplete payload information
15  - correct payload information
16  - payload file missing
17  - payload file checksum mismatch
18 """
19 
20 import sys
21 import os
22 import basf2
23 from http.server import HTTPServer, BaseHTTPRequestHandler
24 from urllib.parse import urlparse, parse_qs
25 from b2test_utils import clean_working_directory, safe_process, skip_test, configure_logging_for_tests
26 import multiprocessing
27 import shutil
28 
29 
30 class SimpleConditionsDB(BaseHTTPRequestHandler):
31  """Simple ConditionsDB server which handles just the two things needed to
32  test the interface: get a list of payloads for the current run and download
33  a payloadfile. It will return different payloads for the experiments to
34  check for different error conditions"""
35 
36 
37  globaltag_states = """[
38  { "name": "OPEN" },
39  { "name": "TESTING" },
40  { "name": "VALIDATED" },
41  { "name": "PUBLISHED" },
42  { "name": "RUNNING" },
43  { "name": "INVALID" }
44  ]"""
45 
46 
47  example_payload = """[{{
48  "payload": {{
49  "baseUrl": "%(baseurl)s",
50  "payloadId":1,
51  "checksum":"{checksum}",
52  "revision":{revision},
53  "payloadUrl":"dbstore_BeamParameters_rev_{revision}.root",
54  "basf2Module": {{ "name":"BeamParameters", "basf2Package": {{ "name":"dbstore" }} }}
55  }}, "payloadIov": {{
56  "expStart": %(exp)s,
57  "expEnd": %(exp)s,
58  "runStart": %(run)s,
59  "runEnd": %(run)s
60  }}
61  }}]"""
62 
63 
64  payloads = {
65  # let's start with empty information
66  "0": "[]",
67  # or one child but the wrong one
68  "1": '[{ "foo": { } }]',
69  # or let's have some invalid XML
70  "2": '[{ "foo',
71  # let's provide one correct payload
72  "3": example_payload.format(checksum="2447fbcf76419fbbc7c6d015ef507769", revision="1"),
73  # same payload but checksum mismatch
74  "4": example_payload.format(checksum="00[wrong checksum]", revision="1"),
75  # non existing payload file
76  "5": example_payload.format(checksum="00[missing]", revision="2"),
77  # duplicate payload, or in this case triple
78  "6": example_payload.format(checksum="2447fbcf76419fbbc7c6d015ef507769", revision="2")[:-1] + "," +
79  example_payload.format(checksum="2447fbcf76419fbbc7c6d015ef507769", revision="1")[1:-1] + "," +
80  example_payload.format(checksum="2447fbcf76419fbbc7c6d015ef507769", revision="3")[1:],
81  }
82 
83 
84  globaltags = {
85  "localtest": "PUBLISHED",
86  "newgt": "NEW",
87  "invalidgt": "INVALID",
88  }
89 
90  def reply(self, xml):
91  """Return a given xml string"""
92  self.send_response(200)
93  self.end_headers()
94  self.wfile.write(xml.encode())
95 
96  def log_message(self, format, *args):
97  """Override default logging to remove timestamp"""
98  print("MockConditionsDB:", format % args)
99 
100  def log_error(self, *args):
101  """Disable error logs"""
102  pass
103 
104  def do_GET(self):
105  """Parse a get request"""
106  url = urlparse(self.path)
107  params = parse_qs(url.query)
108  if url.path == "/v2/globalTagStatus":
109  return self.reply(self.globaltag_states)
110  # return mock payload info
111  elif url.path.startswith("/v2/globalTag"):
112  # gt info
113  gtname = url.path.split("/")[-1]
114  if gtname in self.globaltags:
115  return self.reply('{ "name": "%s", "globalTagStatus": { "name": "%s" } }' % (gtname, self.globaltags[gtname]))
116  else:
117  return self.send_error(404)
118 
119  if url.path.endswith("/iovPayloads/"):
120  exp = params["expNumber"][0]
121  run = params["runNumber"][0]
122 
123  if int(exp) > 100:
124  self.send_error(int(exp))
125  return
126 
127  if int(run) > 1:
128  exp = None
129 
130  if exp in self.payloads:
131  baseurl = "http://%s:%s" % self.server.socket.getsockname()
132  return self.reply(self.payloads[exp] % dict(exp=exp, run=run, baseurl=baseurl))
133  else:
134  # check if a fallback payload file exists in the conditions_testpayloads directory
135  filename = os.path.basename(url.path)
136  # replace rev_3 with rev_1
137  filename = filename.replace("rev_3", "rev_1")
138  basedir = basf2.find_file("framework/tests/conditions_testpayloads")
139  path = os.path.join(basedir, filename)
140  if os.path.isfile(path):
141  # ok, file exists. let's serve it
142  self.send_response(200)
143  self.end_headers()
144  with open(path, "rb") as f:
145  shutil.copyfileobj(f, self.wfile)
146  return
147 
148  # fall back: just return file not found
149  self.send_error(404)
150 
151 
152 def run_mockdb(pipe):
153  """Startup the mock conditions db server and send the port we listen on back
154  to the parent process"""
155  # listen on port 0 means we want to listen on any free port which would be
156  # nice. But since the new code prints the full url including port when there
157  # is a problem we choose a fixed one and hope that it it's free ...
158  try:
159  httpd = HTTPServer(("127.0.0.1", 12701), SimpleConditionsDB)
160  except OSError:
161  pipe.send(None)
162  skip_test("Socket 12701 is in use, cannot continue")
163  return
164  # so see which port we actually got
165  port = httpd.socket.getsockname()[1]
166  # and send to parent
167  pipe.send(port)
168  # now start listening
169  httpd.serve_forever()
170 
171 
172 def run_redirect(pipe, redir_port):
173  """Startup the redirection server: any request should be transparently forwarded
174  to the other using 308 http replies"""
175 
176  class SimpleRedirectServer(BaseHTTPRequestHandler):
177  def do_GET(self):
178  self.send_response(308)
179  self.send_header("Location", f"http://127.0.0.1:{redir_port}{self.path}")
180  self.end_headers()
181 
182  def log_message(self, format, *args):
183  """Override default logging to remove timestamp"""
184  print("Redirect Server:", format % args)
185 
186  def log_error(self, *args):
187  """Disable error logs"""
188  pass
189 
190  try:
191  httpd = HTTPServer(("127.0.0.1", 12702), SimpleRedirectServer)
192  pipe.send(12702)
193  except OSError:
194  pipe.send(None)
195  skip_test("Socket 12702 is in use, cannot continue")
196  return
197 
198  # now start listening
199  httpd.serve_forever()
200 
201 
202 def dbprocess(host, path, lastChangeCallback=lambda: None, *, globaltag="localtest"):
203  """Process a given path in a child process so that FATAL will not abort this
204  script but just the child and configure to use a central database at the given host"""
205  # reset the database so that there is no chain
206  basf2.reset_database()
207  # now run the path in a child process inside of a clean working directory
208  with clean_working_directory():
209  # make logging more reproducible by replacing some strings
210  configure_logging_for_tests()
211  basf2.logging.log_level = basf2.LogLevel.DEBUG
212  basf2.logging.debug_level = 30
213  basf2.conditions.reset()
214  basf2.conditions.expert_settings(download_cache_location="db-cache")
215  basf2.conditions.override_globaltags([globaltag])
216  if host:
217  basf2.conditions.metadata_providers = [host]
218  basf2.conditions.payload_locations = []
219  lastChangeCallback()
220  safe_process(path)
221 
222 
223 # keep timeouts short for testing
224 basf2.conditions.expert_settings(backoff_factor=1, connection_timeout=5,
225  stalled_timeout=5, max_retries=3)
226 
227 # set the random seed to something fixed
228 basf2.set_random_seed("something important")
229 # and create a pipe so we can send the port we listen on from child to parent
230 conn = multiprocessing.Pipe(False)
231 # now start the mock conditions database as daemon so it gets killed at the end
232 # of the script
233 mock_conditionsdb = multiprocessing.Process(target=run_mockdb, args=(conn[1],))
234 mock_conditionsdb.daemon = True
235 mock_conditionsdb.start()
236 # mock db has started when we recieve the port number from the child, so wait for that
237 mock_port = conn[0].recv()
238 # if the port we got is None the server didn't start ... so bail
239 if mock_port is None:
240  sys.exit(1)
241 
242 # startup redirect server, same as conditionsdb_
243 redir_server = multiprocessing.Process(target=run_redirect, args=(conn[1], mock_port))
244 redir_server.daemon = True
245 redir_server.start()
246 redir_port = conn[0].recv()
247 if redir_port is None:
248  sys.exit(1)
249 
250 # and remember host for database access
251 mock_host = f"http://localhost:{mock_port}/"
252 redir_host = f"http://localhost:{redir_port}/"
253 
254 # create a simple processing path with just event info setter an a module which
255 # prints the beamparameters from the database
256 main = basf2.Path()
257 evtinfo = main.add_module("EventInfoSetter")
258 main.add_module("PrintBeamParameters")
259 
260 # run trough a set of experiments, each time we want to process two runs to make
261 # sure that it works correctly for more than one run
262 for exp in range(len(SimpleConditionsDB.payloads) + 1):
263  try:
264  basf2.B2INFO(f">>> check exp {exp}: {SimpleConditionsDB.payloads[str(exp)][0:20]}...)")
265  except KeyError:
266  basf2.B2INFO(f">>> check exp {exp}")
267  evtinfo.param({"expList": [exp, exp, exp], "runList": [0, 1, 2], "evtNumList": [1, 1, 1]})
268  dbprocess(mock_host, main)
269  # and again using redirection
270  dbprocess(redir_host, main)
271 
272 basf2.B2INFO(">>> check that a invalid global tag or a misspelled global tag actually throw errors")
273 evtinfo.param({"expList": [3], "runList": [0], "evtNumList": [1]})
274 for gt in ["newgt", "invalidgt", "horriblymisspelled",
275  "h͌̉e̳̞̞͆ͨ̏͋̕ ͍͚̱̰̀͡c͟o͛҉̟̰̫͔̟̪̠m̴̀ͯ̿͌ͨ̃͆e̡̦̦͖̳͉̗ͨͬ̑͌̃ͅt̰̝͈͚͍̳͇͌h̭̜̙̦̣̓̌̃̓̀̉͜!̱̞̻̈̿̒̀͢!̋̽̍̈͐ͫ͏̠̹̺̜̬͍ͅ"]:
276  dbprocess(mock_host, main, globaltag=gt)
277 
278 basf2.B2INFO(">>> check retry on 503 errors")
279 evtinfo.param({"expList": [503], "runList": [0], "evtNumList": [1]})
280 dbprocess(mock_host, main)
281 basf2.B2INFO(">>> check again without retries")
282 basf2.conditions.expert_settings(max_retries=0)
283 dbprocess(mock_host, main)
284 
285 # the following ones fail, no need for 3 times
286 evtinfo.param({"expList": [0], "runList": [0], "evtNumList": [1]})
287 
288 basf2.B2INFO(">>> try to open localhost on port 0, this should always be refused")
289 dbprocess("http://localhost:0", main)
290 
291 basf2.B2INFO(">>> and once more with a non existing host name to check for lookup errors")
292 dbprocess("http://nosuchurl/", main)
293 
294 basf2.B2INFO(">>> and once more with a non existing protocol")
295 dbprocess("nosuchproto://nosuchurl/", main)
296 
297 basf2.B2INFO(">>> and once more with a totally bogus url")
298 dbprocess("h͌̉e̳̞̞͆ͨ̏͋̕ ͍͚̱̰̀͡c͟o͛҉̟̰̫͔̟̪̠m̴̀ͯ̿͌ͨ̃͆e̡̦̦͖̳͉̗ͨͬ̑͌̃ͅt̰̝͈͚͍̳͇͌h̭̜̙̦̣̓̌̃̓̀̉͜!̱̞̻̈̿̒̀͢!̋̽̍̈͐ͫ͏̠̹̺̜̬͍ͅ", main)
299 
300 basf2.B2INFO(""">>> try to have a list of servers from environment variable
301  We expect that it fails over to the third server, {mock_host}, but then succeeds
302 """)
303 evtinfo.param({"expList": [3], "runList": [0], "evtNumList": [1]})
304 serverlist = [
305  "http://localhost:0",
306  "http://h͌̉e̳̞̞͆ͨ̏͋̕c͟o͛҉̟̰̫͔̟̪̠m̴̀ͯ̿͌ͨ̃͆e̡̦̦͖̳͉̗ͨͬ̑͌̃ͅt̰̝͈͚͍̳͇͌h̭̜̙̦̣̓̌̃̓̀̉͜!̱̞̻̈̿̒̀͢",
307  mock_host
308 ]
309 os.environ["BELLE2_CONDB_SERVERLIST"] = " ".join(serverlist)
310 dbprocess("", main)
311 
312 # ok, try again with the steering file settings instead of environment variable
313 basf2.B2INFO(""">>> try to have a list of servers from steering file
314  We expect that it fails over to the third server, {mock_host}, but then succeeds
315 """)
316 dbprocess("", main, lastChangeCallback=lambda: basf2.set_central_serverlist(serverlist))
317 
318 if "ssl" in sys.argv:
319  # ok, test SSL connectivity ... for now we just want to accept anything. This
320  # is disabled by default since I don't want to depend on badssl.com to be
321  # available
322  for hostname in ("expired", "wrong.host", "self-signed", "untrusted-root"):
323  dbprocess(f"https://{hostname}.badssl.com/", main)
conditions.SimpleConditionsDB
Definition: conditions.py:30