# -*- coding: utf-8 -*-
import unittest
import logging
from time import sleep
from multiprocessing import Queue
from multiprocessing.managers import BaseManager,ListProxy
from iro.worker import Worker
from iro.job import Job
from logging.handlers import BufferingHandler
class MyHandler(BufferingHandler):
def __init__(self,buffer=None):
'''BufferingHandler takes a "capacity" argument
so as to know when to flush. As we're overriding
shouldFlush anyway, we can set a capacity of zero.
You can call flush() manually to clear out the
buffer.
buffer: log messages to this buffer, needed f.ex for processes or threads'''
BufferingHandler.__init__(self, 0)
self.buffer=buffer
def shouldFlush(self):
return False
def emit(self, record):
if record.exc_info: #sonst geht das append schief, weil nicht picklebar
print record.exc_info
record.exc_info=record.exc_info[:-1]
self.buffer.append(record.__dict__)
class MyManager(BaseManager):
pass
MyManager.register('Job', Job)
MyManager.register('list', list, ListProxy)
class TestWorker(unittest.TestCase):
def setUp(self):
#erstelle eine Queue&Manager
self.manager = MyManager()
self.queue = Queue()
#manager starten, damit wir Logging anpassen können
self.manager.start()
self.setUpLogging()
#eigentlich Workerprocess starten
self.worker= Worker(self.queue)
self.worker.start()
def tearDown(self):
#Thread&Queue stoppen
self.stop()
#Logging änderungen rückgängig
self.tearDownLogging()
self.manager.shutdown()
def stop(self):
self.queue.close()
self.queue.join_thread()
self.worker.terminate()
def setUpLogging(self):
'''Logging so umbasteln, das wir alle logging Meldung in self.buf sind'''
#wir brauchen eine threadsichere liste
self.buffer=self.manager.list()
#Handler erstellen, der in den Buffer schreibt
self.handler = h = MyHandler(self.buffer)
self.logger =l= logging.getLogger()
#Level anpassen
l.setLevel(logging.DEBUG)
h.setLevel(logging.DEBUG)
l.addHandler(h)
def tearDownLogging(self):
'''crazy logging hacks wieder entfernen'''
self.logger.removeHandler(self.handler)
self.logger.setLevel(logging.NOTSET)
self.handler.close()
def testJob(self):
'''einen Job verarbeiten'''
job=self.manager.Job(None,None,"test")
self.assertEqual(job.getStatus(),"init")
self.queue.put(job)
sleep(.1)
self.stop()
self.assertEqual(job.getStatus(),"started")
self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
[(20,'Workerprocess läuft nun...'),
(20,'ein neuer Job(1)'),
(20,'Job(1) fertig ;)')])
if __name__ == "__main__":
unittest.main()