Belle II Software  release-06-02-00
database.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 
11 
12 import sqlite3
13 import pathlib
14 import pandas
15 from basf2 import B2DEBUG
16 
17 
18 class SQLiteDB():
19  """
20  Parameters:
21  database_path (pathlib.Path): The path to the database file we want to create/connect to.
22 
23  Keyword Arguments:
24  schema (dict): Database table schema for the DB of the form:
25  {"tablename": ["columnname1 text primary key",
26  "columnname2 int"]
27  }
28  read_only (bool): Should the connection be treated as a read-only connection (no update/insert calls)
29  timeout (float): What timeout value should the connection have. How long to wait for other changes to commit.
30  isolation_level (str): How should the connection behave when making transactions?
31  Choices are [None, "DEFERRED", "IMMEDIATE", "EXCLUSIVE"] where None is autocommit behaviour.
32  """
33 
34  def __init__(self, database_path, schema=None, read_only=False, timeout=5.0, isolation_level=None):
35  self.database_pathdatabase_path = database_path
36  self.schemaschema = schema
37  self.connconn = None
38  self.read_onlyread_only = read_only
39  self.timeouttimeout = timeout
40  self.isolation_levelisolation_level = isolation_level
41  try:
42  if not self.database_pathdatabase_path.exists() and not self.read_onlyread_only:
43  if not self.schemaschema:
44  raise ValueError("The requested database did not exist, "
45  "but you didn't provide a schema to create the tables.")
46  else:
47  self.openopen()
48  self.create_schemacreate_schema()
49  elif not self.database_pathdatabase_path.exists() and read_only:
50  raise ValueError("The requested database did not exist, "
51  "but you specified that this was a read_only connection.")
52  else:
53  self.openopen()
54  except AttributeError as err:
55  if not isinstance(self.database_pathdatabase_path, pathlib.Path):
56  raise TypeError("You did not use a pathlib.Path object as the database_path.")
57  else:
58  raise err
59 
60  def __enter__(self):
61  return self
62 
63  def __exit__(self, type, value, traceback):
64  self.closeclose()
65 
66  def close(self):
67  if self.connconn:
68  if not self.read_onlyread_only:
69  B2DEBUG(29, f"Committing changes and closing Connection for database {self.database_path}.")
70  self.connconn.commit()
71  self.connconn.close()
72 
73  def open(self):
74  B2DEBUG(29, f"Opening Connection for database {self.database_path}. Readonly = {self.read_only}")
75  connection_uri = self.get_uriget_uri()
76  self.connconn = sqlite3.connect(connection_uri, uri=True, timeout=self.timeouttimeout, isolation_level=self.isolation_levelisolation_level)
77 
78  def commit(self):
79  self.connconn.commit()
80 
81  def query(self, sql, parameters=tuple()):
82  cursor = self.connconn.cursor()
83  B2DEBUG(29, f"execute({sql}, {parameters}).")
84  return cursor.execute(sql, parameters)
85 
86  def create_schema(self):
87  for table_name, fields in self.schemaschema.items():
88  columns = ",".join(fields)
89  sql = f"CREATE TABLE {table_name} ({columns})"
90  self.queryquery(sql)
91  self.connconn.commit()
92 
93  def get_uri(self):
94  uri = f"file:{self.database_path.as_posix()}"
95  if self.read_onlyread_only:
96  uri += "?mode=ro"
97  return uri
98 
99 
101  """
102  Parameters:
103  database_path (pathlib.Path): The path to the database file we want to create/connect to.
104 
105  Keyword Arguments:
106  read_only (bool): Should the connection be treated as a read-only connection (no update/insert calls)
107  timeout (float): What timeout value should the connection have. How long to wait for other changes to commit.
108  """
109 
110  default_schema = {"calibrations": ["name text primary key",
111  "state text",
112  "checkpoint text",
113  "iteration int"]}
114 
115  def __init__(self, database_path, read_only=False, timeout=30.0, isolation_level=None):
116  super().__init__(database_path, self.default_schemadefault_schema, read_only, timeout, isolation_level)
117 
118  def insert_calibration(self, calibration_name, state="init", checkpoint="init", iteration=0):
119  self.queryquery("INSERT INTO calibrations VALUES (?,?,?,?)", (calibration_name, state, checkpoint, iteration))
120 
121  def update_calibration_value(self, calibration_name, column_name, new_value, attempts=3):
122  attempt = 1
123  finished = False
124  while not finished:
125  try:
126  self.queryquery("UPDATE calibrations SET {}=? WHERE name=?".format(column_name), (new_value, calibration_name))
127  finished = True
128  except sqlite3.OperationalError as e:
129  if attempt < attempts:
130  attempt += 1
131  else:
132  raise e
133 
134  def get_calibration_value(self, calibration_name, column_name):
135  return self.queryquery("SELECT {} FROM calibrations WHERE name=?".format(column_name), (calibration_name,)).fetchone()[0]
136 
137  def output_calibration_table(self):
138  data = {"name": [], "state": [], "checkpoint": [], "iteration": []}
139  for row in self.queryquery("SELECT * FROM calibrations"):
140  data["name"].append(row[0])
141  data["state"].append(row[1])
142  data["checkpoint"].append(row[2])
143  data["iteration"].append(row[3])
144 
145  table = pandas.DataFrame(data)
146  table_string = table.to_string()
147 
148  line_len = len(table_string.split("\n")[1])
149  title = " Calibrations Table ".center(line_len, " ")
150  border = line_len * "="
151  header = "\n".join((border, title, border))
152  return "\n".join((header, table_string, border))
dictionary default_schema
Definition: database.py:110
def get_uri(self)
Definition: database.py:93
def create_schema(self)
Definition: database.py:86
def open(self)
Definition: database.py:73
def close(self)
Definition: database.py:66
def query(self, sql, parameters=tuple())
Definition: database.py:81