start fixing job mangement devel
authorSandro Knauß <knauss@netzguerilla.net>
Thu, 09 Feb 2012 17:36:31 +0100
branchdevel
changeset 140 75d0eaaa871d
parent 139 65117fd28400
child 141 90c95fdd1e33
start fixing job mangement
iro/controller/task.py
iro/model/__init__.py
iro/model/job.py
iro/model/schema.py
iro/model/session.py
tests/job.py
--- a/iro/controller/task.py	Thu Feb 09 17:35:23 2012 +0100
+++ b/iro/controller/task.py	Thu Feb 09 17:36:31 2012 +0100
@@ -3,7 +3,7 @@
 from ..error import NoRoute, RejectRecipient
 
 from ..model.offer import getPossibleOffers
-from ..model.job import jobs
+from ..model.job import exJobs
 
 from .pool  import taskPool
 
@@ -31,8 +31,8 @@
             raise NoRoute()
 
 
-def createJob(recipients, msg, offers):
-    job = jobs.create(recipients, msg, offers)
+def createJob(user,recipients, msg, offers, info=None):
+    job = exJobs.create(user, recipients, msg, offers, info)
     for r in recipients:
         task = Task(r,job)
         job.addTask(task)
--- a/iro/model/__init__.py	Thu Feb 09 17:35:23 2012 +0100
+++ b/iro/model/__init__.py	Thu Feb 09 17:36:31 2012 +0100
@@ -1,6 +1,7 @@
 import schema
 import user
 import utils
+import job
 
 from dbdefer import setEngine
 from pool import setPool
--- a/iro/model/job.py	Thu Feb 09 17:35:23 2012 +0100
+++ b/iro/model/job.py	Thu Feb 09 17:36:31 2012 +0100
@@ -1,13 +1,8 @@
-import random
 from collections import MutableMapping
 
-from twisted.python import threadable
-
-from .schema import Job as DBJob
+import schema
 from .dbdefer import dbdefer
 
-from iro.error import JobNotFound
-
 class Status:
     '''status for one recipient'''
     todo = 1
@@ -19,16 +14,17 @@
         self.offer = None           #the offer over that this job was done
         self.errtext = None         #the error text
 
-class Job:
+class ExJob:
     '''One Job is a class that handles one job has multiple tasks'''
-    def __init__(self, user, recipients, message, offers, info=None):
-        self.dbjob = None       #Connection to mysql job element
+    def __init__(self, dbjob, recipients, message, offers):
+        self.dbjob = dbjob       #Connection to mysql job element (id)
         self.message = message
         self.recipients = recipients
         self.offers = offers
-        self.info = info
         self.tasks={}
-        self.user = user
+
+    def addTask(self,task):
+        self.tasks[task.recipient] = task
 
     def setStatus(task,status):
         pass
@@ -36,57 +32,16 @@
     def setError(task,err):
         pass
 
+class ExJobs(dict, MutableMapping):
+
     @dbdefer
-    def registerJob(self, session, id):
-        self.id = id
-        u = session.merge(self.user)
-        self.dbjob=DBJob(hash=self.id, info=self.info, status="started")
-        u.jobs.append(self.dbjob)
+    def create(self, session, user, recipients, message, offers, info=None):
+        user = session.merge(user)
+        job = schema.Job(info=info, status="started")
+        user.jobs.append(job)
         session.commit()
 
-    @classmethod
-    @dbdefer
-    def fromDB(cls, session, id):
-        j = session.query(DBJob).filter_by(hash=id).first()
-        if not j:
-            raise JobNotFound
-        job = cls(j.user, [], None, None, j.info)
-        job.dbjob = j
-        job.id = j.hash
-        return job
-
-
-class Jobs(dict, MutableMapping):
-
-    synchronized = ['getNewId']
-
-    def create(self, user, recipients, message, offers, info=None):
-        job = Job(user, recipients, message, offers, info)
-        job.registerJob(id = self.getNewId())
-        self[job.id] = job
-        return job
+        self[job.id] = ExJob(job.id, recipients, message, offers)
+        return self[job.id]
 
