instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / Overlay / OverlayThreadingBridge.py
1 # Written by Arno Bakker, George Milescu
2 # see LICENSE.txt for license information
3 #
4 # This class bridges between the OverlayApps class and the SecureOverlay
5 # and ensures that all upcalls made by the NetworkThread via the SecureOverlay
6 # are handed over to a different thread, the OverlayThread that propagates the
7 # upcall to the OverlayApps.
8
9
10 import sys
11 from threading import currentThread
12 from traceback import print_exc
13
14 from BaseLib.Core.Overlay.SecureOverlay import CloseException
15 from BaseLib.Core.BitTornado.BT1.MessageID import getMessageName
16 from BaseLib.Core.Utilities.utilities import show_permid_short
17 from BaseLib.Utilities.TimedTaskQueue import TimedTaskQueue
18 import threading
19
20 DEBUG = False
21
22 class OverlayThreadingBridge:
23
24     __single = None
25     lock = threading.Lock()
26
27     def __init__(self):
28         if OverlayThreadingBridge.__single:
29             raise RuntimeError, "OverlayThreadingBridge is Singleton"
30         OverlayThreadingBridge.__single = self 
31         
32         self.secover = None
33         self.olapps = None
34         self.olappsmsghandler = None
35         self.olappsconnhandler = None
36
37         # Current impl of wrapper: single thread
38         self.tqueue = TimedTaskQueue(nameprefix="Overlay")
39
40     def getInstance(*args, **kw):
41         # Singleton pattern with double-checking
42         if OverlayThreadingBridge.__single is None:
43             OverlayThreadingBridge.lock.acquire()   
44             try:
45                 if OverlayThreadingBridge.__single is None:
46                     OverlayThreadingBridge(*args, **kw)
47             finally:
48                 OverlayThreadingBridge.lock.release()
49         return OverlayThreadingBridge.__single
50     getInstance = staticmethod(getInstance)
51
52     def resetSingleton(self):
53         """ For testing purposes """
54         OverlayThreadingBridge.__single = None 
55
56     def register_bridge(self,secover,olapps):
57         """ Called by MainThread """
58         self.secover = secover
59         self.olapps = olapps
60         
61         secover.register_recv_callback(self.handleMessage)
62         secover.register_conns_callback(self.handleConnection)
63
64     #
65     # SecOverlay interface
66     #
67     def register(self,launchmanycore,max_len):
68         """ Called by MainThread """
69         self.secover.register(launchmanycore,max_len)
70
71         # FOR TESTING ONLY
72         self.iplport2oc = self.secover.iplport2oc
73
74     def get_handler(self):
75         return self.secover
76
77     def start_listening(self):
78         """ Called by MainThread """
79         self.secover.start_listening()
80
81     def register_recv_callback(self,callback):
82         """ Called by MainThread """
83         self.olappsmsghandler = callback
84
85     def register_conns_callback(self,callback):
86         """ Called by MainThread """
87         self.olappsconnhandler = callback
88
89     def handleConnection(self,exc,permid,selversion,locally_initiated,hisdns):
90         """ Called by NetworkThread """
91         # called by SecureOverlay.got_auth_connection() or cleanup_admin_and_callbacks()
92         if DEBUG:
93             print >>sys.stderr,"olbridge: handleConnection",exc,show_permid_short(permid),selversion,locally_initiated,hisdns,currentThread().getName()
94         
95         def olbridge_handle_conn_func():
96             # Called by OverlayThread
97
98             if DEBUG:
99                 print >>sys.stderr,"olbridge: handle_conn_func",exc,show_permid_short(permid),selversion,locally_initiated,hisdns,currentThread().getName()
100              
101             try:
102                 if hisdns:
103                     self.secover.add_peer_to_db(permid,hisdns,selversion)
104                     
105                 if self.olappsconnhandler is not None:    # self.olappsconnhandler = OverlayApps.handleConnection 
106                     self.olappsconnhandler(exc,permid,selversion,locally_initiated)
107             except:
108                 print_exc()
109                 
110             if isinstance(exc,CloseException):
111                 self.secover.update_peer_status(permid,exc.was_auth_done())
112                 
113         self.tqueue.add_task(olbridge_handle_conn_func,0)
114         
115     def handleMessage(self,permid,selversion,message):
116         """ Called by NetworkThread """
117         #ProxyService_
118         #
119         # DEBUG
120         #print "### olbridge: handleMessage", show_permid_short(permid), selversion, getMessageName(message[0]), currentThread().getName()
121         #
122         #_ProxyService
123         
124         if DEBUG:
125             print >>sys.stderr,"olbridge: handleMessage",show_permid_short(permid),selversion,getMessageName(message[0]),currentThread().getName()
126         
127         def olbridge_handle_msg_func():
128             # Called by OverlayThread
129             
130             if DEBUG:
131                 print >>sys.stderr,"olbridge: handle_msg_func",show_permid_short(permid),selversion,getMessageName(message[0]),currentThread().getName()
132              
133             try:
134                 if self.olappsmsghandler is None:
135                     ret = True
136                 else:
137                     ret = self.olappsmsghandler(permid,selversion,message)
138             except:
139                 print_exc()
140                 ret = False
141             if ret == False:
142                 if DEBUG:
143                     print >>sys.stderr,"olbridge: olbridge_handle_msg_func closing!",show_permid_short(permid),selversion,getMessageName(message[0]),currentThread().getName()
144                 self.close(permid)
145                 
146         self.tqueue.add_task(olbridge_handle_msg_func,0)
147         return True
148
149
150     def connect_dns(self,dns,callback):
151         """ Called by OverlayThread/NetworkThread """
152         
153         if DEBUG:
154             print >>sys.stderr,"olbridge: connect_dns",dns
155         
156         def olbridge_connect_dns_callback(cexc,cdns,cpermid,cselver):
157             # Called by network thread
158
159             if DEBUG:
160                 print >>sys.stderr,"olbridge: connect_dns_callback",cexc,cdns,show_permid_short(cpermid),cselver
161              
162             olbridge_connect_dns_callback_lambda = lambda:callback(cexc,cdns,cpermid,cselver)
163             self.add_task(olbridge_connect_dns_callback_lambda,0)
164             
165         self.secover.connect_dns(dns,olbridge_connect_dns_callback)
166
167
168     def connect(self,permid,callback):
169         """ Called by OverlayThread """
170
171         if DEBUG:
172             print >>sys.stderr,"olbridge: connect",show_permid_short(permid), currentThread().getName()
173         
174         def olbridge_connect_callback(cexc,cdns,cpermid,cselver):
175             # Called by network thread
176             
177             if DEBUG:
178                 print >>sys.stderr,"olbridge: connect_callback",cexc,cdns,show_permid_short(cpermid),cselver, callback, currentThread().getName()
179
180              
181             olbridge_connect_callback_lambda = lambda:callback(cexc,cdns,cpermid,cselver)
182             # Jie: postpone to call this callback to schedule it after the peer has been added to buddycast connection list
183             # Arno, 2008-09-15: No-no-no
184             self.add_task(olbridge_connect_callback_lambda,0)    
185             
186         self.secover.connect(permid,olbridge_connect_callback)
187
188
189     def send(self,permid,msg,callback):
190         """ Called by OverlayThread """
191
192         if DEBUG:
193             print >>sys.stderr,"olbridge: send",show_permid_short(permid),len(msg)
194
195         def olbridge_send_callback(cexc,cpermid):
196             # Called by network thread
197             
198             if DEBUG:
199                 print >>sys.stderr,"olbridge: send_callback",cexc,show_permid_short(cpermid)
200
201              
202             olbridge_send_callback_lambda = lambda:callback(cexc,cpermid)
203             self.add_task(olbridge_send_callback_lambda,0)
204         
205         self.secover.send(permid,msg,olbridge_send_callback)
206
207     def close(self,permid):
208         """ Called by OverlayThread """
209         self.secover.close(permid)
210         
211     def add_task(self,task,t=0,ident=None):
212         """ Called by OverlayThread """
213         self.tqueue.add_task(task,t,ident)
214         
215 #===============================================================================
216 #    # Jie: according to Arno's suggestion, commit on demand instead of periodically
217 #    def periodic_commit(self):
218 #        period = 5*60    # commit every 5 min
219 #        try:
220 #            db = SQLiteCacheDB.getInstance()
221 #            db.commit()
222 #        except:
223 #            period = period*2
224 #        self.add_task(self.periodic_commit, period)
225 #        
226 #===============================================================================
227         
228