--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/iro/tests/testXMLRPCServer.py Tue Oct 05 01:45:32 2010 +0200
@@ -0,0 +1,170 @@
+# -*- coding: utf-8 -*-
+
+import unittest, ConfigParser
+
+import threading, xmlrpclib
+from multiprocessing import Queue
+from multiprocessing.managers import BaseManager
+
+from iro import xmlrpc,anbieter
+from iro.iro import MySMTP,MySmstrade,MyUserDB
+from iro.job import SMSJob, FAXJob, MailJob
+from iro.joblist import Joblist
+from iro.providerlist import Providerlist
+from iro.user import User, Admin
+
+
+class StoppableXMLRPCServer(xmlrpc.SecureUserDBXMLRPCServer, threading.Thread):
+ running=False
+ def __init__(self, *args, **kwargs):
+ xmlrpc.SecureUserDBXMLRPCServer.__init__(self, *args, **kwargs)
+ threading.Thread.__init__(self)
+ self.timeout=.5
+
+ def start(self):
+ self.running=True
+ threading.Thread.start(self)
+
+
+ def run(self):
+ # *serve_forever* muss in einem eigenen Thread laufen, damit man es
+ # unterbrochen werden kann!
+ while (self.running):
+ self.handle_request()
+
+ def stop(self):
+ if (self.running):
+ self.running=False
+ self.server_close()
+ self.join()
+
+ def __del__(self):
+ self.stop()
+
+ def __enter__(self):
+ self.start()
+ return self
+
+ def __exit__(self,type,value,traceback):
+ self.stop()
+
+
+
+def init_server():
+ userlist=[{"name":"test","password":"test", "class":User},
+ {"name":"test2","password":"test2", "class": User},
+ {"name":"admin","password":"admin", "class": Admin}]
+
+
+
+ class MyManager(BaseManager):
+ pass
+
+ MyManager.register('SMSJob', SMSJob)
+ MyManager.register('FAXob', FAXJob)
+ MyManager.register('MailJob',MailJob)
+ MyManager.register('Providerlist',Providerlist)
+ manager = MyManager()
+ manager.start()
+
+
+ #anbieter erzeugen und konfigurieren
+ sip=anbieter.sipgate()
+ sip.read_basic_config("iro.conf")
+
+ localhost=MySMTP()
+ localhost.read_basic_config("iro.conf")
+
+ smstrade=MySmstrade()
+ smstrade.read_basic_config("iro.conf")
+
+ #Benutzerdatenbank erstellen
+ queue = Queue()
+ provider=Providerlist()
+ provider.add("sipgate", sip, ["sms", "fax", ])
+ provider.add("smstrade", smstrade, ["sms", ])
+ provider.add("geonet", None, ["sms", "fax", ])
+ provider.add("fax.de", None, ["sms", "fax", ])
+ provider.add("localhost", localhost, ["mail", ])
+ provider.setDefault("sms","smstrade")
+ provider.setDefault("fax","sipgate")
+ provider.setDefault("mail","localhost")
+ jobqueue=Joblist(manager, queue, provider)
+ userdb=MyUserDB(userlist,jobqueue)
+
+
+ #Server starten
+ cp = ConfigParser.ConfigParser()
+ cp.read(["iro.conf"])
+ cert=cp.get('server', 'cert')
+ key=cp.get('server', 'key')
+ serv = StoppableXMLRPCServer(addr=("localhost", 8000),
+ userdb=userdb,
+ certificate=cert,privatekey=key,
+ logRequests=False)
+ serv.relam="xmlrpc"
+ return serv
+
+
+class TestServer(unittest.TestCase):
+
+ def setUp(self):
+ self.serv = init_server()
+ self.serv.start()
+
+ def tearDown(self):
+ self.serv.stop()
+
+ def testLogin(self):
+ self.assertEqual(xmlrpclib.Server("https://test:test@localhost:8000").status(), {})
+ self.assertEqual(xmlrpclib.Server("https://test2:test2@localhost:8000").status(), {})
+ self.assertRaises(xmlrpclib.ProtocolError, xmlrpclib.Server("https://test2:test@localhost:8000").status)
+ self.assertRaises(xmlrpclib.ProtocolError,xmlrpclib.Server ("https://test:test2@localhost:8000").status)
+
+ def testsendSMS(self):
+ servstr="https://test:test@localhost:8000"
+ client=xmlrpclib.Server(servstr)
+ id=client.startSMS("test",["01234", ] )
+ self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': 'test'}} )
+
+ def testTwoUser(self):
+ u1="https://test:test@localhost:8000"
+ u2="https://test2:test2@localhost:8000"
+ admin="https://admin:admin@localhost:8000"
+ client1=xmlrpclib.Server(u1)
+ client2=xmlrpclib.Server(u2)
+ admin=xmlrpclib.Server(admin)
+ id1=client1.startSMS("test",["01234"] )
+ self.assertEqual(client2.status(),{} )
+ self.assertEqual(admin.status(id1),{id1: {'status': ['init',{}], 'name': 'test'}} )
+ id2=client2.startSMS("test2",["01234"] )
+ self.assertNotEqual(id1, id2)
+ self.assertEqual(client1.status(),{id1: {'status': ['init',{}], 'name': 'test'}})
+ self.assertEqual(client2.status(),{id2: {'status': ['init',{}], 'name': 'test2'}})
+ self.assertEqual(admin.status(),{id1: {'status': ['init',{}], 'name': 'test'},
+ id2: {'status': ['init',{}], 'name': 'test2'}} )
+
+ self.assertEqual(client2.status(id1), {})
+ self.assertEqual(client1.status(id2), {})
+
+ def testGetProvider(self):
+ servstr="https://test:test@localhost:8000"
+ client=xmlrpclib.Server(servstr)
+ self.assertEqual(client.getProvider("sms"), ["fax.de","geonet", "sipgate", "smstrade"])
+ self.assertEqual(client.getProvider("fax"), ["fax.de","geonet", "sipgate"])
+ self.assertEqual(client.getProvider("mail"), ["localhost"])
+
+ self.assertRaises(xmlrpclib.ProtocolError,client.getProvider, "temp")
+
+ def testGetDefault(self):
+ servstr="https://test:test@localhost:8000"
+ client=xmlrpclib.Server(servstr)
+ self.assertEqual(client.getDefaultProvider("sms"), "smstrade")
+ self.assertEqual(client.getDefaultProvider("fax"),"sipgate")
+ self.assertEqual(client.getDefaultProvider("mail"), "localhost")
+
+ self.assertRaises(xmlrpclib.ProtocolError,client.getDefaultProvider, "temp")
+
+
+if __name__ == "__main__":
+ unittest.main()