instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / SocialNetwork / FriendshipMsgHandler.py
1 # Written by Ali Abbas, Arno Bakker
2 # see LICENSE.txt for license information
3
4 # TODO: either maintain connections to friends always or supplement the
5 # list of friends with a number of on-line taste buddies.
6 #
7 # TODO: at least add fifo order to msgs, otherwise clicking 
8 # "make friend", "delete friend", "make friend" could arive in wrong order
9 # due to forwarding.
10 #
11
12 import threading
13 import sys
14 import os
15 import random
16 import cPickle
17 from time import time
18 from types import DictType
19 from traceback import print_exc
20 from sets import Set
21
22 from BaseLib.Core.simpledefs import *
23 from BaseLib.Core.BitTornado.bencode import bencode, bdecode
24
25 from BaseLib.Core.BitTornado.BT1.MessageID import *
26 from BaseLib.Core.CacheDB.CacheDBHandler import PeerDBHandler, FriendDBHandler
27 from BaseLib.Core.CacheDB.SqliteFriendshipStatsCacheDB import FriendshipStatisticsDBHandler
28 from BaseLib.Core.CacheDB.sqlitecachedb import bin2str
29 from BaseLib.Core.Utilities.utilities import *
30
31 from BaseLib.Core.Overlay.SecureOverlay import OLPROTO_VER_SEVENTH
32
33 DEBUG = False
34
35 """
36 State diagram:
37
38 NOFRIEND -> I_INVITED or HE_INVITED
39 I_INVITED -> APPROVED or HE_DENIED
40 HE_INVITED -> APPROVED
41 HE_INVITED -> I_DENIED
42
43 In theory it could happen that he sends an response=1 RESP, in which case
44 he approved us. I consider that an HE_INIVITE
45 """
46
47 RESCHEDULE_INTERVAL = 60
48 RESEND_INTERVAL = 5*60
49
50
51 class FriendshipMsgHandler:
52     __singleton = None
53     __lock = threading.Lock()
54
55     @classmethod
56     def getInstance(cls, *args, **kargs):
57         if not cls.__singleton:
58             cls.__lock.acquire()
59             try:
60                 if not cls.__singleton:
61                     cls.__singleton = cls(*args, **kargs)
62             finally:
63                 cls.__lock.release()
64         return cls.__singleton
65     
66     def __init__(self):
67         if FriendshipMsgHandler.__singleton:
68             raise RuntimeError, "FriendshipMsgHandler is singleton"
69         self.overlay_bridge = None
70         self.currmsgs = {}
71         self.online_fsext_peers = Set() # online peers that speak FRIENDSHIP ext
72         self.peerdb = PeerDBHandler.getInstance()
73         self.frienddb = FriendDBHandler.getInstance()
74         self.friendshipStatistics_db = FriendshipStatisticsDBHandler.getInstance()
75         self.list_no_of_conn_attempts_per_target= {}
76         self.usercallback = None
77     
78     def register(self, overlay_bridge, session):
79         if DEBUG:
80             print >> sys.stderr, "friendship: register"
81         self.overlay_bridge = overlay_bridge
82         self.session = session
83         try:
84             self.load_checkpoint()
85         except:
86             print_exc()
87         self.overlay_bridge.add_task(self.reschedule_connects,RESCHEDULE_INTERVAL)
88      
89      
90     def shutdown(self):
91         """
92         Delegate all outstanding messages to others
93         """
94         # Called by OverlayThread
95         self.delegate_friendship_making()
96         self.checkpoint()
97         
98
99     def register_usercallback(self,usercallback):
100         self.usercallback = usercallback
101     
102     def anythread_send_friendship_msg(self,permid,type,params):
103         """ Called when user adds someone from the person found, or by 
104         explicity adding someone with her credentials
105         It establishes overlay connection with the target peer """
106         # Called by any thread
107         
108         olthread_func = lambda:self.send_friendship_msg(permid,type,params,submit=True)
109         self.overlay_bridge.add_task(olthread_func,0)
110         
111         
112     def send_friendship_msg(self,permid,type,params,submit=False):
113         # Called by overlay thread 
114         
115         if submit:
116             if DEBUG:
117                 print >>sys.stderr,"friendship: send_friendship_msg: Saving msg",show_permid_short(permid)
118             self.save_msg(permid,type,params)
119             
120             if type == F_REQUEST_MSG:
121                 # Make him my friend, pending his approval
122                 self.frienddb.setFriendState(permid, commit=True,state=FS_I_INVITED)
123             elif type == F_RESPONSE_MSG:
124                 # Mark response in DB
125                 if params['response']:
126                     state = FS_MUTUAL
127                 else:
128                     state = FS_I_DENIED
129                 self.frienddb.setFriendState(permid, commit=True,state=state)
130
131         func = lambda exc,dns,permid,selversion:self.fmsg_connect_callback(exc, dns, permid, selversion, type)
132         self.overlay_bridge.connect(permid,self.fmsg_connect_callback)
133         
134         
135     def fmsg_connect_callback(self,exc,dns,permid,selversion, type = None):
136         """ Callback function for the overlay connect function """
137         # Called by OverlayThread
138
139         if exc is None:
140             if selversion < OLPROTO_VER_SEVENTH:
141                 self.remove_msgs_for_ltv7_peer(permid)
142                 return
143             
144             # Reached him
145             sendlist = self.get_msgs_as_sendlist(targetpermid=permid)
146             if DEBUG:
147                 print >> sys.stderr, 'friendship: fmsg_connect_callback: sendlist len',len(sendlist)
148                 #print_stack()
149             
150             for i in range(0,len(sendlist)):
151                 tuple = sendlist[i]
152                 
153                 permid,msgid,msg = tuple
154                 send_callback = lambda exc,permid:self.fmsg_send_callback(exc,permid,msgid)
155                 
156                 if DEBUG:
157                     print >>sys.stderr,"friendship: fmsg_connect_callback: Sending",`msg`,msgid
158                 
159                 mypermid = self.session.get_permid()
160                 
161                 commit = (i == len(sendlist)-1)
162                 isForwarder = 0
163                 no_of_helpers = 0
164 #                if type == F_REQUEST_MSG:
165 #                    print
166 #                elif type == F_RESPONSE_MSG:
167 #                    print
168                 #Set forwarder to True and also no of helpers to 10
169                 if type == F_FORWARD_MSG:
170                     isForwarder = 1
171                     no_of_helpers = 10
172                     
173                   
174                 no_of_attempts = 0
175                 if permid in self.currmsgs:
176                     msgid2rec = self.currmsgs[permid]
177                     if msgid in msgid2rec:
178                         msgrec = msgid2rec[msgid]
179                         no_of_attempts = msgrec['attempt']
180                 
181 #                insertFriendshipStatistics(self, my_permid, target_permid, current_time, isForwarder = 0, no_of_attempts = 0, no_of_helpers = 0, commit = True):
182                 
183                 self.friendshipStatistics_db.insertOrUpdateFriendshipStatistics( bin2str(mypermid), 
184                                                                          bin2str(permid), 
185                                                                          int(time()),
186                                                                          isForwarder, 
187                                                                          no_of_attempts ,
188                                                                          no_of_helpers, 
189                                                                          commit=commit)
190                 
191                 self.overlay_bridge.send(permid, FRIENDSHIP + bencode(msg), send_callback)
192                 
193                 
194         else:
195             if DEBUG:
196                 peer = self.peerdb.getPeer(permid)
197                 if peer is None:
198                     print >>sys.stderr, 'friendship: Could not connect to peer', show_permid_short(permid),peer
199                 else:
200                     print >>sys.stderr, 'friendship: Could not connect to peer', show_permid_short(permid),peer['name']
201                 print >>sys.stderr,exc
202             
203             mypermid = self.session.get_permid()
204             
205             isForwarder = 0
206             no_of_helpers = 0
207             if type == F_FORWARD_MSG:
208                 isForwarder = 1
209                 no_of_helpers = 10
210                     
211                  
212             no_of_attempts = 0
213             if permid in self.currmsgs:
214                 msgid2rec = self.currmsgs[permid]
215                 for msgid in msgid2rec:
216                     msgrec = msgid2rec[msgid]
217                     no_of_attempts = msgrec['attempt']
218                 
219                 
220             self.friendshipStatistics_db.insertOrUpdateFriendshipStatistics( bin2str(mypermid), 
221                                                                          bin2str(permid), 
222                                                                          int(time()),
223                                                                          isForwarder, 
224                                                                          no_of_attempts ,
225                                                                          no_of_helpers)
226         
227         
228
229
230     def fmsg_send_callback(self,exc,permid,msgid):
231         
232         # If an exception arises
233         if exc is None:
234             self.delete_msg(permid,msgid)
235         else:
236             if DEBUG:
237                 print >> sys.stderr, 'friendship: Could not send to ',show_permid_short(permid)  
238                 print_exc()
239             
240         mypermid = self.session.get_permid()
241         
242         no_of_attempts = 0
243         no_of_helpers = 10
244         isForwarder = False
245         if permid in self.currmsgs:
246             msgid2rec = self.currmsgs[permid]
247             for msgid in msgid2rec:
248                 msgrec = msgid2rec[msgid]
249                 no_of_attempts = msgrec['attempt']
250                 if msgrec['forwarded'] == True:
251                     isForwarder = 1
252             
253             
254         self.friendshipStatistics_db.insertOrUpdateFriendshipStatistics( bin2str(mypermid), 
255                                                                          bin2str(permid), 
256                                                                          int(time()),
257                                                                          isForwarder, 
258                                                                          no_of_attempts ,
259                                                                          no_of_helpers)
260                     
261
262     def remove_msgs_for_ltv7_peer(self,permid):
263         """ Remove messages destined for a peer that does not speak >= v7 of
264         the overlay protocol
265         """
266         sendlist = self.get_msgs_as_sendlist(targetpermid=permid)
267         if DEBUG:
268             print >> sys.stderr, 'friendship: remove_msgs_for_ltv7_peer: sendlist len',len(sendlist)
269         
270         for i in range(0,len(sendlist)):
271             tuple = sendlist[i]
272             
273             permid,msgid,msg = tuple
274             self.delete_msg(permid,msgid)
275
276
277     #
278     # Incoming connections
279     #
280     def handleConnection(self, exc, permid, selversion, locally_initiated):
281
282         if selversion < OLPROTO_VER_SEVENTH:
283             return True
284
285         if exc is None:
286             self.online_fsext_peers.add(permid)
287
288             # if we meet peer otherwise, dequeue messages
289             if DEBUG:
290                 print >> sys.stderr,"friendship: Met peer, attempting to deliver msgs",show_permid_short(permid)
291             
292             # If we're initiating the connection from this handler, the 
293             # fmsg_connect_callback will get called twice:
294             # 1. here
295             # 2. just a bit later when the callback for a successful connect()
296             #    is called.
297             # Solution: we delay this call, which should give 2. the time to
298             # run and remove msgs from the queue.
299             #
300             # Better: remove msgs from queue when sent and reinsert if send fails
301             #
302             friendship_delay_func = lambda:self.fmsg_connect_callback(None,None,permid,selversion)
303             self.overlay_bridge.add_task(friendship_delay_func,4)
304         else:
305             try:
306                 self.online_fsext_peers.remove(permid)
307             except:
308                 pass
309             
310         return True        
311
312
313     #
314     # Incoming messages
315     # 
316     def handleMessage(self, permid, selversion, message):
317         """ Handle incoming Friend Request, and their response"""
318
319         if selversion < OLPROTO_VER_SEVENTH:
320             if DEBUG:
321                 print >> sys.stderr,"friendship: Got FRIENDSHIP msg from peer with old protocol",show_permid_short(permid)
322             return False
323         
324         try:
325             d = bdecode(message[1:])
326         except:
327             print_exc()
328             return False
329         
330         return self.process_message(permid,selversion,d)
331     
332     
333     def process_message(self,permid,selversion,d):
334         
335         if self.isValidFriendMsg(d):
336
337             if DEBUG:
338                 print >> sys.stderr,"friendship: Got FRIENDSHIP msg",d['msg type']
339         
340             # If the message is to become a friend, i.e., a friendship request
341             if d['msg type'] == F_REQUEST_MSG:
342                 self.process_request(permid,d)                  
343                         
344             # If the message is to have a response on friend request    
345             elif d['msg type'] == F_RESPONSE_MSG: 
346                 self.process_response(permid,d)
347                     
348             # If the receiving message is to delegate the Friendship request to the target peer
349             elif d['msg type'] == F_FORWARD_MSG:
350                 return self.process_forward(permid,selversion,d)
351             else:
352                 if DEBUG:
353                     print >>sys.stderr,"friendship: Got unknown msg type",d['msg type']
354                 return False
355             
356             return True
357         else:
358             if DEBUG:
359                 print >>sys.stderr,"friendship: Got bad FRIENDSHIP message"
360             return False
361                 
362     def process_request(self,permid,d):
363         # to see that the following peer is already a friend, or not
364         fs = self.frienddb.getFriendState(permid) 
365
366         if DEBUG:
367             print >>sys.stderr,"friendship: process_request: Got request, fs",show_permid_short(permid),fs
368
369         
370         if fs == FS_NOFRIEND or fs == FS_HE_DENIED:
371             # not on HE_INVITED, to filter out duplicates
372             
373             # And if that peer is not already added as a friend, either approved, or unapproved
374             # call friend dialog
375             self.frienddb.setFriendState(permid, commit=True, state = FS_HE_INVITED)
376             
377             # FUTURE: always do callback, such that we also know about failed
378             # attempts
379             if self.usercallback is not None:
380                 friendship_usercallback = lambda:self.usercallback(permid,[])
381                 self.session.uch.perform_usercallback(friendship_usercallback)
382         elif fs == FS_I_INVITED: 
383             # In case, requestee is already added as friend, just make this 
384             # requestee as an approved friend
385
386             if DEBUG:
387                 print >>sys.stderr,"friendship: process_request: Got request but I already invited him"
388             
389             self.frienddb.setFriendState(permid, commit=True, state = FS_MUTUAL)
390
391             if DEBUG:
392                 print >>sys.stderr,"friendship: process_request: Got request but I already invited him: sending reply"
393
394             self.send_friendship_msg(permid,F_RESPONSE_MSG,{'response':1},submit=True)
395         elif fs == FS_MUTUAL:
396             if DEBUG:
397                 print >>sys.stderr,"friendship: process_request: Got request but already approved"
398         elif fs == FS_I_DENIED:
399             if DEBUG:
400                 print >>sys.stderr,"friendship: process_request: Got request but I already denied"
401         elif DEBUG:
402             print >>sys.stderr,"friendship: process_request: Got request, but fs is",fs
403
404     def process_response(self,permid,d):
405
406         mypermid = self.session.get_permid()
407                      
408                 
409         self.friendshipStatistics_db.updateFriendshipResponseTime( bin2str(mypermid), 
410                                                                          bin2str(permid), 
411                                                                          int(time()))
412
413         
414         fs = self.frienddb.getFriendState(permid)
415          
416         # If the request to add has been approved
417         if d['response'] == 1:
418             if fs == FS_I_INVITED:
419                 self.frienddb.setFriendState(permid, commit=True, state = FS_MUTUAL)
420             elif fs != FS_MUTUAL:
421                 # Unsollicited response, consider this an invite, if not already friend
422                 self.frienddb.setFriendState(permid, commit=True, state = FS_HE_INVITED)
423         else:
424             # He denied our friendship
425             self.frienddb.setFriendState(permid, commit=True, state = FS_HE_DENIED)
426
427                 
428     def process_forward(self,permid,selversion,d):
429         
430         mypermid = self.session.get_permid()
431         if d['dest']['permid'] == mypermid:
432             # This is a forward containing a message meant for me
433             
434             # First add original sender to DB so we can connect back to it
435             self.addPeerToDB(d['source'])
436             
437             self.process_message(d['source']['permid'],selversion,d['msg'])
438             
439             return True
440         
441             
442         else:
443             # Queue and forward
444             if DEBUG:
445                 print >>sys.stderr,"friendship: process_fwd: Forwarding immediately to",show_permid_short(d['dest']['permid'])
446
447             if permid != d['source']['permid']:
448                 if DEBUG:
449                     print >>sys.stderr,"friendship: process_fwd: Forwarding: Illegal, source is not sender, and dest is not me"
450                 return False
451             # First add dest to DB so we can connect to it
452             
453             # FUTURE: don't let just any peer overwrite the IP+port of a peer
454             # if self.peer_db.hasPeer(d['dest']['permid']):
455             self.addPeerToDB(d['dest'])
456             
457             self.send_friendship_msg(d['dest']['permid'],d['msg type'],d,submit=True)
458             return True
459
460     def addPeerToDB(self,mpeer):
461         peer = {}
462         peer['permid'] = mpeer['permid']
463         peer['ip'] = mpeer['ip']
464         peer['port'] = mpeer['port']
465         peer['last_seen'] = 0
466         self.peerdb.addPeer(mpeer['permid'],peer,update_dns=True,commit=True)
467         
468
469     def create_friendship_msg(self,type,params):
470         
471         if DEBUG:
472             print >>sys.stderr,"friendship: create_fs_msg:",type,`params`
473         
474         mypermid = self.session.get_permid()
475         myip = self.session.get_external_ip()
476         myport = self.session.get_listen_port()
477         
478         d ={'msg type':type}     
479         if type == F_RESPONSE_MSG:
480             d['response'] = params['response']
481         elif type == F_FORWARD_MSG:
482             
483             if DEBUG:
484                 print >>sys.stderr,"friendship: create: fwd: params",`params`
485             peer = self.peerdb.getPeer(params['destpermid']) # ,keys=['ip', 'port']) 
486             if peer is None:
487                 if DEBUG:
488                     print >> sys.stderr, "friendship: create msg: Don't know IP + port of peer", show_permid_short(params['destpermid'])
489                 return
490             #if DEBUG:
491             #    print >> sys.stderr, "friendship: create msg: Peer at",peer
492             
493             # FUTURE: add signatures on ip+port
494             src = {'permid':mypermid,'ip':myip,'port':myport}
495             dst = {'permid':params['destpermid'],'ip':str(peer['ip']),'port':peer['port']}
496             d.update({'source':src,'dest':dst,'msg':params['msg']})
497         return d
498
499
500
501     def isValidFriendMsg(self,d):
502
503         if DEBUG:
504             print >>sys.stderr,"friendship: msg: payload is",`d`    
505
506         
507         if type(d) != DictType:
508             if DEBUG:
509                 print >>sys.stderr,"friendship: msg: payload is not bencoded dict"    
510             return False
511         if not 'msg type' in d:
512             if DEBUG:
513                 print >>sys.stderr,"friendship: msg: dict misses key",'msg type'    
514             return False
515         
516         if d['msg type'] == F_REQUEST_MSG:
517             keys = d.keys()[:]
518             if len(keys)-1 != 0:
519                 if DEBUG:
520                     print >>sys.stderr,"friendship: msg: REQ: contains superfluous keys",keys    
521                 return False
522             return True
523             
524         if d['msg type'] == F_RESPONSE_MSG:
525             if (d.has_key('response') and (d['response'] == 1 or d['response'] == 0)):
526                 return True
527             else:
528                 if DEBUG:
529                     print >>sys.stderr,"friendship: msg: RESP: something wrong",`d`    
530                 return False
531             
532         if d['msg type'] == F_FORWARD_MSG:
533             if not self.isValidPeer(d['source']):
534                 if DEBUG:
535                     print >>sys.stderr,"friendship: msg: FWD: source bad",`d`    
536                 return False
537             if not self.isValidPeer(d['dest']):
538                 if DEBUG:
539                     print >>sys.stderr,"friendship: msg: FWD: dest bad",`d`    
540                 return False
541             if not 'msg' in d:
542                 if DEBUG:
543                     print >>sys.stderr,"friendship: msg: FWD: no msg",`d`    
544                 return False
545             if not self.isValidFriendMsg(d['msg']):
546                 if DEBUG:
547                     print >>sys.stderr,"friendship: msg: FWD: bad msg",`d`    
548                 return False
549             if d['msg']['msg type'] == F_FORWARD_MSG:
550                 if DEBUG:
551                     print >>sys.stderr,"friendship: msg: FWD: cannot contain fwd",`d`    
552                 return False
553             return True
554         
555         return False
556
557
558     def isValidPeer(self,d):
559         if (d.has_key('ip') and d.has_key('port') and d.has_key('permid') 
560             and validPermid(d['permid'])
561             and validIP(d['ip'])and validPort(d['port'])):
562             return True
563         else:
564             return False                 
565
566
567     def save_msg(self,permid,type,params):
568         
569         if not permid in self.currmsgs:
570             self.currmsgs[permid] = {}
571         
572         mypermid = self.session.get_permid()    
573         now = time()
574         attempt = 1
575         
576         base = mypermid+permid+str(now)+str(random.random())
577         msgid = sha(base).hexdigest()
578         msgrec = {'permid':permid,'type':type,'params':params,'attempt':attempt,'t':now,'forwarded':False}
579         
580         msgid2rec = self.currmsgs[permid]
581         msgid2rec[msgid] = msgrec
582         
583     def delete_msg(self,permid,msgid):
584         try: 
585             if DEBUG:
586                 print >>sys.stderr,"friendship: Deleting msg",show_permid_short(permid),msgid
587             msgid2rec = self.currmsgs[permid]
588             del msgid2rec[msgid]
589         except:
590             #print_exc()
591             pass
592
593     def set_msg_forwarded(self,permid,msgid):
594         try: 
595             msgid2rec = self.currmsgs[permid]
596             msgid2rec[msgid]['forwarded'] = True
597         except:
598             print_exc()
599
600     def reschedule_connects(self):
601         """ This function is run periodically and reconnects to peers when
602         messages meant for it are due to be retried
603         """
604         now = time()
605         delmsgids = []
606         reconnectpermids = Set()
607         for permid in self.currmsgs:
608             msgid2rec = self.currmsgs[permid]
609             for msgid in msgid2rec:
610                 msgrec = msgid2rec[msgid]
611
612                 eta = self.calc_eta(msgrec)
613                 
614                 if DEBUG:
615                     diff = None
616                     if eta is not None:
617                         diff = eta - now
618                         
619                     if DEBUG:
620                         peer = self.peerdb.getPeer(permid)
621                         if peer is None:
622                             print >>sys.stderr,"friendship: reschedule: ETA: wtf, peer not in DB!",show_permid_short(permid)
623                         else:
624                             print >>sys.stderr,"friendship: reschedule: ETA",show_permid_short(permid),peer['name'],diff
625                 
626                 if eta is None: 
627                     delmsgids.append((permid,msgid))
628                 elif now > eta-1.0: # -1 for round off
629                     # reconnect
630                     reconnectpermids.add(permid)
631                     msgrec['attempt'] = msgrec['attempt'] + 1
632                     
633                     # Delegate 
634                     if msgrec['type'] == F_REQUEST_MSG and msgrec['attempt'] == 2:
635                         self.delegate_friendship_making(targetpermid=permid,targetmsgid=msgid)
636         
637         # Remove timed out messages
638         for permid,msgid in delmsgids:
639             if DEBUG:
640                 print >>sys.stderr,"friendship: reschedule: Deleting",show_permid_short(permid),msgid
641             self.delete_msg(permid,msgid)
642         
643         # Initiate connections to peers for which we have due messages    
644         for permid in reconnectpermids:
645             if DEBUG:
646                 print >>sys.stderr,"friendship: reschedule: Reconnect to",show_permid_short(permid)
647
648             self.overlay_bridge.connect(permid,self.fmsg_connect_callback)
649         
650         # Reschedule this periodic task
651         self.overlay_bridge.add_task(self.reschedule_connects,RESCHEDULE_INTERVAL)
652         
653         
654     def calc_eta(self,msgrec):
655         if msgrec['type'] == F_FORWARD_MSG:
656             if msgrec['attempt'] >= 10:
657                 # Stop trying to forward after a given period 
658                 return None
659             # exponential backoff, on 10th attempt we would wait 24hrs
660             eta = msgrec['t'] + pow(3.116,msgrec['attempt'])
661         else:
662             if msgrec['attempt'] >= int(7*24*3600/RESEND_INTERVAL):
663                 # Stop trying to forward after a given period = 1 week 
664                 return None
665
666             eta = msgrec['t'] + msgrec['attempt']*RESEND_INTERVAL 
667         return eta
668         
669
670     def get_msgs_as_sendlist(self,targetpermid=None):
671
672         sendlist = []
673         if targetpermid is None:
674             permids = self.currmsgs.keys()
675         else:
676             permids = [targetpermid] 
677         
678         for permid in permids:
679             msgid2rec = self.currmsgs.get(permid,{})
680             for msgid in msgid2rec:
681                 msgrec = msgid2rec[msgid]
682                 
683                 if DEBUG:
684                     print >>sys.stderr,"friendship: get_msgs: Creating",msgrec['type'],`msgrec['params']`,msgid
685                 if msgrec['type'] == F_FORWARD_MSG:
686                     msg = msgrec['params']
687                 else:
688                     msg = self.create_friendship_msg(msgrec['type'],msgrec['params'])
689                 tuple = (permid,msgid,msg)
690                 sendlist.append(tuple)
691         return sendlist
692
693
694     def get_msgs_as_fwd_sendlist(self,targetpermid=None,targetmsgid=None):
695
696         sendlist = []
697         if targetpermid is None:
698             permids = self.currmsgs.keys()
699         else:
700             permids = [targetpermid] 
701         
702         for permid in permids:
703             msgid2rec = self.currmsgs.get(permid,{})
704             for msgid in msgid2rec:
705                 if targetmsgid is None or msgid == targetmsgid:
706                     msgrec = msgid2rec[msgid]
707                     if msgrec['type'] != F_FORWARD_MSG and msgrec['forwarded'] == False:
708                         # Don't forward forwards, or messages already forwarded
709                     
710                         # Create forward message for original
711                         params = {}
712                         params['destpermid'] = permid
713                         params['msg'] = self.create_friendship_msg(msgrec['type'],msgrec['params'])
714                     
715                         msg = self.create_friendship_msg(F_FORWARD_MSG,params)
716                         tuple = (permid,msgid,msg)
717                         sendlist.append(tuple)
718         return sendlist
719         
720
721                 
722     def delegate_friendship_making(self,targetpermid=None,targetmsgid=None):
723         if DEBUG:
724             print >>sys.stderr,"friendship: delegate:",show_permid_short(targetpermid),targetmsgid
725
726         # 1. See if there are undelivered msgs
727         sendlist = self.get_msgs_as_fwd_sendlist(targetpermid=targetpermid,targetmsgid=targetmsgid)
728         if DEBUG:
729             print >>sys.stderr,"friendship: delegate: Number of messages queued",len(sendlist)
730         
731         if len(sendlist) == 0:
732             return
733   
734         # 2. Get friends, not necess. online
735         friend_permids = self.frienddb.getFriends()
736         
737         if DEBUG:
738             l = len(friend_permids)
739             print >>sys.stderr,"friendship: delegate: friend helpers",l
740             for permid in friend_permids:
741                 print >>sys.stderr,"friendship: delegate: friend helper",show_permid_short(permid)
742         
743         # 3. Sort online peers on similarity, highly similar should be tastebuddies
744         if DEBUG:
745             print >>sys.stderr,"friendship: delegate: Number of online v7 peers",len(self.online_fsext_peers)
746         tastebuddies = self.peerdb.getPeers(list(self.online_fsext_peers),['similarity','name']) 
747         tastebuddies.sort(sim_desc_cmp)
748
749         if DEBUG:
750             print >>sys.stderr,"friendship: delegate: Sorted tastebuddies",`tastebuddies`
751
752         tastebuddies_permids = []
753         size = min(10,len(tastebuddies))
754         for i in xrange(0,size):
755             peer = tastebuddies[i]
756             if DEBUG:
757                 print >>sys.stderr,"friendship: delegate: buddy helper",show_permid_short(peer['permid'])
758             tastebuddies_permids.append(peer['permid'])
759
760         # 4. Create list of helpers:
761         #
762         # Policy: Helpers are a mix of friends and online tastebuddies
763         # with 70% friends (if avail) and 30% tastebuddies
764         #
765         # I chose this policy because friends are not guaranteed to be online
766         # and waiting to see if we can connect to them before switching to
767         # the online taste buddies is complex code-wise and time-consuming.
768         # We don't have a lot of time when this thing is called by Session.shutdown()
769         #
770         nwant = 10
771         nfriends = int(nwant * .7)
772         nbuddies = int(nwant * .3)
773         
774         part1 = sampleorlist(friend_permids,nfriends)
775         fill = nfriends-len(part1) # if no friends, use tastebuddies
776         part2 = sampleorlist(tastebuddies_permids,nbuddies+fill)
777         helpers = part1 + part2
778
779         if DEBUG:
780             l = len(helpers)
781             print >>sys.stderr,"friendship: delegate: end helpers",l
782             for permid in helpers:
783                 print >>sys.stderr,"friendship: delegate: end helper",show_permid_short(permid),self.frienddb.getFriendState(permid),self.peerdb.getPeers([permid],['similarity','name'])
784
785
786         for tuple in sendlist:
787             destpermid,msgid,msg = tuple
788             for helperpermid in helpers:
789                 if destpermid != helperpermid:
790                     connect_callback = lambda exc,dns,permid,selversion:self.forward_connect_callback(exc,dns,permid,selversion,destpermid,msgid,msg)
791                     
792                     if DEBUG:
793                         print >>sys.stderr,"friendship: delegate: Connecting to",show_permid_short(helperpermid)
794                      
795                     self.overlay_bridge.connect(helperpermid, connect_callback)
796
797
798     def forward_connect_callback(self,exc,dns,permid,selversion,destpermid,msgid,msg):
799         if exc is None:
800             
801             if selversion < OLPROTO_VER_SEVENTH:
802                 return
803             
804             send_callback = lambda exc,permid:self.forward_send_callback(exc,permid,destpermid,msgid)
805             if DEBUG:
806                 print >>sys.stderr,"friendship: forward_connect_callback: Sending",`msg`
807             self.overlay_bridge.send(permid, FRIENDSHIP + bencode(msg), send_callback)
808         elif DEBUG:
809             print >>sys.stderr,"friendship: forward: Could not connect to helper",show_permid_short(permid)
810
811
812     def forward_send_callback(self,exc,permid,destpermid,msgid):
813         if DEBUG:
814             if exc is None:
815                 if DEBUG:
816                     print >>sys.stderr,"friendship: forward: Success forwarding to helper",show_permid_short(permid)
817                 self.set_msg_forwarded(destpermid,msgid)
818             else:
819                 if DEBUG:
820                     print >>sys.stderr,"friendship: forward: Failed to forward to helper",show_permid_short(permid)
821         
822     def checkpoint(self):
823         statedir = self.session.get_state_dir()
824         newfilename = os.path.join(statedir,'new-friendship-msgs.pickle')
825         finalfilename = os.path.join(statedir,'friendship-msgs.pickle')
826         try:
827             f = open(newfilename,"wb")
828             cPickle.dump(self.currmsgs,f)
829             f.close()
830             try:
831                 os.remove(finalfilename)
832             except:
833                 # If first time, it doesn't exist
834                 print_exc()
835             os.rename(newfilename,finalfilename)
836         except:
837             print_exc()
838         
839     def load_checkpoint(self):
840         statedir = self.session.get_state_dir()
841         finalfilename = os.path.join(statedir,'friendship-msgs.pickle')
842         try:
843             f = open(finalfilename,"rb")
844             self.currmsgs = cPickle.load(f)
845         except:
846             print >>sys.stderr, "friendship: could not read previous messages from", finalfilename
847
848         # Increase # attempts till current time
849         now = time()
850         for permid in self.currmsgs:
851             msgid2rec = self.currmsgs[permid]
852             for msgid in msgid2rec:
853                 msgrec = msgid2rec[msgid]
854                 diff = now - msgrec['t']
855                 a = int(diff/RESEND_INTERVAL)
856                 a += 1
857                 if DEBUG:
858                     print >>sys.stderr,"friendship: load_checkp: Changing #attempts from",msgrec['attempt'],a
859                 msgrec['attempt'] = a
860
861         
862 def sim_desc_cmp(peera,peerb):
863     if peera['similarity'] < peerb['similarity']:
864         return 1
865     elif peera['similarity'] > peerb['similarity']:
866         return -1
867     else:
868         return 0
869     
870 def sampleorlist(z,k):
871     if len(z) < k:
872         return z
873     else:
874         return random.sample(k)