17from basf2
import B2DEBUG
23 database_path (pathlib.Path): The path to the database file we want to create/connect to.
26 schema (dict): Database table schema for the DB of the form:
27 {
"tablename": [
"columnname1 text primary key",
30 read_only (bool): Should the connection be treated
as a read-only connection (no update/insert calls)
31 timeout (float): What timeout value should the connection have. How long to wait
for other changes to commit.
32 isolation_level (str): How should the connection behave when making transactions?
33 Choices are [
None,
"DEFERRED",
"IMMEDIATE",
"EXCLUSIVE"] where
None is autocommit behaviour.
36 def __init__(self, database_path, schema=None, read_only=False, timeout=5.0, isolation_level=None):
37 self.database_path = database_path
40 self.read_only = read_only
41 self.timeout = timeout
42 self.isolation_level = isolation_level
44 if not self.database_path.exists()
and not self.read_only:
46 raise ValueError(
"The requested database did not exist, "
47 "but you didn't provide a schema to create the tables.")
51 elif not self.database_path.exists()
and read_only:
52 raise ValueError(
"The requested database did not exist, "
53 "but you specified that this was a read_only connection.")
56 except AttributeError
as err:
57 if not isinstance(self.database_path, pathlib.Path):
58 raise TypeError(
"You did not use a pathlib.Path object as the database_path.")
65 def __exit__(self, type, value, traceback):
70 if not self.read_only:
71 B2DEBUG(29, f
"Committing changes and closing Connection for database {self.database_path}.")
76 B2DEBUG(29, f
"Opening Connection for database {self.database_path}. Readonly = {self.read_only}")
77 connection_uri = self.get_uri()
78 self.conn = sqlite3.connect(connection_uri, uri=
True, timeout=self.timeout, isolation_level=self.isolation_level)
83 def query(self, sql, parameters=tuple()):
84 cursor = self.conn.cursor()
85 B2DEBUG(29, f
"execute({sql}, {parameters}).")
86 return cursor.execute(sql, parameters)
88 def create_schema(self):
89 for table_name, fields
in self.schema.items():
90 columns =
",".join(fields)
91 sql = f
"CREATE TABLE {table_name} ({columns})"
96 uri = f
"file:{self.database_path.as_posix()}"
102class CAFDB(SQLiteDB):
105 database_path (pathlib.Path): The path to the database file we want to create/connect to.
108 read_only (bool): Should the connection be treated as a read-only connection (no update/insert calls)
109 timeout (float): What timeout value should the connection have. How long to wait
for other changes to commit.
112 default_schema = {"calibrations": [
"name text primary key",
117 def __init__(self, database_path, read_only=False, timeout=30.0, isolation_level=None):
118 super().__init__(database_path, self.default_schema, read_only, timeout, isolation_level)
120 def insert_calibration(self, calibration_name, state="init", checkpoint="init", iteration=0):
121 self.query(
"INSERT INTO calibrations VALUES (?,?,?,?)", (calibration_name, state, checkpoint, iteration))
123 def update_calibration_value(self, calibration_name, column_name, new_value, attempts=3):
128 self.query(f
"UPDATE calibrations SET {column_name}=? WHERE name=?", (new_value, calibration_name))
130 except sqlite3.OperationalError
as e:
131 if attempt < attempts:
136 def get_calibration_value(self, calibration_name, column_name):
137 return self.query(f
"SELECT {column_name} FROM calibrations WHERE name=?", (calibration_name,)).fetchone()[0]
139 def output_calibration_table(self):
140 data = {
"name": [],
"state": [],
"checkpoint": [],
"iteration": []}
141 for row
in self.query(
"SELECT * FROM calibrations"):
142 data[
"name"].append(row[0])
143 data[
"state"].append(row[1])
144 data[
"checkpoint"].append(row[2])
145 data[
"iteration"].append(row[3])
147 table = pandas.DataFrame(data)
148 table_string = table.to_string()
150 line_len = len(table_string.split(
"\n")[1])
151 title =
" Calibrations Table ".center(line_len,
" ")
152 border = line_len *
"="
153 header =
"\n".join((border, title, border))
154 return "\n".join((header, table_string, border))