building valid unittests devel
authorSandro Knauß <knauss@netzguerilla.net>
Thu, 26 Jan 2012 01:23:04 +0100
branchdevel
changeset 113 abdece5f6be6
parent 112 ea437d1e7b65
child 114 1ed072cc6793
building valid unittests
tests/xmlrpc.py
--- a/tests/xmlrpc.py	Thu Jan 26 01:21:32 2012 +0100
+++ b/tests/xmlrpc.py	Thu Jan 26 01:23:04 2012 +0100
@@ -1,71 +1,152 @@
 from multiprocessing import Process
 from sqlalchemy import create_engine, pool
+import unittest
+
 
 from tempfile import mkdtemp
 import shutil
 
 from iro.model.utils import WithSession, POOL_SIZE as DB_POOL_SIZE
-import iro.model.user as imuser
 
 from iro.model.schema import User, Base
 
 from ngdatabase.mysql import Server, createConfig, Database
 
+from iro.main import runReactor
+
+import iro.error as IroError
+
+import time
+
+from xmlrpclib import Server as xServer, ServerProxy, Fault
+
 class SampleDatabase(Database):
     def createPassword(self):
         self.password="test"
         return self.password
 
+
+#activates all logging we can get.
+
 from twisted.python import log
 import logging
 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s(%(processName)s)-%(levelname)s: %(message)s')
-#observer = log.PythonLoggingObserver()
-#observer.start()
+observer = log.PythonLoggingObserver()
+observer.start()
+
+class XMLRPCTest(unittest.TestCase):
+    """tests for the xmlrpc interface"""
+    def setUp(self):
+        self.s = Process(target=startReactor, args=(md.engine,))
+        self.s.start()
+        #the new process needs time to get stated, so this process has to sleep
+        time.sleep(.2)
+
+    def tearDown(self):
+        self.__debug().stop()
+        time.sleep(.2)
+        self.s.join()
+
+    def __debug(self):
+        return xServer('http://localhost:7080/debug')
+
+    def __rpc2(self):
+        return ServerProxy('http://localhost:7080/RPC2')
+
+    def testDebugHello(self):
+        '''simple test for the connection to xmlrpc server'''
+        ret=self.__debug().hello()
+        self.failUnlessEqual(ret,'hello')
+
+    def testListMethods(self):
+        '''list of all offical Methods, that can be executed'''
+        ret=self.__rpc2().listMethods()
+        self.failUnlessEqual(ret, ['listMethods', 'status', 'stop', 'sms', 'fax', 'mail', 'routes', 'defaultRoute', 'statistic'])
 
-def xxxxx(userid):
-    import xmlrpclib
-    return xmlrpclib.ServerProxy('http://localhost:7080/RPC2').listMethods()
+    def testStatus(self):
+        ret = self.__rpc2().status('abcdef123456789')
+        self.failUnlessEqual(ret, "<User('test','abcdef123456789')>")
+
+    def testNoSuchUser(self):
+        '''a unknown user should raise a UserNotNound Exception
+        bewcause xmlrpc only has a Fault exception this Exception has to be deliverd through a xmlrpclib.Fault Exception'''
+        with self.assertRaises(Fault) as fault:
+            self.__rpc2().status('abcdef123456788')
+        exc = fault.exception
+        unf=IroError.UserNotFound()
+        self.failUnlessEqual(exc.faultCode, unf.code)
+        self.failUnlessEqual(exc.faultString, unf.msg)
 
-def main():
+    def testNoSuchMethod(self):
+        '''a unknown mothod should raise a Exception '''
+        with self.assertRaises(Fault) as fault:
+            self.__rpc2().nosuchmethod()
+        exc = fault.exception
+        self.failUnlessEqual(exc.faultCode, 8001)
+        self.failUnlessEqual(exc.faultString, "procedure nosuchmethod not found")
+    
+    def testValidation(self):
+        '''a validate Exception should be translated to a xmlrpclib.Fault.'''
+        with self.assertRaises(Fault) as fault:
+            self.__rpc2().status('xxx')
+        exc = fault.exception
+        self.failUnlessEqual(exc.faultCode, 700)
+        self.failUnlessEqual(exc.faultString, "Validation of 'apikey' failed.")
+
+
+
+def startReactor(engine):
+    """starts the Rector with a special debug Clild, so that the reactor can be stopped remotly. """
     from twisted.internet import reactor
-    from twisted.web import xmlrpc, server
+    from twisted.web import xmlrpc, resource
     
-    from iro.view.xmlrpc import getResource
+    from iro.view.xmlrpc import appendResource
     
     class XMLRPCDebug(xmlrpc.XMLRPC): 
         def xmlrpc_stop(self):
-            reactor.callLater(0.5,reactor.stop)
+            reactor.callLater(0.1,reactor.stop)
             return ""
 
-    root=getResource()
+        def xmlrpc_hello(self):
+            return "hello"
+
+    root = resource.Resource()
+    root = appendResource(root)
     root.putChild('debug', XMLRPCDebug())
-    reactor.listenTCP(7080, server.Site(root))
-    logging.info("Server is running now...")
-    reactor.run()
+    runReactor(reactor, engine, root)
+
+
+class ModuleData:
+    def __init__(self):
+        self.tdir = mkdtemp(prefix='iro-mysql-')
+        self.server = Server('%s/my.cnf'%self.tdir)
+        self.db = SampleDatabase("test","test",'%s/my.cnf'%self.tdir)
+        self.engine = create_engine('mysql://test:test@localhost/test?unix_socket=%s/socket'%self.tdir,
+                poolclass = pool.SingletonThreadPool,  pool_size=DB_POOL_SIZE, )
 
+    def setUp(self):
+        with open('%s/my.cnf'%self.tdir,'w') as cnf:
+            cnf.write(createConfig(self.tdir))
+        self.server.create()
+        self.server.start()
+        self.db.create()
+        Base.metadata.create_all(self.engine)
+        with WithSession(self.engine, autocommit=True) as session:
+            session.add(User(name='test',apikey='abcdef123456789'))
+    
+    def tearDown(self):
+        self.server.stop()
+        shutil.rmtree(self.tdir)
+ 
+
+md=ModuleData()
+
+def setUpModule():
+    md.setUp()
+
+def tearDownModule():
+    md.tearDown()
+
+    
 if __name__ == '__main__':
-    tdir = mkdtemp(prefix='iro-mysql-')
-    try:
-        with open('%s/my.cnf'%tdir,'w') as cnf:
-            cnf.write(createConfig(tdir))
-        s = Server('%s/my.cnf'%tdir)
-        s.create()
-        s.start()
-        d=SampleDatabase("test","test",'%s/my.cnf'%tdir)
-        d.create()
-        engine = create_engine('mysql://test:test@localhost/test?unix_socket=%s/socket'%tdir,
-               poolclass = pool.SingletonThreadPool,  pool_size=DB_POOL_SIZE, )
-
-        imuser.setEngine(engine)
-
-        try:
-            Base.metadata.create_all(engine)
-            with WithSession(engine, autocommit=True) as session:
-                session.add(User(name='test',apikey='abcdef123456789'))
-            p = Process(target=main)
-            p.start()
-            p.join()
-        finally:
-            s.stop()
-    finally:
-        shutil.rmtree(tdir)
+        unittest.main()