model.job created devel
authorSandro Knauß <knauss@netzguerilla.net>
Tue, 07 Feb 2012 01:56:59 +0100
branchdevel
changeset 135 f8640c663e3e
parent 134 fae3fdfece65
child 136 ca926774e16c
model.job created
iro/anbieter/content.py
iro/controller/pool.py
iro/controller/task.py
iro/controller/viewinterface.py
iro/error.py
iro/model/job.py
iro/model/message.py
iro/model/offer.py
iro/model/schema.py
tests/dbtestcase.py
tests/job.py
--- a/iro/anbieter/content.py	Mon Feb 06 14:39:33 2012 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,95 +0,0 @@
-# -*- coding: utf-8 -*-
-#Copyright (C) 2009  Sandro Knauß <bugs@sandroknauss.de>
-
-#This program is free software; you can redistribute it and/or modify it under the terms
-#of the GNU General Public License as published by the Free Software Foundation;
-#either version 3 of the License, or any later version.
-#This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
-#without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
-#See the GNU General Public License for more details.
-
-#You should have received a copy of the GNU General Public License
-#along with this program; if not, see <http://www.gnu.org/licenses/>.
-
-from email.mime.text import MIMEText
-from email.header import Header
-
-class content:
-    def __init__(self,content):
-        self.content=content
-
-    def sendto(self,anbieter,recipients):
-        pass
-
-    def getContent(self):
-        return self.content
-
-    def __eq__(self,other):
-        return self.content == other.content
-
-    def __ne__(self,other):
-        return not self.__eq__(other)
-
-class SMS(content):
-    def __init__(self, cont):
-        content.__init__(self,cont)
-
-    def sendto(self,anbieter,recipients):
-        anbieter.sendSMS(self,recipients)
-
-
-class FAX(content):
-    def __init__(self,header,cont,attachments):
-        content.__init__(self,cont)
-        self.header=header
-        self.attachments=attachments
-
-    def sendto(self,anbieter,recipients):
-        anbieter.sendFAX(self,recipients)
-
-    def getAttachment(self,i):
-        return self.attachments[i]
-
-    def __eq__(self,other):
-        if not  content.__eq__(self,other):
-            return False
-
-        if self.header != other.header:
-            return False
-
-        if len(self.attachments) != len(other.attachments):
-            return False
-
-        for i in range(len(self.attachments)):
-            if self.attachments[i] != other.attachments[i]:
-                return False
-
-        return True
-
-
-
-class Mail(content):
-    def __init__(self, subject, body, frm):
-        con=MIMEText(body.encode("utf-8"), _charset='utf-8')
-        sub=Header(subject.encode('utf-8'), 'utf-8')
-        con['Subject']=sub
-        self.frm=frm
-        content.__init__(self, con)
-
-    def sendto(self,anbieter,recipients):
-        anbieter.sendMail(self,recipients)
-
-    def as_string(self):
-        return self.content.as_string()
-
-    def getFrom(self):
-        return self.frm
-    
-    def __eq__(self,other):
-        if self.as_string() != other.as_string():
-            return False
-
-        if self.frm != other.frm:
-            return False
-
-        return True
--- a/iro/controller/pool.py	Mon Feb 06 14:39:33 2012 +0100
+++ b/iro/controller/pool.py	Tue Feb 07 01:56:59 2012 +0100
@@ -16,10 +16,13 @@
         """To run a function in Twisted's thread pool"""
         return threads.deferToThreadPool(self.reactor, self.pool, f, *args, **kwargs)
 
+#task Pool for sending
 taskPool = Pool('task',5)
+
+#db Pool to handle reqests to sqlalchemy
 dbPool = Pool('database',5)
 
