tests/xmlrpc.py
author Sandro Knauß <knauss@netzguerilla.net>
Mon, 30 Jan 2012 21:36:12 +0100
branchdevel
changeset 128 1a3ebdd3bdaf
parent 127 79966b937274
child 130 05e599aa83c3
permissions -rw-r--r--
telnumber test in own file

from multiprocessing import Process
from sqlalchemy import create_engine, pool
import unittest

from tempfile import mkdtemp
import shutil

from datetime import datetime

import time

from xmlrpclib import Server as xServer, ServerProxy, Fault

from iro.model.utils import WithSession
from iro.model import POOL_SIZE as DB_POOL_SIZE

from iro.model.schema import User, Base, Offer, Userright, Job, Message
import iro.model.schema as schema

from iro.main import runReactor

import iro.error as IroError

from ngdatabase.mysql import Server, createConfig, Database

class SampleDatabase(Database):
    def createPassword(self):
        self.password="test"
        return self.password


#activates all logging we can get.

from twisted.python import log
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s(%(processName)s)-%(levelname)s: %(message)s')
observer = log.PythonLoggingObserver()
observer.start()

class XMLRPCTest(unittest.TestCase):
    """tests for the xmlrpc interface"""
    def setUp(self):
        self.s = Process(target=startReactor, args=(md.engine,))
        self.s.start()
        #the new process needs time to get stated, so this process has to sleep
        time.sleep(.2)

    def tearDown(self):
        self.__debug().stop()
        time.sleep(.2)
        self.s.join()
        self.__cleanDB()

    def __cleanDB(self):
        with WithSession(md.engine, autocommit=True) as session:
            for table in schema.__tables__:
                session.query(getattr(schema,table)).delete()

    def __debug(self):
        return xServer('http://localhost:7080/debug')

    def __rpc2(self):
        return ServerProxy('http://localhost:7080/RPC2')
    
    def testDebugHello(self):
        '''simple test for the connection to xmlrpc server'''
        ret=self.__debug().hello()
        self.failUnlessEqual(ret,'hello')

    def testListMethods(self):
        '''list of all offical Methods, that can be executed'''
        ret=self.__rpc2().listMethods()
        self.failUnlessEqual(ret, ['listMethods', 'status', 'stop', 'sms', 'fax', 'mail', 'routes', 'defaultRoute', 'bill', 'telnumber','email'])

    def testStatus(self):
        ''' test the status function'''
        with WithSession(md.engine, autocommit=True) as session:
            u = User(name='test',apikey='abcdef123456789')
            session.add(User(name='test',apikey='abcdef123456789'))
        self.failUnlessEqual(self.__rpc2().status('abcdef123456789'), {})
        
        with WithSession(md.engine, autocommit=True) as session:
            u = session.merge(u)
            j = Job(hash="a1", info='info', status="started")
            j.user=u
            session.add(j)
       
        status = {'a1':{"status":"started"}}
        self.failUnlessEqual(self.__rpc2().status('abcdef123456789'), status)
        self.failUnlessEqual(self.__rpc2().status('abcdef123456789',"a1"), status)
        self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 'false'), status)
        self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 0), status)

        #JobNotFound
        with self.assertRaises(Fault) as fault:
            self.__rpc2().status('abcdef123456789',"b")
        exc = fault.exception
        unf = IroError.JobNotFound()
        self.failUnlessEqual(exc.faultCode, unf.code)
        self.failUnlessEqual(exc.faultString, unf.msg)

        #self.failUnlessEqual(self.__rpc2().status('abcdef123456789','abcde', True), ["<User('test','abcdef123456789')>",'abcde', True])
        #self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 'true'), ["<User('test','abcdef123456789')>", '', True])
        #self.failUnlessEqual(self.__rpc2().status('abcdef123456789', '', 1), ["<User('test','abcdef123456789')>", '', True])
    
    def testNoSuchUser(self):
        '''a unknown user should raise a UserNotNound Exception
        bewcause xmlrpc only has a Fault exception this Exception has to be deliverd through a xmlrpclib.Fault Exception'''
        with self.assertRaises(Fault) as fault:
            self.__rpc2().status('abcdef123456789')
        exc = fault.exception
        unf=IroError.UserNotFound()
        self.failUnlessEqual(exc.faultCode, unf.code)
        self.failUnlessEqual(exc.faultString, unf.msg)

    def testNoSuchMethod(self):
        '''a unknown mothod should raise a Exception '''
        with self.assertRaises(Fault) as fault:
            self.__rpc2().nosuchmethod()
        exc = fault.exception
        self.failUnlessEqual(exc.faultCode, 8001)
        self.failUnlessEqual(exc.faultString, "procedure nosuchmethod not found")
    
    def testValidationFault(self):
        '''a validate Exception should be translated to a xmlrpclib.Fault.'''
        with self.assertRaises(Fault) as fault:
            self.__rpc2().status('xxx')
        exc = fault.exception
        self.failUnlessEqual(exc.faultCode, 700)
        self.failUnlessEqual(exc.faultString, "Validation of 'apikey' failed.")

    def testRoutes(self):
        '''test the route function'''
        with WithSession(md.engine, autocommit=True) as session:
            u=User(name='test',apikey='abcdef123456789')
            o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms")
            u.rights.append(Userright(o)) 
            session.add(u)
        self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','sms'),['sipgate_basic'])

        with self.assertRaises(Fault) as fault:
            self.__rpc2().routes('abcdef123456789','fax')
        exc = fault.exception
        self.failUnlessEqual(exc.faultCode, 700)
        self.failUnlessEqual(exc.faultString, "Typ is not valid.")
        
        with WithSession(md.engine, autocommit=True) as session:
            o=Offer(name="sipgate_plus", provider="sipgate", route="plus", typ="sms")
            u = session.query(User).filter_by(name="test").first()
            u.rights.append(Userright(o)) 
            o=Offer(name="faxde", provider="faxde", route="", typ="fax")
            session.add(o)
            session.commit()
        self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','sms'),['sipgate_basic','sipgate_plus'])
        self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','fax'),[])

        with WithSession(md.engine, autocommit=True) as session:
            u = session.query(User).filter_by(name="test").first()
            u.rights.append(Userright(o)) 
        
        self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','sms'),['sipgate_basic','sipgate_plus'])
        self.failUnlessEqual(self.__rpc2().routes('abcdef123456789','fax'),['faxde'])

    def testDefaultRoutes(self):
        '''test the defaultRoute function'''
        with WithSession(md.engine, autocommit=True) as session:
            u=User(name='test',apikey='abcdef123456789')
            o=Offer(name="sipgate_basic", provider="sipgate", route="basic", typ="sms")
            u.rights.append(Userright(o,True)) 
            o=Offer(name="sipgate_plus", provider="sipgate", route="plus", typ="sms")
            u.rights.append(Userright(o))
            session.add(u)
        self.failUnlessEqual(self.__rpc2().defaultRoute('abcdef123456789','sms'),['sipgate_basic'])

    def testTelnumbers(self):
        '''test the telefon validator'''
        self.failUnlessEqual(self.__rpc2().telnumber(["0123/456(78)","+4912346785433","00123435456-658"]),True)

        invalid=['xa','+1','1-23',';:+0','0123']

        with self.assertRaises(Fault) as fault:
            self.__rpc2().telnumber(['01234']+invalid)
        exc = fault.exception
        self.failUnlessEqual(exc.faultCode, 701)
        self.failUnlessEqual(exc.faultString, "No valid telnumber: '%s'"%invalid[0])


    def testVaildEmail(self):
        '''test vaild email adresses (got from wikipedia)'''
        validmails=["niceandsimple@example.com",
                "simplewith+symbol@example.com",
                'a.little.unusual@example.com',
                'a.little.more.unusual@dept.example.com',
                '@[10.10.10.10]',
                '@[1.1.1.1]',
                '@[200.100.100.100]',
                'user@[2001:db8:1ff::a0b:dbd0]',
                '"much.more\ unusual"@example.com',
                '"very.unusual.@.unusual.com"@example.com',
                '"very.(),:;<>[]\\".VERY.\\"very@\\\ \\"very\\".unusual"@strange.example.com',
                "!#$%&'*+-/=?^_`{}|~@example.org",
                '"()<>[]:;@,\\\"!#$%&\'*+-/=?^_`{}| ~  ? ^_`{}|~."@example.org',
                '""@example.org']
        
        for num in validmails:
            self.failUnlessEqual(self.__rpc2().email([num]),True)
    
    def testInvaildEmail(self):
        '''test invaild email adresses (got from wikipedia)'''
        invalid=["Abc.example.com",  # (an @ character must separate the local and domain parts)
                "Abc.@example.com",          # (character dot(.) is last in local part)
                "Abc..123@example.com",      # (character dot(.) is double)
                "A@b@c@example.com",         # (only one @ is allowed outside quotation marks)
                'a"b(c)d,e:f;g<h>i[j\k]l@example.com',  # (none of the special characters in this local part is allowed outside quotation marks)
                'just"not"right@example.com',           # (quoted strings must be dot separated, or the only element making up the local-part)
                'thisis."notallowed@example.com',      # (spaces, quotes, and backslashes may only exist when within quoted strings and preceded by a slash)
                'this\\ still\\"not\\allowed@example.com',   # (even if escaped (preceded by a backslash), spaces, quotes, and backslashes must still be contained by quotes)
                ]

        for number in invalid:
            with self.assertRaises(Fault) as fault:
                self.__rpc2().email([number])
            exc = fault.exception
            self.failUnlessEqual(exc.faultCode, 702)
            self.failUnlessEqual(exc.faultString, "No valid email: '%s'"%number)
    
    def testBill(self):
        '''test bill function'''
        apikey='abcdef123456789'
        with WithSession(md.engine, autocommit=True) as session:
            u=User(name='test',apikey=apikey)
            session.add(u)
        self.failUnlessEqual(self.__rpc2().bill(apikey),{'total':{'price':0.0,'anz':0}})
        
        with WithSession(md.engine, autocommit=True) as session:
            u = session.merge(u)
            o = Offer(name='sipgate_basic',provider="sipgate",route="basic",typ="sms")
            j = Job(hash='a1',info='i',status='sended')
            m = Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.30, offer=o, job=j)
            u.rights.append(Userright(o))
            u.jobs.append(j)
            session.add(m)
            
        self.failUnlessEqual(self.__rpc2().bill(apikey),{'total':{'price':0.3,'anz':1},
            'sipgate_basic':{'price':0.3,'anz':1,'info':{'i':{'price':0.3,'anz':1}}}})
    
        with WithSession(md.engine, autocommit=True) as session:
            j = session.merge(j)
            j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now() , price=0.4, offer=o))
 
        self.failUnlessEqual(self.__rpc2().bill(apikey),{'total':{'price':0.7,'anz':2},
            'sipgate_basic':{'price':0.7,'anz':2,'info':{'i':{'price':0.7,'anz':2}}}})
        
        with WithSession(md.engine, autocommit=True) as session:
            m = session.merge(m)
            m.isBilled=True
        
        self.failUnlessEqual(self.__rpc2().bill(apikey),{'total':{'price':0.4,'anz':1},
            'sipgate_basic':{'price':0.4,'anz':1,'info':{'i':{'price':0.4,'anz':1}}}})

        with WithSession(md.engine, autocommit=True) as session:
            u = session.merge(u)
            j = Job(hash='a2',info='a',status='sended')
            j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now(), price=0.4, offer=o))
            u.jobs.append(j) 
        ret=self.__rpc2().bill(apikey) 
        self.failUnlessEqual(ret['total'],{'price':0.8,'anz':2})
        self.failUnlessEqual(ret['sipgate_basic'],
                {'price':0.8,'anz':2,
                    'info':{'i':{'price':0.4,'anz':1},
                        'a':{'price':0.4,'anz':1},
                        }
                    })
        
        with WithSession(md.engine, autocommit=True) as session:
            u = session.merge(u)
            j = Job(hash='a3',info='a',status='sended')
            o = Offer(name='sipgate_gold',provider="sipgate",route="gold",typ="sms")
            j.messages.append(Message(recipient='0123456789', isBilled=False, date=datetime.now(), price=0.5, offer=o))
            u.rights.append(Userright(offer=o))
            u.jobs.append(j) 
        
        ret=self.__rpc2().bill(apikey) 
        self.failUnlessEqual(ret['total'],{'price':1.3,'anz':3})
        self.failUnlessEqual(ret['sipgate_gold'],
                {'price':0.5,'anz':1,
                    'info':{
                        'a':{'price':0.5,'anz':1},
                        }
                    })

