iro/tests/testWorker.py
changeset 302 3f4bdea2abbf
parent 90 eb04ac3a8327
parent 301 d5ebbcccc41b
child 303 9708742ff89c
--- a/iro/tests/testWorker.py	Wed Dec 21 22:07:48 2011 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,147 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import unittest
-import logging
-from time import sleep
-
-from multiprocessing import Queue
-from multiprocessing.managers import BaseManager, ListProxy
-
-from iro.worker import Worker
-from iro.job import Job, SMSJob
-from iro.anbieter.anbieter import anbieter
-from iro.anbieter.content import SMS
-from iro.providerlist import Providerlist
-
-from logging.handlers import BufferingHandler
-
-class MyHandler(BufferingHandler):
-    def __init__(self,buffer=None):
-        '''BufferingHandler takes a "capacity" argument
-        so as to know when to flush. As we're overriding
-        shouldFlush anyway, we can set a capacity of zero.
-        You can call flush() manually to clear out the
-        buffer.
-            buffer: log messages to this buffer, needed f.ex for processes or threads'''
-        BufferingHandler.__init__(self, 0)
-        self.buffer=buffer
-
-    def shouldFlush(self):
-        return False
-
-    def emit(self, record):
-        if record.exc_info:                                                      #sonst geht das append schief, weil nicht picklebar
-            record.exc_info=record.exc_info[:-1]                                              
-        self.buffer.append(record.__dict__)
-
-
-class BadJob(Job):
-    def start(self,id=None):
-        Job.start(self,id)
-        raise Exception("Error")
-
-#einen Manager anlegen, der Job und eine Liste anbietet
-class MyManager(BaseManager):
-    pass
-MyManager.register('Job', Job) 
-MyManager.register('SMSJob', SMSJob) 
-MyManager.register('BadJob', BadJob) 
-MyManager.register('list', list, ListProxy)
-MyManager.register('Providerlist',Providerlist) 
-
-class TestWorker(unittest.TestCase):
-    def setUp(self):
-        #erstelle eine Queue&Manager
-        self.manager = MyManager()
-        self.queue = Queue()
-        
-        #manager starten, damit wir Logging anpassen können
-        self.manager.start()
-        self.setUpLogging()
-
-        self.providerlist=self.manager.Providerlist()
-        self.providerlist.add("test", anbieter() , ["sms",  ])
-        
-        #eigentlich Workerprocess starten
-        self.worker= Worker(self.queue)
-        self.worker.start()
-
-    def tearDown(self):
-        #Thread&Queue stoppen
-        self.stop()
-        
-        #Logging änderungen rückgängig
-        self.tearDownLogging()
-        self.manager.shutdown()
-    
-    def stop(self):
-        self.queue.close()
-        self.queue.join_thread()
-        self.worker.terminate()
-
-
-    def setUpLogging(self):
-        '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind'''
-        #wir brauchen eine threadsichere liste
-        self.buffer=self.manager.list()
-        
-        #Handler erstellen, der in den Buffer schreibt
-        self.handler = h = MyHandler(self.buffer)
-        self.logger =l= logging.getLogger()
-        
-        #Level anpassen 
-        l.setLevel(logging.DEBUG)
-        h.setLevel(logging.DEBUG)
-        l.addHandler(h)
-
-    def tearDownLogging(self):
-        '''crazy logging hacks wieder entfernen'''
-        self.logger.removeHandler(self.handler)
-        self.logger.setLevel(logging.NOTSET)
-        self.handler.close()
-
-
-    def testJob(self):
-        '''einen Job verarbeiten'''
-        job=self.manager.Job(None,None,"test")
-        self.assertEqual(job.getStatus(),("init",{}))
-        self.queue.put(job)
-        sleep(.1)
-        self.stop()
-        self.assertEqual(job.getStatus(),("started",{}))
-        self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
-                [(20,'Workerprocess läuft nun...'),
-                 (20,'ein neuer Job(1)'),
-                 (20,'Job(1) fertig ;)')])
-
-    def testBadJob(self):
-        '''einen Job verarbeiten, der fehlschlägt'''
-        job=self.manager.BadJob(None,None,"test")
-        self.assertEqual(job.getStatus(),("init",{}))
-        self.queue.put(job)
-        sleep(.1)
-        self.stop()
-        self.assertEqual(job.getStatus(),("error",{}))
-        self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
-                [(20,'Workerprocess läuft nun...'),
-                 (20,'ein neuer Job(1)'),
-                 (40,'Job(1) fehlgeschlagen :(')])
-        error=Exception('Error')
-        self.assertEqual(self.buffer[-1]['exc_info'][0],type(error))
-        self.assertEqual(str(self.buffer[-1]['exc_info'][1]),str(error))
-
-    def testSMSJob(self):
-        job=self.manager.SMSJob(self.providerlist, "test", "name", SMS("message"),[012345])
-        self.assertEqual(job.getStatus(),("init",{}))
-        self.queue.put(job)
-        sleep(.1)
-        self.stop()
-        self.assertEqual(job.getStatus(),("sended",{'failed': [], 'good': []}))
-        self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
-                [(20,'Workerprocess läuft nun...'),
-                 (20,'ein neuer Job(1)'),
-                 (20,'Job(1) fertig ;)')])
-
-
-if __name__ == "__main__":
-    unittest.main()