#! /usr/bin/env python
# -*- coding: utf-8 -*-
###############################################################################
## ##
## Copyright 2011, Neil Wallace <rowinggolfer@googlemail.com> ##
## ##
## This program is free software: you can redistribute it and/or modify ##
## it under the terms of the GNU General Public License as published by ##
## the Free Software Foundation, either version 3 of the License, or ##
## (at your option) any later version. ##
## ##
## This program is distributed in the hope that it will be useful, ##
## but WITHOUT ANY WARRANTY; without even the implied warranty of ##
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ##
## GNU General Public License for more details. ##
## ##
## You should have received a copy of the GNU General Public License ##
## along with this program. If not, see <http://www.gnu.org/licenses/>. ##
## ##
###############################################################################
import logging
import subprocess
import sys
import psycopg2
from lib_openmolar.server.functions.password_generator import new_password
from lib_openmolar.server.functions.om_server_config import OMServerConfig
def log_exception(func):
def db_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
log = logging.getLogger("openmolar-server")
log.debug("unhandled exception")
return ""
return db_func
[docs]class DBFunctions(object):
'''
A class whose functions will be inherited by the server
'''
[docs] def __init__(self):
self.config = OMServerConfig()
@property
[docs] def default_conn_atts(self):
return self.__conn_atts()
def __conn_atts(self, dbname="openmolar_master"):
'''
has to be a private function because of the password!
a well set up server will restrict user "openmolar"
to a unix socket connection (ie local machine only)
or at the very least a TCP/IP connection to only localhost.
(function doesn't get picked up by register_instance)
'''
return "host='%s' user='%s' port='%s' password='%s' dbname='%s'"% (
self.config.postgres_host, self.config.postgres_user,
self.config.postgres_port,
self.config.postgres_pass, dbname)
@log_exception
def _execute(self, statement, dbname="openmolar_master"):
'''
execute an sql statement with default connection rights.
'''
log = logging.getLogger("openmolar_server")
try:
conn = psycopg2.connect(self.__conn_atts(dbname))
try:
# functions such as create and drop do not support transactions
conn.autocommit = True
except AttributeError:
log.warning(
"no autocommit attribute in pyscopg2 - old version?")
conn.set_isolation_level(0)
cursor = conn.cursor()
#log.debug(statement)
cursor.execute(statement)
conn.close()
return True
except psycopg2.Warning as warn:
log.warning(warn)
conn.close()
return True
except psycopg2.Error as exc:
log.exception("error executing statement")
log.error(statement)
raise exc
@log_exception
[docs] def available_databases(self):
'''
get a list of databases (owned by "openmolar")
the query I use for this is based on the following.
SELECT datname, usename, datdba
FROM pg_database JOIN pg_user
ON pg_database.datdba = pg_user.usesysid and usename='openmolar';
pg_database and pg_user are tables which do not require a superuser
to poll for this information.
'''
log = logging.getLogger("openmolar_server")
log.debug("polling for available databases")
databases = []
try:
conn = psycopg2.connect(self.default_conn_atts)
cursor = conn.cursor()
cursor.execute('''SELECT datname FROM pg_database JOIN pg_user
ON pg_database.datdba = pg_user.usesysid
where usename='openmolar' and datname != 'openmolar_master'
order by datname''')
for result in cursor.fetchall():
databases.append(result[0])
conn.close()
except Exception as exc:
log.exception("Serious Error")
return "NONE"
return databases
@log_exception
[docs] def refresh_saved_schema(self):
'''
gets the schema from the admin app.
only works if the admin app is installed on the server machine.
note - this can also be done via the admin gui on a remote machine
'''
log = logging.getLogger("openmolar_server")
log.info("polling admin application for latest schema")
try:
from lib_openmolar.admin.connect import DemoAdminConnection
sql = AdminConnection().virgin_sql
self.save_schema(sql)
except ImportError as exc:
log.warning("admin app not installed on this machine")
return False
return True
@log_exception
[docs] def save_schema(self, sql):
'''
the admin app is responsible for the schema in use.
here, it has passed the schema in text form to the server, so that the
server can lay out new databases without the admin app.
'''
filename = "/usr/share/blank_schema.sql"
log = logging.getLogger("openmolar_server")
log.info("saving schema to %s"% filename)
f = open(filename, "w")
f.write(sql)
f.close()
return True
@log_exception
[docs] def install_fuzzymatch(self, dbname):
'''
installs fuzzymatch functions into database with the name given
'''
log = logging.getLogger("openmolar_server")
log.info("Installing fuzzymatch functions into database '%s'"% dbname)
try:
p = subprocess.Popen(["openmolar-install-fuzzymatch", dbname],
stdout = subprocess.PIPE)
while True:
line = p.stdout.readline()
if not line:
break
log.info(line)
except Exception as exc:
log.exception("unable to install fuzzymatch into '%s'"% dbname)
return False
return True
@log_exception
[docs] def newDB_sql(self, dbname):
'''
returns the sql to layout the users and tables in a database.
'''
sql_file = "/usr/share/openmolar/blank_schema.sql"
perms_file = "/usr/share/openmolar/permissions.sql"
log = logging.getLogger("openmolar_server")
log.info("reading sql from %s"% sql_file)
log.info("reading sql from %s"% perms_file)
groups = {}
perms, sql = "",""
for group in ('admin', 'client'):
groupname = "om_%s_group_%s"% (group, dbname)
sql += "drop user if exists %s;\n"% groupname
sql += "create user %s;\n"% groupname
groups[group] = groupname
try:
f = open(sql_file, "r")
sql += f.read()
f.close()
f = open(perms_file, "r")
perms = f.read()
f.close()
except IOError:
log.exception("error reading sql files.")
permissions = perms.replace("ADMIN_GROUP", groups["admin"]).replace(
"CLIENT_GROUP", groups["client"])
return sql + permissions
@log_exception
[docs] def create_demodb(self):
'''
creates a demo database (loose permission to do this)
'''
return self.create_db("openmolar_demo")
@log_exception
[docs] def create_db(self, dbname):
'''
creates a database with the name given
'''
log = logging.getLogger("openmolar_server")
try:
log.info("creating new database %s [with owner openmolar]"% dbname)
self._execute("create database %s with owner openmolar"% dbname)
return self._layout_schema(dbname)
except:
log.exception("exeption in %(module)s")
return False
@log_exception
def _layout_schema(self, dbname):
'''
creates a blank openmolar table set in the database with the name given
'''
try:
sql = self.newDB_sql(dbname)
log = logging.getLogger("openmolar_server")
log.info("laying out schema for database '%s'"% dbname)
self._execute(sql, dbname)
return True
except:
log.exception("exeption in %(module)s")
return False
@log_exception
[docs] def create_demo_user(self):
'''
create our demo user
'''
log = logging.getLogger("openmolar_server")
log.info("creating a demo user")
if self.create_user("om_demo", "password"):
log.info("user om_demo created")
else:
log.error("unable to create user om_demo. perhaps exists already?")
return self.grant_user_permissions("om_demo", "openmolar_demo",
True, True)
@log_exception
[docs] def drop_demodb(self):
'''
remove the openmolar_demo database
'''
return self.drop_db("openmolar_demo")
@log_exception
[docs] def drop_db(self, dbname):
'''
remove the database with this name
also attempts to remove the standard user groups (this will fail if
other roles in these groups haven't been removed first)
'''
logging.warning("user '%s' is deleting database %s" %(
self._user, dbname))
logging.warning("removing database (if exists) %s"% dbname)
if self._execute('drop database if exists %s;'% dbname):
logging.info("database '%s' removed"% dbname)
else:
return False
for group in ("admin", "client"):
user_group = 'om_%s_%s;'% (group, dbname)
logging.warning("removing role (if exists) '%s'"% user_group)
if self._execute('drop user if exists %s'% user_group ):
logging.info("role '%s' removed"% user_group)
else:
return False
return True
@log_exception
[docs] def create_user(self, username, password=None):
'''
create a user (remote user)
'''
log = logging.getLogger("openmolar_server")
log.info("add a login user with name '%s' and password"% username)
if password is None:
password = new_password()
try:
self._execute(
"create user %s with login encrypted password '%s' "% (
username, password))
return True
except Exception:
log.exception("Serious Error")
return False
@log_exception
[docs] def grant_user_permissions(self, user, dbname, admin=True, client=True):
'''
grant permissions for a user to database dbname
'''
log = logging.getLogger("openmolar_server")
log.info("adding %s to priv groups on database %s"% (user, dbname))
SQL = ""
if admin:
SQL += "GRANT om_admin_group_%s to %s;\n"% (dbname, user)
if client:
SQL += "GRANT om_client_group_%s to %s;\n"% (dbname, user)
try:
self._execute(SQL, dbname)
return True
except Exception:
log.exception("Serious Error")
return False
@log_exception
[docs] def drop_demo_user(self):
'''
drops the demo user
'''
return self.drop_user("om_demo")
@log_exception
[docs] def drop_user(self, username):
'''
drops a user
'''
logging.warning("removing user %s"% username)
if self._execute('drop user %s;'% username):
logging.info("user '%s' removed"% username)
return True
return False
def _tables(self, dbname):
'''
returns all the table names in schema dbname
'''
conn = psycopg2.connect(self.__conn_atts(dbname))
cursor = conn.cursor()
cursor.execute(
"SELECT tablename FROM pg_tables WHERE schemaname='public'")
for tablename in cursor.fetchall():
yield tablename[0]
def _sequences(self, dbname):
'''
returns all the sequences in schema dbname
'''
conn = psycopg2.connect(self.__conn_atts(dbname))
cursor = conn.cursor()
cursor.execute(
"select sequence_name from information_schema.sequences")
for sequence_name in cursor.fetchall():
yield sequence_name[0]
@log_exception
[docs] def truncate_all_tables(self, dbname):
'''
truncates all tables except 'procedure codes'
resets the patient serialno index
'''
logging.warning("removing all data from %s"% dbname)
for tablename in self._tables(dbname):
if tablename != 'procedure_codes':
logging.info("... truncating '%s'"% tablename)
self._execute("TRUNCATE %s CASCADE"% tablename, dbname)
for sequence in self._sequences(dbname):
if not sequence.startswith('procedure_codes'):
logging.info("... reseting sequence '%s'"% sequence)
self._execute("select setval('%s', 1, false)"% sequence,
dbname)
return True
def _test():
'''
test the DBFunctions class
'''
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger("openmolar_server")
sf = DBFunctions()
sf._user = "test_user"
log.debug(sf.available_databases())
dbname = "openmolar_demo"
#log.debug(sf.newDB_sql(dbname))
sf.drop_db(dbname)
sf.drop_demo_user()
sf.create_db(dbname)
sf.create_demo_user()
sf.truncate_all_tables(dbname)
if __name__ == "__main__":
_test()