tests/xmlrpc.py
branchdevel
changeset 131 c51c3e8c3ec0
parent 130 05e599aa83c3
child 132 80a334e2aae7
--- a/tests/xmlrpc.py	Mon Jan 30 22:15:21 2012 +0100
+++ b/tests/xmlrpc.py	Mon Jan 30 23:28:14 2012 +0100
@@ -11,11 +11,9 @@
 
 from xmlrpclib import Server as xServer, ServerProxy, Fault
 
-from iro.model.utils import WithSession
 from iro.model import POOL_SIZE as DB_POOL_SIZE
 
 from iro.model.schema import User, Base, Offer, Userright, Job, Message
-import iro.model.schema as schema
 
 from iro.main import runReactor
 
@@ -23,6 +21,8 @@
 
 from ngdatabase.mysql import Server, createConfig, Database
 
+from .dbtestcase import DBTestCase
+
 class SampleDatabase(Database):
     def createPassword(self):
         self.password="test"
@@ -37,9 +37,11 @@
 observer = log.PythonLoggingObserver()
 observer.start()
 
-class XMLRPCTest(unittest.TestCase):
+class XMLRPCTest(DBTestCase):
     """tests for the xmlrpc interface"""
     def setUp(self):
+        if not self.engine:
+            self.engine = md.engine
         self.s = Process(target=startReactor, args=(md.engine,))
         self.s.start()
         #the new process needs time to get stated, so this process has to sleep
@@ -49,12 +51,7 @@
         self.__debug().stop()
         time.sleep(.2)
         self.s.join()
-        self.__cleanDB()
-
-    def __cleanDB(self):
-        with WithSession(md.engine, autocommit=True) as session:
-            for table in schema.__tables__:
-                session.query(getattr(schema,table)).delete()
+        DBTestCase.tearDown(self)
 
     def __debug(self):
         return xServer('http://localhost:7080/debug')
@@ -74,12 +71,12 @@
 
     def testStatus(self):
         ''' test the status function'''
-        with WithSession(md.engine, autocommit=True) as session:
+        with self.session() as session:
             u = User(name='test',apikey='abcdef123456789')
             session.add(User(name='test',apikey='abcdef123456789'))
         self.failUnlessEqual(self.__rpc2().status('abcdef123456789'), {})
         
-        with WithSession(md.engine, autocommit=True) as session:
+        with self.session() as session:
             u = session.merge(u)
             j = Job(hash="a1", info='info', status="started")
             j.user=u
@@ -131,7 +128,7 @@
 
     def testRoutes(self):
         '''test the route function'''
-        with WithSession(md.engine, autocommit=True) as session:
+        with self.session() as session:
             u=User(name='test',apikey='abcdef123456789')
             o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms")
             u.rights.append(Userright(o)) 
@@ -144,7 +141,7 @@
         self.failUnlessEqual(exc.faultCode, 700)
         self.failUnlessEqual(exc.faultString, "Typ is not valid.")
         
-        with WithSession(md.engine, autocommit=True) as session:
+        with self.session() as session:
             o=Offer(name="sipgate_plus", provider="sipgate", route="plus", typ="sms")
             u = session.query(User).filter_by(name="test").first()
             u.rights.append(Userright(o)) 
@@ -154,7 +151,7 @@
         self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','sms'),['sipgate_basic','sipgate_plus'])
         self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','fax'),[])
 
-        with WithSession(md.engine, autocommit=True) as session:
+        with self.session() as session:
             u = session.query(User).filter_by(name="test").first()
             u.rights.append(Userright(o)) 
         
@@ -163,7 +160,7 @@
 
     def testDefaultRoutes(self):
         '''test the defaultRoute function'''
-        with WithSession(md.engine, autocommit=True) as session:
+        with self.session() as session:
             u=User(name='test',apikey='abcdef123456789')
             o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms")
             u.rights.append(Userright(o,True)) 
@@ -202,42 +199,24 @@
     def testBill(self):
         '''test bill function'''
         apikey='abcdef123456789'
-        with WithSession(md.engine, autocommit=True) as session:
+        with self.session() as session:
             u=User(name='test',apikey=apikey)
             session.add(u)
+      
         self.failUnlessEqual(self.__rpc2().bill(apikey),{'total':{'price':0.0,'anz':0}})
         
-        with WithSession(md.engine, autocommit=True) as session:
+        with self.session() as session:
             u = session.merge(u)
             o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms")
+            u.rights.append(Userright(o))
             j = Job(hash='a1',info='i',status='sended')
-            m = Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.30, offer=o, job=j)
-            u.rights.append(Userright(o))
+            j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.4, offer=o))
             u.jobs.append(j)
-            session.add(m)
-            
-        self.failUnlessEqual(self.__rpc2().bill(apikey),{'total':{'price':0.3,'anz':1},
-            'sipgate_basic':{'price':0.3,'anz':1,'info':{'i':{'price':0.3,'anz':1}}}})
-    
-        with WithSession(md.engine, autocommit=True) as session:
-            j = session.merge(j)
-            j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.4, offer=o))
  
-        self.failUnlessEqual(self.__rpc2().bill(apikey),{'total':{'price':0.7,'anz':2},
-            'sipgate_basic':{'price':0.7,'anz':2,'info':{'i':{'price':0.7,'anz':2}}}})
-        
-        with WithSession(md.engine, autocommit=True) as session:
-            m = session.merge(m)
-            m.isBilled=True
-        
-        self.failUnlessEqual(self.__rpc2().bill(apikey),{'total':{'price':0.4,'anz':1},
-            'sipgate_basic':{'price':0.4,'anz':1,'info':{'i':{'price':0.4,'anz':1}}}})
-
-        with WithSession(md.engine, autocommit=True) as session:
-            u = session.merge(u)
             j = Job(hash='a2',info='a',status='sended')
             j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now(), price=0.4, offer=o))
             u.jobs.append(j) 
+        
         ret=self.__rpc2().bill(apikey) 
         self.failUnlessEqual(ret['total'],{'price':0.8,'anz':2})
         self.failUnlessEqual(ret['sipgate_basic'],
@@ -247,23 +226,7 @@
                         }
                     })
         
-        with WithSession(md.engine, autocommit=True) as session:
-            u = session.merge(u)
-            j = Job(hash='a3',info='a',status='sended')
-            o = Offer(name='sipgate_gold',provider="sipgate",route="gold",typ="sms")
-            j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now(), price=0.5, offer=o))
-            u.rights.append(Userright(offer=o))
-            u.jobs.append(j) 
-        
-        ret=self.__rpc2().bill(apikey) 
-        self.failUnlessEqual(ret['total'],{'price':1.3,'anz':3})
-        self.failUnlessEqual(ret['sipgate_gold'],
-                {'price':0.5,'anz':1,
-                    'info':{
-                        'a':{'price':0.5,'anz':1},
-                        }
-                    })
-
+       
 def startReactor(engine):
     """starts the Rector with a special debug Clild, so that the reactor can be stopped remotly. """
     from twisted.internet import reactor