1 # Written by Pawel Garbacki, George Milescu
2 # see LICENSE.txt for license information
5 from traceback import print_exc
7 from collections import deque
8 from threading import Lock
10 from BaseLib.Core.BitTornado.bencode import bencode
11 from BaseLib.Core.BitTornado.BT1.MessageID import ASK_FOR_HELP, STOP_HELPING, REQUEST_PIECES, CANCEL_PIECE, JOIN_HELPERS, RESIGN_AS_HELPER, DROPPED_PIECE, PROXY_HAVE, PROXY_UNHAVE
13 from BaseLib.Core.Overlay.OverlayThreadingBridge import OverlayThreadingBridge
14 from BaseLib.Core.CacheDB.CacheDBHandler import PeerDBHandler, TorrentDBHandler
15 from BaseLib.Core.Utilities.utilities import show_permid_short
22 class SingleDownloadHelperInterface:
23 """ This interface should contain all methods that the PiecePiecker/Helper
24 calls on the SingleDownload class.
27 self.frozen_by_helper = False
29 def helper_set_freezing(self,val):
30 self.frozen_by_helper = val
32 def is_frozen_by_helper(self):
33 return self.frozen_by_helper
38 def helper_forces_unchoke(self):
41 def _request_more(self, new_unchoke = False):
46 def __init__(self, torrent_hash, num_pieces, coordinator_permid, coordinator = None):
48 self.torrent_hash = torrent_hash
49 self.coordinator = coordinator
51 if coordinator_permid is not None and coordinator_permid == '':
52 self.coordinator_permid = None
54 self.coordinator_permid = coordinator_permid
56 # Get coordinator ip and address
57 self.coordinator_ip = None # see is_coordinator()
58 self.coordinator_port = -1
59 if self.coordinator_permid is not None:
60 peerdb = PeerDBHandler.getInstance()
61 peer = peerdb.getPeer(coordinator_permid)
63 self.coordinator_ip = peer['ip']
64 self.coordinator_port = peer['port']
66 self.overlay_bridge = OverlayThreadingBridge.getInstance()
68 self.reserved_pieces = [False] * num_pieces
69 self.ignored_pieces = [False] * num_pieces
70 self.distr_reserved_pieces = [False] * num_pieces
72 self.requested_pieces = deque()
73 self.requested_pieces_lock = Lock()
76 self.completed = False
77 self.marker = [True] * num_pieces
80 self.continuations = []
81 self.outstanding = None
82 self.last_req_time = 0
84 # The challenge sent by the coordinator
89 result = self.reserve_piece(10,None)
90 print >> sys.stderr,"reserve piece returned: " + str(result)
91 print >> sys.stderr,"Test passed"
98 """ Called by HelperMessageHandler to "wake up" the download that's
99 waiting for its coordinator to reserve it a piece
101 if self.outstanding is None:
103 print >> sys.stderr,"helper: notify: No continuation waiting?"
106 print >> sys.stderr,"helper: notify: Waking downloader"
107 sdownload = self.outstanding
108 self.outstanding = None # must be not before calling self.restart!
109 self.restart(sdownload)
111 #self.send_reservation()
112 l = self.continuations[:] # copy just to be sure
113 self.continuations = []
115 self.restart(sdownload)
117 def restart(self,sdownload):
120 # Chokes can get in while we're waiting for reply from coordinator.
121 # But as we were called from _request_more() we were not choked
122 # just before, so pretend we didn't see the message yet.
123 if sdownload.is_choked():
124 sdownload.helper_forces_unchoke()
125 sdownload.helper_set_freezing(False)
126 sdownload._request_more()
136 def send_join_helpers(self, permid):
137 """ Send a confirmation to the coordinator that the current node will provide proxy services
139 Called by self.got_ask_for_help()
141 @param permid: The permid of the node that will become coordinator
145 print "helper: send_join_helpers: sending a join_helpers message to", show_permid_short(permid)
147 olthread_send_join_helpers_lambda = lambda:self.olthread_send_join_helpers()
148 self.overlay_bridge.add_task(olthread_send_join_helpers_lambda,0)
151 def olthread_send_join_helpers(self):
152 """ Creates a bridge connection for the join helpers message to be sent
154 Called by the overlay thread.
156 # TODO: ??? We need to create the message according to protocol version, so need to pass all info.
157 olthread_join_helpers_connect_callback_lambda = lambda e,d,p,s:self.olthread_join_helpers_connect_callback(e,d,p,s)
158 self.overlay_bridge.connect(self.coordinator_permid,olthread_join_helpers_connect_callback_lambda)
161 def olthread_join_helpers_connect_callback(self,exc,dns,permid,selversion):
162 """ Sends the join helpers message on the connection with the coordinator
164 Called by the overlay thread.
166 @param exc: Peer reachable/unreachable information. None = peer reachable
168 @param permid: the permid of the coordinator
172 # Create message according to protocol version
173 message = JOIN_HELPERS + self.torrent_hash
176 print >> sys.stderr,"helper: olthread_join_helpers_connect_callback: Sending JOIN_HELPERS to",show_permid_short(permid)
178 self.overlay_bridge.send(permid, message, self.olthread_join_helpers_send_callback)
180 # The coordinator is unreachable
181 print >> sys.stderr,"helper: olthread_join_helpers_connect_callback: error connecting to",show_permid_short(permid),exc
184 def olthread_join_helpers_send_callback(self, exc, permid):
185 """ Callback function for error checking in network communication
187 Called by the overlay thread.
189 @param exc: Peer reachable/unreachable information. None = peer reachable
190 @param permid: the permid of the peer that is contacted for helping
195 print >> sys.stderr,"helper: olthread_join_helpers_send_callback: error sending message to",show_permid_short(permid),exc
203 def send_proxy_have(self, aggregated_haves):
204 """ Send a list of aggregated have and bitfield information
206 Called by Downloader.aggregate_and_send_haves
208 @param aggregated_haves: A Bitfield object, containing an aggregated list of stored haves
212 print "helper: send_proxy_have: sending a proxy_have message to", show_permid_short(self.coordinator_permid)
214 aggregated_string = aggregated_haves.tostring()
215 olthread_send_proxy_have_lambda = lambda:self.olthread_send_proxy_have(aggregated_string)
216 self.overlay_bridge.add_task(olthread_send_proxy_have_lambda,0)
219 def olthread_send_proxy_have(self, aggregated_string):
220 """ Creates a bridge connection for the proxy_have message to be sent
222 Called by the overlay thread.
224 @param aggregated_string: a bitstring of available piesces
226 # TODO: ??? We need to create the message according to protocol version, so need to pass all info.
227 olthread_proxy_have_connect_callback_lambda = lambda e,d,p,s:self.olthread_proxy_have_connect_callback(e,d,p,s,aggregated_string)
228 self.overlay_bridge.connect(self.coordinator_permid,olthread_proxy_have_connect_callback_lambda)
231 def olthread_proxy_have_connect_callback(self,exc,dns,permid,selversion, aggregated_string):
232 """ Sends the proxy_have message on the connection with the coordinator
234 Called by the overlay thread.
236 @param exc: Peer reachable/unreachable information. None = peer reachable
238 @param permid: the permid of the coordinator
239 @param selversion: selected (buddycast?) version
240 @param aggregated_string: a bitstring of available pieces
243 # Create message according to protocol version
244 message = PROXY_HAVE + self.torrent_hash + bencode(aggregated_string)
247 print >> sys.stderr,"helper: olthread_proxy_have_connect_callback: Sending PROXY_HAVE to",show_permid_short(permid)
249 self.overlay_bridge.send(permid, message, self.olthread_proxy_have_send_callback)
251 # The coordinator is unreachable
252 print >> sys.stderr,"helper: olthread_proxy_have_connect_callback: error connecting to",show_permid_short(permid),exc
255 def olthread_proxy_have_send_callback(self, exc, permid):
256 """ Callback function for error checking in network communication
258 Called by the overlay thread.
260 @param exc: Peer reachable/unreachable information. None = peer reachable
261 @param permid: the permid of the peer that is contacted for helping
266 print >> sys.stderr,"helper: olthread_proxy_have_send_callback: error sending message to",show_permid_short(permid),exc
274 def send_resign_as_helper(self, permid):
275 """ Send a message to the coordinator that the current node will stop providing proxy services
279 @param permid: The permid of the coordinator
283 print "helper: send_resign_as_helper: sending a resign_as_helper message to", permid
285 olthread_send_resign_as_helper_lambda = lambda:self.olthread_send_resign_as_helper()
286 self.overlay_bridge.add_task(olthread_send_resign_as_helper_lambda,0)
289 def olthread_send_resign_as_helper(self):
290 """ Creates a bridge connection for the resign_as_helper message to be sent
292 Called by the overlay thread.
294 olthread_resign_as_helper_connect_callback_lambda = lambda e,d,p,s:self.olthread_resign_as_helper_connect_callback(e,d,p,s)
295 self.overlay_bridge.connect(self.coordinator_permid,olthread_resign_as_helper_connect_callback_lambda)
298 def olthread_resign_as_helper_connect_callback(self,exc,dns,permid,selversion):
299 """ Sends the resign_as_helper message on the connection with the coordinator
301 Called by the overlay thread.
303 @param exc: Peer reachable/unreachable information. None = peer reachable
305 @param permid: the permid of the coordinator
309 # Create message according to protocol version
310 message = RESIGN_AS_HELPER + self.torrent_hash
313 print >> sys.stderr,"helper: olthread_resign_as_helper_connect_callback: Sending RESIGN_AS_HELPER to",show_permid_short(permid)
315 self.overlay_bridge.send(permid, message, self.olthread_resign_as_helper_send_callback)
317 # The coordinator is unreachable
318 print >> sys.stderr,"helper: olthread_resign_as_helper_connect_callback: error connecting to",show_permid_short(permid),exc
321 def olthread_resign_as_helper_send_callback(self,exc,permid):
322 """ Callback function for error checking in network communication
324 Called by the overlay thread.
326 @param exc: Peer reachable/unreachable information. None = peer reachable
327 @param permid: the permid of the peer that is contacted for helping
332 print >> sys.stderr,"helper: olthread_resign_as_helper_send_callback: error sending message to",show_permid_short(permid),exc
341 # Got (received) messages
343 def got_ask_for_help(self, permid, infohash, challenge):
344 """ Start helping a coordinator or reply with an resign_as_helper message
346 @param permid: The permid of the node sending the help request message
347 @param infohash: the infohash of the torrent for which help is requested
348 @param challenge: The challenge sent by the coordinator
351 print >>sys.stderr,"helper: got_ask_for_help: will answer to the help request from", show_permid_short(permid)
352 if self.can_help(infohash):
355 print >>sys.stderr,"helper: got_ask_for_help: received a help request, going to send join_helpers"
356 self.send_join_helpers(permid)
357 self.challenge = challenge
359 # Send RESIGN_AS_HELPER
361 print >>sys.stderr,"helper: got_ask_for_help: received a help request, going to send resign_as_helper"
362 self.send_resign_as_helper(permid)
368 def can_help(self, infohash):
369 """ Decide if the current node can help a coordinator for the current torrent
371 @param infohash: the infohash of the torrent for which help is requested
373 #TODO: test if I can help the cordinator to download this file
374 #Future support: make the decision based on my preference
381 def got_stop_helping(self, permid, infohash):
382 """ Stop helping a coordinator
384 @param permid: The permid of the node sending the message
385 @param infohash: the infohash of the torrent for which help is released
387 #TODO: decide what to do here
394 def got_request_pieces(self, permid, piece):
395 """ Start downloading the pieces that the coordinator requests
397 @param permid: The permid of the node requesting the pieces
398 @param piece: a piece number, that is going to be downloaded
401 print "helper: got_request_pieces: received request_pieces for piece", piece
403 # Mark the piece as requested in the local data structures
404 self.reserved_pieces[piece] = True
405 # if self.distr_reserved_pieces[piece] == True:
406 # if the piece was previously requested by the same coordinator, don't do anything
407 #self.distr_reserved_pieces[piece] = True
408 # print "Received duplicate proxy request for", piece
411 self.distr_reserved_pieces[piece] = True
412 self.ignored_pieces[piece] = False
414 self.requested_pieces_lock.acquire()
415 self.requested_pieces.append(piece)
416 self.requested_pieces_lock.release()
418 # Start data connection
419 self.start_data_connection()
421 def start_data_connection(self):
422 """ Start a data connection with the coordinator
424 @param permid: The permid of the coordinator
426 # Do this always, will return quickly when connection already exists
427 dns = (self.coordinator_ip, self.coordinator_port)
429 print >> sys.stderr,"helper: start_data_connection: Starting data connection to coordinator at", dns
431 self.encoder.start_connection(dns, id = None, coord_con = True, challenge = self.challenge)
438 def is_coordinator(self, permid):
439 """ Check if the permid is the current coordinator
441 @param permid: The permid to be checked if it is the coordinator
442 @return: True, if the permid is the current coordinator; False, if the permid is not the current coordinator
444 # If we could get coordinator_ip, don't help
445 if self.coordinator_ip is None:
448 if self.coordinator_permid == permid:
454 def next_request(self):
455 """ Returns the next piece in the list of coordinator-requested pieces
457 Called by the PiecePicker
459 @return: a piece number, if there is a requested piece pending download; None, if there is no pending piece
461 self.requested_pieces_lock.acquire()
462 if len(self.requested_pieces) == 0:
463 self.requested_pieces_lock.release()
465 print >>sys.stderr,"helper: next_request: no requested pieces yet. Returning None"
468 next_piece = self.requested_pieces.popleft()
469 self.requested_pieces_lock.release()
471 print >>sys.stderr,"helper: next_request: Returning", next_piece
475 def set_encoder(self, encoder):
476 """ Sets the current encoder.
478 Called from download_bt1.py
480 @param encoder: the new encoder that will be set
482 self.encoder = encoder
483 self.encoder.set_coordinator_ip(self.coordinator_ip)
484 # To support a helping user stopping and restarting a torrent
485 if self.coordinator_permid is not None:
486 self.start_data_connection()
489 def get_coordinator_permid(self):
490 """ Returns the coordinator permid
492 Called from SingleDownload.py
494 @return: Coordinator permid
496 return self.coordinator_permid
499 def is_reserved(self, piece):
500 """ Check if a piece is reserved (requested) by a coordinator
502 Called by the network thread (Interface for PiecePicker and Downloader)
504 @param piece: the piece whose status is to be checked
505 @return: True, if the piece is reqested by a coordinator; False, otherwise.
507 if self.reserved_pieces[piece] or (self.coordinator is not None and self.is_complete()):
509 return self.reserved_pieces[piece]
512 def is_ignored(self, piece):
513 """ Check if a piece is ignored by a coordinator
515 Called by the network thread (Interface for PiecePicker and Downloader)
517 @param piece: the piece whose status is to be checked
518 @return: True, if the piece is ignored by a coordinator; False, otherwise.
520 if not self.ignored_pieces[piece] or (self.coordinator is not None and self.is_complete()):
522 return self.ignored_pieces[piece]
525 def is_complete(self):
526 """ Check torrent is completely downloaded
528 Called by the network thread (Interface for PiecePicker and Downloader)
530 @return: True, all the pieces are downloaded; False, otherwise.
535 self.round = (self.round + 1) % MAX_ROUNDS
539 if self.coordinator is not None:
540 self.completed = (self.coordinator.reserved_pieces == self.marker)
542 self.completed = (self.distr_reserved_pieces == self.marker)
543 return self.completed