# -*- coding: utf-8 -*-
import unittest
import logging
from time import sleep
from multiprocessing import Queue
from multiprocessing.managers import BaseManager, ListProxy
from iro.worker import Worker
from iro.job import Job, SMSJob
from iro.anbieter.anbieter import anbieter
from iro.anbieter.content import SMS
from iro.providerlist import Providerlist
from logging.handlers import BufferingHandler
class MyHandler(BufferingHandler):
def __init__(self,buffer=None):
'''BufferingHandler takes a "capacity" argument
so as to know when to flush. As we're overriding
shouldFlush anyway, we can set a capacity of zero.
You can call flush() manually to clear out the
buffer.
buffer: log messages to this buffer, needed f.ex for processes or threads'''
BufferingHandler.__init__(self, 0)
self.buffer=buffer
def shouldFlush(self):
return False
def emit(self, record):
if record.exc_info: #sonst geht das append schief, weil nicht picklebar
record.exc_info=record.exc_info[:-1]
self.buffer.append(record.__dict__)
class BadJob(Job):
def start(self,id=None):
Job.start(self,id)
raise Exception("Error")
#einen Manager anlegen, der Job und eine Liste anbietet
class MyManager(BaseManager):
pass
MyManager.register('Job', Job)
MyManager.register('SMSJob', SMSJob)
MyManager.register('BadJob', BadJob)
MyManager.register('list', list, ListProxy)
MyManager.register('Providerlist',Providerlist)
class TestWorker(unittest.TestCase):
def setUp(self):
#erstelle eine Queue&Manager
self.manager = MyManager()
self.queue = Queue()
#manager starten, damit wir Logging anpassen können
self.manager.start()
self.setUpLogging()
self.providerlist=self.manager.Providerlist()
self.providerlist.add("test", anbieter() , ["sms", ])
#eigentlich Workerprocess starten
self.worker= Worker(self.queue)
self.worker.start()
def tearDown(self):
#Thread&Queue stoppen
self.stop()
#Logging änderungen rückgängig
self.tearDownLogging()
self.manager.shutdown()
def stop(self):
self.queue.close()
self.queue.join_thread()
self.worker.terminate()
def setUpLogging(self):
'''Logging so umbasteln, das wir alle logging Meldung in self.buf sind'''
#wir brauchen eine threadsichere liste
self.buffer=self.manager.list()
#Handler erstellen, der in den Buffer schreibt
self.handler = h = MyHandler(self.buffer)
self.logger =l= logging.getLogger()
#Level anpassen
l.setLevel(logging.DEBUG)
h.setLevel(logging.DEBUG)
l.addHandler(h)
def tearDownLogging(self):
'''crazy logging hacks wieder entfernen'''
self.logger.removeHandler(self.handler)
self.logger.setLevel(logging.NOTSET)
self.handler.close()
def testJob(self):
'''einen Job verarbeiten'''
job=self.manager.Job(None,None,"test")
self.assertEqual(job.getStatus(),("init",{}))
self.queue.put(job)
sleep(.1)
self.stop()
self.assertEqual(job.getStatus(),("started",{}))
self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
[(20,'Workerprocess läuft nun...'),
(20,'ein neuer Job(1)'),
(20,'Job(1) fertig ;)')])
def testBadJob(self):
'''einen Job verarbeiten, der fehlschlägt'''
job=self.manager.BadJob(None,None,"test")
self.assertEqual(job.getStatus(),("init",{}))
self.queue.put(job)
sleep(.1)
self.stop()
self.assertEqual(job.getStatus(),("error",{}))
self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
[(20,'Workerprocess läuft nun...'),
(20,'ein neuer Job(1)'),
(40,'Job(1) fehlgeschlagen :(')])
error=Exception('Error')
self.assertEqual(self.buffer[-1]['exc_info'][0],type(error))
self.assertEqual(str(self.buffer[-1]['exc_info'][1]),str(error))
def testSMSJob(self):
job=self.manager.SMSJob(self.providerlist, "test", "name", SMS("message"),[012345])
self.assertEqual(job.getStatus(),("init",{}))
self.queue.put(job)
sleep(.1)
self.stop()
self.assertEqual(job.getStatus(),("sended",{'failed': [], 'good': []}))
self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
[(20,'Workerprocess läuft nun...'),
(20,'ein neuer Job(1)'),
(20,'Job(1) fertig ;)')])
if __name__ == "__main__":
unittest.main()