1 # Written by Arno Bakker, George Milescu
2 # see LICENSE.txt for license information
4 # This class bridges between the OverlayApps class and the SecureOverlay
5 # and ensures that all upcalls made by the NetworkThread via the SecureOverlay
6 # are handed over to a different thread, the OverlayThread that propagates the
7 # upcall to the OverlayApps.
11 from threading import currentThread
12 from traceback import print_exc
14 from BaseLib.Core.Overlay.SecureOverlay import CloseException
15 from BaseLib.Core.BitTornado.BT1.MessageID import getMessageName
16 from BaseLib.Core.Utilities.utilities import show_permid_short
17 from BaseLib.Utilities.TimedTaskQueue import TimedTaskQueue
22 class OverlayThreadingBridge:
25 lock = threading.Lock()
28 if OverlayThreadingBridge.__single:
29 raise RuntimeError, "OverlayThreadingBridge is Singleton"
30 OverlayThreadingBridge.__single = self
34 self.olappsmsghandler = None
35 self.olappsconnhandler = None
37 # Current impl of wrapper: single thread
38 self.tqueue = TimedTaskQueue(nameprefix="Overlay")
40 def getInstance(*args, **kw):
41 # Singleton pattern with double-checking
42 if OverlayThreadingBridge.__single is None:
43 OverlayThreadingBridge.lock.acquire()
45 if OverlayThreadingBridge.__single is None:
46 OverlayThreadingBridge(*args, **kw)
48 OverlayThreadingBridge.lock.release()
49 return OverlayThreadingBridge.__single
50 getInstance = staticmethod(getInstance)
52 def resetSingleton(self):
53 """ For testing purposes """
54 OverlayThreadingBridge.__single = None
56 def register_bridge(self,secover,olapps):
57 """ Called by MainThread """
58 self.secover = secover
61 secover.register_recv_callback(self.handleMessage)
62 secover.register_conns_callback(self.handleConnection)
65 # SecOverlay interface
67 def register(self,launchmanycore,max_len):
68 """ Called by MainThread """
69 self.secover.register(launchmanycore,max_len)
72 self.iplport2oc = self.secover.iplport2oc
74 def get_handler(self):
77 def start_listening(self):
78 """ Called by MainThread """
79 self.secover.start_listening()
81 def register_recv_callback(self,callback):
82 """ Called by MainThread """
83 self.olappsmsghandler = callback
85 def register_conns_callback(self,callback):
86 """ Called by MainThread """
87 self.olappsconnhandler = callback
89 def handleConnection(self,exc,permid,selversion,locally_initiated,hisdns):
90 """ Called by NetworkThread """
91 # called by SecureOverlay.got_auth_connection() or cleanup_admin_and_callbacks()
93 print >>sys.stderr,"olbridge: handleConnection",exc,show_permid_short(permid),selversion,locally_initiated,hisdns,currentThread().getName()
95 def olbridge_handle_conn_func():
96 # Called by OverlayThread
99 print >>sys.stderr,"olbridge: handle_conn_func",exc,show_permid_short(permid),selversion,locally_initiated,hisdns,currentThread().getName()
103 self.secover.add_peer_to_db(permid,hisdns,selversion)
105 if self.olappsconnhandler is not None: # self.olappsconnhandler = OverlayApps.handleConnection
106 self.olappsconnhandler(exc,permid,selversion,locally_initiated)
110 if isinstance(exc,CloseException):
111 self.secover.update_peer_status(permid,exc.was_auth_done())
113 self.tqueue.add_task(olbridge_handle_conn_func,0)
115 def handleMessage(self,permid,selversion,message):
116 """ Called by NetworkThread """
120 #print "### olbridge: handleMessage", show_permid_short(permid), selversion, getMessageName(message[0]), currentThread().getName()
125 print >>sys.stderr,"olbridge: handleMessage",show_permid_short(permid),selversion,getMessageName(message[0]),currentThread().getName()
127 def olbridge_handle_msg_func():
128 # Called by OverlayThread
131 print >>sys.stderr,"olbridge: handle_msg_func",show_permid_short(permid),selversion,getMessageName(message[0]),currentThread().getName()
134 if self.olappsmsghandler is None:
137 ret = self.olappsmsghandler(permid,selversion,message)
143 print >>sys.stderr,"olbridge: olbridge_handle_msg_func closing!",show_permid_short(permid),selversion,getMessageName(message[0]),currentThread().getName()
146 self.tqueue.add_task(olbridge_handle_msg_func,0)
150 def connect_dns(self,dns,callback):
151 """ Called by OverlayThread/NetworkThread """
154 print >>sys.stderr,"olbridge: connect_dns",dns
156 def olbridge_connect_dns_callback(cexc,cdns,cpermid,cselver):
157 # Called by network thread
160 print >>sys.stderr,"olbridge: connect_dns_callback",cexc,cdns,show_permid_short(cpermid),cselver
162 olbridge_connect_dns_callback_lambda = lambda:callback(cexc,cdns,cpermid,cselver)
163 self.add_task(olbridge_connect_dns_callback_lambda,0)
165 self.secover.connect_dns(dns,olbridge_connect_dns_callback)
168 def connect(self,permid,callback):
169 """ Called by OverlayThread """
172 print >>sys.stderr,"olbridge: connect",show_permid_short(permid), currentThread().getName()
174 def olbridge_connect_callback(cexc,cdns,cpermid,cselver):
175 # Called by network thread
178 print >>sys.stderr,"olbridge: connect_callback",cexc,cdns,show_permid_short(cpermid),cselver, callback, currentThread().getName()
181 olbridge_connect_callback_lambda = lambda:callback(cexc,cdns,cpermid,cselver)
182 # Jie: postpone to call this callback to schedule it after the peer has been added to buddycast connection list
183 # Arno, 2008-09-15: No-no-no
184 self.add_task(olbridge_connect_callback_lambda,0)
186 self.secover.connect(permid,olbridge_connect_callback)
189 def send(self,permid,msg,callback):
190 """ Called by OverlayThread """
193 print >>sys.stderr,"olbridge: send",show_permid_short(permid),len(msg)
195 def olbridge_send_callback(cexc,cpermid):
196 # Called by network thread
199 print >>sys.stderr,"olbridge: send_callback",cexc,show_permid_short(cpermid)
202 olbridge_send_callback_lambda = lambda:callback(cexc,cpermid)
203 self.add_task(olbridge_send_callback_lambda,0)
205 self.secover.send(permid,msg,olbridge_send_callback)
207 def close(self,permid):
208 """ Called by OverlayThread """
209 self.secover.close(permid)
211 def add_task(self,task,t=0,ident=None):
212 """ Called by OverlayThread """
213 self.tqueue.add_task(task,t,ident)
215 #===============================================================================
216 # # Jie: according to Arno's suggestion, commit on demand instead of periodically
217 # def periodic_commit(self):
218 # period = 5*60 # commit every 5 min
220 # db = SQLiteCacheDB.getInstance()
224 # self.add_task(self.periodic_commit, period)
226 #===============================================================================