Refactors DB access

This commit is contained in:
Fred Z 2018-08-02 20:00:30 +02:00
parent 564cf0a1e6
commit 192e18d8e0
2 changed files with 69 additions and 65 deletions

View File

@ -13,9 +13,9 @@ This file is part of [ocp5](https://github.com/freezed/ocp5) project
DB_CONFIG = {
'host': 'localhost',
'user': 'loff',
'pass': 'loff',
'password': 'loff',
'db': 'loff',
'char': 'utf8',
'charset': 'utf8',
'file': 'create-db-loff.sql'
}

130
db.py
View File

@ -21,99 +21,103 @@ class Db():
>>> test = Db()
>>> print(test.message)
DB «loff» contains these tables :['category', 'product']
DB size : 0.1 MB
Table 'product' has «0» row(s)
Table 'category' has «0» row(s)
DB size : n.n MB
Table 'product' has «n» row(s)
Table 'category' has «n» row(s)
"""
def __init__(self):
"""
Class initialiser
"""
self.DB_NOT_FOUND = True
self.MSG = {
MSG_TEMPLATE = {
"request": "-- REQUEST #{} : --{}",
"database": "DB «{}» contains these tables :",
"tables": "{}\n",
"dashboard": "DB size : {dbsize}\nTable 'product' has «{rowprod}» "
"row(s)\nTable 'category' has «{rowcat}» row(s)"
}
db_not_found = True
self.message = ""
self.result = list()
# Connect to the database
self.connection = pymysql.connect(
host=DB_CONFIG['host'],
user=DB_CONFIG['user'],
password=DB_CONFIG['pass'],
charset=DB_CONFIG['char'],
cursorclass=pymysql.cursors.DictCursor)
# MYSQL server connexion, without DB selection
self.cnx()
try:
with self.connection.cursor() as cursor:
# Show DB
cursor.execute("SHOW DATABASES")
db_list = cursor.fetchall()
self.cursor.execute("SHOW DATABASES")
db_list = self.cursor.fetchall()
for idx, unused in enumerate(db_list):
for idx, unused in enumerate(db_list):
if DB_CONFIG['db'] in db_list[idx].values():
self.DB_NOT_FOUND = False
if DB_CONFIG['db'] in db_list[idx].values():
db_not_found = False
cursor.execute("USE {}".format(DB_CONFIG['db']))
self.cursor.execute("USE {}".format(DB_CONFIG['db']))
cursor.execute("SHOW TABLES")
tbl_list = cursor.fetchall()
self.cursor.execute("SHOW TABLES")
tbl_list = self.cursor.fetchall()
# Get size information on DB
# TODO : extract DB & tables names from code
cursor.execute("""
SELECT table_schema "Full database",
CONCAT(
ROUND(
SUM(data_length + index_length)
/ 1024 / 1024, 1),
" MB") "dbsize"
FROM information_schema.tables
WHERE table_schema = "loff"
GROUP BY table_schema
# Get size information on DB
self.cursor.execute("""
SELECT table_schema "Full database",
CONCAT(
ROUND(
SUM(data_length + index_length)
/ 1024 / 1024, 1),
" MB") "dbsize"
FROM information_schema.tables
WHERE table_schema = "loff"
GROUP BY table_schema
UNION
UNION
SELECT "Rows in 'product' table", COUNT(*)
FROM product
SELECT "Rows in 'product' table", COUNT(*)
FROM product
UNION
UNION
SELECT "Rows in 'category' table", COUNT(*)
FROM category;
""")
dashboard = cursor.fetchall()
SELECT "Rows in 'category' table", COUNT(*)
FROM category;
""")
dashboard = self.cursor.fetchall()
# Constructs DB information message
self.message = self.MSG['database'].format(
dashboard[0]['Full database'])
# Constructs DB information message
self.message = MSG_TEMPLATE['database'].format(
dashboard[0]['Full database'])
self.message += self.MSG['tables'].format(
[val['Tables_in_loff'] for val in tbl_list])
self.message += MSG_TEMPLATE['tables'].format(
[val['Tables_in_loff'] for val in tbl_list])
self.message += self.MSG['dashboard'].format(
dbsize=dashboard[0]['dbsize'].decode("utf-8"),
rowprod=dashboard[1]['dbsize'].decode("utf-8"),
rowcat=dashboard[2]['dbsize'].decode("utf-8"))
self.message += MSG_TEMPLATE['dashboard'].format(
dbsize=dashboard[0]['dbsize'].decode("utf-8"),
rowprod=dashboard[1]['dbsize'].decode("utf-8"),
rowcat=dashboard[2]['dbsize'].decode("utf-8"))
# No DB, create it
if self.DB_NOT_FOUND:
request_list = self.get_sql_from_file()
# No DB, create it
if db_not_found:
request_list = self.get_sql_from_file()
if request_list is not False:
if request_list is not False:
for idx, sql_request in enumerate(request_list):
self.message = self.MSG['request'].format(
idx, sql_request)
cursor.execute(sql_request + ';')
for idx, sql_request in enumerate(request_list):
self.message = MSG_TEMPLATE['request'].format(
idx, sql_request)
self.cursor.execute(sql_request + ';')
except pymysql.err.OperationalError as except_detail:
print("DB error: «{}»".format(except_detail))
def cnx(self, with_db=False):
""" Connect to DB """
conf = {
'host': DB_CONFIG['host'],
'user': DB_CONFIG['user'],
'password': DB_CONFIG['password'],
'charset': DB_CONFIG['charset'],
'cursorclass': pymysql.cursors.DictCursor
}
if with_db:
conf['db'] = DB_CONFIG['db']
self.cursor = pymysql.connect(**conf).cursor()
def get_sql_from_file(self, filename=DB_CONFIG['file']):
"""
@ -144,7 +148,7 @@ class Db():
def __del__(self):
""" Object destruction"""
self.connection.close()
self.cursor.close()
if __name__ == "__main__":