--- a/tests/old/testWorker.py Sun Mar 18 14:06:27 2012 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,147 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import unittest
-import logging
-from time import sleep
-
-from multiprocessing import Queue
-from multiprocessing.managers import BaseManager, ListProxy
-
-from iro.worker import Worker
-from iro.job import Job, SMSJob
-from iro.anbieter.anbieter import anbieter
-from iro.anbieter.content import SMS
-from iro.providerlist import Providerlist
-
-from logging.handlers import BufferingHandler
-
-class MyHandler(BufferingHandler):
- def __init__(self,buffer=None):
- '''BufferingHandler takes a "capacity" argument
- so as to know when to flush. As we're overriding
- shouldFlush anyway, we can set a capacity of zero.
- You can call flush() manually to clear out the
- buffer.
- buffer: log messages to this buffer, needed f.ex for processes or threads'''
- BufferingHandler.__init__(self, 0)
- self.buffer=buffer
-
- def shouldFlush(self):
- return False
-
- def emit(self, record):
- if record.exc_info: #sonst geht das append schief, weil nicht picklebar
- record.exc_info=record.exc_info[:-1]
- self.buffer.append(record.__dict__)
-
-
-class BadJob(Job):
- def start(self,id=None):
- Job.start(self,id)
- raise Exception("Error")
-
-#einen Manager anlegen, der Job und eine Liste anbietet
-class MyManager(BaseManager):
- pass
-MyManager.register('Job', Job)
-MyManager.register('SMSJob', SMSJob)
-MyManager.register('BadJob', BadJob)
-MyManager.register('list', list, ListProxy)
-MyManager.register('Providerlist',Providerlist)
-
-class TestWorker(unittest.TestCase):
- def setUp(self):
- #erstelle eine Queue&Manager
- self.manager = MyManager()
- self.queue = Queue()
-
- #manager starten, damit wir Logging anpassen können
- self.manager.start()
- self.setUpLogging()
-
- self.providerlist=self.manager.Providerlist()
- self.providerlist.add("test", anbieter() , ["sms", ])
-
- #eigentlich Workerprocess starten
- self.worker= Worker(self.queue)
- self.worker.start()
-
- def tearDown(self):
- #Thread&Queue stoppen
- self.stop()
-
- #Logging änderungen rückgängig
- self.tearDownLogging()
- self.manager.shutdown()
-
- def stop(self):
- self.queue.close()
- self.queue.join_thread()
- self.worker.terminate()
-
-
- def setUpLogging(self):
- '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind'''
- #wir brauchen eine threadsichere liste
- self.buffer=self.manager.list()
-
- #Handler erstellen, der in den Buffer schreibt
- self.handler = h = MyHandler(self.buffer)
- self.logger =l= logging.getLogger()
-
- #Level anpassen
- l.setLevel(logging.DEBUG)
- h.setLevel(logging.DEBUG)
- l.addHandler(h)
-
- def tearDownLogging(self):
- '''crazy logging hacks wieder entfernen'''
- self.logger.removeHandler(self.handler)
- self.logger.setLevel(logging.NOTSET)
- self.handler.close()
-
-
- def testJob(self):
- '''einen Job verarbeiten'''
- job=self.manager.Job(None,None,"test")
- self.assertEqual(job.getStatus(),("init",{}))
- self.queue.put(job)
- sleep(.1)
- self.stop()
- self.assertEqual(job.getStatus(),("started",{}))
- self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
- [(20,'Workerprocess läuft nun...'),
- (20,'ein neuer Job(1)'),
- (20,'Job(1) fertig ;)')])
-
- def testBadJob(self):
- '''einen Job verarbeiten, der fehlschlägt'''
- job=self.manager.BadJob(None,None,"test")
- self.assertEqual(job.getStatus(),("init",{}))
- self.queue.put(job)
- sleep(.1)
- self.stop()
- self.assertEqual(job.getStatus(),("error",{}))
- self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
- [(20,'Workerprocess läuft nun...'),
- (20,'ein neuer Job(1)'),
- (40,'Job(1) fehlgeschlagen :(')])
- error=Exception('Error')
- self.assertEqual(self.buffer[-1]['exc_info'][0],type(error))
- self.assertEqual(str(self.buffer[-1]['exc_info'][1]),str(error))
-
- def testSMSJob(self):
- job=self.manager.SMSJob(self.providerlist, "test", "name", SMS("message"),[012345])
- self.assertEqual(job.getStatus(),("init",{}))
- self.queue.put(job)
- sleep(.1)
- self.stop()
- self.assertEqual(job.getStatus(),("sended",{'failed': [], 'good': []}))
- self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"],
- [(20,'Workerprocess läuft nun...'),
- (20,'ein neuer Job(1)'),
- (20,'Job(1) fertig ;)')])
-
-
-if __name__ == "__main__":
- unittest.main()