1 # Written by Bram Cohen, Pawel Garbacki, Arno Bakker and Njaal Borch, George Milescu
2 # see LICENSE.txt for license information
6 from types import DictType,IntType,LongType,ListType,StringType
7 from random import shuffle
8 from traceback import print_exc,print_stack
13 from threading import Event # Wait for CS to complete
15 from BaseLib.Core.BitTornado.bitfield import Bitfield
16 from BaseLib.Core.BitTornado.clock import clock
17 from BaseLib.Core.BitTornado.bencode import bencode,bdecode
18 from BaseLib.Core.BitTornado.__init__ import version_short,decodePeerID,TRIBLER_PEERID_LETTER
19 from BaseLib.Core.BitTornado.BT1.convert import tobinary,toint
21 from BaseLib.Core.BitTornado.BT1.MessageID import *
22 from BaseLib.Core.DecentralizedTracking.MagnetLink.__init__ import *
24 from BaseLib.Core.DecentralizedTracking.ut_pex import *
25 from BaseLib.Core.BitTornado.BT1.track import compact_ip,decompact_ip
27 from BaseLib.Core.BitTornado.CurrentRateMeasure import Measure
28 from BaseLib.Core.ClosedSwarm import ClosedSwarm
29 from BaseLib.Core.Statistics.Status import Status
31 KICK_OLD_CLIENTS=False
34 DEBUG_NORMAL_MSGS = False
36 DEBUG_MESSAGE_HANDLING = False
37 DEBUG_CS = False # Debug closed swarms
39 UNAUTH_PERMID_PERIOD = 3600
41 # allow FACTOR times the metadata to be uploaded each PERIOD.
43 # FACTOR = 2 and PERIOD = 60 will allow all the metadata to be
44 # uploaded 2 times every 60 seconds.
45 UT_METADATA_FLOOD_FACTOR = 1
46 UT_METADATA_FLOOD_PERIOD = 5 * 60 * 60
50 uTorrent and Bram's BitTorrent now support an extension to the protocol,
51 documented on http://www.bittorrent.org/beps/bep_0010.html (previously
52 http://www.rasterbar.com/products/libtorrent/extension_protocol.html)
54 The problem is that the bit they use in the options field of the BT handshake
55 is the same as we use to indicate a peer supports the overlay-swarm connection.
56 The new clients will send an EXTEND message with ID 20 after the handshake to
57 inform the otherside what new messages it supports.
59 As a result, Tribler <= 3.5.0 clients won't be confused, but can't talk to these
60 new clients either or vice versa. The new client will think we understand the
61 message, send it. But because we don't know that message ID, we will close
62 the connection. Our attempts to establish a new overlay connection with the new
63 client will gracefully fail, as the new client will not know of infohash=00000...
64 and close the connection.
66 We solve this conflict by adding support for the EXTEND message. We are now be
67 able to receive it, and send our own. Our message will contain one method name,
68 i.e. Tr_OVERLAYSWARM=253. Processing is now as follows:
70 * If bit 43 is set and the peerID is from an old Tribler (<=3.5.0)
71 peer, we initiate an overlay-swarm connection.
72 * If bit 43 is set and the peer's EXTEND hs message contains method Tr_OVERLAYSWARM,
73 it's a new Tribler peer, and we initiate an overlay-swarm connection.
74 * If bit 43 is set, and the EXTEND hs message does not contain Tr_OVERLAYSWARM
75 it's not a Tribler client and we do not initiate an overlay-swarm
78 N.B. The EXTEND message is poorly designed, it lacks protocol versioning
79 support which is present in the Azureus Extended Messaging Protocol
80 and our overlay-swarm protocol.
83 EXTEND_MSG_HANDSHAKE_ID = chr(0)
84 EXTEND_MSG_OVERLAYSWARM = 'Tr_OVERLAYSWARM'
85 EXTEND_MSG_G2G_V1 = 'Tr_G2G'
86 EXTEND_MSG_G2G_V2 = 'Tr_G2G_v2'
87 EXTEND_MSG_HASHPIECE = 'Tr_hashpiece'
88 EXTEND_MSG_CS = 'NS_CS'
90 CURRENT_LIVE_VERSION=1
91 EXTEND_MSG_LIVE_PREFIX = 'Tr_LIVE_v'
92 LIVE_FAKE_MESSAGE_ID = chr(254)
96 G2G_CALLBACK_INTERVAL = 4
100 for i in xrange(len(s)):
101 text.append(ord(s[i]))
106 def __init__(self, connection, connecter):
107 self.connection = connection
108 self.connecter = connecter
109 self.got_anything = False
110 self.next_upload = None
112 self.partial_message = None
115 self.send_choke_queued = False
116 self.just_unchoked = None
117 self.unauth_permid = None
118 self.looked_for_permid = UNAUTH_PERMID_PERIOD-3
120 self.extend_hs_dict = {} # what extended messages does this peer support
121 self.initiated_overlay = False
124 self.use_g2g = False # set to true if both sides use G2G, indicated by self.connector.use_g2g
125 self.g2g_version = None
127 # batch G2G_XFER information and periodically send it out.
128 self.last_perc_sent = {}
130 config = self.connecter.config
131 self.forward_speeds = [Measure(config['max_rate_period'], config['upload_rate_fudge']),
132 Measure(config['max_rate_period'], config['upload_rate_fudge'])]
134 # BarterCast counters
135 self.total_downloaded = 0
136 self.total_uploaded = 0
138 self.ut_pex_first_flag = True # first time we sent a ut_pex to this peer?
139 self.na_candidate_ext_ip = None
141 self.na_candidate_ext_ip = None
143 # RePEX counters and repexer instance field
144 self.pex_received = 0 # number of PEX messages received
148 self.is_closed_swarm = False
149 self.cs_complete = False # Arno, 2010-08-24; no need for thread safety
150 self.remote_is_authenticated = False
151 self.remote_supports_cs = False
152 status = Status.get_status_holder("LivingLab")
153 self.cs_status = status.create_event("CS_protocol")
154 # This is a total for all
155 self.cs_status_unauth_requests = status.get_or_create_status_element("unauthorized_requests", 0)
156 self.cs_status_supported = status.get_or_create_status_element("nodes_supporting_cs", 0)
157 self.cs_status_not_supported = status.get_or_create_status_element("nodes_not_supporting_cs", 0)
159 if not self.connecter.is_closed_swarm:
160 self.cs_complete = True # Don't block anything if we're not a CS
162 if self.connecter.is_closed_swarm:
164 print >>sys.stderr,"connecter: conn: CS: This is a closed swarm"
165 self.is_closed_swarm = True
166 if 'poa' in self.connecter.config:
168 from base64 import decodestring
169 #poa = self.connecter.config.get_poa()
170 poa = ClosedSwarm.POA.deserialize(decodestring(self.connecter.config['poa']))
171 #poa = self.connecter.config['poa']
176 print >>sys.stderr,"connecter: conn: CS: Missing POA"
179 # Need to also get the rest of the info, like my keys
181 my_keypair = ClosedSwarm.read_cs_keypair(self.connecter.config['eckeypairfilename'])
182 self.closed_swarm_protocol = ClosedSwarm.ClosedSwarm(my_keypair,
183 self.connecter.infohash,
184 self.connecter.config['cs_keys'],
187 print >>sys.stderr,"connecter: conn: CS: Closed swarm ready to start handshake"
190 def get_myip(self, real=False):
191 return self.connection.get_myip(real)
193 def get_myport(self, real=False):
194 return self.connection.get_myport(real)
196 def get_ip(self, real=False):
197 return self.connection.get_ip(real)
199 def get_port(self, real=False):
200 return self.connection.get_port(real)
203 return self.connection.get_id()
205 def get_readable_id(self):
206 return self.connection.get_readable_id()
208 def can_send_to(self):
209 if self.is_closed_swarm and not self.remote_is_authenticated:
215 if self.get_ip() == self.connecter.tracker_ip:
216 print >>sys.stderr,"connecter: close: live: WAAH closing SOURCE"
218 self.connection.close()
225 def is_locally_initiated(self):
226 return self.connection.is_locally_initiated()
228 def send_interested(self):
229 self._send_message(INTERESTED)
231 def send_not_interested(self):
232 self._send_message(NOT_INTERESTED)
234 def send_choke(self):
235 if self.partial_message:
236 self.send_choke_queued = True
238 self._send_message(CHOKE)
239 self.upload.choke_sent()
240 self.just_unchoked = 0
242 def send_unchoke(self):
243 if not self.cs_complete:
245 print >> sys.stderr, 'Connection: send_unchoke: Not sending UNCHOKE, closed swarm handshanke not done'
248 if self.send_choke_queued:
249 self.send_choke_queued = False
250 if DEBUG_NORMAL_MSGS:
251 print >>sys.stderr,'Connection: send_unchoke: CHOKE SUPPRESSED'
253 self._send_message(UNCHOKE)
254 if (self.partial_message or self.just_unchoked is None
255 or not self.upload.interested or self.download.active_requests):
256 self.just_unchoked = 0
258 self.just_unchoked = clock()
261 def send_request(self, index, begin, length):
262 self._send_message(REQUEST + tobinary(index) +
263 tobinary(begin) + tobinary(length))
264 if DEBUG_NORMAL_MSGS:
265 print >>sys.stderr,"sending REQUEST to",self.get_ip()
266 print >>sys.stderr,'sent request: '+str(index)+': '+str(begin)+'-'+str(begin+length)
268 def send_cancel(self, index, begin, length):
269 self._send_message(CANCEL + tobinary(index) +
270 tobinary(begin) + tobinary(length))
271 if DEBUG_NORMAL_MSGS:
272 print >>sys.stderr,'sent cancel: '+str(index)+': '+str(begin)+'-'+str(begin+length)
274 def send_bitfield(self, bitfield):
275 if not self.cs_complete:
276 print >> sys.stderr, "Connection: send_bitfield: Not sending bitfield - CS handshake not done"
279 if self.can_send_to():
280 self._send_message(BITFIELD + bitfield)
282 self.cs_status_unauth_requests.inc()
283 print >>sys.stderr,"Connection: send_bitfield: Sending empty bitfield to unauth node"
284 self._send_message(BITFIELD + Bitfield(self.connecter.numpieces).tostring())
287 def send_have(self, index):
288 if self.can_send_to():
289 self._send_message(HAVE + tobinary(index))
291 # print >>sys.stderr,"Supressing HAVE messages"
293 def send_keepalive(self):
294 self._send_message('')
296 def _send_message(self, s):
297 s = tobinary(len(s))+s
298 if self.partial_message:
299 self.outqueue.append(s)
301 self.connection.send_message_raw(s)
303 def send_partial(self, bytes):
304 if self.connection.closed:
306 if not self.can_send_to():
308 if self.partial_message is None:
309 s = self.upload.get_upload_chunk()
312 # Merkle: send hashlist along with piece in HASHPIECE message
313 index, begin, hashlist, piece = s
316 # ----- G2G: record who we send this to
317 self.g2g_sent_piece_part( self, index, begin, hashlist, piece )
319 # ---- G2G: we are uploading len(piece) data of piece #index
320 for c in self.connecter.connections.itervalues():
324 # include sending to self, because it should not be excluded from the statistics
326 c.queue_g2g_piece_xfer( index, begin, piece )
328 if self.connecter.merkle_torrent:
329 hashpiece_msg_id = self.his_extend_msg_name_to_id(EXTEND_MSG_HASHPIECE)
330 bhashlist = bencode(hashlist)
331 if hashpiece_msg_id is None:
332 # old Tribler <= 4.5.2 style
333 self.partial_message = ''.join((
334 tobinary(1+4+4+4+len(bhashlist)+len(piece)), HASHPIECE,
335 tobinary(index), tobinary(begin), tobinary(len(bhashlist)), bhashlist, piece.tostring() ))
338 self.partial_message = ''.join((
339 tobinary(2+4+4+4+len(bhashlist)+len(piece)), EXTEND, hashpiece_msg_id,
340 tobinary(index), tobinary(begin), tobinary(len(bhashlist)), bhashlist, piece.tostring() ))
343 self.partial_message = ''.join((
344 tobinary(len(piece) + 9), PIECE,
345 tobinary(index), tobinary(begin), piece.tostring()))
346 if DEBUG_NORMAL_MSGS:
347 print >>sys.stderr,'sending chunk: '+str(index)+': '+str(begin)+'-'+str(begin+len(piece))
349 if bytes < len(self.partial_message):
350 self.connection.send_message_raw(self.partial_message[:bytes])
351 self.partial_message = self.partial_message[bytes:]
354 q = [self.partial_message]
355 self.partial_message = None
356 if self.send_choke_queued:
357 self.send_choke_queued = False
358 self.outqueue.append(tobinary(1)+CHOKE)
359 self.upload.choke_sent()
360 self.just_unchoked = 0
361 q.extend(self.outqueue)
364 self.connection.send_message_raw(q)
367 def get_upload(self):
370 def get_download(self):
373 def set_download(self, download):
374 self.download = download
376 def backlogged(self):
377 return not self.connection.is_flushed()
379 def got_request(self, i, p, l):
380 self.upload.got_request(i, p, l)
381 if self.just_unchoked:
382 self.connecter.ratelimiter.ping(clock() - self.just_unchoked)
383 self.just_unchoked = 0
386 # Extension protocol support
388 def supports_extend_msg(self,msg_name):
389 if 'm' in self.extend_hs_dict:
390 return msg_name in self.extend_hs_dict['m']
394 def got_extend_handshake(self,d):
396 print >>sys.stderr,"connecter: Got EXTEND handshake:",d
398 if type(d['m']) != DictType:
399 raise ValueError('Key m does not map to a dict')
402 for key,val in m.iteritems():
403 if type(val) != IntType:
404 # Fix for BitTorrent 4.27.2e
405 if type(val) == StringType:
409 raise ValueError('Message ID in m-dict not int')
412 if not 'm' in self.extend_hs_dict:
413 self.extend_hs_dict['m'] = {}
414 # Note: we store the dict without converting the msg IDs to bytes.
415 self.extend_hs_dict['m'].update(newm)
416 if self.connecter.overlay_enabled and EXTEND_MSG_OVERLAYSWARM in self.extend_hs_dict['m']:
417 # This peer understands our overlay swarm extension
418 if self.connection.locally_initiated:
420 print >>sys.stderr,"connecter: Peer supports Tr_OVERLAYSWARM, attempt connection"
421 self.connect_overlay()
423 if EXTEND_MSG_CS in self.extend_hs_dict['m']:
424 self.remote_supports_cs = True
425 self.cs_status_supported.inc()
427 print >>sys.stderr,"connecter: Peer supports Closed swarms"
429 if self.is_closed_swarm and self.connection.locally_initiated:
431 print >>sys.stderr,"connecter: Initiating Closed swarm handshake"
432 self.start_cs_handshake()
434 self.remote_supports_cs = False
435 self.cs_status_not_supported.inc()
437 print >>sys.stderr,"connecter: conn: Remote node does not support CS, flagging CS as done"
438 self.connecter.cs_handshake_completed()
439 status = Status.get_status_holder("LivingLab")
440 status.add_event(self.cs_status)
441 self.cs_status = status.create_event("CS_protocol")
444 if self.connecter.use_g2g and (EXTEND_MSG_G2G_V1 in self.extend_hs_dict['m'] or EXTEND_MSG_G2G_V2 in self.extend_hs_dict['m']):
445 # Both us and the peer want to use G2G
446 if self.connection.locally_initiated:
448 print >>sys.stderr,"connecter: Peer supports Tr_G2G"
451 if EXTEND_MSG_G2G_V2 in self.extend_hs_dict['m']:
452 self.g2g_version = EXTEND_MSG_G2G_V2
454 self.g2g_version = EXTEND_MSG_G2G_V1
458 peerhaslivekey = False
459 for key in self.extend_hs_dict['m']:
460 if key.startswith(EXTEND_MSG_LIVE_PREFIX):
461 peerhaslivekey = True
462 livever = int(key[len(EXTEND_MSG_LIVE_PREFIX):])
463 if livever < CURRENT_LIVE_VERSION:
464 raise ValueError("Too old LIVE VERSION "+livever)
466 print >>sys.stderr,"Connecter: live: Keeping connection to up-to-date peer v",livever,self.get_ip()
468 if not peerhaslivekey:
469 if self.get_ip() == self.connecter.tracker_ip:
470 # Keep connection to tracker / source
471 print >>sys.stderr,"Connecter: live: Keeping connection to SOURCE",self.connecter.tracker_ip
473 raise ValueError("Kicking old LIVE peer "+self.get_ip())
475 # 'p' is peer's listen port, 'v' is peer's version, all optional
476 # 'e' is used by uTorrent to show it prefers encryption (whatever that means)
477 # See http://www.bittorrent.org/beps/bep_0010.html
478 for key in ['p','e', 'yourip','ipv4','ipv6','reqq']:
480 self.extend_hs_dict[key] = d[key]
482 #print >>sys.stderr,"connecter: got_extend_hs: keys",d.keys()
484 # If he tells us our IP, record this and see if we get a majority vote on it
487 yourip = decompact_ip(d['yourip'])
490 from BaseLib.Core.NATFirewall.DialbackMsgHandler import DialbackMsgHandler
491 dmh = DialbackMsgHandler.getInstance()
492 dmh.network_btengine_extend_yourip(yourip)
498 if 'same_nat_try_internal' in self.connecter.config and self.connecter.config['same_nat_try_internal']:
500 self.na_check_for_same_nat(yourip)
504 # RePEX: Tell repexer we have received an extended handshake
505 repexer = self.connecter.repexer
508 version = d.get('v',None)
509 repexer.got_extend_handshake(self, version)
513 def his_extend_msg_name_to_id(self,ext_name):
514 """ returns the message id (byte) for the given message name or None """
515 val = self.extend_hs_dict['m'].get(ext_name)
521 def get_extend_encryption(self):
522 return self.extend_hs_dict.get('e',0)
524 def get_extend_listenport(self):
525 return self.extend_hs_dict.get('p')
527 def is_tribler_peer(self):
528 client, version = decodePeerID(self.connection.id)
529 return client == TRIBLER_PEERID_LETTER
531 def send_extend_handshake(self):
534 hisip = self.connection.get_ip(real=True)
536 if self.connecter.config.get('same_nat_try_internal',0):
537 is_tribler_peer = self.is_tribler_peer()
538 print >>sys.stderr,"connecter: send_extend_hs: Peer is Tribler client",is_tribler_peer
540 # If we're connecting to a Tribler peer, show our internal IP address
542 ipv4 = self.get_ip(real=True)
544 # See: http://www.bittorrent.org/beps/bep_0010.html
546 d['m'] = self.connecter.EXTEND_HANDSHAKE_M_DICT
547 d['p'] = self.connecter.mylistenport
548 ver = version_short.replace('-',' ',1)
550 d['e'] = 0 # Apparently this means we don't like uTorrent encryption
551 d['yourip'] = compact_ip(hisip)
553 # Only send IPv4 when necessary, we prefer this peer to use this addr.
554 d['ipv4'] = compact_ip(ipv4)
555 if self.connecter.ut_metadata_enabled:
556 # todo: set correct size if known
557 d['metadata_size'] = self.connecter.ut_metadata_size
559 self._send_message(EXTEND + EXTEND_MSG_HANDSHAKE_ID + bencode(d))
561 print >>sys.stderr,'connecter: sent extend: id=0+',d,"yourip",hisip,"ipv4",ipv4
566 def got_ut_pex(self,d):
568 print >>sys.stderr,"connecter: Got uTorrent PEX:",d
569 (same_added_peers,added_peers,dropped_peers) = check_ut_pex(d)
571 # RePEX: increase counter
572 self.pex_received += 1
574 # RePEX: for now, we pass the whole PEX dict to the repexer and
575 # let it decode it. The reason is that check_ut_pex's interface
576 # has recently changed, currently returning a triple to prefer
577 # Tribler peers. The repexer, however, might have different
578 # interests (e.g., storinng all flags). To cater to both interests,
579 # check_ut_pex needs to be rewritten.
580 repexer = self.connecter.repexer
583 repexer.got_ut_pex(self, d)
588 # DoS protection: we're accepting IP addresses from
589 # an untrusted source, so be a bit careful
590 mx = self.connecter.ut_pex_max_addrs_from_peer
592 print >>sys.stderr,"connecter: Got",len(added_peers),"peers via uTorrent PEX, using max",mx
594 # for now we have a strong bias towards Tribler peers
595 if self.is_tribler_peer():
596 shuffle(same_added_peers)
598 sample_peers = same_added_peers
599 sample_peers.extend(added_peers)
601 sample_peers = same_added_peers
602 sample_peers.extend(added_peers)
603 shuffle(sample_peers)
605 # Take random sample of mx peers
606 sample_added_peers_with_id = []
608 # Put the sample in the format desired by Encoder.start_connections()
609 for dns in sample_peers[:mx]:
610 peer_with_id = (dns, 0)
611 sample_added_peers_with_id.append(peer_with_id)
612 if len(sample_added_peers_with_id) > 0:
614 print >>sys.stderr,"connecter: Starting ut_pex conns to",len(sample_added_peers_with_id)
615 self.connection.Encoder.start_connections(sample_added_peers_with_id)
617 def send_extend_ut_pex(self,payload):
618 msg = EXTEND+self.his_extend_msg_name_to_id(EXTEND_MSG_UTORRENT_PEX)+payload
619 self._send_message(msg)
621 def first_ut_pex(self):
622 if self.ut_pex_first_flag:
623 self.ut_pex_first_flag = False
628 def _send_cs_message(self, cs_list):
629 blist = bencode(cs_list)
630 self._send_message(EXTEND + self.his_extend_msg_name_to_id(EXTEND_MSG_CS) + blist)
632 def got_cs_message(self, cs_list):
633 if not self.is_closed_swarm:
634 raise Exception("Got ClosedSwarm message, but this swarm is not closed")
636 # Process incoming closed swarm messages
638 if t == CS_CHALLENGE_A:
640 print >>sys.stderr,"connecter: conn: CS: Got initial challenge"
641 # Got a challenge to authenticate to participate in a closed swarm
643 response = self.closed_swarm_protocol.b_create_challenge(cs_list)
644 self._send_cs_message(response)
646 self.cs_status.add_value("CS_bad_initial_challenge")
648 print >>sys.stderr,"connecter: conn: CS: Bad initial challenge:",e
649 elif t == CS_CHALLENGE_B:
651 print >>sys.stderr,"connecter: conn: CS: Got return challenge"
653 response = self.closed_swarm_protocol.a_provide_poa_message(cs_list)
654 if DEBUG_CS and not response:
655 print >> sys.stderr, "connecter: I'm not intererested in data"
656 self._send_cs_message(response)
658 self.cs_status.add_value("CS_bad_return_challenge")
660 print >>sys.stderr,"connecter: conn: CS: Bad return challenge",e
663 elif t == CS_POA_EXCHANGE_A:
665 print >>sys.stderr,"connecter: conn: CS:Got POA from A"
667 response = self.closed_swarm_protocol.b_provide_poa_message(cs_list)
668 self.remote_is_authenticated = self.closed_swarm_protocol.is_remote_node_authorized()
670 print >>sys.stderr,"connecter: conn: CS: Remote node authorized:",self.remote_is_authenticated
672 self._send_cs_message(response)
674 self.cs_status.add_value("CS_bad_POA_EXCHANGE_A")
676 print >>sys.stderr,"connecter: conn: CS: Bad POA from A:",e
678 elif t == CS_POA_EXCHANGE_B:
680 self.closed_swarm_protocol.a_check_poa_message(cs_list)
681 self.remote_is_authenticated = self.closed_swarm_protocol.is_remote_node_authorized()
683 print >>sys.stderr,"connecter: conn: CS: Remote node authorized:",self.remote_is_authenticated
685 self.cs_status.add_value("CS_bad_POA_EXCHANGE_B")
687 print >>sys.stderr,"connecter: conn: CS: Bad POA from B:",e
689 if not self.closed_swarm_protocol.is_incomplete():
690 self.connecter.cs_handshake_completed()
691 status = Status.get_status_holder("LivingLab")
692 self.cs_complete = True # Flag CS as completed
693 # Don't need to add successful CS event
698 def g2g_sent_piece_part( self, c, index, begin, hashlist, piece ):
699 """ Keeps a record of the fact that we sent piece index[begin:begin+chunk]. """
701 wegaveperc = float(len(piece))/float(self.connecter.piece_size)
702 if index in self.perc_sent:
703 self.perc_sent[index] = self.perc_sent[index] + wegaveperc
705 self.perc_sent[index] = wegaveperc
708 def queue_g2g_piece_xfer(self,index,begin,piece):
709 """ Queue the fact that we sent piece index[begin:begin+chunk] for
712 if self.g2g_version == EXTEND_MSG_G2G_V1:
713 self.send_g2g_piece_xfer_v1(index,begin,piece)
716 perc = float(len(piece))/float(self.connecter.piece_size)
717 if index in self.last_perc_sent:
718 self.last_perc_sent[index] = self.last_perc_sent[index] + perc
720 self.last_perc_sent[index] = perc
722 def dequeue_g2g_piece_xfer(self):
723 """ Send queued information about pieces we sent to peers. Called
726 psf = float(self.connecter.piece_size)
729 #print >>sys.stderr,"connecter: g2g dq: orig",self.last_perc_sent
731 for index,perc in self.last_perc_sent.iteritems():
732 # due to rerequests due to slow pieces the sum can be above 1.0
733 capperc = min(1.0,perc)
734 percb = chr(int((100.0 * capperc)))
735 # bencode can't deal with int keys
736 ppdict[str(index)] = percb
737 self.last_perc_sent = {}
739 #print >>sys.stderr,"connecter: g2g dq: dest",ppdict
742 self.send_g2g_piece_xfer_v2(ppdict)
744 def send_g2g_piece_xfer_v1(self,index,begin,piece):
745 """ Send fact that we sent piece index[begin:begin+chunk] to a peer
746 to all peers (G2G V1).
748 self._send_message(self.his_extend_msg_name_to_id(EXTEND_MSG_G2G_V1) + tobinary(index) + tobinary(begin) + tobinary(len(piece)))
750 def send_g2g_piece_xfer_v2(self,ppdict):
751 """ Send list of facts that we sent pieces to all peers (G2G V2). """
752 blist = bencode(ppdict)
753 self._send_message(EXTEND + self.his_extend_msg_name_to_id(EXTEND_MSG_G2G_V2) + blist)
755 def got_g2g_piece_xfer_v1(self,index,begin,length):
756 """ Got a G2G_PIECE_XFER message in V1 format. """
757 hegaveperc = float(length)/float(self.connecter.piece_size)
758 self.g2g_peer_forwarded_piece_part(index,hegaveperc)
760 def got_g2g_piece_xfer_v2(self,ppdict):
761 """ Got a G2G_PIECE_XFER message in V2 format. """
762 for indexstr,hegavepercb in ppdict.iteritems():
763 index = int(indexstr)
764 hegaveperc = float(ord(hegavepercb))/100.0
765 self.g2g_peer_forwarded_piece_part(index,hegaveperc)
767 def g2g_peer_forwarded_piece_part(self,index,hegaveperc):
768 """ Processes this peer forwarding piece i[begin:end] to a grandchild. """
769 # Reward for forwarding data in general
770 length = ceil(hegaveperc * float(self.connecter.piece_size))
771 self.forward_speeds[1].update_rate(length)
773 if index not in self.perc_sent:
774 # piece came from disk
777 # Extra reward if its data we sent
778 wegaveperc = self.perc_sent[index]
779 overlapperc = wegaveperc * hegaveperc
780 overlap = ceil(overlapperc * float(self.connecter.piece_size))
782 self.forward_speeds[0].update_rate( overlap )
784 def g2g_score( self ):
785 return [x.get_rate() for x in self.forward_speeds]
789 # SecureOverlay support
791 def connect_overlay(self):
793 print >>sys.stderr,"connecter: Initiating overlay connection"
794 if not self.initiated_overlay:
795 from BaseLib.Core.Overlay.SecureOverlay import SecureOverlay
797 self.initiated_overlay = True
798 so = SecureOverlay.getInstance()
799 so.connect_dns(self.connection.dns,self.network_connect_dns_callback)
801 def network_connect_dns_callback(self,exc,dns,permid,selversion):
802 # WARNING: WILL BE CALLED BY NetworkThread
804 print >>sys.stderr,"connecter: peer",dns,"said he supported overlay swarm, but we can't connect to him",exc
806 def start_cs_handshake(self):
809 print >>sys.stderr,"connecter: conn: CS: Initiating Closed Swarm Handshake"
810 challenge = self.closed_swarm_protocol.a_create_challenge()
811 self._send_cs_message(challenge)
813 print >>sys.stderr,"connecter: conn: CS: Bad initial challenge:",e
819 def na_check_for_same_nat(self,yourip):
820 """ See if peer is local, e.g. behind same NAT, same AS or something.
821 If so, try to optimize:
822 - Same NAT -> reconnect to use internal network
824 hisip = self.connection.get_ip(real=True)
826 # Do we share the same NAT?
827 myextip = self.connecter.get_extip_func(unknowniflocal=True)
828 myintip = self.get_ip(real=True)
831 print >>sys.stderr,"connecter: na_check_for_same_nat: his",hisip,"myext",myextip,"myint",myintip
833 if hisip != myintip or hisip == '127.0.0.1': # to allow testing
834 # He can't fake his source addr, so we're not running on the
837 # He may be quicker to determine we should have a local
838 # conn, so prepare for his connection in advance.
841 # I don't known my external IP and he's not on the same
842 # machine as me. yourip could be our real external IP, test.
844 print >>sys.stderr,"connecter: na_check_same_nat: Don't know my ext ip, try to loopback to",yourip,"to see if that's me"
845 self.na_start_loopback_connection(yourip)
846 elif hisip == myextip:
847 # Same NAT. He can't fake his source addr.
848 # Attempt local network connection
850 print >>sys.stderr,"connecter: na_check_same_nat: Yes, trying to connect via internal"
851 self.na_start_internal_connection()
854 # He claims we share the same IP, but I think my ext IP
855 # is something different. Either he is lying or I'm
858 print >>sys.stderr,"connecter: na_check_same_nat: Maybe, me thinks not, try to loopback to",yourip
859 self.na_start_loopback_connection(yourip)
862 def na_start_loopback_connection(self,yourip):
863 """ Peer claims my external IP is "yourip". Try to connect back to myself """
865 print >>sys.stderr,"connecter: na_start_loopback: Checking if my ext ip is",yourip
866 self.na_candidate_ext_ip = yourip
868 dns = (yourip,self.connecter.mylistenport)
869 self.connection.Encoder.start_connection(dns,0,forcenew=True)
871 def na_got_loopback(self,econnection):
872 """ Got a connection with my peer ID. Check that this is indeed me looping
873 back to myself. No man-in-the-middle attacks protection. This is complex
874 if we're also connecting to ourselves because of a stale tracker
875 registration. Window of opportunity is small.
877 himismeip = econnection.get_ip(real=True)
879 print >>sys.stderr,"connecter: conn: na_got_loopback:",himismeip,self.na_candidate_ext_ip
880 if self.na_candidate_ext_ip == himismeip:
881 self.na_start_internal_connection()
884 def na_start_internal_connection(self):
885 """ Reconnect to peer using internal network """
887 print >>sys.stderr,"connecter: na_start_internal_connection"
889 # Doesn't really matter who initiates. Letting other side do it makes
891 if not self.is_locally_initiated():
893 hisip = decompact_ip(self.extend_hs_dict['ipv4'])
894 hisport = self.extend_hs_dict['p']
896 # For testing, see Tribler/Test/test_na_extend_hs.py
897 if hisip == '224.4.8.1' and hisport == 4810:
901 self.connection.na_want_internal_conn_from = hisip
903 hisdns = (hisip,hisport)
905 print >>sys.stderr,"connecter: na_start_internal_connection to",hisdns
906 self.connection.Encoder.start_connection(hisdns,0)
908 def na_get_address_distance(self):
909 return self.connection.na_get_address_distance()
911 def is_live_source(self):
912 if self.connecter.live_streaming:
913 if self.get_ip() == self.connecter.tracker_ip:
920 def __init__(self, metadata, make_upload, downloader, choker, numpieces, piece_size,
921 totalup, config, ratelimiter, merkle_torrent, sched = None,
922 coordinator = None, helper = None, get_extip_func = lambda: None, mylistenport = None, use_g2g = False, infohash=None, tracker=None, live_streaming = False):
924 self.downloader = downloader
925 self.make_upload = make_upload
927 self.numpieces = numpieces
928 self.piece_size = piece_size
930 self.ratelimiter = ratelimiter
931 self.rate_capped = False
933 self.totalup = totalup
934 self.rate_capped = False
935 self.connections = {}
936 self.external_connection_made = 0
937 self.merkle_torrent = merkle_torrent
938 self.use_g2g = use_g2g
940 self.coordinator = coordinator
943 self.get_extip_func = get_extip_func
944 self.mylistenport = mylistenport
945 self.infohash = infohash
946 self.live_streaming = live_streaming
947 self.tracker = tracker
948 self.tracker_ip = None
949 if self.live_streaming:
951 (scheme, netloc, path, pars, query, _fragment) = urlparse.urlparse(self.tracker)
952 host = netloc.split(':')[0]
953 self.tracker_ip = socket.getaddrinfo(host,None)[0][4][0]
956 self.tracker_ip = None
957 #print >>sys.stderr,"Connecter: live: source/tracker is",self.tracker_ip
958 self.overlay_enabled = 0
959 if self.config['overlay']:
960 self.overlay_enabled = True
963 if self.overlay_enabled:
964 print >>sys.stderr,"connecter: Enabling overlay"
966 print >>sys.stderr,"connecter: Disabling overlay"
968 self.ut_pex_enabled = 0
969 if 'ut_pex_max_addrs_from_peer' in self.config:
970 self.ut_pex_max_addrs_from_peer = self.config['ut_pex_max_addrs_from_peer']
971 self.ut_pex_enabled = self.ut_pex_max_addrs_from_peer > 0
972 self.ut_pex_previous_conns = [] # last value of 'added' field for all peers
974 self.ut_metadata_enabled = self.config["magnetlink"]
975 if self.ut_metadata_enabled:
976 # metadata (or self.responce as its called in download_bt1) is
977 # a dic containing the metadata. Ut_metadata shares the
978 # bencoded 'info' part of this metadata in 16kb pieces.
979 infodata = bencode(metadata["info"])
980 self.ut_metadata_size = len(infodata)
981 self.ut_metadata_list = [infodata[index:index+16*1024] for index in xrange(0, len(infodata), 16*1024)]
982 # history is a list containing previous request served (to
983 # limit our bandwidth usage)
984 self.ut_metadata_history = []
985 if DEBUG: print >> sys.stderr,"connecter.__init__: Enable ut_metadata"
988 if self.ut_pex_enabled:
989 print >>sys.stderr,"connecter: Enabling uTorrent PEX",self.ut_pex_max_addrs_from_peer
991 print >>sys.stderr,"connecter: Disabling uTorrent PEX"
993 # The set of messages we support. Note that the msg ID is an int not a byte in
995 self.EXTEND_HANDSHAKE_M_DICT = {}
997 # Say in the EXTEND handshake that we support Closed swarms
999 print >>sys.stderr,"connecter: I support Closed Swarms"
1000 d = {EXTEND_MSG_CS:ord(CS_CHALLENGE_A)}
1001 self.EXTEND_HANDSHAKE_M_DICT.update(d)
1003 if self.overlay_enabled:
1004 # Say in the EXTEND handshake we support the overlay-swarm ext.
1005 d = {EXTEND_MSG_OVERLAYSWARM:ord(CHALLENGE)}
1006 self.EXTEND_HANDSHAKE_M_DICT.update(d)
1007 if self.ut_pex_enabled:
1008 # Say in the EXTEND handshake we support uTorrent's peer exchange ext.
1009 d = {EXTEND_MSG_UTORRENT_PEX:ord(EXTEND_MSG_UTORRENT_PEX_ID)}
1010 self.EXTEND_HANDSHAKE_M_DICT.update(d)
1011 self.sched(self.ut_pex_callback,6)
1013 # Say in the EXTEND handshake we want to do G2G.
1014 d = {EXTEND_MSG_G2G_V2:ord(G2G_PIECE_XFER)}
1015 self.EXTEND_HANDSHAKE_M_DICT.update(d)
1016 self.sched(self.g2g_callback,G2G_CALLBACK_INTERVAL)
1017 if self.merkle_torrent:
1018 d = {EXTEND_MSG_HASHPIECE:ord(HASHPIECE)}
1019 self.EXTEND_HANDSHAKE_M_DICT.update(d)
1020 if self.ut_metadata_enabled:
1021 d = {EXTEND_MSG_METADATA:ord(EXTEND_MSG_METADATA_ID)}
1022 self.EXTEND_HANDSHAKE_M_DICT.update(d)
1026 livekey = EXTEND_MSG_LIVE_PREFIX+str(CURRENT_LIVE_VERSION)
1027 d = {livekey:ord(LIVE_FAKE_MESSAGE_ID)}
1028 self.EXTEND_HANDSHAKE_M_DICT.update(d)
1031 print >>sys.stderr,"Connecter: EXTEND: my dict",self.EXTEND_HANDSHAKE_M_DICT
1034 if config['overlay']:
1035 from BaseLib.Core.Overlay.OverlayThreadingBridge import OverlayThreadingBridge
1037 self.overlay_bridge = OverlayThreadingBridge.getInstance()
1039 self.overlay_bridge = None
1042 self.repexer = None # Should this be called observer instead?
1044 # Closed Swarm stuff
1045 self.is_closed_swarm = False
1046 self.cs_post_func = None
1047 if 'cs_keys' in self.config:
1048 if self.config['cs_keys'] != None:
1049 if len(self.config['cs_keys']) == 0:
1051 print >>sys.stderr, "connecter: cs_keys is empty"
1054 print >>sys.stderr, "connecter: This is a closed swarm - has cs_keys"
1055 self.is_closed_swarm = True
1058 def how_many_connections(self):
1059 return len(self.connections)
1061 def connection_made(self, connection):
1064 c = Connection(connection, self)
1065 self.connections[connection] = c
1067 # RePEX: Inform repexer connection is made
1068 repexer = self.repexer
1071 repexer.connection_made(c,connection.supports_extend_messages())
1073 # The repexer can close the connection in certain cases.
1074 # If so, we abort further execution of this function.
1079 if connection.supports_extend_messages():
1080 # The peer either supports our overlay-swarm extension or
1081 # the utorrent extended protocol.
1083 [client,version] = decodePeerID(connection.id)
1086 print >>sys.stderr,"connecter: Peer is client",client,"version",version,c.get_ip(),c.get_port()
1088 if self.overlay_enabled and client == TRIBLER_PEERID_LETTER and version <= '3.5.0' and connection.locally_initiated:
1089 # Old Tribler, establish overlay connection<
1091 print >>sys.stderr,"connecter: Peer is previous Tribler version, attempt overlay connection"
1093 elif self.ut_pex_enabled:
1094 # EXTEND handshake must be sent just after BT handshake,
1095 # before BITFIELD even
1096 c.send_extend_handshake()
1098 #TODO: overlay swarm also needs upload and download to control transferring rate
1099 # If this is a closed swarm, don't do this now - will be done on completion of the CS protocol!
1100 c.upload = self.make_upload(c, self.ratelimiter, self.totalup)
1101 c.download = self.downloader.make_download(c)
1102 if not self.is_closed_swarm:
1104 print >>sys.stderr,"connecter: connection_made: Freeing choker!"
1105 self.choker.connection_made(c)
1108 print >>sys.stderr,"connecter: connection_made: Will free choker later"
1109 self.choker.add_connection(c)
1110 #self.cs_post_func = lambda:self.choker.connection_made(c)
1111 #self.cs_post_func = lambda:self.choker.start_connection(c)
1112 self.cs_post_func = lambda:self._cs_completed(c)
1116 def connection_lost(self, connection):
1117 c = self.connections[connection]
1119 # RePEX: inform repexer of closed connection
1120 repexer = self.repexer
1123 repexer.connection_closed(c)
1127 ######################################
1129 if self.overlay_bridge is not None:
1130 ip = c.get_ip(False)
1131 port = c.get_port(False)
1132 down_kb = int(c.total_downloaded / 1024)
1133 up_kb = int(c.total_uploaded / 1024)
1136 print >> sys.stderr, "bartercast: attempting database update, adding olthread"
1138 olthread_bartercast_conn_lost_lambda = lambda:olthread_bartercast_conn_lost(ip,port,down_kb,up_kb)
1139 self.overlay_bridge.add_task(olthread_bartercast_conn_lost_lambda,0)
1142 print >> sys.stderr, "bartercast: no overlay bridge found"
1144 #########################
1147 if c.get_ip() == self.tracker_ip:
1148 print >>sys.stderr,"connecter: connection_lost: live: WAAH2 closing SOURCE"
1150 del self.connections[connection]
1152 c.download.disconnected()
1153 self.choker.connection_lost(c)
1155 def connection_flushed(self, connection):
1156 conn = self.connections[connection]
1157 if conn.next_upload is None and (conn.partial_message is not None
1158 or conn.upload.buffer):
1159 self.ratelimiter.queue(conn)
1161 def got_piece(self, i):
1162 for co in self.connections.values():
1165 def our_extend_msg_id_to_name(self,ext_id):
1166 """ find the name for the given message id (byte) """
1167 for key,val in self.EXTEND_HANDSHAKE_M_DICT.iteritems():
1168 if val == ord(ext_id):
1172 def get_ut_pex_conns(self):
1174 for conn in self.connections.values():
1175 if conn.get_extend_listenport() is not None:
1179 def get_ut_pex_previous_conns(self):
1180 return self.ut_pex_previous_conns
1182 def set_ut_pex_previous_conns(self,conns):
1183 self.ut_pex_previous_conns = conns
1185 def ut_pex_callback(self):
1186 """ Periocially send info about the peers you know to the other peers """
1188 print >>sys.stderr,"connecter: Periodic ut_pex update"
1190 currconns = self.get_ut_pex_conns()
1191 (addedconns,droppedconns) = ut_pex_get_conns_diff(currconns,self.get_ut_pex_previous_conns())
1192 self.set_ut_pex_previous_conns(currconns)
1194 for conn in addedconns:
1195 print >>sys.stderr,"connecter: ut_pex: Added",conn.get_ip(),conn.get_extend_listenport()
1196 for conn in droppedconns:
1197 print >>sys.stderr,"connecter: ut_pex: Dropped",conn.get_ip(),conn.get_extend_listenport()
1200 if c.supports_extend_msg(EXTEND_MSG_UTORRENT_PEX):
1203 print >>sys.stderr,"connecter: ut_pex: Creating msg for",c.get_ip(),c.get_extend_listenport()
1204 if c.first_ut_pex():
1209 dconns = droppedconns
1210 payload = create_ut_pex(aconns,dconns,c)
1211 c.send_extend_ut_pex(payload)
1214 self.sched(self.ut_pex_callback,60)
1216 def g2g_callback(self):
1218 self.sched(self.g2g_callback,G2G_CALLBACK_INTERVAL)
1219 for c in self.connections.itervalues():
1223 c.dequeue_g2g_piece_xfer()
1227 def got_ut_metadata(self, connection, dic, message):
1229 CONNECTION: The connection instance where we received this message
1230 DIC: The bdecoded dictionary
1231 MESSAGE: The entire message: <EXTEND-ID><METADATA-ID><BENCODED-DIC><OPTIONAL-DATA>
1233 if DEBUG: print >> sys.stderr, "connecter.got_ut_metadata:", dic
1235 msg_type = dic.get("msg_type", None)
1236 if not type(msg_type) in (int, long):
1237 raise ValueError("Invalid ut_metadata.msg_type")
1238 piece = dic.get("piece", None)
1239 if not type(piece) in (int, long):
1240 raise ValueError("Invalid ut_metadata.piece type")
1241 if not 0 <= piece < len(self.ut_metadata_list):
1242 raise ValueError("Invalid ut_metadata.piece value")
1244 if msg_type == 0: # request
1245 if DEBUG: print >> sys.stderr, "connecter.got_ut_metadata: Received request for piece", piece
1247 # our flood protection policy is to upload all metadata
1248 # once every n minutes.
1250 deadline = now - UT_METADATA_FLOOD_PERIOD
1251 # remove old history
1252 self.ut_metadata_history = [timestamp for timestamp in self.ut_metadata_history if timestamp > deadline]
1254 if len(self.ut_metadata_history) > UT_METADATA_FLOOD_FACTOR * len(self.ut_metadata_list):
1255 # refuse to upload at this time
1256 reply = bencode({"msg_type":2, "piece":piece})
1258 reply = bencode({"msg_type":1, "piece":piece, "data":self.ut_metadata_list[piece]})
1259 self.ut_metadata_history.append(now)
1260 connection._send_message(EXTEND + connection.his_extend_msg_name_to_id(EXTEND_MSG_METADATA) + reply)
1262 elif msg_type == 1: # data
1263 # at this point in the code we must assume that the
1264 # metadata is already there, everything is designed in
1265 # such a way that metadata is required. data replies can
1266 # therefore never occur.
1267 raise ValueError("Invalid ut_metadata: we did not request data")
1269 elif msg_type == 2: # reject
1270 # at this point in the code we must assume that the
1271 # metadata is already there, everything is designed in
1272 # such a way that metadata is required. rejects can
1273 # therefore never occur.
1274 raise ValueError("Invalid ut_metadata: we did not request data that can be rejected")
1277 raise ValueError("Invalid ut_metadata.msg_type value")
1279 def got_hashpiece(self, connection, message):
1280 """ Process Merkle hashpiece message. Note: EXTEND byte has been
1281 stripped, it starts with peer's Tr_hashpiece id for historic reasons ;-)
1284 c = self.connections[connection]
1286 if len(message) <= 13:
1288 print >>sys.stderr,"Close on bad HASHPIECE: msg len"
1291 i = toint(message[1:5])
1292 if i >= self.numpieces:
1294 print >>sys.stderr,"Close on bad HASHPIECE: index out of range"
1297 begin = toint(message[5:9])
1298 len_hashlist = toint(message[9:13])
1299 bhashlist = message[13:13+len_hashlist]
1300 hashlist = bdecode(bhashlist)
1301 if not isinstance(hashlist, list):
1302 raise AssertionError, "hashlist not list"
1304 if not isinstance(oh,list) or \
1305 not (len(oh) == 2) or \
1306 not isinstance(oh[0],int) or \
1307 not isinstance(oh[1],str) or \
1308 not ((len(oh[1])==20)): \
1309 raise AssertionError, "hashlist entry invalid"
1310 piece = message[13+len_hashlist:]
1312 if DEBUG_NORMAL_MSGS:
1313 print >>sys.stderr,"connecter: Got HASHPIECE",i,begin
1315 if c.download.got_piece(i, begin, hashlist, piece):
1319 print >>sys.stderr,"Close on bad HASHPIECE: exception",str(e)
1324 def na_got_loopback(self,econnection):
1326 print >>sys.stderr,"connecter: na_got_loopback: Got connection from",econnection.get_ip(),econnection.get_port()
1327 for c in self.connections.itervalues():
1328 ret = c.na_got_loopback(econnection)
1333 def na_got_internal_connection(self,origconn,newconn):
1334 """ This is called only at the initiator side of the internal conn.
1335 Doesn't matter, only one is enough to close the original connection.
1338 print >>sys.stderr,"connecter: na_got_internal: From",newconn.get_ip(),newconn.get_port()
1343 def got_message(self, connection, message):
1344 # connection: Encrypter.Connection; c: Connecter.Connection
1345 c = self.connections[connection]
1347 # EXTEND handshake will be sent just after BT handshake,
1348 # before BITFIELD even
1350 if DEBUG_MESSAGE_HANDLING:
1353 if DEBUG_NORMAL_MSGS:
1354 print >>sys.stderr,"connecter: Got",getMessageName(t),connection.get_ip()
1357 self.got_extend_message(connection,c,message,self.ut_pex_enabled)
1360 # If this is a closed swarm and we have not authenticated the
1361 # remote node, we must NOT GIVE IT ANYTHING!
1362 #if self.is_closed_swarm and c.closed_swarm_protocol.is_incomplete():
1363 #print >>sys.stderr, "connecter: Remote node not authorized, ignoring it"
1366 if self.is_closed_swarm and c.can_send_to():
1367 c.got_anything = False # Is this correct or does it break something?
1369 if t == BITFIELD and c.got_anything:
1371 print >>sys.stderr,"Close on BITFIELD"
1374 c.got_anything = True
1375 if (t in [CHOKE, UNCHOKE, INTERESTED, NOT_INTERESTED] and
1378 print >>sys.stderr,"Close on bad (UN)CHOKE/(NOT_)INTERESTED",t
1382 if DEBUG_NORMAL_MSGS:
1383 print >>sys.stderr,"connecter: Got CHOKE from",connection.get_ip()
1384 c.download.got_choke()
1386 if DEBUG_NORMAL_MSGS:
1387 print >>sys.stderr,"connecter: Got UNCHOKE from",connection.get_ip()
1388 c.download.got_unchoke()
1389 elif t == INTERESTED:
1390 if DEBUG_NORMAL_MSGS:
1391 print >>sys.stderr,"connecter: Got INTERESTED from",connection.get_ip()
1392 if c.upload is not None:
1393 c.upload.got_interested()
1394 elif t == NOT_INTERESTED:
1395 c.upload.got_not_interested()
1397 if len(message) != 5:
1399 print >>sys.stderr,"Close on bad HAVE: msg len"
1402 i = toint(message[1:])
1403 if i >= self.numpieces:
1405 print >>sys.stderr,"Close on bad HAVE: index out of range"
1408 if DEBUG_NORMAL_MSGS:
1409 print >>sys.stderr,"connecter: Got HAVE(",i,") from",connection.get_ip()
1410 c.download.got_have(i)
1412 if DEBUG_NORMAL_MSGS:
1413 print >>sys.stderr,"connecter: Got BITFIELD from",connection.get_ip()
1415 b = Bitfield(self.numpieces, message[1:],calcactiveranges=self.live_streaming)
1418 print >>sys.stderr,"Close on bad BITFIELD"
1421 if c.download is not None:
1422 c.download.got_have_bitfield(b)
1424 if not c.can_send_to():
1425 c.cs_status_unauth_requests.inc()
1426 print >> sys.stderr,"Got REQUEST but remote node is not authenticated"
1427 return # TODO: Do this better
1429 if len(message) != 13:
1431 print >>sys.stderr,"Close on bad REQUEST: msg len"
1434 i = toint(message[1:5])
1435 if i >= self.numpieces:
1437 print >>sys.stderr,"Close on bad REQUEST: index out of range"
1440 if DEBUG_NORMAL_MSGS:
1441 print >>sys.stderr,"connecter: Got REQUEST(",i,") from",connection.get_ip()
1442 c.got_request(i, toint(message[5:9]), toint(message[9:]))
1444 if len(message) != 13:
1446 print >>sys.stderr,"Close on bad CANCEL: msg len"
1449 i = toint(message[1:5])
1450 if i >= self.numpieces:
1452 print >>sys.stderr,"Close on bad CANCEL: index out of range"
1455 c.upload.got_cancel(i, toint(message[5:9]),
1458 if len(message) <= 9:
1460 print >>sys.stderr,"Close on bad PIECE: msg len"
1463 i = toint(message[1:5])
1464 if i >= self.numpieces:
1466 print >>sys.stderr,"Close on bad PIECE: msg len"
1469 if DEBUG_NORMAL_MSGS: # or connection.get_ip().startswith("192"):
1470 print >>sys.stderr,"connecter: Got PIECE(",i,") from",connection.get_ip()
1471 #if connection.get_ip().startswith("192"):
1472 # print >>sys.stderr,"@",
1474 if c.download.got_piece(i, toint(message[5:9]), [], message[9:]):
1478 print >>sys.stderr,"Close on bad PIECE: exception",str(e)
1483 elif t == HASHPIECE:
1484 # Merkle: Handle pieces with hashes, old Tribler<= 4.5.2 style
1485 self.got_hashpiece(connection,message)
1487 elif t == G2G_PIECE_XFER:
1488 # EXTEND_MSG_G2G_V1 only, V2 is proper EXTEND msg
1489 if len(message) <= 12:
1491 print >>sys.stderr,"Close on bad G2G_PIECE_XFER: msg len"
1496 print >>sys.stderr,"Close on receiving G2G_PIECE_XFER over non-g2g connection"
1500 index = toint(message[1:5])
1501 begin = toint(message[5:9])
1502 length = toint(message[9:13])
1503 c.got_g2g_piece_xfer_v1(index,begin,length)
1508 if DEBUG_MESSAGE_HANDLING:
1512 print >>sys.stderr,"connecter: $$$$$$$$$$$$",getMessageName(t),"took",diff
1515 def got_extend_message(self,connection,c,message,ut_pex_enabled):
1516 # connection: Encrypter.Connection; c: Connecter.Connection
1518 print >>sys.stderr,"connecter: Got EXTEND message, len",len(message)
1519 print >>sys.stderr,"connecter: his handshake",c.extend_hs_dict,c.get_ip()
1522 if len(message) < 4:
1524 print >>sys.stderr,"Close on bad EXTEND: msg len"
1529 print >>sys.stderr,"connecter: Got EXTEND message, id",ord(ext_id)
1530 if ext_id == EXTEND_MSG_HANDSHAKE_ID:
1531 # Message is Handshake
1532 d = bdecode(message[2:])
1533 if type(d) == DictType:
1534 c.got_extend_handshake(d)
1537 print >>sys.stderr,"Close on bad EXTEND: payload of handshake is not a bencoded dict"
1541 # Message is regular message e.g ut_pex
1542 ext_msg_name = self.our_extend_msg_id_to_name(ext_id)
1543 if ext_msg_name is None:
1545 print >>sys.stderr,"Close on bad EXTEND: peer sent ID we didn't define in handshake"
1548 elif ext_msg_name == EXTEND_MSG_OVERLAYSWARM:
1550 print >>sys.stderr,"Not closing EXTEND+CHALLENGE: peer didn't read our spec right, be liberal"
1551 elif ext_msg_name == EXTEND_MSG_UTORRENT_PEX and ut_pex_enabled:
1552 d = bdecode(message[2:])
1553 if type(d) == DictType:
1557 print >>sys.stderr,"Close on bad EXTEND: payload of ut_pex is not a bencoded dict"
1560 elif ext_msg_name == EXTEND_MSG_METADATA:
1562 print >> sys.stderr, "Connecter.got_extend_message() ut_metadata"
1563 # bdecode sloppy will make bdecode ignore the data
1564 # in message that is placed -after- the bencoded
1565 # data (this is the case for a data message)
1566 d = bdecode(message[2:], sloppy=1)
1567 if type(d) == DictType:
1568 self.got_ut_metadata(c, d, message)
1571 print >> sys.stderr, "Connecter.got_extend_message() close on bad ut_metadata message"
1574 elif ext_msg_name == EXTEND_MSG_G2G_V2 and self.use_g2g:
1575 ppdict = bdecode(message[2:])
1576 if type(ppdict) != DictType:
1578 print >>sys.stderr,"Close on bad EXTEND+G2G: payload not dict"
1581 for k,v in ppdict.iteritems():
1582 if type(k) != StringType or type(v) != StringType:
1584 print >>sys.stderr,"Close on bad EXTEND+G2G: key,value not of type int,char"
1591 print >>sys.stderr,"Close on bad EXTEND+G2G: key not int"
1596 print >>sys.stderr,"Close on bad EXTEND+G2G: value too big",ppdict,v,ord(v)
1600 c.got_g2g_piece_xfer_v2(ppdict)
1602 elif ext_msg_name == EXTEND_MSG_HASHPIECE and self.merkle_torrent:
1603 # Merkle: Handle pieces with hashes, Merkle BEP
1604 oldmsg = message[1:]
1605 self.got_hashpiece(connection,oldmsg)
1607 elif ext_msg_name == EXTEND_MSG_CS:
1608 cs_list = bdecode(message[2:])
1609 c.got_cs_message(cs_list)
1613 print >>sys.stderr,"Close on bad EXTEND: peer sent ID that maps to name we don't support",ext_msg_name,`ext_id`,ord(ext_id)
1619 print >>sys.stderr,"Close on bad EXTEND: exception:",str(e),`message[2:]`
1624 def _cs_completed(self, connection):
1626 When completed, this is a callback function to reset the connection
1628 connection.cs_complete = True # Flag CS as completed
1631 # Can't send bitfield here, must loop and send a bunch of HAVEs
1632 # Get the bitfield from the uploader
1633 have_list = connection.upload.storage.get_have_list()
1634 bitfield = Bitfield(self.numpieces, have_list)
1635 connection.send_bitfield(bitfield.tostring())
1636 connection.got_anything = False
1637 self.choker.start_connection(connection)
1639 print >> sys.stderr,"connecter: CS: Error restarting after CS handshake:",e
1641 def cs_handshake_completed(self):
1643 print >>sys.stderr,"connecter: Closed swarm handshake completed!"
1644 if self.cs_post_func:
1647 print >>sys.stderr,"connecter: CS: Woops, don't have post function"
1650 def olthread_bartercast_conn_lost(ip,port,down_kb,up_kb):
1651 """ Called by OverlayThread to store information about the peer to
1652 whom the connection was just closed in the (slow) databases. """
1654 from BaseLib.Core.CacheDB.CacheDBHandler import PeerDBHandler, BarterCastDBHandler
1656 peerdb = PeerDBHandler.getInstance()
1657 bartercastdb = BarterCastDBHandler.getInstance()
1661 permid = peerdb.getPermIDByIP(ip)
1662 my_permid = bartercastdb.my_permid
1665 print >> sys.stderr, "bartercast: (Connecter): Up %d down %d peer %s:%s (PermID = %s)" % (up_kb, down_kb, ip, port, `permid`)
1667 # Save exchanged KBs in BarterCastDB
1669 if permid is not None:
1670 #name = bartercastdb.getName(permid)
1673 new_value = bartercastdb.incrementItem((my_permid, permid), 'downloaded', down_kb, commit=False)
1677 new_value = bartercastdb.incrementItem((my_permid, permid), 'uploaded', up_kb, commit=False)
1680 # For the record: save KBs exchanged with non-tribler peers
1683 new_value = bartercastdb.incrementItem((my_permid, 'non-tribler'), 'downloaded', down_kb, commit=False)
1687 new_value = bartercastdb.incrementItem((my_permid, 'non-tribler'), 'uploaded', up_kb, commit=False)
1691 bartercastdb.commit()
1695 print >> sys.stderr, "BARTERCAST: No bartercastdb instance"