instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / BitTornado / BT1 / Connecter.py
1 # Written by Bram Cohen, Pawel Garbacki, Arno Bakker and Njaal Borch, George Milescu
2 # see LICENSE.txt for license information
3
4 import time
5 import sys
6 from types import DictType,IntType,LongType,ListType,StringType
7 from random import shuffle
8 from traceback import print_exc,print_stack
9 from math import ceil
10 import socket
11 import urlparse
12
13 from threading import Event # Wait for CS to complete
14
15 from BaseLib.Core.BitTornado.bitfield import Bitfield
16 from BaseLib.Core.BitTornado.clock import clock
17 from BaseLib.Core.BitTornado.bencode import bencode,bdecode
18 from BaseLib.Core.BitTornado.__init__ import version_short,decodePeerID,TRIBLER_PEERID_LETTER
19 from BaseLib.Core.BitTornado.BT1.convert import tobinary,toint
20
21 from BaseLib.Core.BitTornado.BT1.MessageID import *
22 from BaseLib.Core.DecentralizedTracking.MagnetLink.__init__ import *
23
24 from BaseLib.Core.DecentralizedTracking.ut_pex import *
25 from BaseLib.Core.BitTornado.BT1.track import compact_ip,decompact_ip
26
27 from BaseLib.Core.BitTornado.CurrentRateMeasure import Measure
28 from BaseLib.Core.ClosedSwarm import ClosedSwarm
29 from BaseLib.Core.Statistics.Status import Status
30
31 KICK_OLD_CLIENTS=False
32
33 DEBUG = False
34 DEBUG_NORMAL_MSGS = False
35 DEBUG_UT_PEX = False
36 DEBUG_MESSAGE_HANDLING = False
37 DEBUG_CS = False # Debug closed swarms
38
39 UNAUTH_PERMID_PERIOD = 3600
40
41 # allow FACTOR times the metadata to be uploaded each PERIOD.
42 # Example:
43 # FACTOR = 2 and PERIOD = 60 will allow all the metadata to be
44 # uploaded 2 times every 60 seconds.
45 UT_METADATA_FLOOD_FACTOR = 1            
46 UT_METADATA_FLOOD_PERIOD = 5 * 60 * 60  
47
48 """
49 Arno: 2007-02-16:
50 uTorrent and Bram's BitTorrent now support an extension to the protocol,
51 documented on http://www.bittorrent.org/beps/bep_0010.html (previously
52 http://www.rasterbar.com/products/libtorrent/extension_protocol.html)
53
54 The problem is that the bit they use in the options field of the BT handshake
55 is the same as we use to indicate a peer supports the overlay-swarm connection.
56 The new clients will send an EXTEND message with ID 20 after the handshake to
57 inform the otherside what new messages it supports.
58
59 As a result, Tribler <= 3.5.0 clients won't be confused, but can't talk to these 
60 new clients either or vice versa. The new client will think we understand the 
61 message, send it. But because we don't know that message ID, we will close 
62 the connection. Our attempts to establish a new overlay connection with the new
63 client will gracefully fail, as the new client will not know of infohash=00000...
64 and close the connection.
65
66 We solve this conflict by adding support for the EXTEND message. We are now be 
67 able to receive it, and send our own. Our message will contain one method name, 
68 i.e. Tr_OVERLAYSWARM=253. Processing is now as follows:
69
70 * If bit 43 is set and the peerID is from an old Tribler (<=3.5.0)
71   peer, we initiate an overlay-swarm connection.
72 * If bit 43 is set and the peer's EXTEND hs message contains method Tr_OVERLAYSWARM,
73   it's a new Tribler peer, and we initiate an overlay-swarm connection.
74 * If bit 43 is set, and the EXTEND hs message does not contain Tr_OVERLAYSWARM
75   it's not a Tribler client and we do not initiate an overlay-swarm
76   connection.
77
78 N.B. The EXTEND message is poorly designed, it lacks protocol versioning
79 support which is present in the Azureus Extended Messaging Protocol
80 and our overlay-swarm protocol.
81
82 """
83 EXTEND_MSG_HANDSHAKE_ID = chr(0)
84 EXTEND_MSG_OVERLAYSWARM = 'Tr_OVERLAYSWARM'
85 EXTEND_MSG_G2G_V1       = 'Tr_G2G'
86 EXTEND_MSG_G2G_V2       = 'Tr_G2G_v2'
87 EXTEND_MSG_HASHPIECE    = 'Tr_hashpiece'
88 EXTEND_MSG_CS           = 'NS_CS'
89
90 CURRENT_LIVE_VERSION=1
91 EXTEND_MSG_LIVE_PREFIX  = 'Tr_LIVE_v'
92 LIVE_FAKE_MESSAGE_ID    = chr(254)
93
94
95
96 G2G_CALLBACK_INTERVAL = 4
97
98 def show(s):
99     text = []
100     for i in xrange(len(s)): 
101         text.append(ord(s[i]))
102     return text
103
104     
105 class Connection:
106     def __init__(self, connection, connecter):
107         self.connection = connection    
108         self.connecter = connecter
109         self.got_anything = False
110         self.next_upload = None
111         self.outqueue = []
112         self.partial_message = None
113         self.download = None
114         self.upload = None
115         self.send_choke_queued = False
116         self.just_unchoked = None
117         self.unauth_permid = None
118         self.looked_for_permid = UNAUTH_PERMID_PERIOD-3
119         self.closed = False
120         self.extend_hs_dict = {}        # what extended messages does this peer support
121         self.initiated_overlay = False
122
123         # G2G
124         self.use_g2g = False # set to true if both sides use G2G, indicated by self.connector.use_g2g
125         self.g2g_version = None 
126         self.perc_sent = {}
127         # batch G2G_XFER information and periodically send it out.
128         self.last_perc_sent = {}
129
130         config = self.connecter.config
131         self.forward_speeds = [Measure(config['max_rate_period'], config['upload_rate_fudge']),
132                                Measure(config['max_rate_period'], config['upload_rate_fudge'])]
133         
134         # BarterCast counters
135         self.total_downloaded = 0
136         self.total_uploaded = 0
137         
138         self.ut_pex_first_flag = True # first time we sent a ut_pex to this peer?
139         self.na_candidate_ext_ip = None
140         
141         self.na_candidate_ext_ip = None
142         
143         # RePEX counters and repexer instance field
144         self.pex_received = 0 # number of PEX messages received
145
146         # Closed swarm stuff
147         # Closed swarms
148         self.is_closed_swarm = False
149         self.cs_complete = False # Arno, 2010-08-24; no need for thread safety
150         self.remote_is_authenticated = False
151         self.remote_supports_cs = False
152         status = Status.get_status_holder("LivingLab")
153         self.cs_status = status.create_event("CS_protocol")
154         # This is a total for all
155         self.cs_status_unauth_requests = status.get_or_create_status_element("unauthorized_requests", 0)
156         self.cs_status_supported = status.get_or_create_status_element("nodes_supporting_cs", 0)
157         self.cs_status_not_supported = status.get_or_create_status_element("nodes_not_supporting_cs", 0)
158
159         if not self.connecter.is_closed_swarm:
160             self.cs_complete = True # Don't block anything if we're not a CS
161
162         if self.connecter.is_closed_swarm:
163             if DEBUG_CS:
164                 print >>sys.stderr,"connecter: conn: CS: This is a closed swarm"
165             self.is_closed_swarm = True
166             if 'poa' in self.connecter.config:
167                 try:
168                     from base64 import decodestring
169                     #poa = self.connecter.config.get_poa()
170                     poa = ClosedSwarm.POA.deserialize(decodestring(self.connecter.config['poa']))
171                     #poa = self.connecter.config['poa']
172                 except Exception,e:
173                     print_exc()
174                     poa = None
175             else:
176                 print >>sys.stderr,"connecter: conn: CS: Missing POA"
177                 poa = None
178         
179             # Need to also get the rest of the info, like my keys
180             # and my POA
181             my_keypair = ClosedSwarm.read_cs_keypair(self.connecter.config['eckeypairfilename'])
182             self.closed_swarm_protocol = ClosedSwarm.ClosedSwarm(my_keypair,
183                                                                  self.connecter.infohash,
184                                                                  self.connecter.config['cs_keys'],
185                                                                  poa)
186             if DEBUG_CS:                                                                 
187                 print >>sys.stderr,"connecter: conn: CS: Closed swarm ready to start handshake"
188
189
190     def get_myip(self, real=False):
191         return self.connection.get_myip(real)
192     
193     def get_myport(self, real=False):
194         return self.connection.get_myport(real)
195         
196     def get_ip(self, real=False):
197         return self.connection.get_ip(real)
198
199     def get_port(self, real=False):
200         return self.connection.get_port(real)
201
202     def get_id(self):
203         return self.connection.get_id()
204
205     def get_readable_id(self):
206         return self.connection.get_readable_id()
207
208     def can_send_to(self):
209         if self.is_closed_swarm and not self.remote_is_authenticated:
210             return False
211         return True
212
213     def close(self):
214         if DEBUG:
215             if self.get_ip() == self.connecter.tracker_ip:
216                 print >>sys.stderr,"connecter: close: live: WAAH closing SOURCE"
217
218         self.connection.close()
219         self.closed = True
220
221         
222     def is_closed(self):
223         return self.closed
224
225     def is_locally_initiated(self):
226         return self.connection.is_locally_initiated()
227
228     def send_interested(self):
229         self._send_message(INTERESTED)
230
231     def send_not_interested(self):
232         self._send_message(NOT_INTERESTED)
233
234     def send_choke(self):
235         if self.partial_message:
236             self.send_choke_queued = True
237         else:
238             self._send_message(CHOKE)
239             self.upload.choke_sent()
240             self.just_unchoked = 0
241
242     def send_unchoke(self):
243         if not self.cs_complete:
244             if DEBUG_CS:
245                 print >> sys.stderr, 'Connection: send_unchoke: Not sending UNCHOKE, closed swarm handshanke not done'
246             return False
247
248         if self.send_choke_queued:
249             self.send_choke_queued = False
250             if DEBUG_NORMAL_MSGS:
251                 print >>sys.stderr,'Connection: send_unchoke: CHOKE SUPPRESSED'
252         else:
253             self._send_message(UNCHOKE)
254             if (self.partial_message or self.just_unchoked is None
255                 or not self.upload.interested or self.download.active_requests):
256                 self.just_unchoked = 0
257             else:
258                 self.just_unchoked = clock()
259         return True
260
261     def send_request(self, index, begin, length):
262         self._send_message(REQUEST + tobinary(index) + 
263             tobinary(begin) + tobinary(length))
264         if DEBUG_NORMAL_MSGS:
265             print >>sys.stderr,"sending REQUEST to",self.get_ip()
266             print >>sys.stderr,'sent request: '+str(index)+': '+str(begin)+'-'+str(begin+length)
267
268     def send_cancel(self, index, begin, length):
269         self._send_message(CANCEL + tobinary(index) + 
270             tobinary(begin) + tobinary(length))
271         if DEBUG_NORMAL_MSGS:
272             print >>sys.stderr,'sent cancel: '+str(index)+': '+str(begin)+'-'+str(begin+length)
273
274     def send_bitfield(self, bitfield):
275         if not self.cs_complete:
276             print >> sys.stderr, "Connection: send_bitfield: Not sending bitfield - CS handshake not done"
277             return 
278
279         if self.can_send_to():
280             self._send_message(BITFIELD + bitfield)
281         else:
282             self.cs_status_unauth_requests.inc()
283             print >>sys.stderr,"Connection: send_bitfield: Sending empty bitfield to unauth node"
284             self._send_message(BITFIELD + Bitfield(self.connecter.numpieces).tostring())
285
286
287     def send_have(self, index):
288         if self.can_send_to():
289             self._send_message(HAVE + tobinary(index))
290         #elif DEBUG_CS:
291         #    print >>sys.stderr,"Supressing HAVE messages"
292
293     def send_keepalive(self):
294         self._send_message('')
295
296     def _send_message(self, s):
297         s = tobinary(len(s))+s
298         if self.partial_message:
299             self.outqueue.append(s)
300         else:
301             self.connection.send_message_raw(s)
302
303     def send_partial(self, bytes):
304         if self.connection.closed:
305             return 0
306         if not self.can_send_to():
307             return 0
308         if self.partial_message is None:
309             s = self.upload.get_upload_chunk()
310             if s is None:
311                 return 0
312             # Merkle: send hashlist along with piece in HASHPIECE message
313             index, begin, hashlist, piece = s
314
315             if self.use_g2g:
316                 # ----- G2G: record who we send this to
317                 self.g2g_sent_piece_part( self, index, begin, hashlist, piece )
318
319                 # ---- G2G: we are uploading len(piece) data of piece #index
320                 for c in self.connecter.connections.itervalues():
321                     if not c.use_g2g:
322                         continue
323
324                     # include sending to self, because it should not be excluded from the statistics
325
326                     c.queue_g2g_piece_xfer( index, begin, piece )
327
328             if self.connecter.merkle_torrent:
329                 hashpiece_msg_id = self.his_extend_msg_name_to_id(EXTEND_MSG_HASHPIECE)
330                 bhashlist = bencode(hashlist)
331                 if hashpiece_msg_id is None:
332                     # old Tribler <= 4.5.2 style
333                     self.partial_message = ''.join((
334                                     tobinary(1+4+4+4+len(bhashlist)+len(piece)), HASHPIECE,
335                                     tobinary(index), tobinary(begin), tobinary(len(bhashlist)), bhashlist, piece.tostring() ))
336                 else:
337                     # Merkle BEP
338                     self.partial_message = ''.join((
339                                     tobinary(2+4+4+4+len(bhashlist)+len(piece)), EXTEND, hashpiece_msg_id,
340                                     tobinary(index), tobinary(begin), tobinary(len(bhashlist)), bhashlist, piece.tostring() ))
341                     
342             else:
343                 self.partial_message = ''.join((
344                             tobinary(len(piece) + 9), PIECE, 
345                             tobinary(index), tobinary(begin), piece.tostring()))
346             if DEBUG_NORMAL_MSGS:
347                 print >>sys.stderr,'sending chunk: '+str(index)+': '+str(begin)+'-'+str(begin+len(piece))
348
349         if bytes < len(self.partial_message):
350             self.connection.send_message_raw(self.partial_message[:bytes])
351             self.partial_message = self.partial_message[bytes:]
352             return bytes
353
354         q = [self.partial_message]
355         self.partial_message = None
356         if self.send_choke_queued:
357             self.send_choke_queued = False
358             self.outqueue.append(tobinary(1)+CHOKE)
359             self.upload.choke_sent()
360             self.just_unchoked = 0
361         q.extend(self.outqueue)
362         self.outqueue = []
363         q = ''.join(q)
364         self.connection.send_message_raw(q)
365         return len(q)
366
367     def get_upload(self):
368         return self.upload
369
370     def get_download(self):
371         return self.download
372
373     def set_download(self, download):
374         self.download = download
375
376     def backlogged(self):
377         return not self.connection.is_flushed()
378
379     def got_request(self, i, p, l):
380         self.upload.got_request(i, p, l)
381         if self.just_unchoked:
382             self.connecter.ratelimiter.ping(clock() - self.just_unchoked)
383             self.just_unchoked = 0
384
385     #
386     # Extension protocol support
387     #
388     def supports_extend_msg(self,msg_name):
389         if 'm' in self.extend_hs_dict:
390             return msg_name in self.extend_hs_dict['m']
391         else:
392             return False
393     
394     def got_extend_handshake(self,d):
395         if DEBUG:
396             print >>sys.stderr,"connecter: Got EXTEND handshake:",d
397         if 'm' in d:
398             if type(d['m']) != DictType:
399                 raise ValueError('Key m does not map to a dict')
400             m = d['m']
401             newm = {}
402             for key,val in m.iteritems():
403                 if type(val) != IntType:
404                     # Fix for BitTorrent 4.27.2e
405                     if type(val) == StringType:
406                         newm[key]= ord(val)
407                         continue
408                     else:
409                         raise ValueError('Message ID in m-dict not int')
410                 newm[key]= val
411
412             if not 'm' in self.extend_hs_dict:
413                 self.extend_hs_dict['m'] = {}
414             # Note: we store the dict without converting the msg IDs to bytes.
415             self.extend_hs_dict['m'].update(newm)
416             if self.connecter.overlay_enabled and EXTEND_MSG_OVERLAYSWARM in self.extend_hs_dict['m']:
417                 # This peer understands our overlay swarm extension
418                 if self.connection.locally_initiated:
419                     if DEBUG:
420                         print >>sys.stderr,"connecter: Peer supports Tr_OVERLAYSWARM, attempt connection"
421                     self.connect_overlay()
422                     
423             if EXTEND_MSG_CS in self.extend_hs_dict['m']:
424                 self.remote_supports_cs = True
425                 self.cs_status_supported.inc()
426                 if DEBUG_CS:
427                     print >>sys.stderr,"connecter: Peer supports Closed swarms"
428
429                 if self.is_closed_swarm and self.connection.locally_initiated:
430                     if DEBUG_CS:
431                         print >>sys.stderr,"connecter: Initiating Closed swarm handshake"
432                     self.start_cs_handshake()
433             else:
434                 self.remote_supports_cs = False
435                 self.cs_status_not_supported.inc()
436                 if DEBUG_CS:
437                     print >>sys.stderr,"connecter: conn: Remote node does not support CS, flagging CS as done"
438                 self.connecter.cs_handshake_completed()
439                 status = Status.get_status_holder("LivingLab")
440                 status.add_event(self.cs_status)
441                 self.cs_status = status.create_event("CS_protocol")
442                 
443                 
444             if self.connecter.use_g2g and (EXTEND_MSG_G2G_V1 in self.extend_hs_dict['m'] or EXTEND_MSG_G2G_V2 in self.extend_hs_dict['m']):
445                 # Both us and the peer want to use G2G
446                 if self.connection.locally_initiated:
447                     if DEBUG:
448                         print >>sys.stderr,"connecter: Peer supports Tr_G2G"
449
450                 self.use_g2g = True
451                 if EXTEND_MSG_G2G_V2 in self.extend_hs_dict['m']:
452                     self.g2g_version = EXTEND_MSG_G2G_V2
453                 else:
454                     self.g2g_version = EXTEND_MSG_G2G_V1
455             
456             # LIVEHACK
457             if KICK_OLD_CLIENTS:
458                 peerhaslivekey = False
459                 for key in self.extend_hs_dict['m']:
460                     if key.startswith(EXTEND_MSG_LIVE_PREFIX):
461                         peerhaslivekey = True
462                         livever = int(key[len(EXTEND_MSG_LIVE_PREFIX):])
463                         if livever < CURRENT_LIVE_VERSION:
464                             raise ValueError("Too old LIVE VERSION "+livever)
465                         else:
466                             print >>sys.stderr,"Connecter: live: Keeping connection to up-to-date peer v",livever,self.get_ip()
467                         
468                 if not peerhaslivekey:
469                     if self.get_ip() == self.connecter.tracker_ip:
470                         # Keep connection to tracker / source
471                         print >>sys.stderr,"Connecter: live: Keeping connection to SOURCE",self.connecter.tracker_ip 
472                     else:
473                         raise ValueError("Kicking old LIVE peer "+self.get_ip())
474
475         # 'p' is peer's listen port, 'v' is peer's version, all optional
476         # 'e' is used by uTorrent to show it prefers encryption (whatever that means)
477         # See http://www.bittorrent.org/beps/bep_0010.html
478         for key in ['p','e', 'yourip','ipv4','ipv6','reqq']:
479             if key in d:
480                 self.extend_hs_dict[key] = d[key]
481         
482         #print >>sys.stderr,"connecter: got_extend_hs: keys",d.keys()
483
484         # If he tells us our IP, record this and see if we get a majority vote on it
485         if 'yourip' in d:
486             try:
487                 yourip = decompact_ip(d['yourip'])
488
489                 try:
490                     from BaseLib.Core.NATFirewall.DialbackMsgHandler import DialbackMsgHandler
491                     dmh = DialbackMsgHandler.getInstance()
492                     dmh.network_btengine_extend_yourip(yourip)
493                 except:
494                     if DEBUG:
495                         print_exc()
496                     pass
497                 
498                 if 'same_nat_try_internal' in self.connecter.config and self.connecter.config['same_nat_try_internal']:
499                     if 'ipv4' in d:
500                         self.na_check_for_same_nat(yourip)
501             except:
502                 print_exc()
503         
504         # RePEX: Tell repexer we have received an extended handshake
505         repexer = self.connecter.repexer
506         if repexer:
507             try:
508                 version = d.get('v',None)
509                 repexer.got_extend_handshake(self, version)
510             except:
511                 print_exc()
512
513     def his_extend_msg_name_to_id(self,ext_name):
514         """ returns the message id (byte) for the given message name or None """
515         val = self.extend_hs_dict['m'].get(ext_name)
516         if val is None:
517             return val
518         else:
519             return chr(val)
520
521     def get_extend_encryption(self):
522         return self.extend_hs_dict.get('e',0)
523     
524     def get_extend_listenport(self):
525         return self.extend_hs_dict.get('p')
526
527     def is_tribler_peer(self):
528         client, version = decodePeerID(self.connection.id)
529         return client == TRIBLER_PEERID_LETTER
530
531     def send_extend_handshake(self):
532
533         # NETWORK AWARE
534         hisip = self.connection.get_ip(real=True)
535         ipv4 = None
536         if self.connecter.config.get('same_nat_try_internal',0):
537             is_tribler_peer = self.is_tribler_peer()
538             print >>sys.stderr,"connecter: send_extend_hs: Peer is Tribler client",is_tribler_peer
539             if is_tribler_peer:
540                 # If we're connecting to a Tribler peer, show our internal IP address
541                 # as 'ipv4'.
542                 ipv4 = self.get_ip(real=True)
543         
544         # See: http://www.bittorrent.org/beps/bep_0010.html
545         d = {}
546         d['m'] = self.connecter.EXTEND_HANDSHAKE_M_DICT
547         d['p'] = self.connecter.mylistenport
548         ver = version_short.replace('-',' ',1)
549         d['v'] = ver
550         d['e'] = 0  # Apparently this means we don't like uTorrent encryption
551         d['yourip'] = compact_ip(hisip)
552         if ipv4 is not None:
553             # Only send IPv4 when necessary, we prefer this peer to use this addr.
554             d['ipv4'] = compact_ip(ipv4) 
555         if self.connecter.ut_metadata_enabled:
556             # todo: set correct size if known
557             d['metadata_size'] = self.connecter.ut_metadata_size
558             
559         self._send_message(EXTEND + EXTEND_MSG_HANDSHAKE_ID + bencode(d))
560         if DEBUG:
561             print >>sys.stderr,'connecter: sent extend: id=0+',d,"yourip",hisip,"ipv4",ipv4
562
563     #
564     # ut_pex support
565     #
566     def got_ut_pex(self,d):
567         if DEBUG_UT_PEX:
568             print >>sys.stderr,"connecter: Got uTorrent PEX:",d
569         (same_added_peers,added_peers,dropped_peers) = check_ut_pex(d)
570         
571         # RePEX: increase counter
572         self.pex_received += 1
573         
574         # RePEX: for now, we pass the whole PEX dict to the repexer and
575         # let it decode it. The reason is that check_ut_pex's interface
576         # has recently changed, currently returning a triple to prefer
577         # Tribler peers. The repexer, however, might have different 
578         # interests (e.g., storinng all flags). To cater to both interests,
579         # check_ut_pex needs to be rewritten. 
580         repexer = self.connecter.repexer
581         if repexer:
582             try:
583                 repexer.got_ut_pex(self, d)
584             except:
585                 print_exc()
586             return
587         
588         # DoS protection: we're accepting IP addresses from 
589         # an untrusted source, so be a bit careful
590         mx = self.connecter.ut_pex_max_addrs_from_peer
591         if DEBUG_UT_PEX:
592             print >>sys.stderr,"connecter: Got",len(added_peers),"peers via uTorrent PEX, using max",mx
593             
594         # for now we have a strong bias towards Tribler peers
595         if self.is_tribler_peer():
596             shuffle(same_added_peers)
597             shuffle(added_peers)
598             sample_peers = same_added_peers
599             sample_peers.extend(added_peers)
600         else:
601             sample_peers = same_added_peers
602             sample_peers.extend(added_peers)
603             shuffle(sample_peers)
604         
605         # Take random sample of mx peers
606         sample_added_peers_with_id = []
607
608         # Put the sample in the format desired by Encoder.start_connections()
609         for dns in sample_peers[:mx]:
610             peer_with_id = (dns, 0)
611             sample_added_peers_with_id.append(peer_with_id)
612         if len(sample_added_peers_with_id) > 0:
613             if DEBUG_UT_PEX:
614                 print >>sys.stderr,"connecter: Starting ut_pex conns to",len(sample_added_peers_with_id)
615             self.connection.Encoder.start_connections(sample_added_peers_with_id)
616
617     def send_extend_ut_pex(self,payload):
618         msg = EXTEND+self.his_extend_msg_name_to_id(EXTEND_MSG_UTORRENT_PEX)+payload
619         self._send_message(msg)
620             
621     def first_ut_pex(self):
622         if self.ut_pex_first_flag:
623             self.ut_pex_first_flag = False
624             return True
625         else:
626             return False
627
628     def _send_cs_message(self, cs_list):
629         blist = bencode(cs_list)
630         self._send_message(EXTEND + self.his_extend_msg_name_to_id(EXTEND_MSG_CS) + blist)
631         
632     def got_cs_message(self, cs_list):
633         if not self.is_closed_swarm:
634             raise Exception("Got ClosedSwarm message, but this swarm is not closed")
635
636         # Process incoming closed swarm messages
637         t = cs_list[0]
638         if t == CS_CHALLENGE_A:
639             if DEBUG_CS:
640                 print >>sys.stderr,"connecter: conn: CS: Got initial challenge"
641             # Got a challenge to authenticate to participate in a closed swarm
642             try:
643                 response = self.closed_swarm_protocol.b_create_challenge(cs_list)
644                 self._send_cs_message(response)
645             except Exception,e:
646                 self.cs_status.add_value("CS_bad_initial_challenge")
647                 if DEBUG_CS:
648                     print >>sys.stderr,"connecter: conn: CS: Bad initial challenge:",e
649         elif t == CS_CHALLENGE_B:
650             if DEBUG_CS:
651                 print >>sys.stderr,"connecter: conn: CS: Got return challenge"
652             try:
653                 response = self.closed_swarm_protocol.a_provide_poa_message(cs_list)
654                 if DEBUG_CS and not response:
655                     print >> sys.stderr, "connecter: I'm not intererested in data"
656                 self._send_cs_message(response)
657             except Exception,e:
658                 self.cs_status.add_value("CS_bad_return_challenge")
659                 if DEBUG_CS:
660                     print >>sys.stderr,"connecter: conn: CS: Bad return challenge",e
661                 print_exc()
662                 
663         elif t == CS_POA_EXCHANGE_A:
664             if DEBUG_CS:
665                print >>sys.stderr,"connecter: conn: CS:Got POA from A"
666             try:
667                 response = self.closed_swarm_protocol.b_provide_poa_message(cs_list)
668                 self.remote_is_authenticated = self.closed_swarm_protocol.is_remote_node_authorized()
669                 if DEBUG_CS:
670                     print >>sys.stderr,"connecter: conn: CS: Remote node authorized:",self.remote_is_authenticated
671                 if response:
672                     self._send_cs_message(response)
673             except Exception,e:
674                 self.cs_status.add_value("CS_bad_POA_EXCHANGE_A")
675                 if DEBUG_CS:
676                    print >>sys.stderr,"connecter: conn: CS: Bad POA from A:",e
677                 
678         elif t == CS_POA_EXCHANGE_B:
679             try:
680                 self.closed_swarm_protocol.a_check_poa_message(cs_list)
681                 self.remote_is_authenticated = self.closed_swarm_protocol.is_remote_node_authorized()
682                 if DEBUG_CS:
683                    print >>sys.stderr,"connecter: conn: CS: Remote node authorized:",self.remote_is_authenticated
684             except Exception,e:
685                 self.cs_status.add_value("CS_bad_POA_EXCHANGE_B")
686                 if DEBUG_CS:
687                    print >>sys.stderr,"connecter: conn: CS: Bad POA from B:",e
688
689         if not self.closed_swarm_protocol.is_incomplete():
690             self.connecter.cs_handshake_completed()
691             status = Status.get_status_holder("LivingLab")
692             self.cs_complete = True  # Flag CS as completed
693             # Don't need to add successful CS event
694
695     #
696     # Give-2-Get
697     #
698     def g2g_sent_piece_part( self, c, index, begin, hashlist, piece ):
699         """ Keeps a record of the fact that we sent piece index[begin:begin+chunk]. """
700
701         wegaveperc = float(len(piece))/float(self.connecter.piece_size)
702         if index in self.perc_sent:
703             self.perc_sent[index] = self.perc_sent[index] + wegaveperc 
704         else:
705             self.perc_sent[index] = wegaveperc
706     
707     
708     def queue_g2g_piece_xfer(self,index,begin,piece):
709         """ Queue the fact that we sent piece index[begin:begin+chunk] for
710         tranmission to peers 
711         """
712         if self.g2g_version == EXTEND_MSG_G2G_V1:
713             self.send_g2g_piece_xfer_v1(index,begin,piece)
714             return
715         
716         perc = float(len(piece))/float(self.connecter.piece_size)
717         if index in self.last_perc_sent:
718             self.last_perc_sent[index] = self.last_perc_sent[index] + perc 
719         else:
720             self.last_perc_sent[index] = perc
721
722     def dequeue_g2g_piece_xfer(self):
723         """ Send queued information about pieces we sent to peers. Called
724         periodically.
725         """ 
726         psf = float(self.connecter.piece_size)
727         ppdict = {}
728         
729         #print >>sys.stderr,"connecter: g2g dq: orig",self.last_perc_sent
730         
731         for index,perc in self.last_perc_sent.iteritems():
732             # due to rerequests due to slow pieces the sum can be above 1.0
733             capperc = min(1.0,perc) 
734             percb = chr(int((100.0 * capperc)))
735             # bencode can't deal with int keys
736             ppdict[str(index)] = percb
737         self.last_perc_sent = {}
738         
739         #print >>sys.stderr,"connecter: g2g dq: dest",ppdict
740         
741         if len(ppdict) > 0:
742             self.send_g2g_piece_xfer_v2(ppdict)
743
744     def send_g2g_piece_xfer_v1(self,index,begin,piece):
745         """ Send fact that we sent piece index[begin:begin+chunk] to a peer
746         to all peers (G2G V1).
747         """
748         self._send_message(self.his_extend_msg_name_to_id(EXTEND_MSG_G2G_V1) + tobinary(index) + tobinary(begin) + tobinary(len(piece)))
749
750     def send_g2g_piece_xfer_v2(self,ppdict):
751         """ Send list of facts that we sent pieces to all peers (G2G V2). """
752         blist = bencode(ppdict)
753         self._send_message(EXTEND + self.his_extend_msg_name_to_id(EXTEND_MSG_G2G_V2) + blist)
754
755     def got_g2g_piece_xfer_v1(self,index,begin,length):
756         """ Got a G2G_PIECE_XFER message in V1 format. """
757         hegaveperc = float(length)/float(self.connecter.piece_size)
758         self.g2g_peer_forwarded_piece_part(index,hegaveperc)
759
760     def got_g2g_piece_xfer_v2(self,ppdict):
761         """ Got a G2G_PIECE_XFER message in V2 format. """
762         for indexstr,hegavepercb in ppdict.iteritems():
763             index = int(indexstr)
764             hegaveperc = float(ord(hegavepercb))/100.0
765             self.g2g_peer_forwarded_piece_part(index,hegaveperc)
766
767     def g2g_peer_forwarded_piece_part(self,index,hegaveperc):
768         """ Processes this peer forwarding piece i[begin:end] to a grandchild. """
769         # Reward for forwarding data in general
770         length = ceil(hegaveperc * float(self.connecter.piece_size))
771         self.forward_speeds[1].update_rate(length)
772
773         if index not in self.perc_sent:
774             # piece came from disk
775             return
776
777         # Extra reward if its data we sent
778         wegaveperc = self.perc_sent[index]
779         overlapperc = wegaveperc * hegaveperc
780         overlap = ceil(overlapperc * float(self.connecter.piece_size))
781         if overlap > 0:
782             self.forward_speeds[0].update_rate( overlap )
783
784     def g2g_score( self ):
785         return [x.get_rate() for x in self.forward_speeds]
786
787
788     #
789     # SecureOverlay support
790     #
791     def connect_overlay(self):
792         if DEBUG:
793             print >>sys.stderr,"connecter: Initiating overlay connection"
794         if not self.initiated_overlay:
795             from BaseLib.Core.Overlay.SecureOverlay import SecureOverlay
796             
797             self.initiated_overlay = True
798             so = SecureOverlay.getInstance()
799             so.connect_dns(self.connection.dns,self.network_connect_dns_callback)
800
801     def network_connect_dns_callback(self,exc,dns,permid,selversion):
802         # WARNING: WILL BE CALLED BY NetworkThread
803         if exc is not None:
804             print >>sys.stderr,"connecter: peer",dns,"said he supported overlay swarm, but we can't connect to him",exc
805
806     def start_cs_handshake(self):
807         try:
808             if DEBUG_CS:
809                 print >>sys.stderr,"connecter: conn: CS: Initiating Closed Swarm Handshake"
810             challenge = self.closed_swarm_protocol.a_create_challenge()
811             self._send_cs_message(challenge)
812         except Exception,e:
813             print >>sys.stderr,"connecter: conn: CS: Bad initial challenge:",e
814         
815
816     #
817     # NETWORK AWARE
818     #
819     def na_check_for_same_nat(self,yourip):
820         """ See if peer is local, e.g. behind same NAT, same AS or something.
821         If so, try to optimize:
822         - Same NAT -> reconnect to use internal network 
823         """
824         hisip = self.connection.get_ip(real=True)
825         if hisip == yourip:
826             # Do we share the same NAT?
827             myextip = self.connecter.get_extip_func(unknowniflocal=True)
828             myintip = self.get_ip(real=True)
829
830             if DEBUG:
831                 print >>sys.stderr,"connecter: na_check_for_same_nat: his",hisip,"myext",myextip,"myint",myintip
832             
833             if hisip != myintip or hisip == '127.0.0.1': # to allow testing
834                 # He can't fake his source addr, so we're not running on the
835                 # same machine,
836
837                 # He may be quicker to determine we should have a local
838                 # conn, so prepare for his connection in advance.
839                 #
840                 if myextip is None:
841                     # I don't known my external IP and he's not on the same
842                     # machine as me. yourip could be our real external IP, test.
843                     if DEBUG:
844                         print >>sys.stderr,"connecter: na_check_same_nat: Don't know my ext ip, try to loopback to",yourip,"to see if that's me"
845                     self.na_start_loopback_connection(yourip)
846                 elif hisip == myextip:
847                     # Same NAT. He can't fake his source addr.
848                     # Attempt local network connection
849                     if DEBUG:
850                         print >>sys.stderr,"connecter: na_check_same_nat: Yes, trying to connect via internal"
851                     self.na_start_internal_connection()
852                 else: 
853                     # hisip != myextip
854                     # He claims we share the same IP, but I think my ext IP
855                     # is something different. Either he is lying or I'm
856                     # mistaken, test
857                     if DEBUG:
858                         print >>sys.stderr,"connecter: na_check_same_nat: Maybe, me thinks not, try to loopback to",yourip
859                     self.na_start_loopback_connection(yourip)
860                 
861                 
862     def na_start_loopback_connection(self,yourip):
863         """ Peer claims my external IP is "yourip". Try to connect back to myself """
864         if DEBUG:
865             print >>sys.stderr,"connecter: na_start_loopback: Checking if my ext ip is",yourip
866         self.na_candidate_ext_ip = yourip
867         
868         dns = (yourip,self.connecter.mylistenport)
869         self.connection.Encoder.start_connection(dns,0,forcenew=True)
870
871     def na_got_loopback(self,econnection):
872         """ Got a connection with my peer ID. Check that this is indeed me looping
873         back to myself. No man-in-the-middle attacks protection. This is complex
874         if we're also connecting to ourselves because of a stale tracker 
875         registration. Window of opportunity is small. 
876         """
877         himismeip = econnection.get_ip(real=True)
878         if DEBUG:
879             print >>sys.stderr,"connecter: conn: na_got_loopback:",himismeip,self.na_candidate_ext_ip
880         if self.na_candidate_ext_ip == himismeip:
881             self.na_start_internal_connection()
882                     
883
884     def na_start_internal_connection(self):
885         """ Reconnect to peer using internal network """
886         if DEBUG:
887             print >>sys.stderr,"connecter: na_start_internal_connection"
888         
889         # Doesn't really matter who initiates. Letting other side do it makes
890         # testing easier.
891         if not self.is_locally_initiated():
892             
893             hisip = decompact_ip(self.extend_hs_dict['ipv4'])
894             hisport = self.extend_hs_dict['p']
895             
896             # For testing, see Tribler/Test/test_na_extend_hs.py
897             if hisip == '224.4.8.1' and hisport == 4810:
898                 hisip = '127.0.0.1'
899                 hisport = 4811
900                 
901             self.connection.na_want_internal_conn_from = hisip        
902             
903             hisdns = (hisip,hisport)
904             if DEBUG:
905                 print >>sys.stderr,"connecter: na_start_internal_connection to",hisdns
906             self.connection.Encoder.start_connection(hisdns,0)
907
908     def na_get_address_distance(self):
909         return self.connection.na_get_address_distance()
910
911     def is_live_source(self):
912         if self.connecter.live_streaming:
913             if self.get_ip() == self.connecter.tracker_ip:
914                 return True
915         return False
916             
917
918 class Connecter:
919 # 2fastbt_
920     def __init__(self, metadata, make_upload, downloader, choker, numpieces, piece_size,
921             totalup, config, ratelimiter, merkle_torrent, sched = None, 
922             coordinator = None, helper = None, get_extip_func = lambda: None, mylistenport = None, use_g2g = False, infohash=None, tracker=None, live_streaming = False):
923
924         self.downloader = downloader
925         self.make_upload = make_upload
926         self.choker = choker
927         self.numpieces = numpieces
928         self.piece_size = piece_size
929         self.config = config
930         self.ratelimiter = ratelimiter
931         self.rate_capped = False
932         self.sched = sched
933         self.totalup = totalup
934         self.rate_capped = False
935         self.connections = {}
936         self.external_connection_made = 0
937         self.merkle_torrent = merkle_torrent
938         self.use_g2g = use_g2g
939         # 2fastbt_
940         self.coordinator = coordinator
941         self.helper = helper
942         self.round = 0
943         self.get_extip_func = get_extip_func
944         self.mylistenport = mylistenport
945         self.infohash = infohash
946         self.live_streaming = live_streaming
947         self.tracker = tracker
948         self.tracker_ip = None
949         if self.live_streaming:
950             try:
951                 (scheme, netloc, path, pars, query, _fragment) = urlparse.urlparse(self.tracker)
952                 host = netloc.split(':')[0] 
953                 self.tracker_ip = socket.getaddrinfo(host,None)[0][4][0]
954             except:
955                 print_exc()
956                 self.tracker_ip = None
957             #print >>sys.stderr,"Connecter: live: source/tracker is",self.tracker_ip
958         self.overlay_enabled = 0
959         if self.config['overlay']:
960             self.overlay_enabled = True
961
962         if DEBUG:
963             if self.overlay_enabled:
964                 print >>sys.stderr,"connecter: Enabling overlay"
965             else:
966                 print >>sys.stderr,"connecter: Disabling overlay"
967             
968         self.ut_pex_enabled = 0
969         if 'ut_pex_max_addrs_from_peer' in self.config:
970             self.ut_pex_max_addrs_from_peer = self.config['ut_pex_max_addrs_from_peer']
971             self.ut_pex_enabled = self.ut_pex_max_addrs_from_peer > 0
972         self.ut_pex_previous_conns = [] # last value of 'added' field for all peers
973
974         self.ut_metadata_enabled = self.config["magnetlink"]
975         if self.ut_metadata_enabled:
976             # metadata (or self.responce as its called in download_bt1) is
977             # a dic containing the metadata.  Ut_metadata shares the
978             # bencoded 'info' part of this metadata in 16kb pieces.
979             infodata = bencode(metadata["info"])
980             self.ut_metadata_size = len(infodata)
981             self.ut_metadata_list = [infodata[index:index+16*1024] for index in xrange(0, len(infodata), 16*1024)]
982             # history is a list containing previous request served (to
983             # limit our bandwidth usage)
984             self.ut_metadata_history = []
985             if DEBUG: print >> sys.stderr,"connecter.__init__: Enable ut_metadata"
986         
987         if DEBUG_UT_PEX:
988             if self.ut_pex_enabled:
989                 print >>sys.stderr,"connecter: Enabling uTorrent PEX",self.ut_pex_max_addrs_from_peer
990             else:
991                 print >>sys.stderr,"connecter: Disabling uTorrent PEX"
992
993         # The set of messages we support. Note that the msg ID is an int not a byte in 
994         # this dict.
995         self.EXTEND_HANDSHAKE_M_DICT = {}
996         
997         # Say in the EXTEND handshake that we support Closed swarms
998         if DEBUG:
999             print >>sys.stderr,"connecter: I support Closed Swarms"
1000         d = {EXTEND_MSG_CS:ord(CS_CHALLENGE_A)}
1001         self.EXTEND_HANDSHAKE_M_DICT.update(d)
1002
1003         if self.overlay_enabled:
1004             # Say in the EXTEND handshake we support the overlay-swarm ext.
1005             d = {EXTEND_MSG_OVERLAYSWARM:ord(CHALLENGE)}
1006             self.EXTEND_HANDSHAKE_M_DICT.update(d)
1007         if self.ut_pex_enabled:
1008             # Say in the EXTEND handshake we support uTorrent's peer exchange ext.
1009             d = {EXTEND_MSG_UTORRENT_PEX:ord(EXTEND_MSG_UTORRENT_PEX_ID)}
1010             self.EXTEND_HANDSHAKE_M_DICT.update(d)
1011             self.sched(self.ut_pex_callback,6)
1012         if self.use_g2g:
1013             # Say in the EXTEND handshake we want to do G2G.
1014             d = {EXTEND_MSG_G2G_V2:ord(G2G_PIECE_XFER)}
1015             self.EXTEND_HANDSHAKE_M_DICT.update(d)
1016             self.sched(self.g2g_callback,G2G_CALLBACK_INTERVAL)
1017         if self.merkle_torrent:
1018             d = {EXTEND_MSG_HASHPIECE:ord(HASHPIECE)}
1019             self.EXTEND_HANDSHAKE_M_DICT.update(d)
1020         if self.ut_metadata_enabled:
1021             d = {EXTEND_MSG_METADATA:ord(EXTEND_MSG_METADATA_ID)}
1022             self.EXTEND_HANDSHAKE_M_DICT.update(d)
1023             
1024             
1025         # LIVEHACK
1026         livekey = EXTEND_MSG_LIVE_PREFIX+str(CURRENT_LIVE_VERSION)
1027         d = {livekey:ord(LIVE_FAKE_MESSAGE_ID)}
1028         self.EXTEND_HANDSHAKE_M_DICT.update(d)
1029
1030         if DEBUG:
1031             print >>sys.stderr,"Connecter: EXTEND: my dict",self.EXTEND_HANDSHAKE_M_DICT
1032
1033         # BarterCast
1034         if config['overlay']:
1035             from BaseLib.Core.Overlay.OverlayThreadingBridge import OverlayThreadingBridge
1036             
1037             self.overlay_bridge = OverlayThreadingBridge.getInstance()
1038         else:
1039             self.overlay_bridge = None
1040             
1041         # RePEX
1042         self.repexer = None # Should this be called observer instead?
1043             
1044         # Closed Swarm stuff
1045         self.is_closed_swarm = False
1046         self.cs_post_func = None
1047         if 'cs_keys' in self.config:
1048             if self.config['cs_keys'] != None:
1049                 if len(self.config['cs_keys']) == 0:
1050                     if DEBUG_CS:
1051                         print >>sys.stderr, "connecter: cs_keys is empty"
1052                 else:
1053                     if DEBUG_CS:
1054                        print >>sys.stderr, "connecter: This is a closed swarm  - has cs_keys"
1055                     self.is_closed_swarm = True
1056
1057
1058     def how_many_connections(self):
1059         return len(self.connections)
1060
1061     def connection_made(self, connection):
1062
1063         assert connection
1064         c = Connection(connection, self)
1065         self.connections[connection] = c
1066         
1067         # RePEX: Inform repexer connection is made
1068         repexer = self.repexer
1069         if repexer:
1070             try:
1071                 repexer.connection_made(c,connection.supports_extend_messages())
1072                 if c.closed:
1073                     # The repexer can close the connection in certain cases.
1074                     # If so, we abort further execution of this function.
1075                     return c
1076             except:
1077                 print_exc()
1078         
1079         if connection.supports_extend_messages():
1080             # The peer either supports our overlay-swarm extension or 
1081             # the utorrent extended protocol.
1082             
1083             [client,version] = decodePeerID(connection.id)
1084             
1085             if DEBUG:
1086                 print >>sys.stderr,"connecter: Peer is client",client,"version",version,c.get_ip(),c.get_port()
1087             
1088             if self.overlay_enabled and client == TRIBLER_PEERID_LETTER and version <= '3.5.0' and connection.locally_initiated:
1089                 # Old Tribler, establish overlay connection<
1090                 if DEBUG:
1091                     print >>sys.stderr,"connecter: Peer is previous Tribler version, attempt overlay connection"
1092                 c.connect_overlay()
1093             elif self.ut_pex_enabled:
1094                 # EXTEND handshake must be sent just after BT handshake, 
1095                 # before BITFIELD even
1096                 c.send_extend_handshake()
1097                 
1098         #TODO: overlay swarm also needs upload and download to control transferring rate
1099         # If this is a closed swarm, don't do this now - will be done on completion of the CS protocol!
1100         c.upload = self.make_upload(c, self.ratelimiter, self.totalup)
1101         c.download = self.downloader.make_download(c)
1102         if not self.is_closed_swarm:
1103             if DEBUG_CS:
1104                print >>sys.stderr,"connecter: connection_made: Freeing choker!"
1105             self.choker.connection_made(c)
1106         else:
1107             if DEBUG_CS:
1108                 print >>sys.stderr,"connecter: connection_made: Will free choker later"
1109             self.choker.add_connection(c)
1110             #self.cs_post_func = lambda:self.choker.connection_made(c)
1111             #self.cs_post_func = lambda:self.choker.start_connection(c)
1112             self.cs_post_func = lambda:self._cs_completed(c)
1113
1114         return c
1115
1116     def connection_lost(self, connection):
1117         c = self.connections[connection]
1118
1119         # RePEX: inform repexer of closed connection
1120         repexer = self.repexer
1121         if repexer:
1122             try:
1123                 repexer.connection_closed(c)
1124             except:
1125                 print_exc()
1126
1127         ######################################
1128         # BarterCast
1129         if self.overlay_bridge is not None:
1130             ip = c.get_ip(False)       
1131             port = c.get_port(False)   
1132             down_kb = int(c.total_downloaded / 1024)
1133             up_kb = int(c.total_uploaded / 1024)
1134             
1135             if DEBUG:
1136                 print >> sys.stderr, "bartercast: attempting database update, adding olthread"
1137             
1138             olthread_bartercast_conn_lost_lambda = lambda:olthread_bartercast_conn_lost(ip,port,down_kb,up_kb)
1139             self.overlay_bridge.add_task(olthread_bartercast_conn_lost_lambda,0)
1140         else:
1141             if DEBUG:
1142                 print >> sys.stderr, "bartercast: no overlay bridge found"
1143             
1144         #########################
1145         
1146         if DEBUG:
1147             if c.get_ip() == self.tracker_ip:
1148                 print >>sys.stderr,"connecter: connection_lost: live: WAAH2 closing SOURCE"
1149             
1150         del self.connections[connection]
1151         if c.download:
1152             c.download.disconnected()
1153         self.choker.connection_lost(c)
1154
1155     def connection_flushed(self, connection):
1156         conn = self.connections[connection]
1157         if conn.next_upload is None and (conn.partial_message is not None
1158                or conn.upload.buffer):
1159             self.ratelimiter.queue(conn)
1160
1161     def got_piece(self, i):
1162         for co in self.connections.values():
1163             co.send_have(i)
1164
1165     def our_extend_msg_id_to_name(self,ext_id):
1166         """ find the name for the given message id (byte) """
1167         for key,val in self.EXTEND_HANDSHAKE_M_DICT.iteritems():
1168             if val == ord(ext_id):
1169                 return key
1170         return None
1171
1172     def get_ut_pex_conns(self):
1173         conns = []
1174         for conn in self.connections.values():
1175             if conn.get_extend_listenport() is not None:
1176                 conns.append(conn)
1177         return conns
1178             
1179     def get_ut_pex_previous_conns(self):
1180         return self.ut_pex_previous_conns
1181
1182     def set_ut_pex_previous_conns(self,conns):
1183         self.ut_pex_previous_conns = conns
1184
1185     def ut_pex_callback(self):
1186         """ Periocially send info about the peers you know to the other peers """
1187         if DEBUG_UT_PEX:
1188             print >>sys.stderr,"connecter: Periodic ut_pex update"
1189             
1190         currconns = self.get_ut_pex_conns()
1191         (addedconns,droppedconns) = ut_pex_get_conns_diff(currconns,self.get_ut_pex_previous_conns())
1192         self.set_ut_pex_previous_conns(currconns)
1193         if DEBUG_UT_PEX:
1194             for conn in addedconns:
1195                 print >>sys.stderr,"connecter: ut_pex: Added",conn.get_ip(),conn.get_extend_listenport()
1196             for conn in droppedconns:
1197                 print >>sys.stderr,"connecter: ut_pex: Dropped",conn.get_ip(),conn.get_extend_listenport()
1198             
1199         for c in currconns:
1200             if c.supports_extend_msg(EXTEND_MSG_UTORRENT_PEX):
1201                 try:
1202                     if DEBUG_UT_PEX:
1203                         print >>sys.stderr,"connecter: ut_pex: Creating msg for",c.get_ip(),c.get_extend_listenport()
1204                     if c.first_ut_pex():
1205                         aconns = currconns
1206                         dconns = []
1207                     else:
1208                         aconns = addedconns
1209                         dconns = droppedconns
1210                     payload = create_ut_pex(aconns,dconns,c)    
1211                     c.send_extend_ut_pex(payload)
1212                 except:
1213                     print_exc()
1214         self.sched(self.ut_pex_callback,60)
1215
1216     def g2g_callback(self):
1217         try:
1218             self.sched(self.g2g_callback,G2G_CALLBACK_INTERVAL)
1219             for c in self.connections.itervalues():
1220                 if not c.use_g2g:
1221                     continue
1222     
1223                 c.dequeue_g2g_piece_xfer()
1224         except:
1225             print_exc()
1226
1227     def got_ut_metadata(self, connection, dic, message):
1228         """
1229         CONNECTION: The connection instance where we received this message
1230         DIC: The bdecoded dictionary
1231         MESSAGE: The entire message: <EXTEND-ID><METADATA-ID><BENCODED-DIC><OPTIONAL-DATA>
1232         """
1233         if DEBUG: print >> sys.stderr, "connecter.got_ut_metadata:", dic
1234
1235         msg_type = dic.get("msg_type", None)
1236         if not type(msg_type) in (int, long):
1237             raise ValueError("Invalid ut_metadata.msg_type")
1238         piece = dic.get("piece", None)
1239         if not type(piece) in (int, long):
1240             raise ValueError("Invalid ut_metadata.piece type")
1241         if not 0 <= piece < len(self.ut_metadata_list):
1242             raise ValueError("Invalid ut_metadata.piece value")
1243
1244         if msg_type == 0: # request
1245             if DEBUG: print >> sys.stderr, "connecter.got_ut_metadata: Received request for piece", piece
1246
1247             # our flood protection policy is to upload all metadata
1248             # once every n minutes.
1249             now = time.time()
1250             deadline = now - UT_METADATA_FLOOD_PERIOD
1251             # remove old history
1252             self.ut_metadata_history = [timestamp for timestamp in self.ut_metadata_history if timestamp > deadline]
1253
1254             if len(self.ut_metadata_history) > UT_METADATA_FLOOD_FACTOR * len(self.ut_metadata_list):
1255                 # refuse to upload at this time
1256                 reply = bencode({"msg_type":2, "piece":piece})
1257             else:
1258                 reply = bencode({"msg_type":1, "piece":piece, "data":self.ut_metadata_list[piece]})
1259                 self.ut_metadata_history.append(now)
1260             connection._send_message(EXTEND + connection.his_extend_msg_name_to_id(EXTEND_MSG_METADATA) + reply)
1261         
1262         elif msg_type == 1: # data
1263             # at this point in the code we must assume that the
1264             # metadata is already there, everything is designed in
1265             # such a way that metadata is required.  data replies can
1266             # therefore never occur.
1267             raise ValueError("Invalid ut_metadata: we did not request data")
1268
1269         elif msg_type == 2: # reject
1270             # at this point in the code we must assume that the
1271             # metadata is already there, everything is designed in
1272             # such a way that metadata is required.  rejects can
1273             # therefore never occur.
1274             raise ValueError("Invalid ut_metadata: we did not request data that can be rejected")
1275
1276         else:
1277             raise ValueError("Invalid ut_metadata.msg_type value")
1278
1279     def got_hashpiece(self, connection, message):
1280         """ Process Merkle hashpiece message. Note: EXTEND byte has been 
1281         stripped, it starts with peer's Tr_hashpiece id for historic reasons ;-)
1282         """
1283         try:
1284             c = self.connections[connection]
1285             
1286             if len(message) <= 13:
1287                 if DEBUG:
1288                     print >>sys.stderr,"Close on bad HASHPIECE: msg len"
1289                 connection.close()
1290                 return
1291             i = toint(message[1:5])
1292             if i >= self.numpieces:
1293                 if DEBUG:
1294                     print >>sys.stderr,"Close on bad HASHPIECE: index out of range"
1295                 connection.close()
1296                 return
1297             begin = toint(message[5:9])
1298             len_hashlist = toint(message[9:13])
1299             bhashlist = message[13:13+len_hashlist]
1300             hashlist = bdecode(bhashlist)
1301             if not isinstance(hashlist, list):
1302                 raise AssertionError, "hashlist not list"
1303             for oh in hashlist:
1304                 if not isinstance(oh,list) or \
1305                 not (len(oh) == 2) or \
1306                 not isinstance(oh[0],int) or \
1307                 not isinstance(oh[1],str) or \
1308                 not ((len(oh[1])==20)): \
1309                     raise AssertionError, "hashlist entry invalid"
1310             piece = message[13+len_hashlist:]
1311
1312             if DEBUG_NORMAL_MSGS:
1313                 print >>sys.stderr,"connecter: Got HASHPIECE",i,begin
1314
1315             if c.download.got_piece(i, begin, hashlist, piece):
1316                 self.got_piece(i)
1317         except Exception,e:
1318             if DEBUG:
1319                 print >>sys.stderr,"Close on bad HASHPIECE: exception",str(e)
1320                 print_exc()
1321             connection.close()
1322
1323     # NETWORK AWARE
1324     def na_got_loopback(self,econnection):
1325         if DEBUG:
1326             print >>sys.stderr,"connecter: na_got_loopback: Got connection from",econnection.get_ip(),econnection.get_port()
1327         for c in self.connections.itervalues():
1328             ret = c.na_got_loopback(econnection)
1329             if ret is not None:
1330                 return ret
1331         return False
1332
1333     def na_got_internal_connection(self,origconn,newconn):
1334         """ This is called only at the initiator side of the internal conn.
1335         Doesn't matter, only one is enough to close the original connection.
1336         """
1337         if DEBUG:
1338             print >>sys.stderr,"connecter: na_got_internal: From",newconn.get_ip(),newconn.get_port()
1339         
1340         origconn.close()
1341
1342
1343     def got_message(self, connection, message):
1344         # connection: Encrypter.Connection; c: Connecter.Connection
1345         c = self.connections[connection]    
1346         t = message[0]
1347         # EXTEND handshake will be sent just after BT handshake, 
1348         # before BITFIELD even
1349
1350         if DEBUG_MESSAGE_HANDLING:
1351             st = time.time()
1352
1353         if DEBUG_NORMAL_MSGS:
1354             print >>sys.stderr,"connecter: Got",getMessageName(t),connection.get_ip()
1355         
1356         if t == EXTEND:
1357             self.got_extend_message(connection,c,message,self.ut_pex_enabled)
1358             return
1359
1360         # If this is a closed swarm and we have not authenticated the 
1361         # remote node, we must NOT GIVE IT ANYTHING!
1362         #if self.is_closed_swarm and c.closed_swarm_protocol.is_incomplete():
1363             #print >>sys.stderr, "connecter: Remote node not authorized, ignoring it"
1364             #return
1365
1366         if self.is_closed_swarm and c.can_send_to():
1367             c.got_anything = False # Is this correct or does it break something?
1368             
1369         if t == BITFIELD and c.got_anything:
1370             if DEBUG:
1371                 print >>sys.stderr,"Close on BITFIELD"
1372             connection.close()
1373             return
1374         c.got_anything = True
1375         if (t in [CHOKE, UNCHOKE, INTERESTED, NOT_INTERESTED] and 
1376                 len(message) != 1):
1377             if DEBUG:
1378                 print >>sys.stderr,"Close on bad (UN)CHOKE/(NOT_)INTERESTED",t
1379             connection.close()
1380             return
1381         if t == CHOKE:
1382             if DEBUG_NORMAL_MSGS:
1383                 print >>sys.stderr,"connecter: Got CHOKE from",connection.get_ip()
1384             c.download.got_choke()
1385         elif t == UNCHOKE:
1386             if DEBUG_NORMAL_MSGS:
1387                 print >>sys.stderr,"connecter: Got UNCHOKE from",connection.get_ip()
1388             c.download.got_unchoke()
1389         elif t == INTERESTED:
1390             if DEBUG_NORMAL_MSGS:
1391                 print >>sys.stderr,"connecter: Got INTERESTED from",connection.get_ip()
1392             if c.upload is not None:
1393                 c.upload.got_interested()
1394         elif t == NOT_INTERESTED:
1395             c.upload.got_not_interested()
1396         elif t == HAVE:
1397             if len(message) != 5:
1398                 if DEBUG:
1399                     print >>sys.stderr,"Close on bad HAVE: msg len"
1400                 connection.close()
1401                 return
1402             i = toint(message[1:])
1403             if i >= self.numpieces:
1404                 if DEBUG:
1405                     print >>sys.stderr,"Close on bad HAVE: index out of range"
1406                 connection.close()
1407                 return
1408             if DEBUG_NORMAL_MSGS:
1409                 print >>sys.stderr,"connecter: Got HAVE(",i,") from",connection.get_ip()
1410             c.download.got_have(i)
1411         elif t == BITFIELD:
1412             if DEBUG_NORMAL_MSGS:
1413                 print >>sys.stderr,"connecter: Got BITFIELD from",connection.get_ip()
1414             try:
1415                 b = Bitfield(self.numpieces, message[1:],calcactiveranges=self.live_streaming)
1416             except ValueError:
1417                 if DEBUG:
1418                     print >>sys.stderr,"Close on bad BITFIELD"
1419                 connection.close()
1420                 return
1421             if c.download is not None:
1422                 c.download.got_have_bitfield(b)
1423         elif t == REQUEST:
1424             if not c.can_send_to():
1425                 c.cs_status_unauth_requests.inc()
1426                 print >> sys.stderr,"Got REQUEST but remote node is not authenticated"
1427                 return # TODO: Do this better
1428
1429             if len(message) != 13:
1430                 if DEBUG:
1431                     print >>sys.stderr,"Close on bad REQUEST: msg len"
1432                 connection.close()
1433                 return
1434             i = toint(message[1:5])
1435             if i >= self.numpieces:
1436                 if DEBUG:
1437                     print >>sys.stderr,"Close on bad REQUEST: index out of range"
1438                 connection.close()
1439                 return
1440             if DEBUG_NORMAL_MSGS:
1441                 print >>sys.stderr,"connecter: Got REQUEST(",i,") from",connection.get_ip()
1442             c.got_request(i, toint(message[5:9]), toint(message[9:]))
1443         elif t == CANCEL:
1444             if len(message) != 13:
1445                 if DEBUG:
1446                     print >>sys.stderr,"Close on bad CANCEL: msg len"
1447                 connection.close()
1448                 return
1449             i = toint(message[1:5])
1450             if i >= self.numpieces:
1451                 if DEBUG:
1452                     print >>sys.stderr,"Close on bad CANCEL: index out of range"
1453                 connection.close()
1454                 return
1455             c.upload.got_cancel(i, toint(message[5:9]), 
1456                 toint(message[9:]))
1457         elif t == PIECE:
1458             if len(message) <= 9:
1459                 if DEBUG:
1460                     print >>sys.stderr,"Close on bad PIECE: msg len"
1461                 connection.close()
1462                 return
1463             i = toint(message[1:5])
1464             if i >= self.numpieces:
1465                 if DEBUG:
1466                     print >>sys.stderr,"Close on bad PIECE: msg len"
1467                 connection.close()
1468                 return
1469             if DEBUG_NORMAL_MSGS: # or connection.get_ip().startswith("192"):
1470                 print >>sys.stderr,"connecter: Got PIECE(",i,") from",connection.get_ip()
1471             #if connection.get_ip().startswith("192"):
1472             #    print >>sys.stderr,"@",
1473             try:
1474                 if c.download.got_piece(i, toint(message[5:9]), [], message[9:]):
1475                     self.got_piece(i)
1476             except Exception,e:
1477                 if DEBUG:
1478                     print >>sys.stderr,"Close on bad PIECE: exception",str(e)
1479                     print_exc()
1480                 connection.close()
1481                 return
1482             
1483         elif t == HASHPIECE:
1484             # Merkle: Handle pieces with hashes, old Tribler<= 4.5.2 style
1485             self.got_hashpiece(connection,message)
1486             
1487         elif t == G2G_PIECE_XFER: 
1488             # EXTEND_MSG_G2G_V1 only, V2 is proper EXTEND msg 
1489             if len(message) <= 12:
1490                 if DEBUG:
1491                     print >>sys.stderr,"Close on bad G2G_PIECE_XFER: msg len"
1492                 connection.close()
1493                 return
1494             if not c.use_g2g:
1495                 if DEBUG:
1496                     print >>sys.stderr,"Close on receiving G2G_PIECE_XFER over non-g2g connection"
1497                 connection.close()
1498                 return
1499
1500             index = toint(message[1:5])
1501             begin = toint(message[5:9])
1502             length = toint(message[9:13])
1503             c.got_g2g_piece_xfer_v1(index,begin,length)
1504
1505         else:
1506             connection.close()
1507
1508         if DEBUG_MESSAGE_HANDLING:
1509             et = time.time()
1510             diff = et - st
1511             if diff > 0.1:
1512                 print >>sys.stderr,"connecter: $$$$$$$$$$$$",getMessageName(t),"took",diff
1513
1514
1515     def got_extend_message(self,connection,c,message,ut_pex_enabled):
1516         # connection: Encrypter.Connection; c: Connecter.Connection
1517         if DEBUG:
1518             print >>sys.stderr,"connecter: Got EXTEND message, len",len(message)
1519             print >>sys.stderr,"connecter: his handshake",c.extend_hs_dict,c.get_ip()
1520             
1521         try:
1522             if len(message) < 4:
1523                 if DEBUG:
1524                     print >>sys.stderr,"Close on bad EXTEND: msg len"
1525                 connection.close()
1526                 return
1527             ext_id = message[1]
1528             if DEBUG:
1529                 print >>sys.stderr,"connecter: Got EXTEND message, id",ord(ext_id)
1530             if ext_id == EXTEND_MSG_HANDSHAKE_ID: 
1531                 # Message is Handshake
1532                 d = bdecode(message[2:])
1533                 if type(d) == DictType:
1534                     c.got_extend_handshake(d)
1535                 else:
1536                     if DEBUG:
1537                         print >>sys.stderr,"Close on bad EXTEND: payload of handshake is not a bencoded dict"
1538                     connection.close()
1539                     return
1540             else:
1541                 # Message is regular message e.g ut_pex
1542                 ext_msg_name = self.our_extend_msg_id_to_name(ext_id)
1543                 if ext_msg_name is None:
1544                     if DEBUG:
1545                         print >>sys.stderr,"Close on bad EXTEND: peer sent ID we didn't define in handshake"
1546                     connection.close()
1547                     return
1548                 elif ext_msg_name == EXTEND_MSG_OVERLAYSWARM:
1549                     if DEBUG:
1550                         print >>sys.stderr,"Not closing EXTEND+CHALLENGE: peer didn't read our spec right, be liberal"
1551                 elif ext_msg_name == EXTEND_MSG_UTORRENT_PEX and ut_pex_enabled:
1552                     d = bdecode(message[2:])
1553                     if type(d) == DictType:
1554                         c.got_ut_pex(d)
1555                     else:
1556                         if DEBUG:
1557                             print >>sys.stderr,"Close on bad EXTEND: payload of ut_pex is not a bencoded dict"
1558                         connection.close()
1559                         return
1560                 elif ext_msg_name == EXTEND_MSG_METADATA:
1561                     if DEBUG:
1562                         print >> sys.stderr, "Connecter.got_extend_message() ut_metadata"
1563                     # bdecode sloppy will make bdecode ignore the data
1564                     # in message that is placed -after- the bencoded
1565                     # data (this is the case for a data message)
1566                     d = bdecode(message[2:], sloppy=1)
1567                     if type(d) == DictType:
1568                         self.got_ut_metadata(c, d, message)
1569                     else:
1570                         if DEBUG:
1571                             print >> sys.stderr, "Connecter.got_extend_message() close on bad ut_metadata message"
1572                         connection.close()
1573                         return
1574                 elif ext_msg_name == EXTEND_MSG_G2G_V2 and self.use_g2g:
1575                     ppdict = bdecode(message[2:])
1576                     if type(ppdict) != DictType:
1577                         if DEBUG:
1578                             print >>sys.stderr,"Close on bad EXTEND+G2G: payload not dict"
1579                         connection.close()
1580                         return
1581                     for k,v in ppdict.iteritems():
1582                         if type(k) != StringType or type(v) != StringType:
1583                             if DEBUG:
1584                                 print >>sys.stderr,"Close on bad EXTEND+G2G: key,value not of type int,char"
1585                             connection.close()
1586                             return
1587                         try:
1588                             int(k)
1589                         except:
1590                             if DEBUG:
1591                                 print >>sys.stderr,"Close on bad EXTEND+G2G: key not int"
1592                             connection.close()
1593                             return
1594                         if ord(v) > 100:
1595                             if DEBUG:
1596                                 print >>sys.stderr,"Close on bad EXTEND+G2G: value too big",ppdict,v,ord(v)
1597                             connection.close()
1598                             return
1599                             
1600                     c.got_g2g_piece_xfer_v2(ppdict)
1601                     
1602                 elif ext_msg_name == EXTEND_MSG_HASHPIECE and self.merkle_torrent:
1603                     # Merkle: Handle pieces with hashes, Merkle BEP
1604                     oldmsg = message[1:]
1605                     self.got_hashpiece(connection,oldmsg)
1606                     
1607                 elif ext_msg_name == EXTEND_MSG_CS:
1608                     cs_list = bdecode(message[2:])
1609                     c.got_cs_message(cs_list)
1610                     
1611                 else:
1612                     if DEBUG:
1613                         print >>sys.stderr,"Close on bad EXTEND: peer sent ID that maps to name we don't support",ext_msg_name,`ext_id`,ord(ext_id)
1614                     connection.close()
1615                     return
1616             return
1617         except Exception,e:
1618             if not DEBUG:
1619                 print >>sys.stderr,"Close on bad EXTEND: exception:",str(e),`message[2:]`
1620                 print_exc()
1621             connection.close()
1622             return
1623
1624     def _cs_completed(self, connection):
1625         """
1626         When completed, this is a callback function to reset the connection
1627         """
1628         connection.cs_complete = True # Flag CS as completed
1629
1630         try:
1631             # Can't send bitfield here, must loop and send a bunch of HAVEs
1632             # Get the bitfield from the uploader
1633             have_list = connection.upload.storage.get_have_list()
1634             bitfield = Bitfield(self.numpieces, have_list)
1635             connection.send_bitfield(bitfield.tostring())
1636             connection.got_anything = False
1637             self.choker.start_connection(connection)
1638         except Exception,e:
1639             print >> sys.stderr,"connecter: CS: Error restarting after CS handshake:",e
1640         
1641     def cs_handshake_completed(self):
1642         if DEBUG_CS:
1643             print >>sys.stderr,"connecter: Closed swarm handshake completed!"
1644         if self.cs_post_func:
1645             self.cs_post_func()
1646         elif DEBUG_CS:
1647             print >>sys.stderr,"connecter: CS: Woops, don't have post function"
1648
1649
1650 def olthread_bartercast_conn_lost(ip,port,down_kb,up_kb):
1651     """ Called by OverlayThread to store information about the peer to
1652     whom the connection was just closed in the (slow) databases. """
1653     
1654     from BaseLib.Core.CacheDB.CacheDBHandler import PeerDBHandler, BarterCastDBHandler
1655     
1656     peerdb = PeerDBHandler.getInstance()
1657     bartercastdb = BarterCastDBHandler.getInstance()
1658     
1659     if bartercastdb:
1660     
1661         permid = peerdb.getPermIDByIP(ip)
1662         my_permid = bartercastdb.my_permid
1663     
1664         if DEBUG:
1665             print >> sys.stderr, "bartercast: (Connecter): Up %d down %d peer %s:%s (PermID = %s)" % (up_kb, down_kb, ip, port, `permid`)
1666     
1667         # Save exchanged KBs in BarterCastDB
1668         changed = False
1669         if permid is not None:
1670             #name = bartercastdb.getName(permid)
1671             
1672             if down_kb > 0:
1673                 new_value = bartercastdb.incrementItem((my_permid, permid), 'downloaded', down_kb, commit=False)
1674                 changed = True
1675      
1676             if up_kb > 0:
1677                 new_value = bartercastdb.incrementItem((my_permid, permid), 'uploaded', up_kb, commit=False)
1678                 changed = True
1679      
1680         # For the record: save KBs exchanged with non-tribler peers
1681         else:
1682             if down_kb > 0:
1683                 new_value = bartercastdb.incrementItem((my_permid, 'non-tribler'), 'downloaded', down_kb, commit=False)
1684                 changed = True
1685      
1686             if up_kb > 0:
1687                 new_value = bartercastdb.incrementItem((my_permid, 'non-tribler'), 'uploaded', up_kb, commit=False)
1688                 changed = True
1689                 
1690         if changed:
1691             bartercastdb.commit()
1692
1693     else:
1694         if DEBUG:
1695             print >> sys.stderr, "BARTERCAST: No bartercastdb instance"
1696             
1697
1698