-    @dbdefer
-    def getNewId(self, session):
-        while True:
-            id = ''.join([random.choice('0123456789abcdef') for i in range(40)])
-            if id not in self.keys():
-                self[id]=None
-                if not session.query(DBJob.hash).filter_by(hash=id).first():
-                    return id
-
-    def __getitem__(self, key):
-        try:
-            return dict.__getitem__(self, key)
-        except KeyError as e:
-            pass
-       
-        try:
-            self[key]=Job.fromDB(key)
-            return self[key]
-        except JobNotFound:
-            raise e
-
-threadable.synchronize(Jobs)
-
-jobs = Jobs()
+exJobs = ExJobs()
--- a/iro/model/schema.py	Thu Feb 09 17:35:23 2012 +0100
+++ b/iro/model/schema.py	Thu Feb 09 17:36:31 2012 +0100
@@ -10,6 +10,7 @@
 from sqlalchemy import and_
 import sqlalchemy.sql.functions as func
 
+import job
 from ..error import JobNotFound
 
 Base = declarative_base()
@@ -76,6 +77,11 @@
     user_id = Column("user", String(100), ForeignKey('apiuser.name'))
     user = relationship("User", backref=backref('jobs'))
 
+    @property
+    def extend(self):
+        return job.exJobs[self.id]
+
+
     def __repr__(self):
         return "<Job('%s','%s','%s','%s')>"%(self.id,self.info, self.status, self.user_id)
 
--- a/iro/model/session.py	Thu Feb 09 17:35:23 2012 +0100
+++ b/iro/model/session.py	Thu Feb 09 17:36:31 2012 +0100
@@ -1,8 +1,8 @@
 from sqlalchemy.orm.session import Session
 
-from .schema import Offer
+import schema
 
 class IroSession(Session):
     @property
     def typs(self):
-        return self.query(Offer.typ).distinct()
+        return self.query(schema.Offer.typ).distinct()
--- a/tests/job.py	Thu Feb 09 17:35:23 2012 +0100
+++ b/tests/job.py	Thu Feb 09 17:36:31 2012 +0100
@@ -1,42 +1,41 @@
 import unittest
+from Queue import deque
 
-from iro.controller.task import createJob
+from iro.controller.task import createJob, Task
+from iro.controller.pool  import taskPool
 
-from iro.model.job import jobs, Job
+from iro.model.job import exJobs, ExJob
 
 from iro.model.pool import data
 
 from iro.model.message import SMS, Fax, Mail
-from iro.model.schema import Job as DBJob, User
+from iro.model.schema import Job, User
 from iro.telnumber import Telnumber
 
-from iro.error import JobNotFound
-from iro.validate import vHash
+from iro.validate import vInteger
 
 
-from .dbtestcase import DBTestCase, setUpModule, tearDownModule
+from .dbtestcase import DBTestCase
 
 #activates all logging we can get.
 
-from twisted.python import log
-import logging
-logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s(%(processName)s)-%(levelname)s: %(message)s')
-observer = log.PythonLoggingObserver()
-observer.start()
-
 class DumpPool():
     def run(self, f,*a,**k):
         return f(*a,**k)
 
 
-data.pool=DumpPool()
-
+class exJobsTest(DBTestCase):
+    '''tests for exJobs'''
 
-class JobsTest(DBTestCase):
-    '''tests for jobs'''
+    def setUp(self):
+        DBTestCase.setUp(self)
+        self.pool = data.pool
+        data.pool = DumpPool()
 
     def tearDown(self):
-        jobs.clear()
+        exJobs.clear()
+        data.pool = self.pool
+        self.pool = None
         DBTestCase.tearDown(self)
 
     def testCreate(self):
@@ -44,63 +43,87 @@
             u = User(name='test',apikey='abcdef123456789')
             session.add(u)
 
-        job = jobs.create(u, [Telnumber('123456789')], SMS('test'), ['test'])
-        self.assertIsInstance(job, Job)
-        self.assertTrue(vHash(job.id, None, 40, 40))
+        job = exJobs.create(u, [Telnumber('123456789')], SMS('test'), ['test'])
+        self.assertIsInstance(job, ExJob)
+        self.assertTrue(vInteger(job.dbjob, None, minv=0 ))
+        self.assertEqual(job.message, SMS('test'))
+        self.assertEqual(job.recipients,  [Telnumber('123456789')])
+        self.assertEqual(job.offers,['test'])
+        self.assertEqual(job.tasks,{})
 
         with self.session() as session:
