iro/test.py
changeset 0 a3b6e531f0d2
child 5 af2f45da3192
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/iro/test.py	Thu Oct 22 10:00:01 2009 +0200
@@ -0,0 +1,107 @@
+# -*- coding: utf-8 -*-
+
+import unittest
+import server
+import threading, xmlrpclib
+from multiprocessing import Queue
+from multiprocessing.managers import BaseManager
+
+class StoppableXMLRPCServer(server.SecureUserDBXMLRPCServer, threading.Thread):
+    running=True
+    def __init__(self, *args, **kwargs):
+        server.SecureUserDBXMLRPCServer.__init__(self, *args, **kwargs)
+        threading.Thread.__init__(self)
+   
+   
+    def run(self):
+        # *serve_forever* muss in einem eigenen Thread laufen, damit man es
+        # unterbrechen kann!
+        while (self.running):
+            self.handle_request()
+   
+   
+    def stop(self):
+        self.running=False
+        self.server_close()
+
+
+def init_server():
+    userlist=[{"name":"test","password":"test",  "class":server.User},
+              {"name":"test2","password":"test2", "class": server.User},
+	       {"name":"admin","password":"admin", "class": server.Admin}]
+
+
+    
+    class MyManager(BaseManager):
+        pass
+    
+    MyManager.register('MessageJob', server.MessageJob) 
+    manager = MyManager()
+    manager.start()
+    
+    
+    #anbieter erzeugen und konfigurieren
+    import anbieter
+    sip=anbieter.sipgate()
+    sip.read_basic_config("iro.conf")
+    localhost=""
+
+    #Benutzerdatenbank erstellen
+    queue = Queue()
+    provider={"sms":sip, "fax":sip, "mail":localhost}
+    jobqueue=server.Jobs(manager,  queue, provider)
+    userdb=server.MyUserDB(userlist,jobqueue)
+
+
+    #Server starten
+    serv = StoppableXMLRPCServer(addr=("localhost", 8000), 
+                                      userdb=userdb,
+                                      certificate="./certs/test.cert.pem",privatekey="./certs/test.key.pem")
+    serv.relam="xmlrpc"
+    return serv
+ 
+    
+class TestServer(unittest.TestCase):
+    
+    def setUp(self):
+      self.serv = init_server()
+      self.serv.start()
+
+    def tearDown(self):
+      self.serv.stop()
+      xmlrpclib.Server("https://test:test@localhost:8000").status()			#letzte nachricht abrufen, damit richt geschlossen wird
+
+    def testLogin(self):
+        self.assertEqual(xmlrpclib.Server("https://test:test@localhost:8000").status(), {})
+        self.assertEqual(xmlrpclib.Server("https://test2:test2@localhost:8000").status(), {})
+        self.assertRaises(xmlrpclib.ProtocolError, xmlrpclib.Server("https://test2:test@localhost:8000").status)
+        self.assertRaises(xmlrpclib.ProtocolError,xmlrpclib.Server ("https://test:test2@localhost:8000").status)
+    
+    def testsendSMS(self):
+	servstr="https://test:test@localhost:8000"
+	client=xmlrpclib.Server(servstr)
+	id=client.startSMS("test",["01234"] )
+        self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name':  {'content': 'test'}}} )
+        
+    def testTwoUser(self):
+ 	u1="https://test:test@localhost:8000"
+	u2="https://test2:test2@localhost:8000"
+	admin="https://admin:admin@localhost:8000"
+	client1=xmlrpclib.Server(u1)
+	client2=xmlrpclib.Server(u2)
+	admin=xmlrpclib.Server(admin)
+	id1=client1.startSMS("test",["01234"] )
+	self.assertEqual(client2.status(),{} ) 
+	self.assertEqual(admin.status(id1),{id1: {'status': ['init',{}], 'name':  {'content': 'test'}}} )
+	id2=client2.startSMS("test2",["01234"] )
+	self.assertNotEqual(id1, id2)
+	self.assertEqual(client1.status(),{id1: {'status': ['init',{}], 'name':  {'content': 'test'}}})
+	self.assertEqual(client2.status(),{id2: {'status': ['init',{}], 'name':   {'content': 'test2'}}})
+	self.assertEqual(admin.status(),{id1: {'status': ['init',{}], 'name':  {'content': 'test'}},
+					id2: {'status': ['init',{}], 'name':   {'content': 'test2'}}} )
+	
+	self.assertRaises(xmlrpclib.Fault, client2.status, id1)
+        self.assertRaises(xmlrpclib.Fault, client1.status, id2)
+    
+if __name__ == "__main__":
+    unittest.main()