instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Utilities / TimedTaskQueue.py
1 # Written by Arno Bakker
2 # see LICENSE.txt for license information
3 #
4 # TimedTaskQueue is a server that executes tasks on behalf of e.g. the GUI that
5 # are too time consuming to be run by the actual GUI Thread (MainThread). Note 
6 # that you still need to delegate the actual updating of the GUI to the 
7 # MainThread via the wx.CallAfter mechanism.
8 #
9 import sys
10
11 from threading import Thread,Condition
12 from traceback import print_exc,print_stack
13 from time import time
14
15 DEBUG = False
16
17 class TimedTaskQueue:
18     
19     __single = None
20     
21     def __init__(self,nameprefix="TimedTaskQueue",isDaemon=True):
22         self.cond = Condition()
23         self.queue = []
24         self.count = 0.0 # serves to keep task that were scheduled at the same time in FIFO order
25         self.thread = Thread(target = self.run)
26         self.thread.setDaemon(isDaemon)
27         self.thread.setName( nameprefix+self.thread.getName() )
28         self.thread.start()
29         
30     def add_task(self,task,t=0,id=None):
31         """ t parameter is now usable, unlike before. 
32             If id is given, all the existing tasks with the same id will be removed
33             before inserting this task 
34         """
35         
36         if task is None:
37             print_stack()
38         
39         self.cond.acquire()
40         when = time()+t
41         if DEBUG:
42             print >>sys.stderr,"ttqueue: ADD EVENT",t,task
43             
44         if id != None:  # remove all redundant tasks
45             self.queue = filter(lambda item:item[2]!=id, self.queue)
46         self.queue.append((when,self.count,task,id))
47         self.count += 1.0
48         self.cond.notify()
49         self.cond.release()
50         
51     def run(self):
52         """ Run by server thread """
53         while True:
54             task = None
55             timeout = None
56             flag = False
57             self.cond.acquire()
58             while True:
59                 while len(self.queue) == 0 or flag:
60                     flag = False
61                     if timeout is None:
62                         # Wait until something is queued
63                         self.cond.wait()
64                     else:
65                         # Wait till first event is due
66                         self.cond.wait(timeout)
67                 # A new event was added or an event is due
68                 self.queue.sort()
69                 
70                 (when,count,task,id) = self.queue[0]
71                 if DEBUG:
72                     print >>sys.stderr,"ttqueue: EVENT IN QUEUE",when,task
73                 now = time()
74                 if now < when:
75                     # Event not due, wait some more
76                     if DEBUG:
77                         print >>sys.stderr,"ttqueue: EVENT NOT TILL",when-now
78                     timeout = when-now
79                     flag = True
80                 else:
81                     # Event due, execute
82                     if DEBUG:
83                         print >>sys.stderr,"ttqueue: EVENT DUE"
84                     self.queue.pop(0)
85                     break
86             self.cond.release()
87             
88             # Execute task outside lock
89             try:
90                 # 'stop' and 'quit' are only used for unit test
91                 if task == 'stop':  
92                     break
93                 elif task == 'quit':
94                     if len(self.queue) == 0:
95                         break
96                     else:
97                         (when,count,task,id) = self.queue[-1]
98                         t = when-time()+0.001
99                         self.add_task('quit',t)
100                 else:
101                     task()        
102             except:
103                 print_exc()
104         
105