iro/xmlserver_test.py
changeset 0 a3b6e531f0d2
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/iro/xmlserver_test.py	Thu Oct 22 10:00:01 2009 +0200
@@ -0,0 +1,139 @@
+# Server code
+
+from xmlrpc import SecureXMLRPCServer,SecureAuthentificateXMLRPCRequestHandler
+import md5, time, os
+
+class Job:
+    def __init__(self,provider,name):
+        self.provider=provider
+        self.name=name
+        self.status="stopped"
+
+    def start(self):
+        self.status="started"
+
+    def stop(self):
+        self.status="stopped"
+
+class SMSJob(Job):
+    def __init__(self,provider,message,recipients):
+        self.message=message
+        self.recipients=recipients
+        Job.__init__(self,provider,message)
+
+class Jobs:
+    def __init__(self,queue,provider):
+        self.jobs={}
+        self.queue=queue
+        self.provider=provider
+
+    def __getitem__(self,key):
+        return self.jobs[key]
+
+    def newSMS(self,message,recipients):
+        job=SMSJob(self.provider,message,recipients)
+        id = self._createID()
+        self.jobs[id]=job
+        #self.queue.put(job)
+        return id
+
+    def _createID(self):
+        while True:
+            m = md5.new()
+            m.update(str(time.time()))
+            m.update(os.urandom(10))
+            if not self.jobs.has_key(m.hexdigest):
+                self.jobs[m.hexdigest()]=None
+                break
+        return m.hexdigest()
+
+
+class User: 
+    def __init__(self,jobqueue):
+        self.jobqueue=jobqueue
+        self.jobs={}
+
+    def status(self,id=None):
+        if id==None:
+            jobs=self.jobs
+        else:
+            try:
+                jobs={id:self.jobs[id]}
+            except:
+                raise String("No Job with ID: %i" %(id))
+        ret={}
+        for key in jobs:
+            job=jobs[key]
+            ret[key]={"name":job.name,"status":job.status}
+        return ret
+
+    def stop(self,id):
+        try:
+            job=self.jobs[id]
+        except:
+            raise String("No Job with ID: %i" %(id))
+
+        job.stop()
+
+    
+    def startSMS(self,message,recipients):
+        id = self.jobqueue.newSMS(message,recipients)
+        self.jobs[id]=self.jobqueue[id]
+        return id
+
+class UserDB:
+    def __init__(self,userlist,jobqueue):
+        self.salt=os.urandom(20)
+        self.jobqueue=jobqueue
+        self.userlist={}
+        for user in userlist:
+            self.createUser(user)
+
+    def createHash(self,username,password):
+        m=md5.new()
+        m.update(self.salt)
+        m.update(username)
+        m.update(password)
+        return m.hexdigest()
+
+    def createUser(self,user):
+        self.userlist[self.createHash(user.username,user.password)]=User(self.jobqueue)
+
+    def __getitem__(self,key):
+        return self.userlist[key]
+
+    
+class SecureAuthentificateXMLRPCRequestHandler2(SecureAuthentificateXMLRPCRequestHandler):
+
+    def testUser(self,username,password):
+        return self.server.activateUser(username,password)
+
+class SecureUserDBXMLRPCServer(SecureXMLRPCServer):
+    def __init__(self, addr, userdb, 
+                 requestHandler=SecureAuthentificateXMLRPCRequestHandler2,
+                 certificate="server.cert", privatekey="server.pem",
+                 logRequests=1):
+        SecureXMLRPCServer.__init__(self, addr, requestHandler, certificate, privatekey, logRequests)
+        self.userdb=userdb
+
+    def activateUser(self,username,password):
+        try:
+            user = self.userdb[self.userdb.createHash(username,password)]
+            self.register_instance(user)
+            return True
+        except KeyError:
+            return False
+        
+
+
+def test():
+    server = SecureXMLRPCServer(("localhost", 8000),requestHandler=SecureAuthentificateXMLRPCRequestHandler,certificate="./certs/test.cert.pem",privatekey="./certs/test.key.pem")
+    server.register_introspection_functions()
+    server.register_instance(StringFunctions())
+    server.register_function(lambda x: x*x, 'square')
+    server.serve_forever()
+
+if __name__ == '__main__':
+    test()
+
+