1 # -*- coding: utf-8 -*- |
|
2 |
|
3 import unittest |
|
4 import logging |
|
5 from time import sleep |
|
6 |
|
7 from multiprocessing import Queue |
|
8 from multiprocessing.managers import BaseManager, ListProxy |
|
9 |
|
10 from iro.worker import Worker |
|
11 from iro.job import Job, SMSJob |
|
12 from iro.anbieter.anbieter import anbieter |
|
13 from iro.anbieter.content import SMS |
|
14 from iro.providerlist import Providerlist |
|
15 |
|
16 from logging.handlers import BufferingHandler |
|
17 |
|
18 class MyHandler(BufferingHandler): |
|
19 def __init__(self,buffer=None): |
|
20 '''BufferingHandler takes a "capacity" argument |
|
21 so as to know when to flush. As we're overriding |
|
22 shouldFlush anyway, we can set a capacity of zero. |
|
23 You can call flush() manually to clear out the |
|
24 buffer. |
|
25 buffer: log messages to this buffer, needed f.ex for processes or threads''' |
|
26 BufferingHandler.__init__(self, 0) |
|
27 self.buffer=buffer |
|
28 |
|
29 def shouldFlush(self): |
|
30 return False |
|
31 |
|
32 def emit(self, record): |
|
33 if record.exc_info: #sonst geht das append schief, weil nicht picklebar |
|
34 record.exc_info=record.exc_info[:-1] |
|
35 self.buffer.append(record.__dict__) |
|
36 |
|
37 |
|
38 class BadJob(Job): |
|
39 def start(self,id=None): |
|
40 Job.start(self,id) |
|
41 raise Exception("Error") |
|
42 |
|
43 #einen Manager anlegen, der Job und eine Liste anbietet |
|
44 class MyManager(BaseManager): |
|
45 pass |
|
46 MyManager.register('Job', Job) |
|
47 MyManager.register('SMSJob', SMSJob) |
|
48 MyManager.register('BadJob', BadJob) |
|
49 MyManager.register('list', list, ListProxy) |
|
50 MyManager.register('Providerlist',Providerlist) |
|
51 |
|
52 class TestWorker(unittest.TestCase): |
|
53 def setUp(self): |
|
54 #erstelle eine Queue&Manager |
|
55 self.manager = MyManager() |
|
56 self.queue = Queue() |
|
57 |
|
58 #manager starten, damit wir Logging anpassen können |
|
59 self.manager.start() |
|
60 self.setUpLogging() |
|
61 |
|
62 self.providerlist=self.manager.Providerlist() |
|
63 self.providerlist.add("test", anbieter() , ["sms", ]) |
|
64 |
|
65 #eigentlich Workerprocess starten |
|
66 self.worker= Worker(self.queue) |
|
67 self.worker.start() |
|
68 |
|
69 def tearDown(self): |
|
70 #Thread&Queue stoppen |
|
71 self.stop() |
|
72 |
|
73 #Logging änderungen rückgängig |
|
74 self.tearDownLogging() |
|
75 self.manager.shutdown() |
|
76 |
|
77 def stop(self): |
|
78 self.queue.close() |
|
79 self.queue.join_thread() |
|
80 self.worker.terminate() |
|
81 |
|
82 |
|
83 def setUpLogging(self): |
|
84 '''Logging so umbasteln, das wir alle logging Meldung in self.buf sind''' |
|
85 #wir brauchen eine threadsichere liste |
|
86 self.buffer=self.manager.list() |
|
87 |
|
88 #Handler erstellen, der in den Buffer schreibt |
|
89 self.handler = h = MyHandler(self.buffer) |
|
90 self.logger =l= logging.getLogger() |
|
91 |
|
92 #Level anpassen |
|
93 l.setLevel(logging.DEBUG) |
|
94 h.setLevel(logging.DEBUG) |
|
95 l.addHandler(h) |
|
96 |
|
97 def tearDownLogging(self): |
|
98 '''crazy logging hacks wieder entfernen''' |
|
99 self.logger.removeHandler(self.handler) |
|
100 self.logger.setLevel(logging.NOTSET) |
|
101 self.handler.close() |
|
102 |
|
103 |
|
104 def testJob(self): |
|
105 '''einen Job verarbeiten''' |
|
106 job=self.manager.Job(None,None,"test") |
|
107 self.assertEqual(job.getStatus(),("init",{})) |
|
108 self.queue.put(job) |
|
109 sleep(.1) |
|
110 self.stop() |
|
111 self.assertEqual(job.getStatus(),("started",{})) |
|
112 self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], |
|
113 [(20,'Workerprocess läuft nun...'), |
|
114 (20,'ein neuer Job(1)'), |
|
115 (20,'Job(1) fertig ;)')]) |
|
116 |
|
117 def testBadJob(self): |
|
118 '''einen Job verarbeiten, der fehlschlägt''' |
|
119 job=self.manager.BadJob(None,None,"test") |
|
120 self.assertEqual(job.getStatus(),("init",{})) |
|
121 self.queue.put(job) |
|
122 sleep(.1) |
|
123 self.stop() |
|
124 self.assertEqual(job.getStatus(),("error",{})) |
|
125 self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], |
|
126 [(20,'Workerprocess läuft nun...'), |
|
127 (20,'ein neuer Job(1)'), |
|
128 (40,'Job(1) fehlgeschlagen :(')]) |
|
129 error=Exception('Error') |
|
130 self.assertEqual(self.buffer[-1]['exc_info'][0],type(error)) |
|
131 self.assertEqual(str(self.buffer[-1]['exc_info'][1]),str(error)) |
|
132 |
|
133 def testSMSJob(self): |
|
134 job=self.manager.SMSJob(self.providerlist, "test", "name", SMS("message"),[012345]) |
|
135 self.assertEqual(job.getStatus(),("init",{})) |
|
136 self.queue.put(job) |
|
137 sleep(.1) |
|
138 self.stop() |
|
139 self.assertEqual(job.getStatus(),("sended",{'failed': [], 'good': []})) |
|
140 self.assertEqual([(l['levelno'],l['msg']) for l in self.buffer if l['name']=="iro.worker"], |
|
141 [(20,'Workerprocess läuft nun...'), |
|
142 (20,'ein neuer Job(1)'), |
|
143 (20,'Job(1) fertig ;)')]) |
|
144 |
|
145 |
|
146 if __name__ == "__main__": |
|
147 unittest.main() |
|