|
1 # -*- coding: utf-8 -*- |
|
2 |
|
3 import unittest |
|
4 import logging |
|
5 from time import sleep |
|
6 |
|
7 from multiprocessing import Queue |
|
8 from multiprocessing.managers import BaseManager,ListProxy |
|
9 |
|
10 from iro.worker import Worker |
|
11 from iro.job import Job |
|
12 |
|
13 from logging.handlers import BufferingHandler |
|
14 |
|
15 class MyHandler(BufferingHandler): |
|
16 def __init__(self,buffer=None): |
|
17 '''BufferingHandler takes a "capacity" argument |
|
18 so as to know when to flush. As we're overriding |
|
19 shouldFlush anyway, we can set a capacity of zero. |
|
20 You can call flush() manually to clear out the |
|
21 buffer. |
|
22 buffer: log messages to this buffer, needed f.ex for processes or threads''' |
|
23 BufferingHandler.__init__(self, 0) |
|
24 self.buffer=buffer |
|
25 |
|
26 def shouldFlush(self): |
|
27 return False |
|
28 |
|
29 def emit(self, record): |
|
30 if record.exc_info: #sonst geht das append schief, weil nicht picklebar |
|
31 print record.exc_info |
|
32 record.exc_info=record.exc_info[:-1] |
|
33 self.buffer.append(record.__dict__) |
|
34 |
|
35 |
|
36 class BadJob(Job): |
|
37 def start(self): |
|
38 Job.start(self) |
|
39 raise Exception("Error") |
|
40 |
|
41 #einen Manager anlegen, der Job und eine Liste anbietet |
|
42 class MyManager(BaseManager): |
|
43 pass |
|
44 MyManager.register('Job', Job) |
|
45 MyManager.register('BadJob', BadJob) |
|
46 MyManager.register('list', list, ListProxy) |
|
47 |
|
48 class TestWorker(unittest.TestCase): |
|
49 def setUp(self): |
|
50 #erstelle eine Queue&Manager |
|
51 self.manager = MyManager() |
|
52 self.queue = Queue() |
|
53 |
|
54 #manager starten, damit wir Logging anpassen können |
|
55 self.manager.start() |
|
56 self.setUpLogging() |
|
57 |
|
58 #eigentlich Workerprocess starten |
|
59 self.worker= Worker(self.queue) |
|
60 self.worker.start() |
|
61 |
|
62 def tearDown(self): |
|
63 #Thread&Queue stoppen |
|
64 self.stop() |
|
65 |
|
66 #Logging änderungen rückgängig |
|
67 self.tearDownLogging() |
|
68 self.manager.shutdown() |
|
69 |
|
70 def stop(self): |
|
71 self.queue.close() |
|
72 self.queue.join_thread() |
|
73 self.worker.terminate() |
|
74 |
|
75 |
|
76 def setUpLogging(self): |
|
77 '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind''' |
|
78 #wir brauchen eine threadsichere liste |
|
79 self.buffer=self.manager.list() |
|
80 |
|
81 #Handler erstellen, der in den Buffer schreibt |
|
82 self.handler = h = MyHandler(self.buffer) |
|
83 self.logger =l= logging.getLogger() |
|
84 |
|
85 #Level anpassen |
|
86 l.setLevel(logging.DEBUG) |
|
87 h.setLevel(logging.DEBUG) |
|
88 l.addHandler(h) |
|
89 |
|
90 def tearDownLogging(self): |
|
91 '''crazy logging hacks wieder entfernen''' |
|
92 self.logger.removeHandler(self.handler) |
|
93 self.logger.setLevel(logging.NOTSET) |
|
94 self.handler.close() |
|
95 |
|
96 |
|
97 def testJob(self): |
|
98 '''einen Job verarbeiten''' |
|
99 job=self.manager.Job(None,None,"test") |
|
100 self.assertEqual(job.getStatus(),"init") |
|
101 self.queue.put(job) |
|
102 sleep(.1) |
|
103 self.stop() |
|
104 self.assertEqual(job.getStatus(),"started") |
|
105 self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], |
|
106 [(20,'Workerprocess läuft nun...'), |
|
107 (20,'ein neuer Job(1)'), |
|
108 (20,'Job(1) fertig ;)')]) |
|
109 |
|
110 def testBadJob(self): |
|
111 '''einen Job verarbeiten, der fehlschlägt''' |
|
112 job=self.manager.BadJob(None,None,"test") |
|
113 self.assertEqual(job.getStatus(),"init") |
|
114 self.queue.put(job) |
|
115 sleep(.1) |
|
116 self.stop() |
|
117 self.assertEqual(job.getStatus(),"error") |
|
118 print self.buffer |
|
119 self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], |
|
120 [(20,'Workerprocess läuft nun...'), |
|
121 (20,'ein neuer Job(1)'), |
|
122 (40,'Job(1) fehlgeschlagen :(')]) |
|
123 error=Exception('Error') |
|
124 self.assertEqual(self.buffer[-1]['exc_info'][0],type(error)) |
|
125 self.assertEqual(str(self.buffer[-1]['exc_info'][1]),str(error)) |
|
126 |
|
127 |
|
128 |
|
129 if __name__ == "__main__": |
|
130 unittest.main() |