-
+# all pools in one list
 pools=[taskPool,dbPool]
 
 def startPool(reactor):
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/iro/controller/task.py	Tue Feb 07 01:56:59 2012 +0100
@@ -0,0 +1,41 @@
+from functools import partial
+
+from ..error import NoRoute, RejectRecipient
+
+from ..model.offer import getPossibleOffers
+from ..model.job import jobs
+
+from .pool  import taskPool
+
+class Task:
+    '''one single part of a job.
+    a task is one message to one recipient
+    '''
+    def __init__(self, recipient, job):
+        self.recipient = recipient
+        self.job = job
+
+    def start(self):
+        self.d = taskPool.run(self._run)
+        self.d.addCallback(partial(self.job.setStatus,self))
+        self.d.addErrback(partial(self.job.setError,self))
+        return self.d
+
+    def _run(self):
+        for offer in getPossibleOffers(self.recipient,self.job.offers):
+            try:
+                return offer.send(self.recipient,self.job.message)
+            except RejectRecipient:
+                continue
+        else:
+            raise NoRoute()
+
+
+def createJob(recipients, msg, offers):
+    job = jobs.create(recipients, msg, offers)
+    for r in recipients:
+        task = Task(r,job)
+        job.addTask(task)
+        task.start()
+
+    return job
--- a/iro/controller/viewinterface.py	Mon Feb 06 14:39:33 2012 +0100
+++ b/iro/controller/viewinterface.py	Tue Feb 07 01:56:59 2012 +0100
@@ -1,7 +1,11 @@
 # -*- coding: utf-8 -*-
 from ..model.decorators import vUser, vRoute, dbdefer, vTyp
+from ..model.message import SMS, Fax, Mail
+
 from ..validate import validate, vBool, vHash, vTel, vEmail
 
+from .task import createJob
+
 class Interface(object):
     '''class for a xmlrpc user
     '''
@@ -66,7 +70,8 @@
         id[hash]: Die ID des Auftrages
 
         '''
-        return ""
+        job = createJob(recipients, SMS(message), info, route)
+        return job.id
    
     @validate(kwd="recipients",func=vTel)
     @vUser
@@ -86,12 +91,13 @@
         id[hash]: Die ID des Auftrages
 
         '''
-        return ""
+        job = createJob(recipients, Fax(subject, fax), info, route)
+        return job.id
 
     @validate(kwd="recipients",func=vEmail)
     @vUser
     @vRoute(typ="mail")
-    def mail(self, user, subject,  body, recipients, frm, route="default", info=""):
+    def mail(self, user, subject,  body, recipients, frm=None, route="default", info=""):
         '''Versendet eine Email.
 
         Keywords:
@@ -107,7 +113,9 @@
         id[hash]: Die ID des Auftrages
 
         '''
-        return ""
+
+        job = createJob(recipients, Mail(frm, subject, body), info, route)
+        return job.id
        
     @validate(kwd="typ", func=vTyp)
     @vUser
--- a/iro/error.py	Mon Feb 06 14:39:33 2012 +0100
+++ b/iro/error.py	Tue Feb 07 01:56:59 2012 +0100
@@ -51,3 +51,10 @@
         self.number = number
         msg = "No valid email: '%s'"%(number)
         ValidateException.__init__(self, 702, field, msg)
