1 # Written by Bram Cohen and Pawel Garbacki, George Milescu
2 # see LICENSE.txt for license information
6 from BaseLib.Core.BitTornado.CurrentRateMeasure import Measure
7 from BaseLib.Core.BitTornado.bitfield import Bitfield
8 from random import shuffle
9 from base64 import b64encode
10 from BaseLib.Core.BitTornado.clock import clock
11 from BaseLib.Core.Statistics.Status.Status import get_status_holder
16 from BaseLib.Core.ProxyService.Helper import SingleDownloadHelperInterface
18 class SingleDownloadHelperInterface:
33 DEBUG_CHUNKS = False # set DEBUG_CHUNKS in PiecePickerStreaming to True
36 # only define the following functions in __debug__. And only import
37 # them in this case. They are to expensive to have, and have no
38 # purpose, outside debug mode.
40 # Arno, 2009-06-15: Win32 binary versions have __debug__ True apparently, workaround.
44 _ident_letter_pool = None
45 def get_ident_letter(download):
46 if not download.ip in _ident_letters:
47 global _ident_letter_pool
48 if not _ident_letter_pool:
49 _ident_letter_pool = [chr(c) for c in range(ord("a"), ord("z")+1)] + [chr(c) for c in range(ord("A"), ord("Z")+1)]
50 _ident_letters[download.ip] = _ident_letter_pool.pop(0)
51 return _ident_letters[download.ip]
53 def print_chunks(downloader, pieces, before=(), after=(), compact=True):
55 Print a line summery indicating completed/outstanding/non-requested chunks
57 When COMPACT is True one character will represent one piece.
59 - --> no outstanding requests
60 1-9 --> the number of outstanding requests (max 9)
62 When COMPACT is False one character will requests one chunk.
64 - --> no outstanding requests
65 a-z --> requested at peer with that character (also capitals, duplicates may occur)
66 1-9 --> requested multipile times (at n peers)
69 do_I_have = downloader.storage.do_I_have
70 do_I_have_requests = downloader.storage.do_I_have_requests
71 inactive_requests = downloader.storage.inactive_requests
72 piece_size = downloader.storage.piece_length
73 chunk_size = downloader.storage.request_size
74 chunks_per_piece = int(piece_size / chunk_size)
78 for download in downloader.downloads:
79 for piece, begin, length in download.active_requests:
80 if not piece in request_map:
81 request_map[piece] = 0
82 request_map[piece] += 1
84 def print_chunks_helper(piece_id):
85 if do_I_have(piece_id): return "#"
86 if do_I_have_requests(piece_id): return "-"
87 if piece_id in request_map: return str(min(9, request_map[piece_id]))
92 for download in downloader.downloads:
94 for piece, begin, length in download.active_requests:
95 if not piece in request_map:
96 request_map[piece] = ["-"] * chunks_per_piece
97 index = int(begin/chunk_size)
98 if request_map[piece][index] == "-":
99 request_map[piece][index] = get_ident_letter(download)
100 elif type(request_map[piece][index]) is str:
101 request_map[piece][index] = 2
103 request_map[piece][index] += 1
104 request_map[piece][int(begin/chunk_size)] = get_ident_letter(download)
106 def print_chunks_helper(piece_id):
107 if do_I_have(piece_id): return "#" * chunks_per_piece
108 # if do_I_have_requests(piece_id): return "-" * chunks_per_piece
109 if piece_id in request_map:
110 if piece_id in inactive_requests and type(inactive_requests[piece_id]) is list:
111 for begin, length in inactive_requests[piece_id]:
112 request_map[piece_id][int(begin/chunk_size)] = " "
113 return "".join([str(c) for c in request_map[piece_id]])
114 return "-" * chunks_per_piece
126 print >>sys.stderr, "Outstanding %s:%d:%d:%s [%s|%s|%s]" % (s_before, pieces[0], pieces[-1], s_after, "".join(map(print_chunks_helper, before)), "".join(map(print_chunks_helper, pieces)), "".join(map(print_chunks_helper, after)))
129 print >>sys.stderr, "Outstanding 0:0 []"
132 def print_chunks(downloader, pieces, before=(), after=(), compact=True):
137 def __init__(self, ip):
140 self.numconnections = 0
141 self.lastdownload = None
145 def __init__(self, download):
146 self.download = download
147 self.ip = download.ip
148 self.downloader = download.downloader
149 self.stats = self.downloader.perip[self.ip]
150 self.lastindex = None
152 def failed(self, index, bump = False):
153 self.stats.bad.setdefault(index, 0)
154 self.downloader.gotbaddata[self.ip] = 1
155 self.stats.bad[index] += 1
156 if len(self.stats.bad) > 1:
157 if self.download is not None:
158 self.downloader.try_kick(self.download)
159 elif self.stats.numconnections == 1 and self.stats.lastdownload is not None:
160 self.downloader.try_kick(self.stats.lastdownload)
161 if len(self.stats.bad) >= 3 and len(self.stats.bad) > int(self.stats.numgood/30):
162 self.downloader.try_ban(self.ip)
164 self.downloader.picker.bump(index)
166 def good(self, index):
167 # lastindex is a hack to only increase numgood by one for each good
168 # piece, however many chunks come from the connection(s) from this IP
169 if index != self.lastindex:
170 self.stats.numgood += 1
171 self.lastindex = index
174 class SingleDownload(SingleDownloadHelperInterface):
176 def __init__(self, downloader, connection):
178 SingleDownloadHelperInterface.__init__(self)
180 self.downloader = downloader
181 self.connection = connection
183 self.interested = False
184 self.active_requests = []
185 self.measure = Measure(downloader.max_rate_period)
186 self.peermeasure = Measure(downloader.max_rate_period)
187 self.have = Bitfield(downloader.numpieces)
190 self.example_interest = None
192 self.ip = connection.get_ip()
193 self.guard = BadDataGuard(self)
195 self.helper = downloader.picker.helper
196 self.proxy_have = Bitfield(downloader.numpieces)
199 # boudewijn: VOD needs a download measurement that is not
200 # averaged over a 'long' period. downloader.max_rate_period is
201 # (by default) 20 seconds because this matches the unchoke
203 self.short_term_measure = Measure(5)
205 # boudewijn: each download maintains a counter for the number
206 # of high priority piece requests that did not get any
207 # responce within x seconds.
208 self.bad_performance_counter = 0
210 def _backlog(self, just_unchoked):
211 self.backlog = int(min(
212 2+int(4*self.measure.get_rate()/self.downloader.chunksize),
213 (2*just_unchoked)+self.downloader.queue_limit() ))
214 if self.backlog > 50:
215 self.backlog = int(max(50, self.backlog * 0.075))
218 def disconnected(self):
219 self.downloader.lost_peer(self)
221 """ JD: obsoleted -- moved to picker.lost_peer
223 if self.have.complete():
224 self.downloader.picker.lost_seed()
226 for i in xrange(len(self.have)):
228 self.downloader.picker.lost_have(i)
231 if self.have.complete() and self.downloader.storage.is_endgame():
232 self.downloader.add_disconnected_seed(self.connection.get_readable_id())
234 self.guard.download = None
237 if self.downloader.queued_out.has_key(self):
238 del self.downloader.queued_out[self]
239 if not self.active_requests:
241 if self.downloader.endgamemode:
242 self.active_requests = []
245 for index, begin, length in self.active_requests:
246 self.downloader.storage.request_lost(index, begin, length)
249 self.active_requests = []
250 if self.downloader.paused:
252 ds = [d for d in self.downloader.downloads if not d.choked]
256 for d in self.downloader.downloads:
257 if d.choked and not d.interested:
259 if d.have[l] and self.downloader.storage.do_I_have_requests(l):
268 def got_unchoke(self):
272 self._request_more(new_unchoke = True)
278 def is_interested(self):
279 return self.interested
281 def send_interested(self):
282 if not self.interested:
283 self.interested = True
284 self.connection.send_interested()
286 def send_not_interested(self):
288 self.interested = False
289 self.connection.send_not_interested()
291 def got_piece(self, index, begin, hashlist, piece):
293 Returns True if the piece is complete.
294 Note that in this case a -piece- means a chunk!
297 if self.bad_performance_counter:
298 self.bad_performance_counter -= 1
299 if DEBUG: print >>sys.stderr, "decreased bad_performance_counter to", self.bad_performance_counter
303 # print >> sys.stderr, 'Downloader: got piece of length %d' % length
305 self.active_requests.remove((index, begin, length))
307 self.downloader.discarded += length
309 if self.downloader.endgamemode:
310 self.downloader.all_requests.remove((index, begin, length))
311 if DEBUG: print >>sys.stderr, "Downloader: got_piece: removed one request from all_requests", len(self.downloader.all_requests), "remaining"
315 self.measure.update_rate(length)
316 # Update statistic gatherer
317 status = get_status_holder("LivingLab")
318 s_download = status.get_or_create_status_element("downloaded",0)
319 s_download.inc(length)
321 self.short_term_measure.update_rate(length)
322 self.downloader.measurefunc(length)
323 if not self.downloader.storage.piece_came_in(index, begin, hashlist, piece, self.guard):
324 self.downloader.piece_flunked(index)
327 # boudewijn: we need more accurate (if possibly invalid)
328 # measurements on current download speed
329 self.downloader.picker.got_piece(index, begin, length)
331 # print "Got piece=", index, "begin=", begin, "len=", length
332 if self.downloader.storage.do_I_have(index):
333 self.downloader.picker.complete(index)
335 if self.downloader.endgamemode:
336 for d in self.downloader.downloads:
340 assert not d.active_requests
341 d.fix_download_endgame()
344 d.active_requests.remove((index, begin, length))
347 d.connection.send_cancel(index, begin, length)
348 d.fix_download_endgame()
350 assert not d.active_requests
352 self.downloader.check_complete(index)
355 self.connection.total_downloaded += length
357 return self.downloader.storage.do_I_have(index)
360 def helper_forces_unchoke(self):
364 def _request_more(self, new_unchoke = False, slowpieces = []):
367 print >>sys.stderr,"Downloader: _request_more()"
368 if self.helper is not None and self.is_frozen_by_helper():
370 print >>sys.stderr,"Downloader: _request_more: blocked, returning"
375 print >>sys.stderr,"Downloader: _request_more: choked, returning"
378 # do not download from coordinator
379 if self.connection.connection.is_coordinator_con():
381 print >>sys.stderr,"Downloader: _request_more: coordinator conn"
384 if self.downloader.endgamemode:
385 self.fix_download_endgame(new_unchoke)
387 print >>sys.stderr,"Downloader: _request_more: endgame mode, returning"
389 if self.downloader.paused:
391 print >>sys.stderr,"Downloader: _request_more: paused, returning"
393 if len(self.active_requests) >= self._backlog(new_unchoke):
395 print >>sys.stderr,"Downloader: more req than unchoke (active req: %d >= backlog: %d)" % (len(self.active_requests), self._backlog(new_unchoke))
396 # Jelle: Schedule _request more to be called in some time. Otherwise requesting and receiving packages
397 # may stop, if they arrive to quickly
398 if self.downloader.download_rate:
399 wait_period = self.downloader.chunksize / self.downloader.download_rate / 2.0
401 # Boudewijn: when wait_period is 0.0 this will cause
402 # the the _request_more method to be scheduled
403 # multiple times (recursively), causing severe cpu
406 # Therefore, only schedule _request_more to be called
407 # if the call will be made in the future. The minimal
408 # wait_period should be tweaked.
409 if wait_period > 1.0:
411 print >>sys.stderr,"Downloader: waiting for %f s to call _request_more again" % wait_period
412 self.downloader.scheduler(self._request_more, wait_period)
414 if not (self.active_requests or self.backlog):
415 self.downloader.queued_out[self] = 1
419 # print >>sys.stderr,"Downloader: _request_more: len act",len(self.active_requests),"back",self.backlog
422 while len(self.active_requests) < self.backlog:
424 # print >>sys.stderr,"Downloader: Looking for interesting piece"
426 #print "DOWNLOADER self.have=", self.have.toboollist()
428 # This is the PiecePicker call is the current client is a Coordinator
429 interest = self.downloader.picker.next(self.have,
430 self.downloader.storage.do_I_have_requests,
432 self.downloader.too_many_partials(),
433 self.connection.connection.is_helper_con(),
434 slowpieces = slowpieces, connection = self.connection, proxyhave = self.proxy_have)
439 print >>sys.stderr,"Downloader: _request_more: next() returned",interest,"took %.5f" % (diff)
443 if self.helper and self.downloader.storage.inactive_requests[interest] is None:
444 # The current node is a helper and received a request from a coordinator for a piece it has already downloaded
445 # Should send a Have message to the coordinator
446 self.connection.send_have(interest)
449 if self.helper and self.downloader.storage.inactive_requests[interest] == []:
450 # The current node is a helper and received a request from a coordinator for a piece that is downloading
451 # (all blocks are requested to the swarm, and have not arrived yet)
455 self.example_interest = interest
456 self.send_interested()
458 while len(self.active_requests) < self.backlog and loop:
460 begin, length = self.downloader.storage.new_request(interest)
463 print >>sys.stderr,"Downloader: new_request",interest,begin,length,"to",self.connection.connection.get_ip(),self.connection.connection.get_port()
465 self.downloader.picker.requested(interest, begin, length)
466 self.active_requests.append((interest, begin, length))
467 self.connection.send_request(interest, begin, length)
468 self.downloader.chunk_requested(length)
469 if not self.downloader.storage.do_I_have_requests(interest):
471 lost_interests.append(interest)
472 if not self.active_requests:
473 self.send_not_interested()
475 for d in self.downloader.downloads:
476 if d.active_requests or not d.interested:
478 if d.example_interest is not None and self.downloader.storage.do_I_have_requests(d.example_interest):
480 for lost in lost_interests:
487 interest = self.downloader.picker.next(d.have,
488 self.downloader.storage.do_I_have_requests,
489 self, # Arno, 2008-05-22; self -> d? Original Pawel code
490 self.downloader.too_many_partials(),
491 self.connection.connection.is_helper_con(), willrequest=False,connection=self.connection, proxyhave = self.proxy_have)
496 print >>sys.stderr,"Downloader: _request_more: next()2 returned",interest,"took %.5f" % (diff)
498 if interest is not None:
499 # The helper has at least one piece that the coordinator requested
500 if self.helper and self.downloader.storage.inactive_requests[interest] is None:
501 # The current node is a helper and received a request from a coordinator for a piece it has already downloaded
502 # Should send a Have message to the coordinator
503 self.connection.send_have(interest)
505 if self.helper and self.downloader.storage.inactive_requests[interest] == []:
506 # The current node is a helper and received a request from a coordinator for a piece that is downloading
507 # (all blocks are requested to the swarm, and have not arrived yet)
512 d.send_not_interested()
514 d.example_interest = interest
516 # Arno: LIVEWRAP: no endgame
517 if not self.downloader.endgamemode and \
518 self.downloader.storage.is_endgame() and \
519 not (self.downloader.picker.videostatus and self.downloader.picker.videostatus.live_streaming):
520 self.downloader.start_endgame()
523 def fix_download_endgame(self, new_unchoke = False):
525 # do not download from coordinator
526 if self.downloader.paused or self.connection.connection.is_coordinator_con():
527 if DEBUG: print >>sys.stderr, "Downloader: fix_download_endgame: paused", self.downloader.paused, "or is_coordinator_con", self.connection.connection.is_coordinator_con()
531 if len(self.active_requests) >= self._backlog(new_unchoke):
532 if not (self.active_requests or self.backlog) and not self.choked:
533 self.downloader.queued_out[self] = 1
534 if DEBUG: print >>sys.stderr, "Downloader: fix_download_endgame: returned"
537 want = [a for a in self.downloader.all_requests if self.have[a[0]] and a not in self.active_requests and (self.helper is None or self.connection.connection.is_helper_con() or not self.helper.is_ignored(a[0]))]
539 if not (self.active_requests or want):
540 self.send_not_interested()
541 if DEBUG: print >>sys.stderr, "Downloader: fix_download_endgame: not interested"
544 self.send_interested()
546 if DEBUG: print >>sys.stderr, "Downloader: fix_download_endgame: choked"
549 del want[self.backlog - len(self.active_requests):]
550 self.active_requests.extend(want)
551 for piece, begin, length in want:
553 if self.helper is None or self.connection.connection.is_helper_con() or self.helper.reserve_piece(piece,self):
554 self.connection.send_request(piece, begin, length)
555 self.downloader.chunk_requested(length)
558 def got_have(self, index):
559 # print >>sys.stderr,"Downloader: got_have",index
561 print >>sys.stderr,"Downloader: got_have",index
562 if index == self.downloader.numpieces-1:
563 self.downloader.totalmeasure.update_rate(self.downloader.storage.total_length-(self.downloader.numpieces-1)*self.downloader.storage.piece_length)
564 self.peermeasure.update_rate(self.downloader.storage.total_length-(self.downloader.numpieces-1)*self.downloader.storage.piece_length)
566 self.downloader.totalmeasure.update_rate(self.downloader.storage.piece_length)
567 self.peermeasure.update_rate(self.downloader.storage.piece_length)
570 if not self.downloader.picker.is_valid_piece(index):
572 print >>sys.stderr,"Downloader: got_have",index,"is invalid piece"
573 return # TODO: should we request_more()?
578 self.have[index] = True
579 self.downloader.picker.got_have(index,self.connection)
582 # Aggregate the haves bitfields and send them to the coordinator
583 # If I am a coordinator, i will exit shortly
584 self.downloader.aggregate_and_send_haves()
588 if self.have.complete():
589 self.downloader.picker.became_seed()
590 if self.downloader.picker.am_I_complete():
591 self.downloader.add_disconnected_seed(self.connection.get_readable_id())
592 self.connection.close()
594 if self.downloader.endgamemode:
595 self.fix_download_endgame()
596 elif ( not self.downloader.paused
597 and not self.downloader.picker.is_blocked(index)
598 and self.downloader.storage.do_I_have_requests(index) ):
602 self.send_interested()
604 def _check_interests(self):
605 if self.interested or self.downloader.paused:
607 for i in xrange(len(self.have)):
608 if ( self.have[i] and not self.downloader.picker.is_blocked(i)
609 and ( self.downloader.endgamemode
610 or self.downloader.storage.do_I_have_requests(i) ) ):
611 self.send_interested()
614 def got_have_bitfield(self, have):
615 if self.downloader.picker.am_I_complete() and have.complete():
616 # Arno: If we're both seeds
617 if self.downloader.super_seeding:
618 self.connection.send_bitfield(have.tostring()) # be nice, show you're a seed too
619 self.connection.close()
620 self.downloader.add_disconnected_seed(self.connection.get_readable_id())
628 self.downloader.picker.got_seed()
630 # Arno: pass on HAVE knowledge to PiecePicker and if LIVEWRAP:
631 # filter out valid pieces
633 # STBSPEED: if we haven't hooked in yet, don't iterate over whole range
634 # just over the active ranges in the received Bitfield
635 activerangeiterators = []
636 if self.downloader.picker.videostatus and self.downloader.picker.videostatus.live_streaming and self.downloader.picker.videostatus.get_live_startpos() is None:
638 activeranges = have.get_active_ranges()
640 if len(activeranges) == 0:
641 # Bug, fallback to whole range
642 activerangeiterators = [self.downloader.picker.get_valid_range_iterator()]
644 # Create iterators for the active ranges
645 for (s,e) in activeranges:
646 activerangeiterators.append(xrange(s,e+1))
648 # Hooked in, use own valid range as active range
650 # Arno, 2010-04-20: Not correct for VOD with seeking, then we
651 # should store the HAVE info for things before playback too.
653 activerangeiterators = [self.downloader.picker.get_valid_range_iterator()]
656 print >>sys.stderr,"Downloader: got_have_field: live: Filtering bitfield",activerangeiterators
658 if not self.downloader.picker.videostatus or self.downloader.picker.videostatus.live_streaming:
660 print >>sys.stderr,"Downloader: got_have_field: live or normal filter"
661 # Transfer HAVE knowledge to PiecePicker and filter pieces if live
662 validhave = Bitfield(self.downloader.numpieces)
663 for iterator in activerangeiterators:
667 self.downloader.picker.got_have(i,self.connection)
670 print >>sys.stderr,"Downloader: got_have_field: VOD filter"
671 validhave = Bitfield(self.downloader.numpieces)
672 (first,last) = self.downloader.picker.videostatus.download_range()
673 for i in xrange(first,last):
676 self.downloader.picker.got_have(i,self.connection)
679 # Aggregate the haves bitfields and send them to the coordinator
680 # ARNOPS: Shouldn't this be done after have = validhave?
681 self.downloader.aggregate_and_send_haves()
687 checkhave = Bitfield(self.downloader.numpieces)
688 for i in self.downloader.picker.get_valid_range_iterator():
692 assert validhave.tostring() == checkhave.tostring()
695 # Store filtered bitfield instead of received one
701 print >>sys.stderr,"Download: got_have_field: took",diff
706 #print >>sys.stderr,"Downloader: got_have_bitfield: valid",`have.toboollist()`
708 if self.downloader.endgamemode and not self.downloader.paused:
709 for piece, begin, length in self.downloader.all_requests:
711 self.send_interested()
714 self._check_interests()
717 return self.measure.get_rate()
719 def get_short_term_rate(self):
720 return self.short_term_measure.get_rate()
722 def is_snubbed(self):
724 if not self.choked and clock() - self.last2 > self.downloader.snub_time and \
725 not self.connection.connection.is_helper_con() and \
726 not self.connection.connection.is_coordinator_con():
728 for index, begin, length in self.active_requests:
729 self.connection.send_cancel(index, begin, length)
730 self.got_choke() # treat it just like a choke
731 return clock() - self.last > self.downloader.snub_time
733 def peer_is_complete(self):
734 return self.have.complete()
737 def __init__(self, infohash, storage, picker, backlog, max_rate_period,
738 numpieces, chunksize, measurefunc, snub_time,
739 kickbans_ok, kickfunc, banfunc, scheduler = None):
740 self.infohash = infohash
741 self.b64_infohash = b64encode(infohash)
742 self.storage = storage
744 self.backlog = backlog
745 self.max_rate_period = max_rate_period
746 self.measurefunc = measurefunc
747 self.totalmeasure = Measure(max_rate_period*storage.piece_length/storage.request_size)
748 self.numpieces = numpieces
749 self.chunksize = chunksize
750 self.snub_time = snub_time
751 self.kickfunc = kickfunc
752 self.banfunc = banfunc
753 self.disconnectedseeds = {}
759 self.kickbans_ok = kickbans_ok
760 self.kickbans_halted = False
761 self.super_seeding = False
762 self.endgamemode = False
763 self.endgame_queued_pieces = []
764 self.all_requests = []
766 self.download_rate = 0
767 # self.download_rate = 25000 # 25K/s test rate
768 self.bytes_requested = 0
769 self.last_time = clock()
771 self.requeueing = False
773 self.scheduler = scheduler
775 # hack: we should not import this since it is not part of the
776 # core nor should we import here, but otherwise we will get
779 # _event_reporter stores events that are logged somewhere...
780 # from BaseLib.Core.Statistics.StatusReporter import get_reporter_instance
781 # self._event_reporter = get_reporter_instance()
782 self._event_reporter = get_status_holder("LivingLab")
785 self.scheduler(self.dlr_periodic_check, 1)
787 def dlr_periodic_check(self):
788 self.picker.check_outstanding_requests(self.downloads)
790 ds = [d for d in self.downloads if not d.choked]
795 self.scheduler(self.dlr_periodic_check, 1)
797 def set_download_rate(self, rate):
798 self.download_rate = rate * 1000
799 self.bytes_requested = 0
801 def queue_limit(self):
802 if not self.download_rate:
803 return 10e10 # that's a big queue!
805 self.bytes_requested -= (t - self.last_time) * self.download_rate
807 if not self.requeueing and self.queued_out and self.bytes_requested < 0:
808 self.requeueing = True
809 q = self.queued_out.keys()
814 self.requeueing = False
815 if -self.bytes_requested > 5*self.download_rate:
816 self.bytes_requested = -5*self.download_rate
817 ql = max(int(-self.bytes_requested/self.chunksize), 0)
819 # print >> sys.stderr, 'Downloader: download_rate: %s, bytes_requested: %s, chunk: %s -> queue limit: %d' % \
820 # (self.download_rate, self.bytes_requested, self.chunksize, ql)
823 def chunk_requested(self, size):
824 self.bytes_requested += size
826 external_data_received = chunk_requested
828 def make_download(self, connection):
829 ip = connection.get_ip()
830 if self.perip.has_key(ip):
831 perip = self.perip[ip]
833 perip = self.perip.setdefault(ip, PerIPStats(ip))
834 perip.peerid = connection.get_readable_id()
835 perip.numconnections += 1
836 d = SingleDownload(self, connection)
837 perip.lastdownload = d
838 self.downloads.append(d)
839 self._event_reporter.create_and_add_event("connection-established", [self.b64_infohash, str(ip)])
842 def piece_flunked(self, index):
847 while self.storage.do_I_have_requests(index):
848 nb, nl = self.storage.new_request(index)
849 self.all_requests.append((index, nb, nl))
850 for d in self.downloads:
851 d.fix_download_endgame()
853 self._reset_endgame()
855 ds = [d for d in self.downloads if not d.choked]
859 ds = [d for d in self.downloads if not d.interested and d.have[index]]
861 d.example_interest = index
864 def has_downloaders(self):
865 return len(self.downloads)
867 def lost_peer(self, download):
869 self.perip[ip].numconnections -= 1
870 if self.perip[ip].lastdownload == download:
871 self.perip[ip].lastdownload = None
872 self.downloads.remove(download)
873 if self.endgamemode and not self.downloads: # all peers gone
874 self._reset_endgame()
876 self._event_reporter.create_and_add_event("connection-upload", [self.b64_infohash, ip, download.connection.total_uploaded])
877 self._event_reporter.create_and_add_event("connection-download", [self.b64_infohash, ip, download.connection.total_downloaded])
878 self._event_reporter.create_and_add_event("connection-lost", [self.b64_infohash, ip])
880 def _reset_endgame(self):
881 if DEBUG: print >>sys.stderr, "Downloader: _reset_endgame"
882 self.storage.reset_endgame(self.all_requests)
883 self.endgamemode = False
884 self.all_requests = []
885 self.endgame_queued_pieces = []
887 def add_disconnected_seed(self, id):
888 # if not self.disconnectedseeds.has_key(id):
889 # self.picker.seed_seen_recently()
890 self.disconnectedseeds[id]=clock()
892 # def expire_disconnected_seeds(self):
894 def num_disconnected_seeds(self):
895 # first expire old ones
897 for id, t in self.disconnectedseeds.items():
898 if clock() - t > EXPIRE_TIME: #Expire old seeds after so long
901 # self.picker.seed_disappeared()
902 del self.disconnectedseeds[id]
903 return len(self.disconnectedseeds)
904 # if this isn't called by a stats-gathering function
905 # it should be scheduled to run every minute or two.
907 def _check_kicks_ok(self):
908 if len(self.gotbaddata) > 10:
909 self.kickbans_ok = False
910 self.kickbans_halted = True
911 return self.kickbans_ok and len(self.downloads) > 2
913 def try_kick(self, download):
914 if self._check_kicks_ok():
915 download.guard.download = None
917 id = download.connection.get_readable_id()
919 self.perip[ip].peerid = id
920 self.kickfunc(download.connection)
922 def try_ban(self, ip):
923 if self._check_kicks_ok():
925 self.banned[ip] = self.perip[ip].peerid
926 if self.kicked.has_key(ip):
929 def set_super_seed(self):
930 self.super_seeding = True
932 def check_complete(self, index):
933 if self.endgamemode and not self.all_requests:
934 self.endgamemode = False
935 if self.endgame_queued_pieces and not self.endgamemode:
936 self.requeue_piece_download()
937 if self.picker.am_I_complete():
938 assert not self.all_requests
939 assert not self.endgamemode
941 for download in self.downloads:
942 if download.have.complete():
943 download.connection.send_have(index) # be nice, tell the other seed you completed
944 self.add_disconnected_seed(download.connection.get_readable_id())
945 download.connection.close()
947 self._event_reporter.create_and_add_event("connection-seed", [self.b64_infohash, download.ip, download.connection.total_uploaded])
949 self._event_reporter.create_and_add_event("connection-upload", [self.b64_infohash, download.ip, download.connection.total_uploaded])
950 self._event_reporter.create_and_add_event("connection-download", [self.b64_infohash, download.ip, download.connection.total_downloaded])
952 self._event_reporter.create_and_add_event("complete", [self.b64_infohash])
953 # self._event_reporter.flush()
958 def too_many_partials(self):
959 return len(self.storage.dirty) > (len(self.downloads)/2)
961 def cancel_requests(self, requests, allowrerequest=True):
963 # todo: remove duplicates
964 slowpieces = [piece_id for piece_id, _, _ in requests]
967 if self.endgame_queued_pieces:
968 for piece_id, _, _ in requests:
969 if not self.storage.do_I_have(piece_id):
971 self.endgame_queued_pieces.remove(piece_id)
975 # remove the items in requests from self.all_requests
976 if not allowrerequest:
977 self.all_requests = [request for request in self.all_requests if not request in requests]
978 if DEBUG: print >>sys.stderr, "Downloader: cancel_requests: all_requests", len(self.all_requests), "remaining"
980 for download in self.downloads:
982 for request in download.active_requests:
983 if request in requests:
985 if DEBUG: print >>sys.stderr, "Downloader:cancel_requests: canceling", request, "on", download.ip
986 download.connection.send_cancel(*request)
987 if not self.endgamemode:
988 self.storage.request_lost(*request)
990 download.active_requests = [request for request in download.active_requests if not request in requests]
991 # Arno: VOD: all these peers were slow for their individually
992 # assigned pieces. These pieces have high priority, so don't
993 # retrieve any of theses pieces from these slow peers, just
994 # give them something further in the future.
996 download._request_more()
998 # Arno: ALT is to just kick peer. Good option if we have lots (See Encryper.to_connect() queue
999 #print >>sys.stderr,"Downloader: Kicking slow peer",d.ip
1000 #d.connection.close() # bye bye, zwaai zwaai
1001 download._request_more(slowpieces=slowpieces)
1003 if not self.endgamemode and download.choked:
1004 download._check_interests()
1006 def cancel_piece_download(self, pieces, allowrerequest=True):
1007 if self.endgamemode:
1008 if self.endgame_queued_pieces:
1009 for piece in pieces:
1011 self.endgame_queued_pieces.remove(piece)
1016 for index, nb, nl in self.all_requests:
1018 self.storage.request_lost(index, nb, nl)
1021 new_all_requests = []
1022 for index, nb, nl in self.all_requests:
1024 self.storage.request_lost(index, nb, nl)
1026 new_all_requests.append((index, nb, nl))
1027 self.all_requests = new_all_requests
1028 if DEBUG: print >>sys.stderr, "Downloader: cancel_piece_download: all_requests", len(self.all_requests), "remaining"
1030 for d in self.downloads:
1032 for index, nb, nl in d.active_requests:
1035 d.connection.send_cancel(index, nb, nl)
1036 if not self.endgamemode:
1037 self.storage.request_lost(index, nb, nl)
1039 d.active_requests = [ r for r in d.active_requests
1040 if r[0] not in pieces ]
1041 # Arno: VOD: all these peers were slow for their individually
1042 # assigned pieces. These pieces have high priority, so don't
1043 # retrieve any of theses pieces from these slow peers, just
1044 # give them something further in the future.
1045 if not allowrerequest:
1046 # Arno: ALT is to just kick peer. Good option if we have lots (See Encryper.to_connect() queue
1047 #print >>sys.stderr,"Downloader: Kicking slow peer",d.ip
1048 #d.connection.close() # bye bye, zwaai zwaai
1049 d._request_more(slowpieces=pieces)
1052 if not self.endgamemode and d.choked:
1053 d._check_interests()
1055 def requeue_piece_download(self, pieces = []):
1056 if self.endgame_queued_pieces:
1057 for piece in pieces:
1058 if not piece in self.endgame_queued_pieces:
1059 self.endgame_queued_pieces.append(piece)
1060 pieces = self.endgame_queued_pieces
1061 if self.endgamemode:
1062 if self.all_requests:
1063 self.endgame_queued_pieces = pieces
1065 self.endgamemode = False
1066 self.endgame_queued_pieces = None
1068 ds = [d for d in self.downloads]
1072 d._check_interests()
1076 def start_endgame(self):
1077 assert not self.endgamemode
1078 self.endgamemode = True
1079 assert not self.all_requests
1080 for d in self.downloads:
1081 if d.active_requests:
1082 assert d.interested and not d.choked
1083 for request in d.active_requests:
1084 assert not request in self.all_requests
1085 self.all_requests.append(request)
1086 for d in self.downloads:
1087 d.fix_download_endgame()
1088 if DEBUG: print >>sys.stderr, "Downloader: start_endgame: we have", len(self.all_requests), "requests remaining"
1090 def pause(self, flag):
1093 for d in self.downloads:
1094 for index, begin, length in d.active_requests:
1095 d.connection.send_cancel(index, begin, length)
1097 d.send_not_interested()
1098 if self.endgamemode:
1099 self._reset_endgame()
1101 shuffle(self.downloads)
1102 for d in self.downloads:
1103 d._check_interests()
1104 if d.interested and not d.choked:
1107 def live_invalidate(self,piece,mevirgin=False): # Arno: LIVEWRAP
1108 #print >>sys.stderr,"Downloader: live_invalidate",piece
1109 for d in self.downloads:
1110 d.have[piece] = False
1111 # STBSPEED: If I have no pieces yet, no need to loop to invalidate them.
1113 self.storage.live_invalidate(piece)
1115 def live_invalidate_ranges(self,toinvalidateranges,toinvalidateset):
1116 """ STBPEED: Faster version of live_invalidate that copies have arrays
1117 rather than iterate over them for clearing
1119 if len(toinvalidateranges) == 1:
1120 (s,e) = toinvalidateranges[0]
1121 emptyrange = [False for piece in xrange(s,e+1)]
1122 assert len(emptyrange) == e+1-s
1124 for d in self.downloads:
1125 newhave = d.have[0:s] + emptyrange + d.have[e+1:]
1128 d.have = Bitfield(length=len(newhave),fromarray=newhave)
1129 #assert oldhave.tostring() == d.have.tostring()
1131 for piece in toinvalidateset:
1132 d.have[piece] = False
1133 print >>sys.stderr,"d len",len(d.have)
1134 print >>sys.stderr,"new len",len(newhave)
1136 for i in xrange(0,len(newhave)):
1137 if d.have[i] != newhave[i]:
1138 print >>sys.stderr,"newhave diff",i
1143 (s1,e1) = toinvalidateranges[0]
1144 (s2,e2) = toinvalidateranges[1]
1145 emptyrange1 = [False for piece in xrange(s1,e1+1)]
1146 emptyrange2 = [False for piece in xrange(s2,e2+1)]
1148 assert len(emptyrange1) == e1+1-s1
1149 assert len(emptyrange2) == e2+1-s2
1151 for d in self.downloads:
1152 newhave = emptyrange1 + d.have[e1+1:s2] + emptyrange2
1155 d.have = Bitfield(length=len(newhave),fromarray=newhave)
1156 #assert oldhave.tostring() == d.have.tostring()
1158 for piece in toinvalidateset:
1159 d.have[piece] = False
1160 print >>sys.stderr,"d len",len(d.have)
1161 print >>sys.stderr,"new len",len(newhave)
1162 for i in xrange(0,len(newhave)):
1163 if d.have[i] != newhave[i]:
1164 print >>sys.stderr,"newhave diff",i
1170 def aggregate_and_send_haves(self):
1171 """ Aggregates the information from the haves bitfields for all the active connections,
1172 then calls the helper class to send the aggregated information as a PROXY_HAVE message
1174 if self.picker.helper:
1175 # The current node is a coordinator
1177 print >> sys.stderr,"Downloader: aggregate_and_send_haves: helper None or helper conn"
1179 # haves_vector is a matrix, having on each line a Bitfield
1180 haves_vector = [None] * len(self.downloads)
1181 for i in range(0, len(self.downloads)):
1182 haves_vector[i] = self.downloads[i].have
1184 #Calculate the aggregated haves
1185 aggregated_haves = Bitfield(self.numpieces)
1186 for piece in range (0, self.numpieces):
1187 aggregated_value = False
1188 # For every column in the haves_vector matrix
1189 for d in range(0, len(self.downloads)):
1190 # For every active connection
1191 aggregated_value = aggregated_value or haves_vector[d][piece] # Logical OR operation
1192 aggregated_haves[piece] = aggregated_value
1194 self.picker.helper.send_proxy_have(aggregated_haves)