--- a/tests/xmlrpc.py Thu Jan 26 01:21:32 2012 +0100
+++ b/tests/xmlrpc.py Thu Jan 26 01:23:04 2012 +0100
@@ -1,71 +1,152 @@
from multiprocessing import Process
from sqlalchemy import create_engine, pool
+import unittest
+
from tempfile import mkdtemp
import shutil
from iro.model.utils import WithSession, POOL_SIZE as DB_POOL_SIZE
-import iro.model.user as imuser
from iro.model.schema import User, Base
from ngdatabase.mysql import Server, createConfig, Database
+from iro.main import runReactor
+
+import iro.error as IroError
+
+import time
+
+from xmlrpclib import Server as xServer, ServerProxy, Fault
+
class SampleDatabase(Database):
def createPassword(self):
self.password="test"
return self.password
+
+#activates all logging we can get.
+
from twisted.python import log
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s(%(processName)s)-%(levelname)s: %(message)s')
-#observer = log.PythonLoggingObserver()
-#observer.start()
+observer = log.PythonLoggingObserver()
+observer.start()
+
+class XMLRPCTest(unittest.TestCase):
+ """tests for the xmlrpc interface"""
+ def setUp(self):
+ self.s = Process(target=startReactor, args=(md.engine,))
+ self.s.start()
+ #the new process needs time to get stated, so this process has to sleep
+ time.sleep(.2)
+
+ def tearDown(self):
+ self.__debug().stop()
+ time.sleep(.2)
+ self.s.join()
+
+ def __debug(self):
+ return xServer('http://localhost:7080/debug')
+
+ def __rpc2(self):
+ return ServerProxy('http://localhost:7080/RPC2')
+
+ def testDebugHello(self):
+ '''simple test for the connection to xmlrpc server'''
+ ret=self.__debug().hello()
+ self.failUnlessEqual(ret,'hello')
+
+ def testListMethods(self):
+ '''list of all offical Methods, that can be executed'''
+ ret=self.__rpc2().listMethods()
+ self.failUnlessEqual(ret, ['listMethods', 'status', 'stop', 'sms', 'fax', 'mail', 'routes', 'defaultRoute', 'statistic'])
-def xxxxx(userid):
- import xmlrpclib
- return xmlrpclib.ServerProxy('http://localhost:7080/RPC2').listMethods()
+ def testStatus(self):
+ ret = self.__rpc2().status('abcdef123456789')
+ self.failUnlessEqual(ret, "<User('test','abcdef123456789')>")
+
+ def testNoSuchUser(self):
+ '''a unknown user should raise a UserNotNound Exception
+ bewcause xmlrpc only has a Fault exception this Exception has to be deliverd through a xmlrpclib.Fault Exception'''
+ with self.assertRaises(Fault) as fault:
+ self.__rpc2().status('abcdef123456788')
+ exc = fault.exception
+ unf=IroError.UserNotFound()
+ self.failUnlessEqual(exc.faultCode, unf.code)
+ self.failUnlessEqual(exc.faultString, unf.msg)
-def main():
+ def testNoSuchMethod(self):
+ '''a unknown mothod should raise a Exception '''
+ with self.assertRaises(Fault) as fault:
+ self.__rpc2().nosuchmethod()
+ exc = fault.exception
+ self.failUnlessEqual(exc.faultCode, 8001)
+ self.failUnlessEqual(exc.faultString, "procedure nosuchmethod not found")
+
+ def testValidation(self):
+ '''a validate Exception should be translated to a xmlrpclib.Fault.'''
+ with self.assertRaises(Fault) as fault:
+ self.__rpc2().status('xxx')
+ exc = fault.exception
+ self.failUnlessEqual(exc.faultCode, 700)
+ self.failUnlessEqual(exc.faultString, "Validation of 'apikey' failed.")
+
+
+
+def startReactor(engine):
+ """starts the Rector with a special debug Clild, so that the reactor can be stopped remotly. """
from twisted.internet import reactor
- from twisted.web import xmlrpc, server
+ from twisted.web import xmlrpc, resource
- from iro.view.xmlrpc import getResource
+ from iro.view.xmlrpc import appendResource
class XMLRPCDebug(xmlrpc.XMLRPC):
def xmlrpc_stop(self):
- reactor.callLater(0.5,reactor.stop)
+ reactor.callLater(0.1,reactor.stop)
return ""
- root=getResource()
+ def xmlrpc_hello(self):
+ return "hello"
+
+ root = resource.Resource()
+ root = appendResource(root)
root.putChild('debug', XMLRPCDebug())
- reactor.listenTCP(7080, server.Site(root))
- logging.info("Server is running now...")
- reactor.run()
+ runReactor(reactor, engine, root)
+
+
+class ModuleData:
+ def __init__(self):
+ self.tdir = mkdtemp(prefix='iro-mysql-')
+ self.server = Server('%s/my.cnf'%self.tdir)
+ self.db = SampleDatabase("test","test",'%s/my.cnf'%self.tdir)
+ self.engine = create_engine('mysql://test:test@localhost/test?unix_socket=%s/socket'%self.tdir,
+ poolclass = pool.SingletonThreadPool, pool_size=DB_POOL_SIZE, )
+ def setUp(self):
+ with open('%s/my.cnf'%self.tdir,'w') as cnf:
+ cnf.write(createConfig(self.tdir))
+ self.server.create()
+ self.server.start()
+ self.db.create()
+ Base.metadata.create_all(self.engine)
+ with WithSession(self.engine, autocommit=True) as session:
+ session.add(User(name='test',apikey='abcdef123456789'))
+
+ def tearDown(self):
+ self.server.stop()
+ shutil.rmtree(self.tdir)
+
+
+md=ModuleData()
+
+def setUpModule():
+ md.setUp()
+
+def tearDownModule():
+ md.tearDown()
+
+
if __name__ == '__main__':
- tdir = mkdtemp(prefix='iro-mysql-')
- try:
- with open('%s/my.cnf'%tdir,'w') as cnf:
- cnf.write(createConfig(tdir))
- s = Server('%s/my.cnf'%tdir)
- s.create()
- s.start()
- d=SampleDatabase("test","test",'%s/my.cnf'%tdir)
- d.create()
- engine = create_engine('mysql://test:test@localhost/test?unix_socket=%s/socket'%tdir,
- poolclass = pool.SingletonThreadPool, pool_size=DB_POOL_SIZE, )
-
- imuser.setEngine(engine)
-
- try:
- Base.metadata.create_all(engine)
- with WithSession(engine, autocommit=True) as session:
- session.add(User(name='test',apikey='abcdef123456789'))
- p = Process(target=main)
- p.start()
- p.join()
- finally:
- s.stop()
- finally:
- shutil.rmtree(tdir)
+ unittest.main()