from sqlalchemy.orm import sessionmaker
from twisted.internet import threads
from twisted.python.threadpool import ThreadPool
POOL_SIZE=5 #how many threads should the db connector pool should have
class Data:
def __init__(self):
self.pool = ThreadPool(minthreads=1, maxthreads=POOL_SIZE, name='database')
self.reactor = None
d = Data()
def startPool(reactor):
d.pool.start()
d.reactor = reactor
d.reactor.addSystemEventTrigger('before', 'shutdown', d.pool.stop)
def run_in_db_thread(f):
"""Decorator to run DB queries in Twisted's thread pool"""
def wrapper(*args, **kwargs):
return threads.deferToThreadPool(d.reactor, d.pool, f, *args, **kwargs)
return wrapper
class WithSession(object):
'''a with statement for a database session connection'''
def __init__(self, engine, autocommit=False):
self.engine = engine
self.autocommit=autocommit
def __enter__(self):
self.session = sessionmaker(bind=self.engine)()
return self.session
def __exit__(self,exc_type, exc_value, traceback):
if exc_type is None:
if self.autocommit:
self.session.commit()
else:
self.session.rollback()
self.session.close()
class DBDefer(object):
'''a twisted sqlalchemy connector this Decorator adds a session parameter, with a valid session connection'''
def __init__(self, engine, autocommit=False):
self.autocommit=autocommit
self.engine = engine
def __call__(self, func):
@run_in_db_thread
def wrapper(*args, **kwargs):
with WithSession(self.engine, self.autocommit) as session:
return func(*args, session=session, **kwargs)
return wrapper