# HG changeset patch # User Sandro Knauß # Date 1327537384 -3600 # Node ID abdece5f6be6e645b39884ac8936ce1ddc4ab924 # Parent ea437d1e7b6535eca5864d07e5b9e31221fde3f7 building valid unittests diff -r ea437d1e7b65 -r abdece5f6be6 tests/xmlrpc.py --- a/tests/xmlrpc.py Thu Jan 26 01:21:32 2012 +0100 +++ b/tests/xmlrpc.py Thu Jan 26 01:23:04 2012 +0100 @@ -1,71 +1,152 @@ from multiprocessing import Process from sqlalchemy import create_engine, pool +import unittest + from tempfile import mkdtemp import shutil from iro.model.utils import WithSession, POOL_SIZE as DB_POOL_SIZE -import iro.model.user as imuser from iro.model.schema import User, Base from ngdatabase.mysql import Server, createConfig, Database +from iro.main import runReactor + +import iro.error as IroError + +import time + +from xmlrpclib import Server as xServer, ServerProxy, Fault + class SampleDatabase(Database): def createPassword(self): self.password="test" return self.password + +#activates all logging we can get. + from twisted.python import log import logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s(%(processName)s)-%(levelname)s: %(message)s') -#observer = log.PythonLoggingObserver() -#observer.start() +observer = log.PythonLoggingObserver() +observer.start() + +class XMLRPCTest(unittest.TestCase): + """tests for the xmlrpc interface""" + def setUp(self): + self.s = Process(target=startReactor, args=(md.engine,)) + self.s.start() + #the new process needs time to get stated, so this process has to sleep + time.sleep(.2) + + def tearDown(self): + self.__debug().stop() + time.sleep(.2) + self.s.join() + + def __debug(self): + return xServer('http://localhost:7080/debug') + + def __rpc2(self): + return ServerProxy('http://localhost:7080/RPC2') + + def testDebugHello(self): + '''simple test for the connection to xmlrpc server''' + ret=self.__debug().hello() + self.failUnlessEqual(ret,'hello') + + def testListMethods(self): + '''list of all offical Methods, that can be executed''' + ret=self.__rpc2().listMethods() + self.failUnlessEqual(ret, ['listMethods', 'status', 'stop', 'sms', 'fax', 'mail', 'routes', 'defaultRoute', 'statistic']) -def xxxxx(userid): - import xmlrpclib - return xmlrpclib.ServerProxy('http://localhost:7080/RPC2').listMethods() + def testStatus(self): + ret = self.__rpc2().status('abcdef123456789') + self.failUnlessEqual(ret, "") + + def testNoSuchUser(self): + '''a unknown user should raise a UserNotNound Exception + bewcause xmlrpc only has a Fault exception this Exception has to be deliverd through a xmlrpclib.Fault Exception''' + with self.assertRaises(Fault) as fault: + self.__rpc2().status('abcdef123456788') + exc = fault.exception + unf=IroError.UserNotFound() + self.failUnlessEqual(exc.faultCode, unf.code) + self.failUnlessEqual(exc.faultString, unf.msg) -def main(): + def testNoSuchMethod(self): + '''a unknown mothod should raise a Exception ''' + with self.assertRaises(Fault) as fault: + self.__rpc2().nosuchmethod() + exc = fault.exception + self.failUnlessEqual(exc.faultCode, 8001) + self.failUnlessEqual(exc.faultString, "procedure nosuchmethod not found") + + def testValidation(self): + '''a validate Exception should be translated to a xmlrpclib.Fault.''' + with self.assertRaises(Fault) as fault: + self.__rpc2().status('xxx') + exc = fault.exception + self.failUnlessEqual(exc.faultCode, 700) + self.failUnlessEqual(exc.faultString, "Validation of 'apikey' failed.") + + + +def startReactor(engine): + """starts the Rector with a special debug Clild, so that the reactor can be stopped remotly. """ from twisted.internet import reactor - from twisted.web import xmlrpc, server + from twisted.web import xmlrpc, resource - from iro.view.xmlrpc import getResource + from iro.view.xmlrpc import appendResource class XMLRPCDebug(xmlrpc.XMLRPC): def xmlrpc_stop(self): - reactor.callLater(0.5,reactor.stop) + reactor.callLater(0.1,reactor.stop) return "" - root=getResource() + def xmlrpc_hello(self): + return "hello" + + root = resource.Resource() + root = appendResource(root) root.putChild('debug', XMLRPCDebug()) - reactor.listenTCP(7080, server.Site(root)) - logging.info("Server is running now...") - reactor.run() + runReactor(reactor, engine, root) + + +class ModuleData: + def __init__(self): + self.tdir = mkdtemp(prefix='iro-mysql-') + self.server = Server('%s/my.cnf'%self.tdir) + self.db = SampleDatabase("test","test",'%s/my.cnf'%self.tdir) + self.engine = create_engine('mysql://test:test@localhost/test?unix_socket=%s/socket'%self.tdir, + poolclass = pool.SingletonThreadPool, pool_size=DB_POOL_SIZE, ) + def setUp(self): + with open('%s/my.cnf'%self.tdir,'w') as cnf: + cnf.write(createConfig(self.tdir)) + self.server.create() + self.server.start() + self.db.create() + Base.metadata.create_all(self.engine) + with WithSession(self.engine, autocommit=True) as session: + session.add(User(name='test',apikey='abcdef123456789')) + + def tearDown(self): + self.server.stop() + shutil.rmtree(self.tdir) + + +md=ModuleData() + +def setUpModule(): + md.setUp() + +def tearDownModule(): + md.tearDown() + + if __name__ == '__main__': - tdir = mkdtemp(prefix='iro-mysql-') - try: - with open('%s/my.cnf'%tdir,'w') as cnf: - cnf.write(createConfig(tdir)) - s = Server('%s/my.cnf'%tdir) - s.create() - s.start() - d=SampleDatabase("test","test",'%s/my.cnf'%tdir) - d.create() - engine = create_engine('mysql://test:test@localhost/test?unix_socket=%s/socket'%tdir, - poolclass = pool.SingletonThreadPool, pool_size=DB_POOL_SIZE, ) - - imuser.setEngine(engine) - - try: - Base.metadata.create_all(engine) - with WithSession(engine, autocommit=True) as session: - session.add(User(name='test',apikey='abcdef123456789')) - p = Process(target=main) - p.start() - p.join() - finally: - s.stop() - finally: - shutil.rmtree(tdir) + unittest.main()