tests/xmlrpc.py
author Sandro Knauß <knauss@netzguerilla.net>
Thu, 26 Jan 2012 01:15:33 +0100
branchdevel
changeset 109 935b5fcaf152
parent 108 cadc01b2bdc0
child 113 abdece5f6be6
permissions -rw-r--r--
little typos

from multiprocessing import Process
from sqlalchemy import create_engine, pool

from tempfile import mkdtemp
import shutil

from iro.model.utils import WithSession, POOL_SIZE as DB_POOL_SIZE
import iro.model.user as imuser

from iro.model.schema import User, Base

from ngdatabase.mysql import Server, createConfig, Database

class SampleDatabase(Database):
    def createPassword(self):
        self.password="test"
        return self.password

from twisted.python import log
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s(%(processName)s)-%(levelname)s: %(message)s')
#observer = log.PythonLoggingObserver()
#observer.start()

def xxxxx(userid):
    import xmlrpclib
    return xmlrpclib.ServerProxy('http://localhost:7080/RPC2').listMethods()

def main():
    from twisted.internet import reactor
    from twisted.web import xmlrpc, server
    
    from iro.view.xmlrpc import getResource
    
    class XMLRPCDebug(xmlrpc.XMLRPC): 
        def xmlrpc_stop(self):
            reactor.callLater(0.5,reactor.stop)
            return ""

    root=getResource()
    root.putChild('debug', XMLRPCDebug())
    reactor.listenTCP(7080, server.Site(root))
    logging.info("Server is running now...")
    reactor.run()

if __name__ == '__main__':
    tdir = mkdtemp(prefix='iro-mysql-')
    try:
        with open('%s/my.cnf'%tdir,'w') as cnf:
            cnf.write(createConfig(tdir))
        s = Server('%s/my.cnf'%tdir)
        s.create()
        s.start()
        d=SampleDatabase("test","test",'%s/my.cnf'%tdir)
        d.create()
        engine = create_engine('mysql://test:test@localhost/test?unix_socket=%s/socket'%tdir,
               poolclass = pool.SingletonThreadPool,  pool_size=DB_POOL_SIZE, )

        imuser.setEngine(engine)

        try:
            Base.metadata.create_all(engine)
            with WithSession(engine, autocommit=True) as session:
                session.add(User(name='test',apikey='abcdef123456789'))
            p = Process(target=main)
            p.start()
            p.join()
        finally:
            s.stop()
    finally:
        shutil.rmtree(tdir)