controller.database -> model.utils
cleaning up model.utils
documenting model.utils
--- a/iro/controller/database.py Sun Jan 22 12:26:00 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,56 +0,0 @@
-from sqlalchemy import create_engine, pool
-
-from sqlalchemy.orm import sessionmaker
-
-
-from twisted.internet import threads
-
-from ..model.schema import Base
-
-engine = create_engine('sqlite:///:memory:', echo=True)
-
-def createDatabase():
- Base.metadata.create_all(engine)
-
-Session = sessionmaker(bind=engine)
-
-def toThread(f):
- def wrapper(*args, **kwargs):
- return threads.deferToThread(f, *args, **kwargs)
- return wrapper
-
-
-class WithSession():
- def __init__(self,autocommit=False):
- self.autocommit=autocommit
-
- def __enter__(self):
- self.conn = engine.connect()
- self.session = Session(bind=self.conn)
- return self.session
-
- def __exit__(self,exc_type, exc_value, traceback):
- if exc_type is None:
- if self.autocommit:
- self.session.commit()
- else:
- self.session.rollback()
- self.session.close()
- self.conn.close()
-
-class DBDefer(object):
- def __init__(self, dsn, poolclass = pool.SingletonThreadPool, *args, **kargs):
- self.engine = create_engine(dsn, poolclass=poolclass, *args, **kargs)
-
- def __call__(self, func):
- @toThread
- def wrapper(*args, **kwargs):
- session = sessionmaker(bind=self.engine)()
- try:
- return func(*args, session=session, **kwargs)
- except:
- session.rollback()
- raise
- finally:
- session.close()
- return wrapper
--- a/iro/model/__init__.py Sun Jan 22 12:26:00 2012 +0100
+++ b/iro/model/__init__.py Sun Jan 22 23:29:18 2012 +0100
@@ -1,2 +1,3 @@
import schema
import user
+import utils
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/iro/model/utils.py Sun Jan 22 23:29:18 2012 +0100
@@ -0,0 +1,49 @@
+from sqlalchemy.orm import sessionmaker
+
+from twisted.internet import reactor
+from twisted.internet import threads
+from twisted.python.threadpool import ThreadPool
+
+POOL_SIZE=5 #how many threads should the db connector pool should have
+
+dbpool = ThreadPool(minthreads=1, maxthreads=POOL_SIZE, name='database')
+dbpool.start()
+reactor.addSystemEventTrigger('before', 'shutdown', dbpool.stop)
+
+def run_in_db_thread(f):
+ """Decorator to run DB queries in Twisted's thread pool"""
+ def wrapper(*args, **kwargs):
+ return threads.deferToThreadPool(reactor, dbpool,f, *args, **kwargs)
+ return wrapper
+
+
+class WithSession(object):
+ '''a with statement for a database session connection'''
+ def __init__(self, engine, autocommit=False):
+ self.engine = engine
+ self.autocommit=autocommit
+
+ def __enter__(self):
+ self.session = sessionmaker(bind=self.engine)()
+ return self.session
+
+ def __exit__(self,exc_type, exc_value, traceback):
+ if exc_type is None:
+ if self.autocommit:
+ self.session.commit()
+ else:
+ self.session.rollback()
+ self.session.close()
+
+class DBDefer(object):
+ '''a twisted sqlalchemy connector this Decorator adds a session parameter, with a valid session connection'''
+ def __init__(self, engine, autocommit=False):
+ self.autocommit=autocommit
+ self.engine = engine
+
+ def __call__(self, func):
+ @run_in_db_thread
+ def wrapper(*args, **kwargs):
+ with WithSession(self.engine, self.autocommit) as session:
+ return func(*args, session=session, **kwargs)
+ return wrapper
--- a/tests/xmlrpc.py Sun Jan 22 12:26:00 2012 +0100
+++ b/tests/xmlrpc.py Sun Jan 22 23:29:18 2012 +0100
@@ -1,16 +1,15 @@
from multiprocessing import Process
-from sqlalchemy import create_engine
+from sqlalchemy import create_engine, pool
from tempfile import mkdtemp
import shutil
-from iro.controller.database import createDatabase,WithSession, DBDefer
+from iro.model.utils import WithSession, DBDefer, POOL_SIZE as DB_POOL_SIZE
import iro.model.user as imuser
-import iro.controller.database as db
-from iro.model.schema import User
+from iro.model.schema import User, Base
-from ngdatabase.mysql import Server,createConfig,Database
+from ngdatabase.mysql import Server, createConfig, Database
class SampleDatabase(Database):
def createPassword(self):
@@ -19,7 +18,7 @@
from twisted.python import log
import logging
-logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s(%(processName)s)-%(levelname)s: %(message)s')
+logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s(%(processName)s)-%(levelname)s: %(message)s')
#observer = log.PythonLoggingObserver()
#observer.start()
@@ -41,7 +40,7 @@
root=getResource()
root.putChild('debug', XMLRPCDebug())
reactor.listenTCP(7080, server.Site(root))
- reactor.suggestThreadPoolSize(5)
+ logging.info("Server is running now...")
reactor.run()
if __name__ == '__main__':
@@ -54,9 +53,10 @@
s.start()
d=SampleDatabase("test","test",'%s/my.cnf'%tdir)
d.create()
- db.engine = create_engine('mysql://test:test@localhost/test?unix_socket=%s/socket'%tdir)
- dbdefer = DBDefer('mysql://test:test@localhost/test?unix_socket=%s/socket'%tdir, pool_size=5)
-
+ engine = create_engine('mysql://test:test@localhost/test?unix_socket=%s/socket'%tdir,
+ poolclass = pool.SingletonThreadPool, pool_size=DB_POOL_SIZE, )
+ dbdefer = DBDefer(engine)
+ withsession=WithSession(engine, autocommit=True)
@dbdefer
def getuser(userhash, session):
#session.execute("SELECT SLEEP(10)")
@@ -66,10 +66,9 @@
imuser._getuser=imuser.getuser
imuser.getuser=getuser
-
try:
- createDatabase()
- with WithSession(autocommit=True) as session:
+ Base.metadata.create_all(engine)
+ with withsession as session:
session.add(User(name='test',apikey='abcdef123456789'))
p = Process(target=main)
p.start()