7 from basf2
import B2DEBUG
13 database_path (pathlib.Path): The path to the database file we want to create/connect to.
16 schema (dict): Database table schema for the DB of the form:
17 {"tablename": ["columnname1 text primary key",
20 read_only (bool): Should the connection be treated as a read-only connection (no update/insert calls)
21 timeout (float): What timeout value should the connection have. How long to wait for other changes to commit.
22 isolation_level (str): How should the connection behave when making transactions?
23 Choices are [None, "DEFERRED", "IMMEDIATE", "EXCLUSIVE"] where None is autocommit behaviour.
26 def __init__(self, database_path, schema=None, read_only=False, timeout=5.0, isolation_level=None):
36 raise ValueError(
"The requested database did not exist, "
37 "but you didn't provide a schema to create the tables.")
42 raise ValueError(
"The requested database did not exist, "
43 "but you specified that this was a read_only connection.")
46 except AttributeError
as err:
48 raise TypeError(
"You did not use a pathlib.Path object as the database_path.")
55 def __exit__(self, type, value, traceback):
61 B2DEBUG(29, f
"Committing changes and closing Connection for database {self.database_path}.")
66 B2DEBUG(29, f
"Opening Connection for database {self.database_path}. Readonly = {self.read_only}")
73 def query(self, sql, parameters=tuple()):
74 cursor = self.
conn.cursor()
75 B2DEBUG(29, f
"execute({sql}, {parameters}).")
76 return cursor.execute(sql, parameters)
78 def create_schema(self):
79 for table_name, fields
in self.
schema.items():
80 columns =
",".join(fields)
81 sql = f
"CREATE TABLE {table_name} ({columns})"
86 uri = f
"file:{self.database_path.as_posix()}"
95 database_path (pathlib.Path): The path to the database file we want to create/connect to.
98 read_only (bool): Should the connection be treated as a read-only connection (no update/insert calls)
99 timeout (float): What timeout value should the connection have. How long to wait for other changes to commit.
102 default_schema = {
"calibrations": [
"name text primary key",
107 def __init__(self, database_path, read_only=False, timeout=30.0, isolation_level=None):
108 super().__init__(database_path, self.
default_schema, read_only, timeout, isolation_level)
110 def insert_calibration(self, calibration_name, state="init", checkpoint="init", iteration=0):
111 self.
query(
"INSERT INTO calibrations VALUES (?,?,?,?)", (calibration_name, state, checkpoint, iteration))
113 def update_calibration_value(self, calibration_name, column_name, new_value, attempts=3):
118 self.
query(
"UPDATE calibrations SET {}=? WHERE name=?".format(column_name), (new_value, calibration_name))
120 except sqlite3.OperationalError
as e:
121 if attempt < attempts:
126 def get_calibration_value(self, calibration_name, column_name):
127 return self.
query(
"SELECT {} FROM calibrations WHERE name=?".format(column_name), (calibration_name,)).fetchone()[0]
129 def output_calibration_table(self):
130 data = {
"name": [],
"state": [],
"checkpoint": [],
"iteration": []}
131 for row
in self.
query(
"SELECT * FROM calibrations"):
132 data[
"name"].append(row[0])
133 data[
"state"].append(row[1])
134 data[
"checkpoint"].append(row[2])
135 data[
"iteration"].append(row[3])
137 table = pandas.DataFrame(data)
138 table_string = table.to_string()
140 line_len = len(table_string.split(
"\n")[1])
141 title =
" Calibrations Table ".center(line_len,
" ")
142 border = line_len *
"="
143 header =
"\n".join((border, title, border))
144 return "\n".join((header, table_string, border))