--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/old/testloglock.py Tue Jan 10 06:10:38 2012 +0100
@@ -0,0 +1,98 @@
+
+import unittest
+import os
+import tempfile
+import signal
+import threading
+import time
+
+class testLogLock(unittest.TestCase):
+ def test_ThreadingAndLocks(self):
+ #create a thread, have that thread grab a lock, and sleep.
+ fd, file = tempfile.mkstemp('.nonlog')
+ _lock = threading.RLock()
+ os.close(fd)
+ def locker():
+ os.write(fd, 'Thread acquiring lock\n')
+ _lock.acquire()
+ os.write(fd, 'Thread acquired lock\n')
+ time.sleep(0.4)
+ os.write(fd, 'Thread releasing lock\n')
+ _lock.release()
+ os.write(fd, 'Thread released lock\n')
+
+ #Then in the main thread, throw a signal to self
+ def handleSignal(sigNum, frame):
+ os.write(fd, 'Main Thread acquiring lock\n')
+ lock = False
+ endtime = time.time() + 1.0
+ while not lock:
+ lock = _lock.acquire(blocking=0)
+ time.sleep(0.01)
+ if time.time() > endtime:
+ break
+ if not lock:
+ os.write(fd, 'Main Thread could not acquire lock\n')
+ return
+ os.write(fd, 'Main Thread acquired lock\n')
+ os.write(fd, 'Main Thread releasing lock\n')
+ _lock.release()
+ os.write(fd, 'Main Thread released lock\n')
+
+ sighndlr = signal.signal(signal.SIGUSR1, handleSignal)
+ try:
+ fd = os.open(file, os.O_SYNC | os.O_WRONLY | os.O_CREAT)
+ thread = threading.Thread(target=locker)
+ thread.start()
+ time.sleep(0.1)
+ os.kill(os.getpid(), signal.SIGUSR1)
+ thread.join()
+
+ #check the results
+ os.close(fd)
+ fileconts = open(file, 'r').read()
+ self.assertTrue('Main Thread released lock' in fileconts)
+ self.assertEqual(fileconts,
+ '''Thread acquiring lock
+Thread acquired lock
+Main Thread acquiring lock
+Thread releasing lock
+Thread released lock
+Main Thread acquired lock
+Main Thread releasing lock
+Main Thread released lock
+''')
+
+ #Now try after acquiring the lock from the main thread
+ fd = os.open(file, os.O_SYNC | os.O_WRONLY | os.O_CREAT)
+ _lock.acquire()
+ thread = threading.Thread(target=locker)
+ thread.start()
+ time.sleep(0.1)
+ os.kill(os.getpid(), signal.SIGUSR1)
+ _lock.release()
+ thread.join()
+ os.close(fd)
+ fileconts = open(file, 'r').read()
+ self.assertEqual(fileconts,
+ '''Thread acquiring lock
+Main Thread acquiring lock
+Main Thread acquired lock
+Main Thread releasing lock
+Main Thread released lock
+Thread acquired lock
+Thread releasing lock
+Thread released lock
+''')
+
+ finally:
+ signal.signal(signal.SIGUSR1, sighndlr)
+ try:
+ os.close(fd)
+ except OSError:
+ pass
+ try:
+ os.unlink(file)
+ except OSError:
+ pass
+