|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 import unittest |
|
4 import logging |
|
5 from time import sleep |
|
6 |
|
7 from multiprocessing import Queue |
|
8 from multiprocessing.managers import BaseManager |
|
9 import iro |
|
10 from ..worker import Worker |
|
11 from ..job import Job |
|
12 |
|
13 from logging.handlers import BufferingHandler |
|
14 |
|
15 class MyHandler(BufferingHandler): |
|
16 def __init__(self): |
|
17 '''BufferingHandler takes a "capacity" argument |
|
18 so as to know when to flush. As we're overriding |
|
19 shouldFlush anyway, we can set a capacity of zero. |
|
20 You can call flush() manually to clear out the |
|
21 buffer.''' |
|
22 BufferingHandler.__init__(self, 0) |
|
23 |
|
24 def shouldFlush(self): |
|
25 return False |
|
26 |
|
27 def emit(self, record): |
|
28 self.buffer.append(record.__dict__) |
|
29 |
|
30 class MyManager(BaseManager): |
|
31 pass |
|
32 |
|
33 MyManager.register('Job', Job) |
|
34 |
|
35 |
|
36 class TestWorker(unittest.TestCase): |
|
37 def setUp(self): |
|
38 self.handler = h = MyHandler() |
|
39 self.logger = l = logging.getLogger() |
|
40 l.addHandler(h) |
|
41 |
|
42 self.manager = MyManager() |
|
43 self.manager.start() |
|
44 self.queue = Queue() |
|
45 self.worker=Worker(self.queue) |
|
46 self.worker.start() |
|
47 sleep(.5) |
|
48 |
|
49 |
|
50 def TearDown(self): |
|
51 self.manager.stop() |
|
52 self.queue.close() |
|
53 self.worker.terminate() |
|
54 self.logger.removeHandler(self.handler) |
|
55 self.handler.close() |
|
56 |
|
57 def testJob(self): |
|
58 job=Job(None,None,"test") |
|
59 self.assertEqual(job.status,"init") |
|
60 self.queue.put(job) |
|
61 sleep(.5) |
|
62 logging.debug("test") |
|
63 print self.handler.buffer |
|
64 self.assertEqual(job.status,"started") |
|
65 |
|
66 if __name__ == "__main__": |
|
67 unittest.main() |