Belle II Software development
database.py
1#!/usr/bin/env python3
2
3# disable doxygen check for this file
4# @cond
5
6
13
14import sqlite3
15import pathlib
16import pandas
17from basf2 import B2DEBUG
18
19
20class SQLiteDB():
21 """
22 Parameters:
23 database_path (pathlib.Path): The path to the database file we want to create/connect to.
24
25 Keyword Arguments:
26 schema (dict): Database table schema for the DB of the form:
27 {"tablename": ["columnname1 text primary key",
28 "columnname2 int"]
29 }
30 read_only (bool): Should the connection be treated as a read-only connection (no update/insert calls)
31 timeout (float): What timeout value should the connection have. How long to wait for other changes to commit.
32 isolation_level (str): How should the connection behave when making transactions?
33 Choices are [None, "DEFERRED", "IMMEDIATE", "EXCLUSIVE"] where None is autocommit behaviour.
34 """
35
36 def __init__(self, database_path, schema=None, read_only=False, timeout=5.0, isolation_level=None):
37 self.database_path = database_path
38 self.schema = schema
39 self.conn = None
40 self.read_only = read_only
41 self.timeout = timeout
42 self.isolation_level = isolation_level
43 try:
44 if not self.database_path.exists() and not self.read_only:
45 if not self.schema:
46 raise ValueError("The requested database did not exist, "
47 "but you didn't provide a schema to create the tables.")
48 else:
49 self.open()
50 self.create_schema()
51 elif not self.database_path.exists() and read_only:
52 raise ValueError("The requested database did not exist, "
53 "but you specified that this was a read_only connection.")
54 else:
55 self.open()
56 except AttributeError as err:
57 if not isinstance(self.database_path, pathlib.Path):
58 raise TypeError("You did not use a pathlib.Path object as the database_path.")
59 else:
60 raise err
61
62 def __enter__(self):
63 return self
64
65 def __exit__(self, type, value, traceback):
66 self.close()
67
68 def close(self):
69 if self.conn:
70 if not self.read_only:
71 B2DEBUG(29, f"Committing changes and closing Connection for database {self.database_path}.")
72 self.conn.commit()
73 self.conn.close()
74
75 def open(self):
76 B2DEBUG(29, f"Opening Connection for database {self.database_path}. Readonly = {self.read_only}")
77 connection_uri = self.get_uri()
78 self.conn = sqlite3.connect(connection_uri, uri=True, timeout=self.timeout, isolation_level=self.isolation_level)
79
80 def commit(self):
81 self.conn.commit()
82
83 def query(self, sql, parameters=tuple()):
84 cursor = self.conn.cursor()
85 B2DEBUG(29, f"execute({sql}, {parameters}).")
86 return cursor.execute(sql, parameters)
87
88 def create_schema(self):
89 for table_name, fields in self.schema.items():
90 columns = ",".join(fields)
91 sql = f"CREATE TABLE {table_name} ({columns})"
92 self.query(sql)
93 self.conn.commit()
94
95 def get_uri(self):
96 uri = f"file:{self.database_path.as_posix()}"
97 if self.read_only:
98 uri += "?mode=ro"
99 return uri
100
101
102class CAFDB(SQLiteDB):
103 """
104 Parameters:
105 database_path (pathlib.Path): The path to the database file we want to create/connect to.
106
107 Keyword Arguments:
108 read_only (bool): Should the connection be treated as a read-only connection (no update/insert calls)
109 timeout (float): What timeout value should the connection have. How long to wait for other changes to commit.
110 """
111
112 default_schema = {"calibrations": ["name text primary key",
113 "state text",
114 "checkpoint text",
115 "iteration int"]}
116
117 def __init__(self, database_path, read_only=False, timeout=30.0, isolation_level=None):
118 super().__init__(database_path, self.default_schema, read_only, timeout, isolation_level)
119
120 def insert_calibration(self, calibration_name, state="init", checkpoint="init", iteration=0):
121 self.query("INSERT INTO calibrations VALUES (?,?,?,?)", (calibration_name, state, checkpoint, iteration))
122
123 def update_calibration_value(self, calibration_name, column_name, new_value, attempts=3):
124 attempt = 1
125 finished = False
126 while not finished:
127 try:
128 self.query(f"UPDATE calibrations SET {column_name}=? WHERE name=?", (new_value, calibration_name))
129 finished = True
130 except sqlite3.OperationalError as e:
131 if attempt < attempts:
132 attempt += 1
133 else:
134 raise e
135
136 def get_calibration_value(self, calibration_name, column_name):
137 return self.query(f"SELECT {column_name} FROM calibrations WHERE name=?", (calibration_name,)).fetchone()[0]
138
139 def output_calibration_table(self):
140 data = {"name": [], "state": [], "checkpoint": [], "iteration": []}
141 for row in self.query("SELECT * FROM calibrations"):
142 data["name"].append(row[0])
143 data["state"].append(row[1])
144 data["checkpoint"].append(row[2])
145 data["iteration"].append(row[3])
146
147 table = pandas.DataFrame(data)
148 table_string = table.to_string()
149
150 line_len = len(table_string.split("\n")[1])
151 title = " Calibrations Table ".center(line_len, " ")
152 border = line_len * "="
153 header = "\n".join((border, title, border))
154 return "\n".join((header, table_string, border))
155
156# @endcond