from multiprocessing import Process
from sqlalchemy import create_engine, pool
import unittest
from tempfile import mkdtemp
import shutil
from datetime import datetime
import time
from xmlrpclib import Server as xServer, ServerProxy, Fault
from iro.controller.pool import dbPool
from iro.model.schema import User, Base, Offer, Userright, Job, Message
from iro.main import runReactor
import iro.error as IroError
from ngdatabase.mysql import Server, createConfig, Database
from .dbtestcase import DBTestCase
class SampleDatabase(Database):
def createPassword(self):
self.password="test"
return self.password
#activates all logging we can get.
from twisted.python import log
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s(%(processName)s)-%(levelname)s: %(message)s')
observer = log.PythonLoggingObserver()
observer.start()
class XMLRPCTest(DBTestCase):
"""tests for the xmlrpc interface"""
def setUp(self):
if not self.engine:
self.engine = md.engine
self.s = Process(target=startReactor, args=(md.engine,))
self.s.start()
#the new process needs time to get stated, so this process has to sleep
time.sleep(.2)
def tearDown(self):
self.__debug().stop()
time.sleep(.2)
self.s.join()
DBTestCase.tearDown(self)
def __debug(self):
return xServer('http://localhost:7080/debug')
def __rpc2(self):
return ServerProxy('http://localhost:7080/RPC2')
def testDebugHello(self):
'''simple test for the connection to xmlrpc server'''
ret=self.__debug().hello()
self.failUnlessEqual(ret,'hello')
def testListMethods(self):
'''list of all offical Methods, that can be executed'''
ret=self.__rpc2().listMethods()
self.failUnlessEqual(ret, ['listMethods', 'status', 'stop', 'sms', 'fax', 'mail', 'routes', 'defaultRoute', 'bill', 'telnumber','email'])
def testStatus(self):
''' test the status function'''
with self.session() as session:
u = User(name='test',apikey='abcdef123456789')
session.add(User(name='test',apikey='abcdef123456789'))
self.failUnlessEqual(self.__rpc2().status('abcdef123456789'), {})
with self.session() as session:
u = session.merge(u)
j = Job(hash="a1", info='info', status="started")
j.user=u
session.add(j)
status = {'a1':{"status":"started"}}
self.failUnlessEqual(self.__rpc2().status('abcdef123456789'), status)
self.failUnlessEqual(self.__rpc2().status('abcdef123456789',"a1"), status)
self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 'false'), status)
self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 0), status)
#JobNotFound
with self.assertRaises(Fault) as fault:
self.__rpc2().status('abcdef123456789',"b")
exc = fault.exception
unf = IroError.JobNotFound()
self.failUnlessEqual(exc.faultCode, unf.code)
self.failUnlessEqual(exc.faultString, unf.msg)
#self.failUnlessEqual(self.__rpc2().status('abcdef123456789','abcde', True), ["<User('test','abcdef123456789')>",'abcde', True])
#self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 'true'), ["<User('test','abcdef123456789')>", '', True])
#self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 1), ["<User('test','abcdef123456789')>", '', True])
def testNoSuchUser(self):
'''a unknown user should raise a UserNotNound Exception
bewcause xmlrpc only has a Fault exception this Exception has to be deliverd through a xmlrpclib.Fault Exception'''
with self.assertRaises(Fault) as fault:
self.__rpc2().status('abcdef123456789')
exc = fault.exception
unf=IroError.UserNotFound()
self.failUnlessEqual(exc.faultCode, unf.code)
self.failUnlessEqual(exc.faultString, unf.msg)
def testNoSuchMethod(self):
'''a unknown mothod should raise a Exception '''
with self.assertRaises(Fault) as fault:
self.__rpc2().nosuchmethod()
exc = fault.exception
self.failUnlessEqual(exc.faultCode, 8001)
self.failUnlessEqual(exc.faultString, "procedure nosuchmethod not found")
def testValidationFault(self):
'''a validate Exception should be translated to a xmlrpclib.Fault.'''
with self.assertRaises(Fault) as fault:
self.__rpc2().status('xxx')
exc = fault.exception
self.failUnlessEqual(exc.faultCode, 700)
self.failUnlessEqual(exc.faultString, "Validation of 'apikey' failed.")
def testRoutes(self):
'''test the route function'''
with self.session() as session:
u=User(name='test',apikey='abcdef123456789')
o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms")
u.rights.append(Userright(o))
session.add(u)
self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','sms'),['sipgate_basic'])
with self.assertRaises(Fault) as fault:
self.__rpc2().routes('abcdef123456789','fax')
exc = fault.exception
self.failUnlessEqual(exc.faultCode, 700)
self.failUnlessEqual(exc.faultString, "Typ is not valid.")
with self.session() as session:
o=Offer(name="sipgate_plus", provider="sipgate", route="plus", typ="sms")
u = session.query(User).filter_by(name="test").first()
u.rights.append(Userright(o))
o=Offer(name="faxde", provider="faxde", route="", typ="fax")
session.add(o)
session.commit()
self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','sms'),['sipgate_basic','sipgate_plus'])
self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','fax'),[])
with self.session() as session:
u = session.query(User).filter_by(name="test").first()
u.rights.append(Userright(o))
self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','sms'),['sipgate_basic','sipgate_plus'])
self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','fax'),['faxde'])
def testDefaultRoutes(self):
'''test the defaultRoute function'''
with self.session() as session:
u=User(name='test',apikey='abcdef123456789')
o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms")
u.rights.append(Userright(o,True))
o=Offer(name="sipgate_plus", provider="sipgate", route="plus", typ="sms")
u.rights.append(Userright(o))
session.add(u)
self.failUnlessEqual(self.__rpc2().defaultRoute('abcdef123456789','sms'),['sipgate_basic'])
def testTelnumbers(self):
'''test the telefon validator'''
self.failUnlessEqual(self.__rpc2().telnumber(["0123/456(78)","+4912346785433","00123435456-658"]),True)
invalid=['xa','+1','1-23',';:+0','0123']
with self.assertRaises(Fault) as fault:
self.__rpc2().telnumber(['01234']+invalid)
exc = fault.exception
self.failUnlessEqual(exc.faultCode, 701)
self.failUnlessEqual(exc.faultString, "No valid telnumber: '%s'" % invalid[0])
def testVaildEmail(self):
'''test vaild email adresses (got from wikipedia)'''
validmails=["niceandsimple@example.com"]
self.failUnlessEqual(self.__rpc2().email(validmails),True)
def testInvaildEmail(self):
'''test invaild email adresses (got from wikipedia)'''
invalid=["Abc.example.com",]
with self.assertRaises(Fault) as fault:
self.__rpc2().email(invalid)
exc = fault.exception
self.failUnlessEqual(exc.faultCode, 702)
self.failUnlessEqual(exc.faultString, "No valid email: '%s'" % invalid[0])
def testBill(self):
'''test bill function'''
apikey='abcdef123456789'
with self.session() as session:
u=User(name='test',apikey=apikey)
session.add(u)
self.failUnlessEqual(self.__rpc2().bill(apikey),{'total':{'price':0.0,'anz':0}})
with self.session() as session:
u = session.merge(u)
o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms")
u.rights.append(Userright(o))
j = Job(hash='a1',info='i',status='sended')
j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.4, offer=o))
u.jobs.append(j)
j = Job(hash='a2',info='a',status='sended')
j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now(), price=0.4, offer=o))
u.jobs.append(j)
ret=self.__rpc2().bill(apikey)
self.failUnlessEqual(ret['total'],{'price':0.8,'anz':2})
self.failUnlessEqual(ret['sipgate_basic'],
{'price':0.8,'anz':2,
'info':{'i':{'price':0.4,'anz':1},
'a':{'price':0.4,'anz':1},
}
})
def startReactor(engine):
"""starts the Rector with a special debug Clild, so that the reactor can be stopped remotly. """
from twisted.internet import reactor
from twisted.web import xmlrpc, resource
from iro.view.xmlrpc import appendResource
class XMLRPCDebug(xmlrpc.XMLRPC):
def xmlrpc_stop(self):
reactor.callLater(0.1,reactor.stop)
return ""
def xmlrpc_hello(self):
return "hello"
root = resource.Resource()
root = appendResource(root)
root.putChild('debug', XMLRPCDebug())
runReactor(reactor, engine, root)
class ModuleData:
def __init__(self):
self.tdir = mkdtemp(prefix='iro-mysql-')
self.server = Server('%s/my.cnf'%self.tdir)
self.db = SampleDatabase("test","test",'%s/my.cnf'%self.tdir)
self.engine = create_engine('mysql://test:test@localhost/test?unix_socket=%s/socket'%self.tdir,
poolclass = pool.SingletonThreadPool, pool_size=dbPool.maxthreads, )#echo=True)
def setUp(self):
with open('%s/my.cnf'%self.tdir,'w') as cnf:
cnf.write(createConfig(self.tdir))
self.server.create()
self.server.start()
self.db.create()
Base.metadata.create_all(self.engine)
def tearDown(self):
self.server.stop()
shutil.rmtree(self.tdir)
md=ModuleData()
def setUpModule():
md.setUp()
def tearDownModule():
md.tearDown()
if __name__ == '__main__':
unittest.main()