tests/old/testWorker.py
branchdevel
changeset 97 7556364b8104
parent 81 fea4c6760ca5
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/old/testWorker.py	Tue Jan 10 06:10:38 2012 +0100
@@ -0,0 +1,147 @@
+# -*- coding: utf-8 -*-
+
+import unittest
+import logging
+from time import sleep
+
+from multiprocessing import Queue
+from multiprocessing.managers import BaseManager, ListProxy
+
+from iro.worker import Worker
+from iro.job import Job, SMSJob
+from iro.anbieter.anbieter import anbieter
+from iro.anbieter.content import SMS
+from iro.providerlist import Providerlist
+
+from logging.handlers import BufferingHandler
+
+class MyHandler(BufferingHandler):
+    def __init__(self,buffer=None):
+        '''BufferingHandler takes a "capacity" argument
+        so as to know when to flush. As we're overriding
+        shouldFlush anyway, we can set a capacity of zero.
+        You can call flush() manually to clear out the
+        buffer.
+            buffer: log messages to this buffer, needed f.ex for processes or threads'''
+        BufferingHandler.__init__(self, 0)
+        self.buffer=buffer
+
+    def shouldFlush(self):
+        return False
+
+    def emit(self, record):
+        if record.exc_info:                                                      #sonst geht das append schief, weil nicht picklebar
+            record.exc_info=record.exc_info[:-1]                                              
+        self.buffer.append(record.__dict__)
+
+
+class BadJob(Job):
+    def start(self,id=None):
+        Job.start(self,id)
+        raise Exception("Error")
+
+#einen Manager anlegen, der Job und eine Liste anbietet
+class MyManager(BaseManager):
+    pass
+MyManager.register('Job', Job) 
+MyManager.register('SMSJob', SMSJob) 
+MyManager.register('BadJob', BadJob) 
+MyManager.register('list', list, ListProxy)
+MyManager.register('Providerlist',Providerlist) 
+
+class TestWorker(unittest.TestCase):
+    def setUp(self):
+        #erstelle eine Queue&Manager
+        self.manager = MyManager()
+        self.queue = Queue()
+        
+        #manager starten, damit wir Logging anpassen können
+        self.manager.start()
+        self.setUpLogging()
+
+        self.providerlist=self.manager.Providerlist()
+        self.providerlist.add("test", anbieter() , ["sms",  ])
+        
+        #eigentlich Workerprocess starten
+        self.worker= Worker(self.queue)
+        self.worker.start()
+
+    def tearDown(self):
+        #Thread&Queue stoppen
+        self.stop()
+        
+        #Logging änderungen rückgängig
+        self.tearDownLogging()
+        self.manager.shutdown()
+    
+    def stop(self):
+        self.queue.close()
+        self.queue.join_thread()
+        self.worker.terminate()
+
+
+    def setUpLogging(self):
+        '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind'''
+        #wir brauchen eine threadsichere liste
+        self.buffer=self.manager.list()
+        
+        #Handler erstellen, der in den Buffer schreibt
+        self.handler = h = MyHandler(self.buffer)
+        self.logger =l= logging.getLogger()
+        
+        #Level anpassen 
+        l.setLevel(logging.DEBUG)
+        h.setLevel(logging.DEBUG)
+        l.addHandler(h)
+
+    def tearDownLogging(self):
+        '''crazy logging hacks wieder entfernen'''
+        self.logger.removeHandler(self.handler)
+        self.logger.setLevel(logging.NOTSET)
+        self.handler.close()
+
+
+    def testJob(self):
+        '''einen Job verarbeiten'''
+        job=self.manager.Job(None,None,"test")
+        self.assertEqual(job.getStatus(),("init",{}))
+        self.queue.put(job)
+        sleep(.1)
+        self.stop()
+        self.assertEqual(job.getStatus(),("started",{}))
+        self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
+                [(20,'Workerprocess läuft nun...'),
+                 (20,'ein neuer Job(1)'),
+                 (20,'Job(1) fertig ;)')])
+
+    def testBadJob(self):
+        '''einen Job verarbeiten, der fehlschlägt'''
+        job=self.manager.BadJob(None,None,"test")
+        self.assertEqual(job.getStatus(),("init",{}))
+        self.queue.put(job)
+        sleep(.1)
+        self.stop()
+        self.assertEqual(job.getStatus(),("error",{}))
+        self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
+                [(20,'Workerprocess läuft nun...'),
+                 (20,'ein neuer Job(1)'),
+                 (40,'Job(1) fehlgeschlagen :(')])
+        error=Exception('Error')
+        self.assertEqual(self.buffer[-1]['exc_info'][0],type(error))
+        self.assertEqual(str(self.buffer[-1]['exc_info'][1]),str(error))
+
+    def testSMSJob(self):
+        job=self.manager.SMSJob(self.providerlist, "test", "name", SMS("message"),[012345])
+        self.assertEqual(job.getStatus(),("init",{}))
+        self.queue.put(job)
+        sleep(.1)
+        self.stop()
+        self.assertEqual(job.getStatus(),("sended",{'failed': [], 'good': []}))
+        self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
+                [(20,'Workerprocess läuft nun...'),
+                 (20,'ein neuer Job(1)'),
+                 (20,'Job(1) fertig ;)')])
+
+
+if __name__ == "__main__":
+    unittest.main()