--- a/iro/tests/testWorker.py Tue Oct 05 03:24:01 2010 +0200
+++ b/iro/tests/testWorker.py Wed Oct 06 04:28:19 2010 +0200
@@ -5,63 +5,100 @@
from time import sleep
from multiprocessing import Queue
-from multiprocessing.managers import BaseManager
-import iro
-from ..worker import Worker
-from ..job import Job
+from multiprocessing.managers import BaseManager,ListProxy
+
+from iro.worker import Worker
+from iro.job import Job
from logging.handlers import BufferingHandler
class MyHandler(BufferingHandler):
- def __init__(self):
+ def __init__(self,buffer=None):
'''BufferingHandler takes a "capacity" argument
so as to know when to flush. As we're overriding
shouldFlush anyway, we can set a capacity of zero.
You can call flush() manually to clear out the
- buffer.'''
+ buffer.
+ buffer: log messages to this buffer, needed f.ex for processes or threads'''
BufferingHandler.__init__(self, 0)
+ self.buffer=buffer
def shouldFlush(self):
return False
def emit(self, record):
+ if record.exc_info: #sonst geht das append schief, weil nicht picklebar
+ print record.exc_info
+ record.exc_info=record.exc_info[:-1]
self.buffer.append(record.__dict__)
class MyManager(BaseManager):
pass
-
MyManager.register('Job', Job)
-
+MyManager.register('list', list, ListProxy)
class TestWorker(unittest.TestCase):
def setUp(self):
- self.handler = h = MyHandler()
- self.logger = l = logging.getLogger()
+ #erstelle eine Queue&Manager
+ self.manager = MyManager()
+ self.queue = Queue()
+
+ #manager starten, damit wir Logging anpassen können
+ self.manager.start()
+ self.setUpLogging()
+
+ #eigentlich Workerprocess starten
+ self.worker= Worker(self.queue)
+ self.worker.start()
+
+ def tearDown(self):
+ #Thread&Queue stoppen
+ self.stop()
+
+ #Logging änderungen rückgängig
+ self.tearDownLogging()
+ self.manager.shutdown()
+
+ def stop(self):
+ self.queue.close()
+ self.queue.join_thread()
+ self.worker.terminate()
+
+
+ def setUpLogging(self):
+ '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind'''
+ #wir brauchen eine threadsichere liste
+ self.buffer=self.manager.list()
+
+ #Handler erstellen, der in den Buffer schreibt
+ self.handler = h = MyHandler(self.buffer)
+ self.logger =l= logging.getLogger()
+
+ #Level anpassen
+ l.setLevel(logging.DEBUG)
+ h.setLevel(logging.DEBUG)
l.addHandler(h)
- self.manager = MyManager()
- self.manager.start()
- self.queue = Queue()
- self.worker=Worker(self.queue)
- self.worker.start()
- sleep(.5)
-
+ def tearDownLogging(self):
+ '''crazy logging hacks wieder entfernen'''
+ self.logger.removeHandler(self.handler)
+ self.logger.setLevel(logging.NOTSET)
+ self.handler.close()
+
- def TearDown(self):
- self.manager.stop()
- self.queue.close()
- self.worker.terminate()
- self.logger.removeHandler(self.handler)
- self.handler.close()
-
def testJob(self):
- job=Job(None,None,"test")
- self.assertEqual(job.status,"init")
+ '''einen Job verarbeiten'''
+ job=self.manager.Job(None,None,"test")
+ self.assertEqual(job.getStatus(),"init")
self.queue.put(job)
- sleep(.5)
- logging.debug("test")
- print self.handler.buffer
- self.assertEqual(job.status,"started")
+ sleep(.1)
+ self.stop()
+ self.assertEqual(job.getStatus(),"started")
+ self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
+ [(20,'Workerprocess läuft nun...'),
+ (20,'ein neuer Job(1)'),
+ (20,'Job(1) fertig ;)')])
+
if __name__ == "__main__":
unittest.main()