iro/test.py
author Sandro Knauß <knauss@netzguerilla.net>
Thu, 22 Oct 2009 10:00:01 +0200
changeset 0 a3b6e531f0d2
child 5 af2f45da3192
permissions -rw-r--r--
[svn r93] creating iro package branch

# -*- coding: utf-8 -*-

import unittest
import server
import threading, xmlrpclib
from multiprocessing import Queue
from multiprocessing.managers import BaseManager

class StoppableXMLRPCServer(server.SecureUserDBXMLRPCServer, threading.Thread):
    running=True
    def __init__(self, *args, **kwargs):
        server.SecureUserDBXMLRPCServer.__init__(self, *args, **kwargs)
        threading.Thread.__init__(self)
   
   
    def run(self):
        # *serve_forever* muss in einem eigenen Thread laufen, damit man es
        # unterbrechen kann!
        while (self.running):
            self.handle_request()
   
   
    def stop(self):
        self.running=False
        self.server_close()


def init_server():
    userlist=[{"name":"test","password":"test",  "class":server.User},
              {"name":"test2","password":"test2", "class": server.User},
	       {"name":"admin","password":"admin", "class": server.Admin}]


    
    class MyManager(BaseManager):
        pass
    
    MyManager.register('MessageJob', server.MessageJob) 
    manager = MyManager()
    manager.start()
    
    
    #anbieter erzeugen und konfigurieren
    import anbieter
    sip=anbieter.sipgate()
    sip.read_basic_config("iro.conf")
    localhost=""

    #Benutzerdatenbank erstellen
    queue = Queue()
    provider={"sms":sip, "fax":sip, "mail":localhost}
    jobqueue=server.Jobs(manager,  queue, provider)
    userdb=server.MyUserDB(userlist,jobqueue)


    #Server starten
    serv = StoppableXMLRPCServer(addr=("localhost", 8000), 
                                      userdb=userdb,
                                      certificate="./certs/test.cert.pem",privatekey="./certs/test.key.pem")
    serv.relam="xmlrpc"
    return serv
 
    
class TestServer(unittest.TestCase):
    
    def setUp(self):
      self.serv = init_server()
      self.serv.start()

    def tearDown(self):
      self.serv.stop()
      xmlrpclib.Server("https://test:test@localhost:8000").status()			#letzte nachricht abrufen, damit richt geschlossen wird

    def testLogin(self):
        self.assertEqual(xmlrpclib.Server("https://test:test@localhost:8000").status(), {})
        self.assertEqual(xmlrpclib.Server("https://test2:test2@localhost:8000").status(), {})
        self.assertRaises(xmlrpclib.ProtocolError, xmlrpclib.Server("https://test2:test@localhost:8000").status)
        self.assertRaises(xmlrpclib.ProtocolError,xmlrpclib.Server ("https://test:test2@localhost:8000").status)
    
    def testsendSMS(self):
	servstr="https://test:test@localhost:8000"
	client=xmlrpclib.Server(servstr)
	id=client.startSMS("test",["01234"] )
        self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name':  {'content': 'test'}}} )
        
    def testTwoUser(self):
 	u1="https://test:test@localhost:8000"
	u2="https://test2:test2@localhost:8000"
	admin="https://admin:admin@localhost:8000"
	client1=xmlrpclib.Server(u1)
	client2=xmlrpclib.Server(u2)
	admin=xmlrpclib.Server(admin)
	id1=client1.startSMS("test",["01234"] )
	self.assertEqual(client2.status(),{} ) 
	self.assertEqual(admin.status(id1),{id1: {'status': ['init',{}], 'name':  {'content': 'test'}}} )
	id2=client2.startSMS("test2",["01234"] )
	self.assertNotEqual(id1, id2)
	self.assertEqual(client1.status(),{id1: {'status': ['init',{}], 'name':  {'content': 'test'}}})
	self.assertEqual(client2.status(),{id2: {'status': ['init',{}], 'name':   {'content': 'test2'}}})
	self.assertEqual(admin.status(),{id1: {'status': ['init',{}], 'name':  {'content': 'test'}},
					id2: {'status': ['init',{}], 'name':   {'content': 'test2'}}} )
	
	self.assertRaises(xmlrpclib.Fault, client2.status, id1)
        self.assertRaises(xmlrpclib.Fault, client1.status, id2)
    
if __name__ == "__main__":
    unittest.main()