diff -r 000000000000 -r a3b6e531f0d2 iro/test.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/iro/test.py Thu Oct 22 10:00:01 2009 +0200 @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- + +import unittest +import server +import threading, xmlrpclib +from multiprocessing import Queue +from multiprocessing.managers import BaseManager + +class StoppableXMLRPCServer(server.SecureUserDBXMLRPCServer, threading.Thread): + running=True + def __init__(self, *args, **kwargs): + server.SecureUserDBXMLRPCServer.__init__(self, *args, **kwargs) + threading.Thread.__init__(self) + + + def run(self): + # *serve_forever* muss in einem eigenen Thread laufen, damit man es + # unterbrechen kann! + while (self.running): + self.handle_request() + + + def stop(self): + self.running=False + self.server_close() + + +def init_server(): + userlist=[{"name":"test","password":"test", "class":server.User}, + {"name":"test2","password":"test2", "class": server.User}, + {"name":"admin","password":"admin", "class": server.Admin}] + + + + class MyManager(BaseManager): + pass + + MyManager.register('MessageJob', server.MessageJob) + manager = MyManager() + manager.start() + + + #anbieter erzeugen und konfigurieren + import anbieter + sip=anbieter.sipgate() + sip.read_basic_config("iro.conf") + localhost="" + + #Benutzerdatenbank erstellen + queue = Queue() + provider={"sms":sip, "fax":sip, "mail":localhost} + jobqueue=server.Jobs(manager, queue, provider) + userdb=server.MyUserDB(userlist,jobqueue) + + + #Server starten + serv = StoppableXMLRPCServer(addr=("localhost", 8000), + userdb=userdb, + certificate="./certs/test.cert.pem",privatekey="./certs/test.key.pem") + serv.relam="xmlrpc" + return serv + + +class TestServer(unittest.TestCase): + + def setUp(self): + self.serv = init_server() + self.serv.start() + + def tearDown(self): + self.serv.stop() + xmlrpclib.Server("https://test:test@localhost:8000").status() #letzte nachricht abrufen, damit richt geschlossen wird + + def testLogin(self): + self.assertEqual(xmlrpclib.Server("https://test:test@localhost:8000").status(), {}) + self.assertEqual(xmlrpclib.Server("https://test2:test2@localhost:8000").status(), {}) + self.assertRaises(xmlrpclib.ProtocolError, xmlrpclib.Server("https://test2:test@localhost:8000").status) + self.assertRaises(xmlrpclib.ProtocolError,xmlrpclib.Server ("https://test:test2@localhost:8000").status) + + def testsendSMS(self): + servstr="https://test:test@localhost:8000" + client=xmlrpclib.Server(servstr) + id=client.startSMS("test",["01234"] ) + self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': {'content': 'test'}}} ) + + def testTwoUser(self): + u1="https://test:test@localhost:8000" + u2="https://test2:test2@localhost:8000" + admin="https://admin:admin@localhost:8000" + client1=xmlrpclib.Server(u1) + client2=xmlrpclib.Server(u2) + admin=xmlrpclib.Server(admin) + id1=client1.startSMS("test",["01234"] ) + self.assertEqual(client2.status(),{} ) + self.assertEqual(admin.status(id1),{id1: {'status': ['init',{}], 'name': {'content': 'test'}}} ) + id2=client2.startSMS("test2",["01234"] ) + self.assertNotEqual(id1, id2) + self.assertEqual(client1.status(),{id1: {'status': ['init',{}], 'name': {'content': 'test'}}}) + self.assertEqual(client2.status(),{id2: {'status': ['init',{}], 'name': {'content': 'test2'}}}) + self.assertEqual(admin.status(),{id1: {'status': ['init',{}], 'name': {'content': 'test'}}, + id2: {'status': ['init',{}], 'name': {'content': 'test2'}}} ) + + self.assertRaises(xmlrpclib.Fault, client2.status, id1) + self.assertRaises(xmlrpclib.Fault, client1.status, id2) + +if __name__ == "__main__": + unittest.main()