--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/iro/test.py Thu Oct 22 10:00:01 2009 +0200
@@ -0,0 +1,107 @@
+# -*- coding: utf-8 -*-
+
+import unittest
+import server
+import threading, xmlrpclib
+from multiprocessing import Queue
+from multiprocessing.managers import BaseManager
+
+class StoppableXMLRPCServer(server.SecureUserDBXMLRPCServer, threading.Thread):
+ running=True
+ def __init__(self, *args, **kwargs):
+ server.SecureUserDBXMLRPCServer.__init__(self, *args, **kwargs)
+ threading.Thread.__init__(self)
+
+
+ def run(self):
+ # *serve_forever* muss in einem eigenen Thread laufen, damit man es
+ # unterbrechen kann!
+ while (self.running):
+ self.handle_request()
+
+
+ def stop(self):
+ self.running=False
+ self.server_close()
+
+
+def init_server():
+ userlist=[{"name":"test","password":"test", "class":server.User},
+ {"name":"test2","password":"test2", "class": server.User},
+ {"name":"admin","password":"admin", "class": server.Admin}]
+
+
+
+ class MyManager(BaseManager):
+ pass
+
+ MyManager.register('MessageJob', server.MessageJob)
+ manager = MyManager()
+ manager.start()
+
+
+ #anbieter erzeugen und konfigurieren
+ import anbieter
+ sip=anbieter.sipgate()
+ sip.read_basic_config("iro.conf")
+ localhost=""
+
+ #Benutzerdatenbank erstellen
+ queue = Queue()
+ provider={"sms":sip, "fax":sip, "mail":localhost}
+ jobqueue=server.Jobs(manager, queue, provider)
+ userdb=server.MyUserDB(userlist,jobqueue)
+
+
+ #Server starten
+ serv = StoppableXMLRPCServer(addr=("localhost", 8000),
+ userdb=userdb,
+ certificate="./certs/test.cert.pem",privatekey="./certs/test.key.pem")
+ serv.relam="xmlrpc"
+ return serv
+
+
+class TestServer(unittest.TestCase):
+
+ def setUp(self):
+ self.serv = init_server()
+ self.serv.start()
+
+ def tearDown(self):
+ self.serv.stop()
+ xmlrpclib.Server("https://test:test@localhost:8000").status() #letzte nachricht abrufen, damit richt geschlossen wird
+
+ def testLogin(self):
+ self.assertEqual(xmlrpclib.Server("https://test:test@localhost:8000").status(), {})
+ self.assertEqual(xmlrpclib.Server("https://test2:test2@localhost:8000").status(), {})
+ self.assertRaises(xmlrpclib.ProtocolError, xmlrpclib.Server("https://test2:test@localhost:8000").status)
+ self.assertRaises(xmlrpclib.ProtocolError,xmlrpclib.Server ("https://test:test2@localhost:8000").status)
+
+ def testsendSMS(self):
+ servstr="https://test:test@localhost:8000"
+ client=xmlrpclib.Server(servstr)
+ id=client.startSMS("test",["01234"] )
+ self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': {'content': 'test'}}} )
+
+ def testTwoUser(self):
+ u1="https://test:test@localhost:8000"
+ u2="https://test2:test2@localhost:8000"
+ admin="https://admin:admin@localhost:8000"
+ client1=xmlrpclib.Server(u1)
+ client2=xmlrpclib.Server(u2)
+ admin=xmlrpclib.Server(admin)
+ id1=client1.startSMS("test",["01234"] )
+ self.assertEqual(client2.status(),{} )
+ self.assertEqual(admin.status(id1),{id1: {'status': ['init',{}], 'name': {'content': 'test'}}} )
+ id2=client2.startSMS("test2",["01234"] )
+ self.assertNotEqual(id1, id2)
+ self.assertEqual(client1.status(),{id1: {'status': ['init',{}], 'name': {'content': 'test'}}})
+ self.assertEqual(client2.status(),{id2: {'status': ['init',{}], 'name': {'content': 'test2'}}})
+ self.assertEqual(admin.status(),{id1: {'status': ['init',{}], 'name': {'content': 'test'}},
+ id2: {'status': ['init',{}], 'name': {'content': 'test2'}}} )
+
+ self.assertRaises(xmlrpclib.Fault, client2.status, id1)
+ self.assertRaises(xmlrpclib.Fault, client1.status, id2)
+
+if __name__ == "__main__":
+ unittest.main()