3 import unittest |
3 import unittest |
4 import logging |
4 import logging |
5 from time import sleep |
5 from time import sleep |
6 |
6 |
7 from multiprocessing import Queue |
7 from multiprocessing import Queue |
8 from multiprocessing.managers import BaseManager |
8 from multiprocessing.managers import BaseManager,ListProxy |
9 import iro |
9 |
10 from ..worker import Worker |
10 from iro.worker import Worker |
11 from ..job import Job |
11 from iro.job import Job |
12 |
12 |
13 from logging.handlers import BufferingHandler |
13 from logging.handlers import BufferingHandler |
14 |
14 |
15 class MyHandler(BufferingHandler): |
15 class MyHandler(BufferingHandler): |
16 def __init__(self): |
16 def __init__(self,buffer=None): |
17 '''BufferingHandler takes a "capacity" argument |
17 '''BufferingHandler takes a "capacity" argument |
18 so as to know when to flush. As we're overriding |
18 so as to know when to flush. As we're overriding |
19 shouldFlush anyway, we can set a capacity of zero. |
19 shouldFlush anyway, we can set a capacity of zero. |
20 You can call flush() manually to clear out the |
20 You can call flush() manually to clear out the |
21 buffer.''' |
21 buffer. |
|
22 buffer: log messages to this buffer, needed f.ex for processes or threads''' |
22 BufferingHandler.__init__(self, 0) |
23 BufferingHandler.__init__(self, 0) |
|
24 self.buffer=buffer |
23 |
25 |
24 def shouldFlush(self): |
26 def shouldFlush(self): |
25 return False |
27 return False |
26 |
28 |
27 def emit(self, record): |
29 def emit(self, record): |
|
30 if record.exc_info: #sonst geht das append schief, weil nicht picklebar |
|
31 print record.exc_info |
|
32 record.exc_info=record.exc_info[:-1] |
28 self.buffer.append(record.__dict__) |
33 self.buffer.append(record.__dict__) |
29 |
34 |
30 class MyManager(BaseManager): |
35 class MyManager(BaseManager): |
31 pass |
36 pass |
32 |
|
33 MyManager.register('Job', Job) |
37 MyManager.register('Job', Job) |
34 |
38 MyManager.register('list', list, ListProxy) |
35 |
39 |
36 class TestWorker(unittest.TestCase): |
40 class TestWorker(unittest.TestCase): |
37 def setUp(self): |
41 def setUp(self): |
38 self.handler = h = MyHandler() |
42 #erstelle eine Queue&Manager |
39 self.logger = l = logging.getLogger() |
43 self.manager = MyManager() |
|
44 self.queue = Queue() |
|
45 |
|
46 #manager starten, damit wir Logging anpassen können |
|
47 self.manager.start() |
|
48 self.setUpLogging() |
|
49 |
|
50 #eigentlich Workerprocess starten |
|
51 self.worker= Worker(self.queue) |
|
52 self.worker.start() |
|
53 |
|
54 def tearDown(self): |
|
55 #Thread&Queue stoppen |
|
56 self.stop() |
|
57 |
|
58 #Logging änderungen rückgängig |
|
59 self.tearDownLogging() |
|
60 self.manager.shutdown() |
|
61 |
|
62 def stop(self): |
|
63 self.queue.close() |
|
64 self.queue.join_thread() |
|
65 self.worker.terminate() |
|
66 |
|
67 |
|
68 def setUpLogging(self): |
|
69 '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind''' |
|
70 #wir brauchen eine threadsichere liste |
|
71 self.buffer=self.manager.list() |
|
72 |
|
73 #Handler erstellen, der in den Buffer schreibt |
|
74 self.handler = h = MyHandler(self.buffer) |
|
75 self.logger =l= logging.getLogger() |
|
76 |
|
77 #Level anpassen |
|
78 l.setLevel(logging.DEBUG) |
|
79 h.setLevel(logging.DEBUG) |
40 l.addHandler(h) |
80 l.addHandler(h) |
41 |
81 |
42 self.manager = MyManager() |
82 def tearDownLogging(self): |
43 self.manager.start() |
83 '''crazy logging hacks wieder entfernen''' |
44 self.queue = Queue() |
84 self.logger.removeHandler(self.handler) |
45 self.worker=Worker(self.queue) |
85 self.logger.setLevel(logging.NOTSET) |
46 self.worker.start() |
86 self.handler.close() |
47 sleep(.5) |
|
48 |
|
49 |
87 |
50 def TearDown(self): |
88 |
51 self.manager.stop() |
|
52 self.queue.close() |
|
53 self.worker.terminate() |
|
54 self.logger.removeHandler(self.handler) |
|
55 self.handler.close() |
|
56 |
|
57 def testJob(self): |
89 def testJob(self): |
58 job=Job(None,None,"test") |
90 '''einen Job verarbeiten''' |
59 self.assertEqual(job.status,"init") |
91 job=self.manager.Job(None,None,"test") |
|
92 self.assertEqual(job.getStatus(),"init") |
60 self.queue.put(job) |
93 self.queue.put(job) |
61 sleep(.5) |
94 sleep(.1) |
62 logging.debug("test") |
95 self.stop() |
63 print self.handler.buffer |
96 self.assertEqual(job.getStatus(),"started") |
64 self.assertEqual(job.status,"started") |
97 self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], |
|
98 [(20,'Workerprocess läuft nun...'), |
|
99 (20,'ein neuer Job(1)'), |
|
100 (20,'Job(1) fertig ;)')]) |
|
101 |
65 |
102 |
66 if __name__ == "__main__": |
103 if __name__ == "__main__": |
67 unittest.main() |
104 unittest.main() |