diff -r 254c00f3cbe0 -r 31a2b1cd4981 iro/tests/testXMLRPCServer.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/tests/testXMLRPCServer.py Tue Oct 05 01:45:32 2010 +0200 @@ -0,0 +1,170 @@ +# -*- coding: utf-8 -*- + +import unittest, ConfigParser + +import threading, xmlrpclib +from multiprocessing import Queue +from multiprocessing.managers import BaseManager + +from iro import xmlrpc,anbieter +from iro.iro import MySMTP,MySmstrade,MyUserDB +from iro.job import SMSJob, FAXJob, MailJob +from iro.joblist import Joblist +from iro.providerlist import Providerlist +from iro.user import User, Admin + + +class StoppableXMLRPCServer(xmlrpc.SecureUserDBXMLRPCServer, threading.Thread): + running=False + def __init__(self, *args, **kwargs): + xmlrpc.SecureUserDBXMLRPCServer.__init__(self, *args, **kwargs) + threading.Thread.__init__(self) + self.timeout=.5 + + def start(self): + self.running=True + threading.Thread.start(self) + + + def run(self): + # *serve_forever* muss in einem eigenen Thread laufen, damit man es + # unterbrochen werden kann! + while (self.running): + self.handle_request() + + def stop(self): + if (self.running): + self.running=False + self.server_close() + self.join() + + def __del__(self): + self.stop() + + def __enter__(self): + self.start() + return self + + def __exit__(self,type,value,traceback): + self.stop() + + + +def init_server(): + userlist=[{"name":"test","password":"test", "class":User}, + {"name":"test2","password":"test2", "class": User}, + {"name":"admin","password":"admin", "class": Admin}] + + + + class MyManager(BaseManager): + pass + + MyManager.register('SMSJob', SMSJob) + MyManager.register('FAXob', FAXJob) + MyManager.register('MailJob',MailJob) + MyManager.register('Providerlist',Providerlist) + manager = MyManager() + manager.start() + + + #anbieter erzeugen und konfigurieren + sip=anbieter.sipgate() + sip.read_basic_config("iro.conf") + + localhost=MySMTP() + localhost.read_basic_config("iro.conf") + + smstrade=MySmstrade() + smstrade.read_basic_config("iro.conf") + + #Benutzerdatenbank erstellen + queue = Queue() + provider=Providerlist() + provider.add("sipgate", sip, ["sms", "fax", ]) + provider.add("smstrade", smstrade, ["sms", ]) + provider.add("geonet", None, ["sms", "fax", ]) + provider.add("fax.de", None, ["sms", "fax", ]) + provider.add("localhost", localhost, ["mail", ]) + provider.setDefault("sms","smstrade") + provider.setDefault("fax","sipgate") + provider.setDefault("mail","localhost") + jobqueue=Joblist(manager, queue, provider) + userdb=MyUserDB(userlist,jobqueue) + + + #Server starten + cp = ConfigParser.ConfigParser() + cp.read(["iro.conf"]) + cert=cp.get('server', 'cert') + key=cp.get('server', 'key') + serv = StoppableXMLRPCServer(addr=("localhost", 8000), + userdb=userdb, + certificate=cert,privatekey=key, + logRequests=False) + serv.relam="xmlrpc" + return serv + + +class TestServer(unittest.TestCase): + + def setUp(self): + self.serv = init_server() + self.serv.start() + + def tearDown(self): + self.serv.stop() + + def testLogin(self): + self.assertEqual(xmlrpclib.Server("https://test:test@localhost:8000").status(), {}) + self.assertEqual(xmlrpclib.Server("https://test2:test2@localhost:8000").status(), {}) + self.assertRaises(xmlrpclib.ProtocolError, xmlrpclib.Server("https://test2:test@localhost:8000").status) + self.assertRaises(xmlrpclib.ProtocolError,xmlrpclib.Server ("https://test:test2@localhost:8000").status) + + def testsendSMS(self): + servstr="https://test:test@localhost:8000" + client=xmlrpclib.Server(servstr) + id=client.startSMS("test",["01234", ] ) + self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} ) + + def testTwoUser(self): + u1="https://test:test@localhost:8000" + u2="https://test2:test2@localhost:8000" + admin="https://admin:admin@localhost:8000" + client1=xmlrpclib.Server(u1) + client2=xmlrpclib.Server(u2) + admin=xmlrpclib.Server(admin) + id1=client1.startSMS("test",["01234"] ) + self.assertEqual(client2.status(),{} ) + self.assertEqual(admin.status(id1),{id1: {'status': ['init',{}], 'name': 'test'}} ) + id2=client2.startSMS("test2",["01234"] ) + self.assertNotEqual(id1, id2) + self.assertEqual(client1.status(),{id1: {'status': ['init',{}], 'name': 'test'}}) + self.assertEqual(client2.status(),{id2: {'status': ['init',{}], 'name': 'test2'}}) + self.assertEqual(admin.status(),{id1: {'status': ['init',{}], 'name': 'test'}, + id2: {'status': ['init',{}], 'name': 'test2'}} ) + + self.assertEqual(client2.status(id1), {}) + self.assertEqual(client1.status(id2), {}) + + def testGetProvider(self): + servstr="https://test:test@localhost:8000" + client=xmlrpclib.Server(servstr) + self.assertEqual(client.getProvider("sms"), ["fax.de","geonet", "sipgate", "smstrade"]) + self.assertEqual(client.getProvider("fax"), ["fax.de","geonet", "sipgate"]) + self.assertEqual(client.getProvider("mail"), ["localhost"]) + + self.assertRaises(xmlrpclib.ProtocolError,client.getProvider, "temp") + + def testGetDefault(self): + servstr="https://test:test@localhost:8000" + client=xmlrpclib.Server(servstr) + self.assertEqual(client.getDefaultProvider("sms"), "smstrade") + self.assertEqual(client.getDefaultProvider("fax"),"sipgate") + self.assertEqual(client.getDefaultProvider("mail"), "localhost") + + self.assertRaises(xmlrpclib.ProtocolError,client.getDefaultProvider, "temp") + + +if __name__ == "__main__": + unittest.main()