+
+
+class NoRoute(Exception):
+    pass
+
+class RejectRecipient(Exception):
+    pass
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/iro/model/job.py	Tue Feb 07 01:56:59 2012 +0100
@@ -0,0 +1,92 @@
+import random
+from collections import MutableMapping
+
+from twisted.python import threadable
+
+from .schema import Job as DBJob
+from .dbdefer import dbdefer
+
+from iro.error import JobNotFound
+
+class Status:
+    '''status for one recipient'''
+    todo = 1
+    good = 2
+    error = 3
+    def __init__(self, job):
+        self.status = Status.todo
+        self.job = job
+        self.offer = None           #the offer over that this job was done
+        self.errtext = None         #the error text
+
+class Job:
+    '''One Job is a class that handles one job has multiple tasks'''
+    def __init__(self, user, recipients, message, offers, info=None):
+        self.dbjob = None       #Connection to mysql job element
+        self.message = message
+        self.recipients = recipients
+        self.offers = offers
+        self.info = info
+        self.tasks={}
+        self.user = user
+
+    def setStatus(task,status):
+        pass
+
+    def setError(task,err):
+        pass
+
+    @dbdefer
+    def registerJob(self, session, id):
+        self.id = id
+        u = session.merge(self.user)
+        self.dbjob=DBJob(hash=self.id, info=self.info, status="started")
+        u.jobs.append(self.dbjob)
+        session.commit()
+
+    @classmethod
+    @dbdefer
+    def fromDB(cls, session, id):
+        j = session.query(DBJob).filter_by(hash=id).first()
+        if not j:
+            raise JobNotFound
+        job = cls(j.user, [], None, None, j.info)
+        job.dbjob = j
+        job.id = j.hash
+        return job
+
+
+class Jobs(dict, MutableMapping):
+
+    synchronized = ['getNewId']
+
+    def create(self, user, recipients, message, offers, info=None):
+        job = Job(user, recipients, message, offers, info)
+        job.registerJob(id = self.getNewId())
+        self[job.id] = job
+        return job
+
+    @dbdefer
+    def getNewId(self, session):
+        while True:
+            id = ''.join([random.choice('0123456789abcdef') for i in range(40)])
+            if id not in self.keys():
+                self[id]=None
+                if not session.query(DBJob.hash).filter_by(hash=id).first():
+                    return id
+
+    def __getitem__(self, key):
+        try:
+            return dict.__getitem__(self, key)
+        except KeyError as e:
+            pass
+       
+        try:
+            self[key]=Job.fromDB(key)
+            return self[key]
+        except JobNotFound:
+            raise e
+
+threadable.synchronize(Jobs)
+
+jobs = Jobs()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/iro/model/message.py	Tue Feb 07 01:56:59 2012 +0100
@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+
+from email.mime.text import MIMEText
+from email.header import Header
+
+class Message:
+    def __init__(self,content):
+        self.content=content
+
+    def getContent(self):
+        return self.content
+
+    def __eq__(self,other):
+        return self.content == other.content
+
+    def __ne__(self,other):
+        return not self.__eq__(other)
+
+class SMS(Message):
+    def __init__(self, cont):
+        Message.__init__(self,cont)
+
+    def sendto(self,anbieter,recipients):
+        anbieter.sendSMS(self,recipients)
+
+
+class Fax(Message):
+    def __init__(self,header,cont,attachments):
+        Message.__init__(self,cont)
+        self.header=header
+        self.attachments=attachments
+
+    def getAttachment(self,i):
+        return self.attachments[i]
+
+    def __eq__(self,other):
+        if not  Message.__eq__(self,other):
+            return False
+
+        if self.header != other.header:
+            return False
+
+        if len(self.attachments) != len(other.attachments):
+            return False
+
+        for i in range(len(self.attachments)):
+            if self.attachments[i] != other.attachments[i]:
+                return False
+        return True
+
+
+
+class Mail(Message):
+    def __init__(self, subject, body, frm):
+        con=MIMEText(body.encode("utf-8"), _charset='utf-8')
+        sub=Header(subject.encode('utf-8'), 'utf-8')
+        con['Subject']=sub
+        self.frm=frm
+        Message.__init__(self, con)
+
+    def as_string(self):
+        return self.Message.as_string()
+
+    def getFrom(self):
+        return self.frm
+    
+    def __eq__(self,other):
+        if self.as_string() != other.as_string():
+            return False
+
+        if self.frm != other.frm:
+            return False
+
+        return True
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/iro/model/offer.py	Tue Feb 07 01:56:59 2012 +0100
@@ -0,0 +1,2 @@
+def getPossibleOffers(recipient, offers):
+	pass
--- a/iro/model/schema.py	Mon Feb 06 14:39:33 2012 +0100
+++ b/iro/model/schema.py	Tue Feb 07 01:56:59 2012 +0100
@@ -76,6 +76,9 @@
     user_id = Column("user", String(100), ForeignKey('apiuser.name'))
     user = relationship("User", backref=backref('jobs'))
 
+    def __repr__(self):
+        return "<Job('%s','%s','%s','%s')>"%(self.hash,self.info, self.status, self.user_id)
+
 class User(Base):
     """Die Benutzerdatenbank von Iro. <em>ng_kunde</em> ist der verknüpfte netzguerilla.net Benutzer, der die Rechnung zahlt."""
     __tablename__ = "apiuser"
--- a/tests/dbtestcase.py	Mon Feb 06 14:39:33 2012 +0100
+++ b/tests/dbtestcase.py	Tue Feb 07 01:56:59 2012 +0100
@@ -6,7 +6,7 @@
 
 from ngdatabase.mysql import Server, createConfig, Database
 
