Adds messaging & stats features #4

This commit is contained in:
Fred Z 2018-08-01 16:20:38 +02:00
parent 005a25afc5
commit 3ffbcfa208

71
db.py
View File

@ -6,8 +6,8 @@ Author: freezed <freezed@users.noreply.github.com> 2018-07-26
Version: 0.1 Version: 0.1
Licence: `GNU GPL v3` GNU GPL v3: http://www.gnu.org/licenses/ Licence: `GNU GPL v3` GNU GPL v3: http://www.gnu.org/licenses/
Connect to DB Connects DB, executes SQL statements
""" """
import pymysql.cursors import pymysql.cursors
from config import DB_CONFIG from config import DB_CONFIG
@ -20,7 +20,10 @@ class Db():
:Test: :Test:
>>> test = Db() >>> test = Db()
>>> print(test.message) >>> print(test.message)
DB «localhost» exist, using it DB «loff» contains these tables :['category', 'product']
DB size : 0.1 MB
Table 'product' has «0» row(s)
Table 'category' has «0» row(s)
""" """
def __init__(self): def __init__(self):
@ -30,7 +33,11 @@ class Db():
self.DB_NOT_FOUND = True self.DB_NOT_FOUND = True
self.MSG = { self.MSG = {
"request": "-- REQUEST #{} : --{}", "request": "-- REQUEST #{} : --{}",
"database": "DB «{}» exist, using it"} "database": "DB «{}» contains these tables :",
"tables": "{}\n",
"dashboard": "DB size : {dbsize}\nTable 'product' has «{rowprod}» "
"row(s)\nTable 'category' has «{rowcat}» row(s)"
}
self.message = "" self.message = ""
# Connect to the database # Connect to the database
@ -50,13 +57,52 @@ class Db():
for idx, unused in enumerate(db_list): for idx, unused in enumerate(db_list):
if DB_CONFIG['db'] in db_list[idx].values(): if DB_CONFIG['db'] in db_list[idx].values():
DB_NOT_FOUND = False self.DB_NOT_FOUND = False
cursor.execute("USE {}".format(DB_CONFIG['db'])) cursor.execute("USE {}".format(DB_CONFIG['db']))
cursor.execute("SHOW TABLES")
tbl_list = cursor.fetchall()
# Get size information on DB
# TODO : extract DB & tables names from code
cursor.execute("""
SELECT table_schema "Full database",
CONCAT(
ROUND(
SUM(data_length + index_length)
/ 1024 / 1024, 1),
" MB") "dbsize"
FROM information_schema.tables
WHERE table_schema = "loff"
GROUP BY table_schema
UNION
SELECT "Rows in 'product' table", COUNT(*)
FROM product
UNION
SELECT "Rows in 'category' table", COUNT(*)
FROM category;
""")
dashboard = cursor.fetchall()
# Constructs DB information message
self.message = self.MSG['database'].format( self.message = self.MSG['database'].format(
DB_CONFIG['host']) dashboard[0]['Full database'])
self.message += self.MSG['tables'].format(
[val['Tables_in_loff'] for val in tbl_list])
self.message += self.MSG['dashboard'].format(
dbsize=dashboard[0]['dbsize'].decode("utf-8"),
rowprod=dashboard[1]['dbsize'].decode("utf-8"),
rowcat=dashboard[2]['dbsize'].decode("utf-8"))
# No DB, create it # No DB, create it
if DB_NOT_FOUND: if self.DB_NOT_FOUND:
request_list = self.get_sql_from_file() request_list = self.get_sql_from_file()
if request_list is not False: if request_list is not False:
@ -69,23 +115,23 @@ class Db():
except pymysql.err.OperationalError as except_detail: except pymysql.err.OperationalError as except_detail:
print("DB error: «{}»".format(except_detail)) print("DB error: «{}»".format(except_detail))
@staticmethod def get_sql_from_file(self, filename=DB_CONFIG['file']):
def get_sql_from_file(filename=DB_CONFIG['file']):
""" """
Get the SQL instruction from a file Get the SQL instruction from a file
:return: a list of each SQL query whithout the trailing ";" :return: a list of each SQL query whithout the trailing ";"
:Tests: :Tests:
>>> Db.get_sql_from_file('wronq_file.sql') >>> Db.get_sql_from_file(Db, 'wronq_file.sql')
File load error : wronq_file.sql
False False
>>> Db.message
'File load error : wronq_file.sql'
""" """
from os import path from os import path
# File did not exists # File did not exists
if path.isfile(filename) is False: if path.isfile(filename) is False:
print("File load error : {}".format(filename)) self.message = "File load error : {}".format(filename)
return False return False
else: else:
@ -102,6 +148,5 @@ class Db():
if __name__ == "__main__": if __name__ == "__main__":
""" Running doctests """
import doctest import doctest
doctest.testmod() doctest.testmod()