Belle II Software  release-08-01-10
local_metadata.py
1 #!/usr/bin/env python3
2 
3 
10 
11 """
12 Module containing all functionality necessary to mange local metadata and payload
13 information.
14 """
15 
16 import sqlite3
17 from conditions_db import PayloadInformation
18 
19 
21  """
22  Class to handle local sqlite dump of conditions database metadata
23 
24  This class can create and read sqlite dumps of the central database in a format
25  compatible with the local metadata provider in basf2.
26  """
27 
28  APPLICATION_ID = 0xb2cdb
29 
31  SCHEMA_VERSION = 1
32 
33  SCHEMA_SQL = """
34  -- first drop all tables we care about
35  DROP VIEW IF EXISTS iov_payloads;
36  DROP INDEX IF EXISTS iov_index;
37  DROP TABLE IF EXISTS iovs;
38  DROP TABLE IF EXISTS globaltags;
39  DROP VIEW IF EXISTS full_payloads;
40  DROP TABLE IF EXISTS payloads;
41  DROP TABLE IF EXISTS payloadNames;
42  DROP TABLE IF EXISTS baseUrls;
43 
44  -- table to store the base urls for normalization: Mostly we have the
45  -- same base url for all payloads but we only want to store the string
46  -- once so give it an id
47  CREATE TABLE baseUrls (
48  baseUrlId INTEGER PRIMARY KEY,
49  baseUrl text UNIQUE
50  );
51 
52  -- table to store all distinct payload names only once
53  CREATE TABLE payloadNames (
54  payloadNameId INTEGER PRIMARY KEY,
55  payloadName TEXT UNIQUE
56  );
57 
58  -- Payload information for each payload.
59  -- payloadId, revision, checksum, payloadUrl are taken directly without
60  -- change from the central database.
61  -- payloadName and baseUrl are put into local tables defined above,
62  -- these ids are local to the file.
63  -- payloadUrl is taken from the database except if it follows the usual
64  -- `dbstore/{payloadName}/dbstore_{payloadName}_rev_{revision}.root` in
65  -- which case it is set to NULL and created on the fly from the pattern.
66  CREATE TABLE payloads (
67  payloadId INTEGER PRIMARY KEY,
68  payloadNameId INTEGER NOT NULL,
69  revision INTEGER NOT NULL,
70  checksum text NOT NULL,
71  payloadUrl text DEFAULT NULL,
72  baseUrlId INTEGER NOT NULL,
73  CONSTRAINT name_rev UNIQUE (payloadNameId, revision),
74  FOREIGN KEY (payloadNameId) REFERENCES payloadNames (payloadNameId)
75  ON UPDATE CASCADE ON DELETE RESTRICT
76  FOREIGN KEY (baseUrlId) REFERENCES baseUrls (baseUrlId)
77  ON UPDATE CASCADE ON DELETE RESTRICT
78  );
79 
80  -- Payload information with the local ids and the payloadUrl resolved
81  -- Information in this view is identical to the central server
82  CREATE VIEW full_payloads AS
83  SELECT payloadId, payloadName, revision, checksum,
84  ifnull(payloadUrl, 'dbstore/' || payloadName || '/dbstore_' ||
85  payloadName || '_rev_' || revision || '.root')
86  as payloadUrl, baseUrl
87  FROM payloads NATURAL JOIN payloadNames NATURAL JOIN baseUrls;
88 
89  -- table for all globaltags in this file. All values taken directly from
90  -- the central server
91  CREATE TABLE globaltags (
92  globalTagId INTEGER PRIMARY KEY,
93  globalTagName text UNIQUE,
94  globalTagStatus text NOT NULL
95  );
96 
97  -- table for all iovs in all globaltags in this file, all values taken
98  -- directly from the central server
99  CREATE TABLE iovs (
100  globalTagId INTEGER NOT NULL,
101  payloadId INTEGER NOT NULL,
102  firstExp INTEGER NOT NULL,
103  firstRun INTEGER NOT NULL,
104  finalExp INTEGER NOT NULL,
105  finalRun INTEGER NOT NULL,
106  FOREIGN KEY (globalTagId) REFERENCES globaltags (globalTagId)
107  ON UPDATE CASCADE ON DELETE CASCADE,
108  FOREIGN KEY (payloadId) REFERENCES payloads (payloadId)
109  ON UPDATE CASCADE ON DELETE RESTRICT
110  );
111  -- composite index on the iovs to exclude duplicates and allow
112  -- performant lookup
113  CREATE UNIQUE INDEX iov_index on iovs (
114  globalTagId, firstExp, firstRun, finalExp, finalRun, payloadId
115  );
116 
117  -- full view returning the full information on all iovs in all globaltags
118  CREATE VIEW iov_payloads AS
119  SELECT globalTagName, payloadId, payloadName, revision, checksum,
120  firstExp, firstRun, finalExp, finalRun, payloadUrl, baseUrl
121  FROM globaltags NATURAL JOIN iovs NATURAL JOIN full_payloads;
122  """
123 
124  def __init__(self, filename, mode="read"):
125  """
126  Open an sqlite database and make sure that the schema exists in the
127  correct version or create it if ``mode=overwrite``
128 
129  Arguments:
130  filename (str): name of the database file
131  readonly (str): how to open the file. Can be one of ``read`` to open
132  the file readonly, ``append` to append new data to an existing file
133  and ``overwrite`` to recreate all tables and overwrite the contents.
134  """
135 
136 
137  self._cache_cache = {}
138 
139  self._database_database = None
140 
141  # connect to the database file ...
142  if mode == "read":
143  self._database_database = sqlite3.connect(f"file:{filename}?mode=ro", uri=True)
144  elif mode in ["append", "overwrite"]:
145  self._database_database = sqlite3.connect(filename)
146  else:
147  raise RuntimeError("invalid mode: please supply one of 'read', 'append', 'overwrite'")
148 
149  if mode == "overwrite":
150  # drop and recreate all tables
151  self._database_database.executescript(self.SCHEMA_SQLSCHEMA_SQL)
152  # and set the application id/schema version correctly
153  self._database_database.execute(f"PRAGMA application_id = {self.APPLICATION_ID}")
154  self._database_database.execute(f"PRAGMA user_version = {self.SCHEMA_VERSION}")
155  self._database_database.commit()
156  else:
157  # make sure the application id/schema version is the same
158  application_id = self._database_database.execute("PRAGMA application_id").fetchone()[0]
159  if application_id != self.APPLICATION_IDAPPLICATION_ID:
160  raise RuntimeError("Not a b2conditionsdb database file")
161  schema_version = self._database_database.execute("PRAGMA user_version").fetchone()[0]
162  if schema_version != self.SCHEMA_VERSIONSCHEMA_VERSION:
163  raise RuntimeError("Cannot use sqlite file: different schema version, please recreate")
164 
165  def get_payload_count(self):
166  """Get the number of distinct payloads known to this file"""
167  cursor = self._database_database.execute("SELECT count(*) from full_payloads")
168  return cursor.fetchone()[0]
169 
170  def _resolve_id(self, name, entity):
171  """
172  Resolve the id for a named entity in the database file.
173 
174  Create new entities on demand and cache all known entities
175 
176  Parameters:
177  name (str): name to lookup
178  entity (str): type of the entity, currently ``baseUrl`` or ``payloadName``
179  """
180  # fill existing entries on first access
181  if entity not in self._cache_cache:
182  self._cache_cache[entity] = {row[0]: row[1] for row in self._database_database.execute(f"SELECT {entity}, {entity}Id from {entity}s")}
183  # and then check if we have this entry, otherwise make a new one
184  cache = self._cache_cache[entity]
185  if name not in cache:
186  cache[name] = len(cache) + 1
187  self._database_database.execute(f"INSERT INTO {entity}s ({entity}Id, {entity}) VALUES (?,?)", (cache[name], name))
188  return cache[name]
189 
190  def add_globaltag(self, tag_id, name, state, iovs):
191  """
192  Add a globaltag to the database file. If the globaltag already exists in
193  the file its contents will be replaced.
194 
195  Parameters: tag_id (str): id of the globaltag in the central database
196  name (str): name of the globaltag in the central database state (str):
197  state of the globaltag in the central database iovs
198  (list(PayloadInformation)): all iovs valid for this globaltag
199  """
200  self._database_database.execute("INSERT OR REPLACE INTO globaltags VALUES (?,?,?)", (tag_id, name, state))
201  # remove existing iovs ... we want to append globaltags but we want all globaltags to be correct
202  self._database_database.execute("DELETE from iovs WHERE globalTagId=?", (tag_id,))
203 
204  all_payloads = {}
205  all_iovs = []
206  for p in iovs:
207  if p.payload_id not in all_payloads:
208  base_url = self._resolve_id_resolve_id(p.base_url, 'baseUrl')
209  name = self._resolve_id_resolve_id(p.name, 'payloadName')
210  url = None
211  if p.payload_url.lstrip('/') != f"dbstore/{p.name}/dbstore_{p.name}_rev_{p.revision}.root":
212  url = p.payload_url
213  all_payloads[p.payload_id] = (p.payload_id, name, p.revision, p.checksum, url, base_url)
214  all_iovs.append((tag_id, p.payload_id) + p.iov)
215 
216  self._database_database.executemany("INSERT OR REPLACE INTO payloads VALUES (?,?,?,?,?,?)", all_payloads.values())
217  self._database_database.executemany("INSERT INTO iovs VALUES (?,?,?,?,?,?)", all_iovs)
218  # make sure everything is committed
219  self._database_database.commit()
220  self._database_database.execute("VACUUM")
221 
222  def get_globaltags(self):
223  """Return the list of globaltags stored in the file
224 
225  Returns:
226  a list of (id, name, state) tuples for all globaltags
227  """
228  return [row for row in self._database_database.execute("SELECT globalTagId, globalTagName, globalTagStatus FROM globaltags "
229  "ORDER by globalTagName")]
230 
231  def get_payloads(self):
232  """Get all payloads existing in this file
233 
234  Returns:
235  a sorted list of `PayloadInformation` objects for all payloads defined in this file with the iov set to None
236  """
237  payloads = sorted([PayloadInformation(*row) for row in
238  self._database_database.execute("SELECT payloadId, payloadName, revision, checksum, "
239  "payloadUrl, baseUrl from full_payloads")])
240  return payloads
241 
242  def get_all_iovs(self, globalTag, exp=None, run=None, message=None):
243  """Get all iovs for a given globaltag
244 
245  Parameters:
246  globalTag (str): name of the globaltag
247  exp (int): experiment number to check (or None to return all iovs)
248  run (int): run number to check (or None to return all iovs)
249  message (str): ignored, just for compatibility with `ConditionsDB.get_all_iovs`
250 
251  Returns:
252  a sorted list of `PayloadInformation` objects
253  """
254  params = {"globalTag": globalTag}
255  query = """\
256  SELECT
257  payloadId, payloadName, revision, checksum, payloadUrl, baseUrl,
258  firstExp, firstRun, finalExp, finalRun
259  FROM iov_payloads
260  WHERE globalTagName=:globalTag"""
261  if exp is not None:
262  params.update({"exp": exp, "run": run})
263  query += """ AND\
264  ((firstExp==:exp AND firstRun<=:run) OR firstExp<:exp) AND
265  (finalExp<0 OR (finalRun<0 AND finalExp>=:exp) OR (finalExp>:exp) OR (finalExp==:exp AND finalRun>=:run))"""
266  iovs = sorted([PayloadInformation(*row[:6], iov_id=None, iov=row[6:]) for row in
267  self._database_database.execute(query, params)])
268  return iovs
string SCHEMA_SQL
SQL script to create all necessary tables and views.
int APPLICATION_ID
Application ID to be stored int the sqlite file.
_cache
Cache name->id mappings from the database.
def add_globaltag(self, tag_id, name, state, iovs)
def get_all_iovs(self, globalTag, exp=None, run=None, message=None)
int SCHEMA_VERSION
Schema version, to be increased when the table definitions change so that we can check for safe appen...