-from iro.model import schema
+from iro.model import schema, setEngine
 from iro.model.utils import WithSession
 from iro.model.schema import Base
 
@@ -42,7 +42,7 @@
         self.tdir = mkdtemp(prefix='iro-mysql-')
         self.server = Server('%s/my.cnf'%self.tdir)
         self.db = SampleDatabase("test","test",'%s/my.cnf'%self.tdir)
-        self.engine = create_engine('mysql://test:test@localhost/test?unix_socket=%s/socket'%self.tdir,
+        self.engine = create_engine('mysql://test:test@localhost/test?unix_socket=%s/socket'%self.tdir, 
                 poolclass = pool.SingletonThreadPool,  pool_size=dbPool.maxthreads, )#echo=True)
 
     def setUp(self):
@@ -52,6 +52,7 @@
         self.server.start()
         self.db.create()
         Base.metadata.create_all(self.engine)
+        setEngine(self.engine)
     
     def tearDown(self):
         self.server.stop()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/job.py	Tue Feb 07 01:56:59 2012 +0100
@@ -0,0 +1,107 @@
+import unittest
+
+from iro.controller.task import createJob
+
+from iro.model.job import jobs, Job
+
+from iro.model.pool import data
+
+from iro.model.message import SMS, Fax, Mail
+from iro.model.schema import Job as DBJob, User
+from iro.telnumber import Telnumber
+
+from iro.error import JobNotFound
+from iro.validate import vHash
+
+
+from .dbtestcase import DBTestCase, setUpModule, tearDownModule
+
+#activates all logging we can get.
+
+from twisted.python import log
+import logging
+logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s(%(processName)s)-%(levelname)s: %(message)s')
+observer = log.PythonLoggingObserver()
+observer.start()
+
+class DumpPool():
+    def run(self, f,*a,**k):
+        return f(*a,**k)
+
+
+data.pool=DumpPool()
+
+
+class JobsTest(DBTestCase):
+    '''tests for jobs'''
+
+    def tearDown(self):
+        jobs.clear()
+        DBTestCase.tearDown(self)
+
+    def testCreate(self):
+        with self.session() as session:
+            u = User(name='test',apikey='abcdef123456789')
+            session.add(u)
+
+        job = jobs.create(u, [Telnumber('123456789')], SMS('test'), ['test'])
+        self.assertIsInstance(job, Job)
+        self.assertTrue(vHash(job.id, None, 40, 40))
+
+        with self.session() as session:
+            j = session.query(DBJob.hash).all()
+            self.assertEqual(j,[(job.id,)])
+
+        self.assertEqual(jobs[job.id],job)
+
+    def testGet(self):
+        with self.session() as session:
+            u = User(name='test',apikey='abcdef123456789')
+            session.add(u)
+
+        job = Job(u, [Telnumber('123456789')], SMS('test'), ['test'])
+        jobs['a1'] = job
+
+        self.assertEqual(len(jobs), 1)
+        self.assertEqual(job, jobs['a1'])
+
+    def testGetFromDB(self):
+        with self.session() as session:
+            u = User(name='test',apikey='abcdef123456789')
+            job = DBJob(hash='a1', info="info", status="started")
+            u.jobs.append(job)
+            session.add(u)
+
+        with self.session() as session:
+            job = session.merge(job)
+            u = session.merge(u)
+            self.assertEqual(repr(jobs['a1'].dbjob),repr(job))
+            self.assertEqual(repr(jobs['a1'].user),repr(u))
+            self.assertEqual(jobs['a1'].info, 'info')
+
+    def testUnknownJob(self):
+        with self.assertRaises(JobNotFound):
+            Job.fromDB('a1234567890')
+
+        with self.assertRaises(KeyError):
+            jobs['a1234567890']
+
+    @unittest.skip('test not implemented')
+    def testSyncroniced(self):
+        pass
+
+class JobTest(DBTestCase):
+    def testCreateSMS(self):
+        job = createJob([],SMS('sms'),[])
+        pass
+
+    def testCreateFax(self):
+        job = createJob([],Fax('header','fax',[]),[])
+        pass
+
+    def testCreateMail(self):
+        job = createJob([],Mail('sub','body','t@t.de'),[])
+    
+
+if __name__ == '__main__':
+        unittest.main()