# HG changeset patch # User Sandro Knauß # Date 1328805391 -3600 # Node ID 75d0eaaa871d8c0f534b68ee5a1dfb6038664b93 # Parent 65117fd284001f8c5324c1999f57092477b9bdb8 start fixing job mangement diff -r 65117fd28400 -r 75d0eaaa871d iro/controller/task.py --- a/iro/controller/task.py Thu Feb 09 17:35:23 2012 +0100 +++ b/iro/controller/task.py Thu Feb 09 17:36:31 2012 +0100 @@ -3,7 +3,7 @@ from ..error import NoRoute, RejectRecipient from ..model.offer import getPossibleOffers -from ..model.job import jobs +from ..model.job import exJobs from .pool import taskPool @@ -31,8 +31,8 @@ raise NoRoute() -def createJob(recipients, msg, offers): - job = jobs.create(recipients, msg, offers) +def createJob(user,recipients, msg, offers, info=None): + job = exJobs.create(user, recipients, msg, offers, info) for r in recipients: task = Task(r,job) job.addTask(task) diff -r 65117fd28400 -r 75d0eaaa871d iro/model/__init__.py --- a/iro/model/__init__.py Thu Feb 09 17:35:23 2012 +0100 +++ b/iro/model/__init__.py Thu Feb 09 17:36:31 2012 +0100 @@ -1,6 +1,7 @@ import schema import user import utils +import job from dbdefer import setEngine from pool import setPool diff -r 65117fd28400 -r 75d0eaaa871d iro/model/job.py --- a/iro/model/job.py Thu Feb 09 17:35:23 2012 +0100 +++ b/iro/model/job.py Thu Feb 09 17:36:31 2012 +0100 @@ -1,13 +1,8 @@ -import random from collections import MutableMapping -from twisted.python import threadable - -from .schema import Job as DBJob +import schema from .dbdefer import dbdefer -from iro.error import JobNotFound - class Status: '''status for one recipient''' todo = 1 @@ -19,16 +14,17 @@ self.offer = None #the offer over that this job was done self.errtext = None #the error text -class Job: +class ExJob: '''One Job is a class that handles one job has multiple tasks''' - def __init__(self, user, recipients, message, offers, info=None): - self.dbjob = None #Connection to mysql job element + def __init__(self, dbjob, recipients, message, offers): + self.dbjob = dbjob #Connection to mysql job element (id) self.message = message self.recipients = recipients self.offers = offers - self.info = info self.tasks={} - self.user = user + + def addTask(self,task): + self.tasks[task.recipient] = task def setStatus(task,status): pass @@ -36,57 +32,16 @@ def setError(task,err): pass +class ExJobs(dict, MutableMapping): + @dbdefer - def registerJob(self, session, id): - self.id = id - u = session.merge(self.user) - self.dbjob=DBJob(hash=self.id, info=self.info, status="started") - u.jobs.append(self.dbjob) + def create(self, session, user, recipients, message, offers, info=None): + user = session.merge(user) + job = schema.Job(info=info, status="started") + user.jobs.append(job) session.commit() - @classmethod - @dbdefer - def fromDB(cls, session, id): - j = session.query(DBJob).filter_by(hash=id).first() - if not j: - raise JobNotFound - job = cls(j.user, [], None, None, j.info) - job.dbjob = j - job.id = j.hash - return job - - -class Jobs(dict, MutableMapping): - - synchronized = ['getNewId'] - - def create(self, user, recipients, message, offers, info=None): - job = Job(user, recipients, message, offers, info) - job.registerJob(id = self.getNewId()) - self[job.id] = job - return job + self[job.id] = ExJob(job.id, recipients, message, offers) + return self[job.id] - @dbdefer - def getNewId(self, session): - while True: - id = ''.join([random.choice('0123456789abcdef') for i in range(40)]) - if id not in self.keys(): - self[id]=None - if not session.query(DBJob.hash).filter_by(hash=id).first(): - return id - - def __getitem__(self, key): - try: - return dict.__getitem__(self, key) - except KeyError as e: - pass - - try: - self[key]=Job.fromDB(key) - return self[key] - except JobNotFound: - raise e - -threadable.synchronize(Jobs) - -jobs = Jobs() +exJobs = ExJobs() diff -r 65117fd28400 -r 75d0eaaa871d iro/model/schema.py --- a/iro/model/schema.py Thu Feb 09 17:35:23 2012 +0100 +++ b/iro/model/schema.py Thu Feb 09 17:36:31 2012 +0100 @@ -10,6 +10,7 @@ from sqlalchemy import and_ import sqlalchemy.sql.functions as func +import job from ..error import JobNotFound Base = declarative_base() @@ -76,6 +77,11 @@ user_id = Column("user", String(100), ForeignKey('apiuser.name')) user = relationship("User", backref=backref('jobs')) + @property + def extend(self): + return job.exJobs[self.id] + + def __repr__(self): return ""%(self.id,self.info, self.status, self.user_id) diff -r 65117fd28400 -r 75d0eaaa871d iro/model/session.py --- a/iro/model/session.py Thu Feb 09 17:35:23 2012 +0100 +++ b/iro/model/session.py Thu Feb 09 17:36:31 2012 +0100 @@ -1,8 +1,8 @@ from sqlalchemy.orm.session import Session -from .schema import Offer +import schema class IroSession(Session): @property def typs(self): - return self.query(Offer.typ).distinct() + return self.query(schema.Offer.typ).distinct() diff -r 65117fd28400 -r 75d0eaaa871d tests/job.py --- a/tests/job.py Thu Feb 09 17:35:23 2012 +0100 +++ b/tests/job.py Thu Feb 09 17:36:31 2012 +0100 @@ -1,42 +1,41 @@ import unittest +from Queue import deque -from iro.controller.task import createJob +from iro.controller.task import createJob, Task +from iro.controller.pool import taskPool -from iro.model.job import jobs, Job +from iro.model.job import exJobs, ExJob from iro.model.pool import data from iro.model.message import SMS, Fax, Mail -from iro.model.schema import Job as DBJob, User +from iro.model.schema import Job, User from iro.telnumber import Telnumber -from iro.error import JobNotFound -from iro.validate import vHash +from iro.validate import vInteger -from .dbtestcase import DBTestCase, setUpModule, tearDownModule +from .dbtestcase import DBTestCase #activates all logging we can get. -from twisted.python import log -import logging -logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s(%(processName)s)-%(levelname)s: %(message)s') -observer = log.PythonLoggingObserver() -observer.start() - class DumpPool(): def run(self, f,*a,**k): return f(*a,**k) -data.pool=DumpPool() - +class exJobsTest(DBTestCase): + '''tests for exJobs''' -class JobsTest(DBTestCase): - '''tests for jobs''' + def setUp(self): + DBTestCase.setUp(self) + self.pool = data.pool + data.pool = DumpPool() def tearDown(self): - jobs.clear() + exJobs.clear() + data.pool = self.pool + self.pool = None DBTestCase.tearDown(self) def testCreate(self): @@ -44,63 +43,87 @@ u = User(name='test',apikey='abcdef123456789') session.add(u) - job = jobs.create(u, [Telnumber('123456789')], SMS('test'), ['test']) - self.assertIsInstance(job, Job) - self.assertTrue(vHash(job.id, None, 40, 40)) + job = exJobs.create(u, [Telnumber('123456789')], SMS('test'), ['test']) + self.assertIsInstance(job, ExJob) + self.assertTrue(vInteger(job.dbjob, None, minv=0 )) + self.assertEqual(job.message, SMS('test')) + self.assertEqual(job.recipients, [Telnumber('123456789')]) + self.assertEqual(job.offers,['test']) + self.assertEqual(job.tasks,{}) with self.session() as session: - j = session.query(DBJob.hash).all() - self.assertEqual(j,[(job.id,)]) + j = session.query(Job.id).all() + self.assertEqual(j,[(job.dbjob,)]) - self.assertEqual(jobs[job.id],job) + self.assertEqual(exJobs[job.dbjob],job) def testGet(self): with self.session() as session: u = User(name='test',apikey='abcdef123456789') session.add(u) - job = Job(u, [Telnumber('123456789')], SMS('test'), ['test']) - jobs['a1'] = job + job = ExJob(None, [Telnumber('123456789')], SMS('test'), ['test']) + exJobs[1] = job - self.assertEqual(len(jobs), 1) - self.assertEqual(job, jobs['a1']) + self.assertEqual(len(exJobs), 1) + self.assertEqual(job, exJobs[1]) def testGetFromDB(self): with self.session() as session: u = User(name='test',apikey='abcdef123456789') - job = DBJob(hash='a1', info="info", status="started") + job = Job( info="info", status="started") u.jobs.append(job) session.add(u) with self.session() as session: job = session.merge(job) u = session.merge(u) - self.assertEqual(repr(jobs['a1'].dbjob),repr(job)) - self.assertEqual(repr(jobs['a1'].user),repr(u)) - self.assertEqual(jobs['a1'].info, 'info') + ejob= ExJob(job.id, [Telnumber('123456789')], SMS('test'), ['test']) + exJobs[job.id]=ejob + self.assertEqual(job.extend, ejob) + self.assertEqual(u.jobs[0].extend,ejob) - def testUnknownJob(self): - with self.assertRaises(JobNotFound): - Job.fromDB('a1234567890') - - with self.assertRaises(KeyError): - jobs['a1234567890'] - - @unittest.skip('test not implemented') - def testSyncroniced(self): - pass + def testUnknownExJob(self): + self.assertRaises(KeyError,exJobs.__getitem__,'a1234567890') class JobTest(DBTestCase): + + def setUp(self): + DBTestCase.setUp(self) + self.pool = data.pool + data.pool = DumpPool() + + def tearDown(self): + exJobs.clear() + data.pool = self.pool + self.pool = None + taskPool.pool.q.queue = deque() + DBTestCase.tearDown(self) + def testCreateSMS(self): - job = createJob([],SMS('sms'),[]) - pass + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + exjob = createJob(u,[Telnumber('0123325456')],SMS('sms'),[]) + self.assertEqual(taskPool.pool.q.qsize(),1) + self.assertEqual(exjob.tasks.keys(),[Telnumber('0123325456')]) + self.assertIsInstance(exjob.tasks[Telnumber('0123325456')], Task) + + def testCreateFax(self): - job = createJob([],Fax('header','fax',[]),[]) - pass + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + job = createJob(u,[Telnumber('0123325456')],Fax('header','fax',[]),[]) + + self.assertEqual(taskPool.pool.q.qsize(),1) def testCreateMail(self): - job = createJob([],Mail('sub','body','t@t.de'),[]) + with self.session() as session: + u = User(name='test',apikey='abcdef123456789') + session.add(u) + job = createJob(u,[],Mail('sub','body','t@t.de'),[]) if __name__ == '__main__':