iro/test.py
changeset 45 4bde195af39f
parent 44 e20909e61588
parent 43 72c472c87460
child 47 a0eac136eb20
equal deleted inserted replaced
44:e20909e61588 45:4bde195af39f
     1 # -*- coding: utf-8 -*-
       
     2 
       
     3 import unittest, ConfigParser
       
     4 import iro
       
     5 from job import SMSJob, FAXJob, MailJob
       
     6 from joblist import Joblist
       
     7 from providerlist import Providerlist
       
     8 import threading, xmlrpclib
       
     9 from multiprocessing import Queue
       
    10 from multiprocessing.managers import BaseManager
       
    11 
       
    12 class StoppableXMLRPCServer(iro.SecureUserDBXMLRPCServer, threading.Thread):
       
    13     running=True
       
    14     def __init__(self, *args, **kwargs):
       
    15         iro.SecureUserDBXMLRPCServer.__init__(self, *args, **kwargs)
       
    16         threading.Thread.__init__(self)
       
    17    
       
    18    
       
    19     def run(self):
       
    20         # *serve_forever* muss in einem eigenen Thread laufen, damit man es
       
    21         # unterbrechen kann!
       
    22         while (self.running):
       
    23             self.handle_request()
       
    24    
       
    25    
       
    26     def stop(self):
       
    27         self.running=False
       
    28         self.server_close()
       
    29 
       
    30 
       
    31 def init_server():
       
    32     userlist=[{"name":"test","password":"test",  "class":iro.User},
       
    33               {"name":"test2","password":"test2", "class": iro.User},
       
    34               {"name":"admin","password":"admin", "class": iro.Admin}]
       
    35 
       
    36 
       
    37     
       
    38     class MyManager(BaseManager):
       
    39         pass
       
    40     
       
    41     MyManager.register('SMSJob', SMSJob) 
       
    42     MyManager.register('FAXob', FAXJob) 
       
    43     MyManager.register('MailJob',MailJob) 
       
    44     MyManager.register('Providerlist',Providerlist) 
       
    45     manager = MyManager()
       
    46     manager.start()
       
    47     
       
    48     
       
    49     #anbieter erzeugen und konfigurieren
       
    50     import anbieter
       
    51     sip=anbieter.sipgate()
       
    52     sip.read_basic_config("iro.conf")
       
    53     
       
    54     localhost=iro.MySMTP()
       
    55     localhost.read_basic_config("iro.conf")
       
    56 
       
    57     smstrade=iro.MySmstrade()
       
    58     smstrade.read_basic_config("iro.conf")
       
    59 
       
    60     #Benutzerdatenbank erstellen
       
    61     queue = Queue()
       
    62     provider=Providerlist()
       
    63     provider.add("sipgate", sip, ["sms", "fax", ])
       
    64     provider.add("smstrade", smstrade, ["sms", ])
       
    65     provider.add("geonet", None, ["sms", "fax", ])
       
    66     provider.add("fax.de", None, ["sms", "fax", ])
       
    67     provider.add("localhost", localhost, ["mail", ])
       
    68     provider.setDefault("sms","smstrade")
       
    69     provider.setDefault("fax","sipgate")
       
    70     provider.setDefault("mail","localhost")
       
    71     jobqueue=Joblist(manager,  queue, provider)
       
    72     userdb=iro.MyUserDB(userlist,jobqueue)
       
    73 
       
    74 
       
    75     #Server starten
       
    76     cp = ConfigParser.ConfigParser()
       
    77     cp.read(["iro.conf"])
       
    78     cert=cp.get('server', 'cert')
       
    79     key=cp.get('server', 'key')
       
    80     serv = StoppableXMLRPCServer(addr=("localhost", 8000), 
       
    81                                       userdb=userdb,
       
    82                                       certificate=cert,privatekey=key)
       
    83     serv.relam="xmlrpc"
       
    84     return serv
       
    85  
       
    86     
       
    87 class TestServer(unittest.TestCase):
       
    88     
       
    89     def setUp(self):
       
    90       self.serv = init_server()
       
    91       self.serv.start()
       
    92 
       
    93     def tearDown(self):
       
    94       self.serv.stop()
       
    95       xmlrpclib.Server("https://test:test@localhost:8000").status()			#letzte nachricht abrufen, damit richt geschlossen wird
       
    96 
       
    97     def testLogin(self):
       
    98         self.assertEqual(xmlrpclib.Server("https://test:test@localhost:8000").status(), {})
       
    99         self.assertEqual(xmlrpclib.Server("https://test2:test2@localhost:8000").status(), {})
       
   100         self.assertRaises(xmlrpclib.ProtocolError, xmlrpclib.Server("https://test2:test@localhost:8000").status)
       
   101         self.assertRaises(xmlrpclib.ProtocolError,xmlrpclib.Server ("https://test:test2@localhost:8000").status)
       
   102     
       
   103     def testsendSMS(self):
       
   104         servstr="https://test:test@localhost:8000"
       
   105         client=xmlrpclib.Server(servstr)
       
   106         id=client.startSMS("test",["01234", ] )
       
   107         self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name':  'test'}} )
       
   108         
       
   109     def testTwoUser(self):
       
   110         u1="https://test:test@localhost:8000"
       
   111         u2="https://test2:test2@localhost:8000"
       
   112         admin="https://admin:admin@localhost:8000"
       
   113         client1=xmlrpclib.Server(u1)
       
   114         client2=xmlrpclib.Server(u2)
       
   115         admin=xmlrpclib.Server(admin)
       
   116         id1=client1.startSMS("test",["01234"] )
       
   117         self.assertEqual(client2.status(),{} ) 
       
   118         self.assertEqual(admin.status(id1),{id1: {'status': ['init',{}], 'name':  'test'}} )
       
   119         id2=client2.startSMS("test2",["01234"] )
       
   120         self.assertNotEqual(id1, id2)
       
   121         self.assertEqual(client1.status(),{id1: {'status': ['init',{}], 'name':  'test'}})
       
   122         self.assertEqual(client2.status(),{id2: {'status': ['init',{}], 'name':  'test2'}})
       
   123         self.assertEqual(admin.status(),{id1: {'status': ['init',{}], 'name':   'test'},
       
   124                         id2: {'status': ['init',{}], 'name':   'test2'}} )
       
   125         
       
   126         self.assertEqual(client2.status(id1), {})
       
   127         self.assertEqual(client1.status(id2), {})
       
   128         
       
   129     def testGetProvider(self):
       
   130         servstr="https://test:test@localhost:8000"
       
   131         client=xmlrpclib.Server(servstr)       
       
   132         self.assertEqual(client.getProvider("sms"), ["fax.de","geonet", "sipgate", "smstrade"])
       
   133         self.assertEqual(client.getProvider("fax"), ["fax.de","geonet", "sipgate"])
       
   134         self.assertEqual(client.getProvider("mail"), ["localhost"])
       
   135         
       
   136         self.assertRaises(xmlrpclib.Fault,client.getProvider, "temp")
       
   137     
       
   138     def testGetDefault(self):
       
   139         servstr="https://test:test@localhost:8000"
       
   140         client=xmlrpclib.Server(servstr)       
       
   141         self.assertEqual(client.getDefaultProvider("sms"), "smstrade")
       
   142         self.assertEqual(client.getDefaultProvider("fax"),"sipgate")
       
   143         self.assertEqual(client.getDefaultProvider("mail"), "localhost")       
       
   144         
       
   145         self.assertRaises(xmlrpclib.Fault,client.getDefaultProvider, "temp")        
       
   146 
       
   147     
       
   148 if __name__ == "__main__":
       
   149     unittest.main()