-            j = session.query(DBJob.hash).all()
-            self.assertEqual(j,[(job.id,)])
+            j = session.query(Job.id).all()
+            self.assertEqual(j,[(job.dbjob,)])
 
-        self.assertEqual(jobs[job.id],job)
+        self.assertEqual(exJobs[job.dbjob],job)
 
     def testGet(self):
         with self.session() as session:
             u = User(name='test',apikey='abcdef123456789')
             session.add(u)
 
-        job = Job(u, [Telnumber('123456789')], SMS('test'), ['test'])
-        jobs['a1'] = job
+        job = ExJob(None, [Telnumber('123456789')], SMS('test'), ['test'])
+        exJobs[1] = job
 
-        self.assertEqual(len(jobs), 1)
-        self.assertEqual(job, jobs['a1'])
+        self.assertEqual(len(exJobs), 1)
+        self.assertEqual(job, exJobs[1])
 
     def testGetFromDB(self):
         with self.session() as session:
             u = User(name='test',apikey='abcdef123456789')
-            job = DBJob(hash='a1', info="info", status="started")
+            job = Job( info="info", status="started")
             u.jobs.append(job)
             session.add(u)
 
         with self.session() as session:
             job = session.merge(job)
             u = session.merge(u)
-            self.assertEqual(repr(jobs['a1'].dbjob),repr(job))
-            self.assertEqual(repr(jobs['a1'].user),repr(u))
-            self.assertEqual(jobs['a1'].info, 'info')
+            ejob= ExJob(job.id, [Telnumber('123456789')], SMS('test'), ['test'])
+            exJobs[job.id]=ejob
+            self.assertEqual(job.extend, ejob)
+            self.assertEqual(u.jobs[0].extend,ejob)
 
-    def testUnknownJob(self):
-        with self.assertRaises(JobNotFound):
-            Job.fromDB('a1234567890')
-
-        with self.assertRaises(KeyError):
-            jobs['a1234567890']
-
-    @unittest.skip('test not implemented')
-    def testSyncroniced(self):
-        pass
+    def testUnknownExJob(self):
+        self.assertRaises(KeyError,exJobs.__getitem__,'a1234567890')
 
 class JobTest(DBTestCase):
+   
+    def setUp(self):
+        DBTestCase.setUp(self)
+        self.pool = data.pool
+        data.pool = DumpPool()
+
+    def tearDown(self):
+        exJobs.clear()
+        data.pool = self.pool
+        self.pool = None
+        taskPool.pool.q.queue = deque()
+        DBTestCase.tearDown(self)
+    
     def testCreateSMS(self):
-        job = createJob([],SMS('sms'),[])
-        pass
+        with self.session() as session:
+            u = User(name='test',apikey='abcdef123456789')
+            session.add(u)
+        exjob = createJob(u,[Telnumber('0123325456')],SMS('sms'),[])
+        self.assertEqual(taskPool.pool.q.qsize(),1)
+        self.assertEqual(exjob.tasks.keys(),[Telnumber('0123325456')])
+        self.assertIsInstance(exjob.tasks[Telnumber('0123325456')], Task)
+
+
 
     def testCreateFax(self):
-        job = createJob([],Fax('header','fax',[]),[])
-        pass
+        with self.session() as session:
+            u = User(name='test',apikey='abcdef123456789')
+            session.add(u)
+        job = createJob(u,[Telnumber('0123325456')],Fax('header','fax',[]),[])
+
+        self.assertEqual(taskPool.pool.q.qsize(),1)
 
     def testCreateMail(self):
-        job = createJob([],Mail('sub','body','t@t.de'),[])
+        with self.session() as session:
+            u = User(name='test',apikey='abcdef123456789')
+            session.add(u)
+        job = createJob(u,[],Mail('sub','body','t@t.de'),[])
     
 
 if __name__ == '__main__':