def startReactor(engine):
    """starts the Rector with a special debug Clild, so that the reactor can be stopped remotly. """
    from twisted.internet import reactor
    from twisted.web import xmlrpc, resource
    
    from iro.view.xmlrpc import appendResource
    
    class XMLRPCDebug(xmlrpc.XMLRPC): 
        def xmlrpc_stop(self):
            reactor.callLater(0.1,reactor.stop)
            return ""

        def xmlrpc_hello(self):
            return "hello"

    root = resource.Resource()
    root = appendResource(root)
    root.putChild('debug', XMLRPCDebug())
    runReactor(reactor, engine, root)


class ModuleData:
    def __init__(self):
        self.tdir = mkdtemp(prefix='iro-mysql-')
        self.server = Server('%s/my.cnf'%self.tdir)
        self.db = SampleDatabase("test","test",'%s/my.cnf'%self.tdir)
        self.engine = create_engine('mysql://test:test@localhost/test?unix_socket=%s/socket'%self.tdir,
                poolclass = pool.SingletonThreadPool,  pool_size=DB_POOL_SIZE, )#echo=True)

    def setUp(self):
        with open('%s/my.cnf'%self.tdir,'w') as cnf:
            cnf.write(createConfig(self.tdir))
        self.server.create()
        self.server.start()
        self.db.create()
        Base.metadata.create_all(self.engine)
    
    def tearDown(self):
        self.server.stop()
        shutil.rmtree(self.tdir)
 

md=ModuleData()

def setUpModule():
    md.setUp()

def tearDownModule():
    md.tearDown()

    
if __name__ == '__main__':
        unittest.main()