diff -r 254c00f3cbe0 -r 31a2b1cd4981 iro/test.py --- a/iro/test.py Mon Oct 04 04:28:28 2010 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,166 +0,0 @@ -# -*- coding: utf-8 -*- - -import unittest, ConfigParser -import iro -from job import SMSJob, FAXJob, MailJob -from joblist import Joblist -from providerlist import Providerlist -import threading, xmlrpclib -from multiprocessing import Queue -from multiprocessing.managers import BaseManager - -class StoppableXMLRPCServer(iro.SecureUserDBXMLRPCServer, threading.Thread): - running=False - def __init__(self, *args, **kwargs): - iro.SecureUserDBXMLRPCServer.__init__(self, *args, **kwargs) - threading.Thread.__init__(self) - self.timeout=.5 - - def start(self): - self.running=True - threading.Thread.start(self) - - - def run(self): - # *serve_forever* muss in einem eigenen Thread laufen, damit man es - # unterbrochen werden kann! - while (self.running): - self.handle_request() - - def stop(self): - if (self.running): - self.running=False - self.server_close() - self.join() - - def __del__(self): - self.stop() - - def __enter__(self): - self.start() - return self - - def __exit__(self,type,value,traceback): - self.stop() - - - -def init_server(): - userlist=[{"name":"test","password":"test", "class":iro.User}, - {"name":"test2","password":"test2", "class": iro.User}, - {"name":"admin","password":"admin", "class": iro.Admin}] - - - - class MyManager(BaseManager): - pass - - MyManager.register('SMSJob', SMSJob) - MyManager.register('FAXob', FAXJob) - MyManager.register('MailJob',MailJob) - MyManager.register('Providerlist',Providerlist) - manager = MyManager() - manager.start() - - - #anbieter erzeugen und konfigurieren - import anbieter - sip=anbieter.sipgate() - sip.read_basic_config("iro.conf") - - localhost=iro.MySMTP() - localhost.read_basic_config("iro.conf") - - smstrade=iro.MySmstrade() - smstrade.read_basic_config("iro.conf") - - #Benutzerdatenbank erstellen - queue = Queue() - provider=Providerlist() - provider.add("sipgate", sip, ["sms", "fax", ]) - provider.add("smstrade", smstrade, ["sms", ]) - provider.add("geonet", None, ["sms", "fax", ]) - provider.add("fax.de", None, ["sms", "fax", ]) - provider.add("localhost", localhost, ["mail", ]) - provider.setDefault("sms","smstrade") - provider.setDefault("fax","sipgate") - provider.setDefault("mail","localhost") - jobqueue=Joblist(manager, queue, provider) - userdb=iro.MyUserDB(userlist,jobqueue) - - - #Server starten - cp = ConfigParser.ConfigParser() - cp.read(["iro.conf"]) - cert=cp.get('server', 'cert') - key=cp.get('server', 'key') - serv = StoppableXMLRPCServer(addr=("localhost", 8000), - userdb=userdb, - certificate=cert,privatekey=key, - logRequests=False) - serv.relam="xmlrpc" - return serv - - -class TestServer(unittest.TestCase): - - def setUp(self): - self.serv = init_server() - self.serv.start() - - def tearDown(self): - self.serv.stop() - - def testLogin(self): - self.assertEqual(xmlrpclib.Server("https://test:test@localhost:8000").status(), {}) - self.assertEqual(xmlrpclib.Server("https://test2:test2@localhost:8000").status(), {}) - self.assertRaises(xmlrpclib.ProtocolError, xmlrpclib.Server("https://test2:test@localhost:8000").status) - self.assertRaises(xmlrpclib.ProtocolError,xmlrpclib.Server ("https://test:test2@localhost:8000").status) - - def testsendSMS(self): - servstr="https://test:test@localhost:8000" - client=xmlrpclib.Server(servstr) - id=client.startSMS("test",["01234", ] ) - self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} ) - - def testTwoUser(self): - u1="https://test:test@localhost:8000" - u2="https://test2:test2@localhost:8000" - admin="https://admin:admin@localhost:8000" - client1=xmlrpclib.Server(u1) - client2=xmlrpclib.Server(u2) - admin=xmlrpclib.Server(admin) - id1=client1.startSMS("test",["01234"] ) - self.assertEqual(client2.status(),{} ) - self.assertEqual(admin.status(id1),{id1: {'status': ['init',{}], 'name': 'test'}} ) - id2=client2.startSMS("test2",["01234"] ) - self.assertNotEqual(id1, id2) - self.assertEqual(client1.status(),{id1: {'status': ['init',{}], 'name': 'test'}}) - self.assertEqual(client2.status(),{id2: {'status': ['init',{}], 'name': 'test2'}}) - self.assertEqual(admin.status(),{id1: {'status': ['init',{}], 'name': 'test'}, - id2: {'status': ['init',{}], 'name': 'test2'}} ) - - self.assertEqual(client2.status(id1), {}) - self.assertEqual(client1.status(id2), {}) - - def testGetProvider(self): - servstr="https://test:test@localhost:8000" - client=xmlrpclib.Server(servstr) - self.assertEqual(client.getProvider("sms"), ["fax.de","geonet", "sipgate", "smstrade"]) - self.assertEqual(client.getProvider("fax"), ["fax.de","geonet", "sipgate"]) - self.assertEqual(client.getProvider("mail"), ["localhost"]) - - self.assertRaises(xmlrpclib.ProtocolError,client.getProvider, "temp") - - def testGetDefault(self): - servstr="https://test:test@localhost:8000" - client=xmlrpclib.Server(servstr) - self.assertEqual(client.getDefaultProvider("sms"), "smstrade") - self.assertEqual(client.getDefaultProvider("fax"),"sipgate") - self.assertEqual(client.getDefaultProvider("mail"), "localhost") - - self.assertRaises(xmlrpclib.ProtocolError,client.getDefaultProvider, "temp") - - -if __name__ == "__main__": - unittest.main()