removing old tests devel
authorSandro Knauß <knauss@netzguerilla.net>
Sun, 18 Mar 2012 14:07:20 +0100
branchdevel
changeset 232 3f0d2a060e5d
parent 231 3929338fd17f
child 233 34435357dc8a
removing old tests
tests/old/__init__.py
tests/old/dump_test_log.py
tests/old/stopableServer.py
tests/old/testJob.py
tests/old/testWorker.py
tests/old/testXMLRPCServer.py
tests/old/testloglock.py
--- a/tests/old/dump_test_log.py	Sun Mar 18 14:06:27 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,42 +0,0 @@
-import time, os, signal
-LOG_FILE = 'test.log'
-
-log_file = open(LOG_FILE, 'a')
-
-def log(msg):
-    log_file.write(msg + '\n')
-    log_file.flush()
-
-def SigUSR1Handler(signum, frame):
-    print "Reacting on USR1 signal (signal 10)"
-    global log_file
-    log_file.close()
-    log_file = open(LOG_FILE, 'a')
-    return
-
-def init():
-    if os.path.isfile('/var/usr/dump_test_log.pid'):
-        print 'Have to stop server first'
-        os.exit(1)
-    else:
-        print 'Starting server...'
-        #write process id file
-        f = open('/var/run/dump_test_log.pid', 'w')
-        f.write(str(os.getpid()))
-        f.flush()
-        f.close()
-        print 'Process start with pid ', os.getpid()
-
-    signal.signal(signal.SIGUSR1, SigUSR1Handler)
-
-def main():
-    init()
-    count = 1
-    while True:
-      log('log line #%d, pid: %d' % (count, os.getpid()))
-    count = count + 1
-    time.sleep(1)
-
-if __name__ == '__main__':
-    main()
-
--- a/tests/old/stopableServer.py	Sun Mar 18 14:06:27 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,118 +0,0 @@
-import ConfigParser
-
-import threading
-
-from multiprocessing import Queue
-from multiprocessing.managers import BaseManager
-
-from iro import xmlrpc,anbieter
-from iro.user import User, Admin
-from iro.iro import MySMTP,MySmstrade,MyUserDB
-from iro.job import SMSJob, FAXJob, MailJob
-from iro.joblist import Joblist
-from iro.providerlist import Providerlist
-
-class StoppableXMLRPCServer(xmlrpc.SecureUserDBXMLRPCServer, threading.Thread):
-    running=False
-    def __init__(self, *args, **kwargs):
-        xmlrpc.SecureUserDBXMLRPCServer.__init__(self, *args, **kwargs)
-        threading.Thread.__init__(self)
-        self.timeout=.5
-   
-    def start(self):
-        self.running=True
-        threading.Thread.start(self)
- 
-   
-    def run(self):
-        # *serve_forever* muss in einem eigenen Thread laufen, damit man es
-        # unterbrochen werden kann!
-        while (self.running):
-            try:
-                self.handle_request()
-            except :
-                break
-   
-    def stop(self):
-        if (self.running):
-            self.running=False
-            self.server_close()
-            self.join()
-
-    def __del__(self):
-        self.stop()
- 
-    def __enter__(self):
-        self.start()
-        return self
-
-    def __exit__(self,type,value,traceback):
-        self.stop()
-
-
-class Internal:
-    pass
-
-def init_server():
-    userlist=[{"name":"test","password":"test",  "class":User},
-              {"name":"test2","password":"test2", "class": User},
-              {"name":"admin","password":"admin", "class": Admin}]
-
-
-    
-    class MyManager(BaseManager):
-        pass
-
-    internal=Internal()
-    
-    MyManager.register('SMSJob', SMSJob) 
-    MyManager.register('FaxJob', FAXJob) 
-    MyManager.register('MailJob',MailJob) 
-    MyManager.register('Providerlist',Providerlist) 
-    manager = MyManager()
-    manager.start()
-    
-    internal.manager=manager
-    
-    #anbieter erzeugen und konfigurieren
-    sip=anbieter.sipgate()
-    sip.read_basic_config("iro.conf")
-    
-    localhost=MySMTP()
-    localhost.read_basic_config("iro.conf")
-
-    smstrade=MySmstrade()
-    smstrade.read_basic_config("iro.conf")
-
-    #Benutzerdatenbank erstellen
-    queue = Queue()
-    internal.queue=queue
-    provider=Providerlist()
-    internal.provider=provider
-    provider.add("sipgate", sip, ["sms", "fax", ])
-    provider.add("smstrade", smstrade, ["sms", ])
-    provider.add("geonet", None, ["sms", "fax", ])
-    provider.add("fax.de", None, ["sms", "fax", ])
-    provider.add("localhost", localhost, ["mail", ])
-    provider.setDefault("sms","smstrade")
-    provider.setDefault("fax","sipgate")
-    provider.setDefault("mail","localhost")
-    jobqueue=Joblist(manager,  queue, provider)
-    internal.jobqueue=jobqueue
-    userdb=MyUserDB(userlist,jobqueue)
-    internal.userdb=userdb
-
-
-    #Server starten
-    cp = ConfigParser.ConfigParser()
-    cp.read(["iro.conf"])
-    cert=cp.get('server', 'cert')
-    key=cp.get('server', 'key')
-    serv = StoppableXMLRPCServer(addr=("localhost", 8000), 
-                                      userdb=userdb,
-                                      certificate=cert,privatekey=key,
-                                      logRequests=False)
-    serv.relam="xmlrpc"
-    internal.serv=serv
-    return internal
-
--- a/tests/old/testJob.py	Sun Mar 18 14:06:27 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,82 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import unittest
-
-import xmlrpclib
-from stopableServer import init_server
-from iro.anbieter.content import SMS,FAX,Mail
-
-class TestServer(unittest.TestCase):
-    
-    def setUp(self):
-        self.i = init_server()
-        self.serv=self.i.serv
-        self.serv.start()
-
-    def tearDown(self):
-        self.serv.stop()
-
-   
-    def SendSMS(self,msg):
-        servstr="https://test:test@localhost:8000"
-        client=xmlrpclib.Server(servstr)
-        id=client.startSMS(msg,["01234", ] )
-        self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name':  unicode(msg)}} )
-        ele=self.i.queue.get(.1)
-        self.assertEqual(ele.getRecipients(),["01234", ] )
-        self.assertNotEqual(ele.getMessage(),SMS('') )
-        self.assertEqual(ele.getMessage(),SMS(msg) )
-    
-    def testSimpleSMS(self):
-        self.SendSMS("test")
-    
-    def testSpecialCharacters(self):
-        self.SendSMS(u"!\"§$%&/()=?\'")
-        self.SendSMS(u"@ł€ł€¶ŧł¼¼½¬¬↓ŧ←ĸ↓→øđŋħ“”µ·…–|")
-
-    def testSendFAX(self):
-        servstr="https://test:test@localhost:8000"
-        client=xmlrpclib.Server(servstr)
-        msg="2134wergsdfg4w56q34134æſðđæðſđŋ³@¼ŧæðđŋł€¶ŧ€¶ŧ"
-        id=client.startFAX("test",xmlrpclib.Binary(msg),["01234", ] )
-        self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name':  'test'}} )
-        ele=self.i.queue.get(.1)
-        self.assertEqual(ele.getRecipients(),["01234", ] )
-        self.assertEqual(ele.getMessage(),FAX('test','',[msg]))
-
-    def testDoubleFAX(self):
-        servstr="https://test:test@localhost:8000"
-        client=xmlrpclib.Server(servstr)
-        msg="2134wergsdfg4w56q34134æſðđæðſđŋ³@¼ŧæðđŋł€¶ŧ€¶ŧ"
-        pdf=open('tests/test.pdf').read()
-        id=client.startFAX("test",[xmlrpclib.Binary(msg),xmlrpclib.Binary(pdf)],["01234", ] )
-        self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name':  'test'}} )
-        ele=self.i.queue.get(.1)
-        self.assertEqual(ele.getRecipients(),["01234", ] )
-        self.assertEqual(ele.getMessage(),FAX('test','',[msg, pdf]))
-
-    def testSendMail(self):
-        servstr="https://test:test@localhost:8000"
-        client=xmlrpclib.Server(servstr)
-        msg=u"2134wergsdfg4w56q34134æſðđæðſđŋ³@¼ŧæðđŋł€¶ŧ€¶ŧ"
-        id=client.startMail("test",msg,["test@test.de", ],'absender@test.de' )
-        self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name':  'test'}} )
-        ele=self.i.queue.get(.1)
-        self.assertEqual(ele.getRecipients(),["test@test.de", ] )
-        self.assertEqual(ele.getMessage(),Mail('test',msg,'absender@test.de'))
-        self.assertEqual(ele.getMessage().as_string(),"""Content-Type: text/plain; charset="utf-8"
-MIME-Version: 1.0
-Content-Transfer-Encoding: base64
-Subject: =?utf-8?q?test?=
-
-MjEzNHdlcmdzZGZnNHc1NnEzNDEzNMOmxb/DsMSRw6bDsMW/xJHFi8KzQMK8xafDpsOwxJHFi8WC
-4oKswrbFp+KCrMK2xac=
-""")
-        sub=u"³¼½ſðđŋſ€¼½ÖÄÜß"
-        id=client.startMail(sub,msg,["test@test.de", ],'absender@test.de' )
-        self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name': sub}})
-        ele=self.i.queue.get(.1)
-        self.assertEqual(ele.getMessage(),Mail(sub, msg, 'absender@test.de'))
-
-if __name__ == "__main__":
-    unittest.main()  
--- a/tests/old/testWorker.py	Sun Mar 18 14:06:27 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,147 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import unittest
-import logging
-from time import sleep
-
-from multiprocessing import Queue
-from multiprocessing.managers import BaseManager, ListProxy
-
-from iro.worker import Worker
-from iro.job import Job, SMSJob
-from iro.anbieter.anbieter import anbieter
-from iro.anbieter.content import SMS
-from iro.providerlist import Providerlist
-
-from logging.handlers import BufferingHandler
-
-class MyHandler(BufferingHandler):
-    def __init__(self,buffer=None):
-        '''BufferingHandler takes a "capacity" argument
-        so as to know when to flush. As we're overriding
-        shouldFlush anyway, we can set a capacity of zero.
-        You can call flush() manually to clear out the
-        buffer.
-            buffer: log messages to this buffer, needed f.ex for processes or threads'''
-        BufferingHandler.__init__(self, 0)
-        self.buffer=buffer
-
-    def shouldFlush(self):
-        return False
-
-    def emit(self, record):
-        if record.exc_info:                                                      #sonst geht das append schief, weil nicht picklebar
-            record.exc_info=record.exc_info[:-1]                                              
-        self.buffer.append(record.__dict__)
-
-
-class BadJob(Job):
-    def start(self,id=None):
-        Job.start(self,id)
-        raise Exception("Error")
-
-#einen Manager anlegen, der Job und eine Liste anbietet
-class MyManager(BaseManager):
-    pass
-MyManager.register('Job', Job) 
-MyManager.register('SMSJob', SMSJob) 
-MyManager.register('BadJob', BadJob) 
-MyManager.register('list', list, ListProxy)
-MyManager.register('Providerlist',Providerlist) 
-
-class TestWorker(unittest.TestCase):
-    def setUp(self):
-        #erstelle eine Queue&Manager
-        self.manager = MyManager()
-        self.queue = Queue()
-        
-        #manager starten, damit wir Logging anpassen können
-        self.manager.start()
-        self.setUpLogging()
-
-        self.providerlist=self.manager.Providerlist()
-        self.providerlist.add("test", anbieter() , ["sms",  ])
-        
-        #eigentlich Workerprocess starten
-        self.worker= Worker(self.queue)
-        self.worker.start()
-
-    def tearDown(self):
-        #Thread&Queue stoppen
-        self.stop()
-        
-        #Logging änderungen rückgängig
-        self.tearDownLogging()
-        self.manager.shutdown()
-    
-    def stop(self):
-        self.queue.close()
-        self.queue.join_thread()
-        self.worker.terminate()
-
-
-    def setUpLogging(self):
-        '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind'''
-        #wir brauchen eine threadsichere liste
-        self.buffer=self.manager.list()
-        
-        #Handler erstellen, der in den Buffer schreibt
-        self.handler = h = MyHandler(self.buffer)
-        self.logger =l= logging.getLogger()
-        
-        #Level anpassen 
-        l.setLevel(logging.DEBUG)
-        h.setLevel(logging.DEBUG)
-        l.addHandler(h)
-
-    def tearDownLogging(self):
-        '''crazy logging hacks wieder entfernen'''
-        self.logger.removeHandler(self.handler)
-        self.logger.setLevel(logging.NOTSET)
-        self.handler.close()
-
-
-    def testJob(self):
-        '''einen Job verarbeiten'''
-        job=self.manager.Job(None,None,"test")
-        self.assertEqual(job.getStatus(),("init",{}))
-        self.queue.put(job)
-        sleep(.1)
-        self.stop()
-        self.assertEqual(job.getStatus(),("started",{}))
-        self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
-                [(20,'Workerprocess läuft nun...'),
-                 (20,'ein neuer Job(1)'),
-                 (20,'Job(1) fertig ;)')])
-
-    def testBadJob(self):
-        '''einen Job verarbeiten, der fehlschlägt'''
-        job=self.manager.BadJob(None,None,"test")
-        self.assertEqual(job.getStatus(),("init",{}))
-        self.queue.put(job)
-        sleep(.1)
-        self.stop()
-        self.assertEqual(job.getStatus(),("error",{}))
-        self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
-                [(20,'Workerprocess läuft nun...'),
-                 (20,'ein neuer Job(1)'),
-                 (40,'Job(1) fehlgeschlagen :(')])
-        error=Exception('Error')
-        self.assertEqual(self.buffer[-1]['exc_info'][0],type(error))
-        self.assertEqual(str(self.buffer[-1]['exc_info'][1]),str(error))
-
-    def testSMSJob(self):
-        job=self.manager.SMSJob(self.providerlist, "test", "name", SMS("message"),[012345])
-        self.assertEqual(job.getStatus(),("init",{}))
-        self.queue.put(job)
-        sleep(.1)
-        self.stop()
-        self.assertEqual(job.getStatus(),("sended",{'failed': [], 'good': []}))
-        self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
-                [(20,'Workerprocess läuft nun...'),
-                 (20,'ein neuer Job(1)'),
-                 (20,'Job(1) fertig ;)')])
-
-
-if __name__ == "__main__":
-    unittest.main()  
--- a/tests/old/testXMLRPCServer.py	Sun Mar 18 14:06:27 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,71 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import unittest
-
-import xmlrpclib
-from stopableServer import init_server
-
-class TestServer(unittest.TestCase):
-    
-    def setUp(self):
-        self.i = init_server()
-        self.serv=self.i.serv
-        
-        self.serv.start()
-
-    def tearDown(self):
-        self.serv.stop()
-
-    def testLogin(self):
-        self.assertEqual(xmlrpclib.Server("https://test:test@localhost:8000").status(), {})
-        self.assertEqual(xmlrpclib.Server("https://test2:test2@localhost:8000").status(), {})
-        self.assertRaises(xmlrpclib.ProtocolError, xmlrpclib.Server("https://test2:test@localhost:8000").status)
-        self.assertRaises(xmlrpclib.ProtocolError,xmlrpclib.Server ("https://test:test2@localhost:8000").status)
-    
-    def testsendSMS(self):
-        servstr="https://test:test@localhost:8000"
-        client=xmlrpclib.Server(servstr)
-        id=client.startSMS("test",["01234", ] )
-        self.assertEqual(client.status(id),{id: {'status': ['init',{}], 'name':  'test'}} )
-        
-    def testTwoUser(self):
-        u1="https://test:test@localhost:8000"
-        u2="https://test2:test2@localhost:8000"
-        admin="https://admin:admin@localhost:8000"
-        client1=xmlrpclib.Server(u1)
-        client2=xmlrpclib.Server(u2)
-        admin=xmlrpclib.Server(admin)
-        id1=client1.startSMS("test",["01234"] )
-        self.assertEqual(client2.status(),{} ) 
-        self.assertEqual(admin.status(id1),{id1: {'status': ['init', {}], 'name':  'test'}} )
-        id2=client2.startSMS("test2",["01234"] )
-        self.assertNotEqual(id1, id2)
-        self.assertEqual(client1.status(),{id1: {'status': ['init', {}], 'name':  'test'}})
-        self.assertEqual(client2.status(),{id2: {'status': ['init', {}], 'name':  'test2'}})
-        self.assertEqual(admin.status(),{id1: {'status': ['init', {}], 'name':   'test'},
-                        id2: {'status': ['init', {}], 'name':   'test2'}} )
-        
-        self.assertEqual(client2.status(id1), {})
-        self.assertEqual(client1.status(id2), {})
-        
-    def testGetProvider(self):
-        servstr="https://test:test@localhost:8000"
-        client=xmlrpclib.Server(servstr)       
-        self.assertEqual(client.getProvider("sms"), ["fax.de","geonet", "sipgate", "smstrade"])
-        self.assertEqual(client.getProvider("fax"), ["fax.de","geonet", "sipgate"])
-        self.assertEqual(client.getProvider("mail"), ["localhost"])
-        
-        self.assertRaises(xmlrpclib.ProtocolError,client.getProvider, "temp")
-    
-    def testGetDefault(self):
-        servstr="https://test:test@localhost:8000"
-        client=xmlrpclib.Server(servstr)       
-        self.assertEqual(client.getDefaultProvider("sms"), "smstrade")
-        self.assertEqual(client.getDefaultProvider("fax"),"sipgate")
-        self.assertEqual(client.getDefaultProvider("mail"), "localhost")       
-        
-        self.assertRaises(xmlrpclib.ProtocolError,client.getDefaultProvider, "temp")        
-
-    
-if __name__ == "__main__":
-    unittest.main()  
--- a/tests/old/testloglock.py	Sun Mar 18 14:06:27 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,98 +0,0 @@
-
-import unittest
-import os
-import tempfile
-import signal
-import threading
-import time
-
-class testLogLock(unittest.TestCase):
-    def test_ThreadingAndLocks(self):
-        #create a thread, have that thread grab a lock, and sleep.
-        fd, file = tempfile.mkstemp('.nonlog')
-        _lock = threading.RLock()
-        os.close(fd)
-        def locker():
-            os.write(fd, 'Thread acquiring lock\n')
-            _lock.acquire()
-            os.write(fd, 'Thread acquired lock\n')
-            time.sleep(0.4)
-            os.write(fd, 'Thread releasing lock\n')
-            _lock.release()
-            os.write(fd, 'Thread released lock\n')
-
-        #Then in the main thread, throw a signal to self
-        def handleSignal(sigNum, frame):
-            os.write(fd, 'Main Thread acquiring lock\n')
-            lock = False
-            endtime = time.time() + 1.0
-            while not lock:
-                lock = _lock.acquire(blocking=0)
-                time.sleep(0.01)
-                if time.time() > endtime:
-                    break
-            if not lock:
-                os.write(fd, 'Main Thread could not acquire lock\n')
-                return
-            os.write(fd, 'Main Thread acquired lock\n')
-            os.write(fd, 'Main Thread releasing lock\n')
-            _lock.release()
-            os.write(fd, 'Main Thread released lock\n')
-
-        sighndlr = signal.signal(signal.SIGUSR1, handleSignal)
-        try:
-            fd = os.open(file, os.O_SYNC | os.O_WRONLY | os.O_CREAT)
-            thread = threading.Thread(target=locker)
-            thread.start()
-            time.sleep(0.1)
-            os.kill(os.getpid(), signal.SIGUSR1)
-            thread.join()
-
-            #check the results
-            os.close(fd)
-            fileconts = open(file, 'r').read()
-            self.assertTrue('Main Thread released lock' in fileconts)
-            self.assertEqual(fileconts,
-                '''Thread acquiring lock
-Thread acquired lock
-Main Thread acquiring lock
-Thread releasing lock
-Thread released lock
-Main Thread acquired lock
-Main Thread releasing lock
-Main Thread released lock
-''')
-
-            #Now try after acquiring the lock from the main thread
-            fd = os.open(file, os.O_SYNC | os.O_WRONLY | os.O_CREAT)
-            _lock.acquire()
-            thread = threading.Thread(target=locker)
-            thread.start()
-            time.sleep(0.1)
-            os.kill(os.getpid(), signal.SIGUSR1)
-            _lock.release()
-            thread.join()
-            os.close(fd)
-            fileconts = open(file, 'r').read()
-            self.assertEqual(fileconts,
-                '''Thread acquiring lock
-Main Thread acquiring lock
-Main Thread acquired lock
-Main Thread releasing lock
-Main Thread released lock
-Thread acquired lock
-Thread releasing lock
-Thread released lock
-''')
-
-        finally:
-            signal.signal(signal.SIGUSR1, sighndlr)
-            try:
-                os.close(fd)
-            except OSError:
-                pass
-            try:
-                os.unlink(file)
-            except OSError:
-                pass
-