Belle II Software  release-08-01-10
database.py
1 #!/usr/bin/env python3
2 
3 # disable doxygen check for this file
4 # @cond
5 
6 
13 
14 import sqlite3
15 import pathlib
16 import pandas
17 from basf2 import B2DEBUG
18 
19 
20 class SQLiteDB():
21  """
22  Parameters:
23  database_path (pathlib.Path): The path to the database file we want to create/connect to.
24 
25  Keyword Arguments:
26  schema (dict): Database table schema for the DB of the form:
27  {"tablename": ["columnname1 text primary key",
28  "columnname2 int"]
29  }
30  read_only (bool): Should the connection be treated as a read-only connection (no update/insert calls)
31  timeout (float): What timeout value should the connection have. How long to wait for other changes to commit.
32  isolation_level (str): How should the connection behave when making transactions?
33  Choices are [None, "DEFERRED", "IMMEDIATE", "EXCLUSIVE"] where None is autocommit behaviour.
34  """
35 
36  def __init__(self, database_path, schema=None, read_only=False, timeout=5.0, isolation_level=None):
37  self.database_path = database_path
38  self.schema = schema
39  self.conn = None
40  self.read_only = read_only
41  self.timeout = timeout
42  self.isolation_level = isolation_level
43  try:
44  if not self.database_path.exists() and not self.read_only:
45  if not self.schema:
46  raise ValueError("The requested database did not exist, "
47  "but you didn't provide a schema to create the tables.")
48  else:
49  self.open()
50  self.create_schema()
51  elif not self.database_path.exists() and read_only:
52  raise ValueError("The requested database did not exist, "
53  "but you specified that this was a read_only connection.")
54  else:
55  self.open()
56  except AttributeError as err:
57  if not isinstance(self.database_path, pathlib.Path):
58  raise TypeError("You did not use a pathlib.Path object as the database_path.")
59  else:
60  raise err
61 
62  def __enter__(self):
63  return self
64 
65  def __exit__(self, type, value, traceback):
66  self.close()
67 
68  def close(self):
69  if self.conn:
70  if not self.read_only:
71  B2DEBUG(29, f"Committing changes and closing Connection for database {self.database_path}.")
72  self.conn.commit()
73  self.conn.close()
74 
75  def open(self):
76  B2DEBUG(29, f"Opening Connection for database {self.database_path}. Readonly = {self.read_only}")
77  connection_uri = self.get_uri()
78  self.conn = sqlite3.connect(connection_uri, uri=True, timeout=self.timeout, isolation_level=self.isolation_level)
79 
80  def commit(self):
81  self.conn.commit()
82 
83  def query(self, sql, parameters=tuple()):
84  cursor = self.conn.cursor()
85  B2DEBUG(29, f"execute({sql}, {parameters}).")
86  return cursor.execute(sql, parameters)
87 
88  def create_schema(self):
89  for table_name, fields in self.schema.items():
90  columns = ",".join(fields)
91  sql = f"CREATE TABLE {table_name} ({columns})"
92  self.query(sql)
93  self.conn.commit()
94 
95  def get_uri(self):
96  uri = f"file:{self.database_path.as_posix()}"
97  if self.read_only:
98  uri += "?mode=ro"
99  return uri
100 
101 
102 class CAFDB(SQLiteDB):
103  """
104  Parameters:
105  database_path (pathlib.Path): The path to the database file we want to create/connect to.
106 
107  Keyword Arguments:
108  read_only (bool): Should the connection be treated as a read-only connection (no update/insert calls)
109  timeout (float): What timeout value should the connection have. How long to wait for other changes to commit.
110  """
111 
112  default_schema = {"calibrations": ["name text primary key",
113  "state text",
114  "checkpoint text",
115  "iteration int"]}
116 
117  def __init__(self, database_path, read_only=False, timeout=30.0, isolation_level=None):
118  super().__init__(database_path, self.default_schema, read_only, timeout, isolation_level)
119 
120  def insert_calibration(self, calibration_name, state="init", checkpoint="init", iteration=0):
121  self.query("INSERT INTO calibrations VALUES (?,?,?,?)", (calibration_name, state, checkpoint, iteration))
122 
123  def update_calibration_value(self, calibration_name, column_name, new_value, attempts=3):
124  attempt = 1
125  finished = False
126  while not finished:
127  try:
128  self.query("UPDATE calibrations SET {}=? WHERE name=?".format(column_name), (new_value, calibration_name))
129  finished = True
130  except sqlite3.OperationalError as e:
131  if attempt < attempts:
132  attempt += 1
133  else:
134  raise e
135 
136  def get_calibration_value(self, calibration_name, column_name):
137  return self.query("SELECT {} FROM calibrations WHERE name=?".format(column_name), (calibration_name,)).fetchone()[0]
138 
139  def output_calibration_table(self):
140  data = {"name": [], "state": [], "checkpoint": [], "iteration": []}
141  for row in self.query("SELECT * FROM calibrations"):
142  data["name"].append(row[0])
143  data["state"].append(row[1])
144  data["checkpoint"].append(row[2])
145  data["iteration"].append(row[3])
146 
147  table = pandas.DataFrame(data)
148  table_string = table.to_string()
149 
150  line_len = len(table_string.split("\n")[1])
151  title = " Calibrations Table ".center(line_len, " ")
152  border = line_len * "="
153  header = "\n".join((border, title, border))
154  return "\n".join((header, table_string, border))
155 
156 # @endcond