--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/iro/tests/testWorker.py Tue Oct 05 03:24:01 2010 +0200
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+
+import unittest
+import logging
+from time import sleep
+
+from multiprocessing import Queue
+from multiprocessing.managers import BaseManager
+import iro
+from ..worker import Worker
+from ..job import Job
+
+from logging.handlers import BufferingHandler
+
+class MyHandler(BufferingHandler):
+ def __init__(self):
+ '''BufferingHandler takes a "capacity" argument
+ so as to know when to flush. As we're overriding
+ shouldFlush anyway, we can set a capacity of zero.
+ You can call flush() manually to clear out the
+ buffer.'''
+ BufferingHandler.__init__(self, 0)
+
+ def shouldFlush(self):
+ return False
+
+ def emit(self, record):
+ self.buffer.append(record.__dict__)
+
+class MyManager(BaseManager):
+ pass
+
+MyManager.register('Job', Job)
+
+
+class TestWorker(unittest.TestCase):
+ def setUp(self):
+ self.handler = h = MyHandler()
+ self.logger = l = logging.getLogger()
+ l.addHandler(h)
+
+ self.manager = MyManager()
+ self.manager.start()
+ self.queue = Queue()
+ self.worker=Worker(self.queue)
+ self.worker.start()
+ sleep(.5)
+
+
+ def TearDown(self):
+ self.manager.stop()
+ self.queue.close()
+ self.worker.terminate()
+ self.logger.removeHandler(self.handler)
+ self.handler.close()
+
+ def testJob(self):
+ job=Job(None,None,"test")
+ self.assertEqual(job.status,"init")
+ self.queue.put(job)
+ sleep(.5)
+ logging.debug("test")
+ print self.handler.buffer
+ self.assertEqual(job.status,"started")
+
+if __name__ == "__main__":
+ unittest.main()