1 # Written by Jan David Mol, Arno Bakker, George Milescu
2 # see LICENSE.txt for license information
7 from traceback import print_exc,print_stack
9 from BaseLib.Core.BitTornado.BT1.PiecePicker import PiecePicker
12 from BaseLib.Core.BitTornado.BT1.Downloader import print_chunks
14 # percent piece loss to emulate -- we just don't request this percentage of the pieces
15 # only implemented for live streaming
17 TEST_VOD_OVERRIDE = False
20 DEBUG_CHUNKS = False # set DEBUG_CHUNKS in BT1.Downloader to True
23 def rarest_first( has_dict, rarity_list, filter = lambda x: True ):
24 """ Select the rarest of pieces in has_dict, according
25 to the rarities in rarity_list. Breaks ties uniformly
26 at random. Additionally, `filter' is applied to select
27 the pieces we can return. """
30 - `choice' is the choice so far
31 - `n' is the number of pieces we could choose from so far
32 - `rarity' is the rarity of the choice so far
34 Every time we see a rarer piece, we reset our choice.
35 Every time we see a piece of the same rarity we're looking for,
36 we select it (overriding the previous choice) with probability 1/n.
37 This leads to a uniformly selected piece in one pass, be it that
38 we need more random numbers than when doing two passes. """
44 for k in (x for x in has_dict if filter(x)):
47 if rarity is None or r < rarity:
53 if random.uniform(0,n) == 0: # uniform selects from [0,n)
58 class PiecePickerStreaming(PiecePicker):
59 """ Implements piece picking for streaming video. Keeps track of playback
60 point and avoids requesting obsolete pieces. """
62 # order of initialisation and important function calls
63 # PiecePicker.__init__ (by BitTornado.BT1Download.__init__)
64 # PiecePicker.complete (by hash checker, for pieces on disk)
65 # MovieSelector.__init__
66 # PiecePicker.set_download_range (indirectly by MovieSelector.__init__)
67 # MovieOnDemandTransporter.__init__ (by BitTornado.BT1Download.startEngine)
68 # PiecePicker.set_bitrate (by MovieOnDemandTransporter)
69 # PiecePicker.set_transporter (by MovieOnDemandTransporter)
71 # PiecePicker._next (once connections are set up)
73 # PiecePicker.complete (by hash checker, for pieces received)
75 # relative size of mid-priority set
78 def __init__(self, numpieces,
79 rarest_first_cutoff = 1, rarest_first_priority_cutoff = 3,
80 priority_step = 20, helper = None, coordinator = None, rate_predictor = None, piecesize = 0):
81 PiecePicker.__init__( self, numpieces, rarest_first_cutoff, rarest_first_priority_cutoff,
82 priority_step, helper, coordinator, rate_predictor)
84 # maximum existing piece number, to avoid scanning beyond it in next()
89 self.stats["high"] = 0
94 self.transporter = None
96 # self.outstanding_requests contains (piece-id, begin,
97 # length):timestamp pairs for each outstanding request.
98 self.outstanding_requests = {}
100 # The playing_delay and buffering_delay give three values
101 # (min, max, offeset) in seconds.
103 # The min tells how long before the cancel policy is allowed
104 # to kick in. We can not expect to receive a piece instantly,
105 # so we have to wait this time before having a download speed
108 # The max tells how long before we cancel the request. The
109 # request may also be canceled because the chunk will not be
110 # completed given the current download speed.
112 # The offset gives a grace period that is taken into account
113 # when choosing to cancel a request. For instance, when the
114 # peer download speed is too low to receive the chunk within 10
115 # seconds, a grace offset of 15 would ensure that the chunk is
116 # NOT canceled (useful while buffering)
117 self.playing_delay = (5, 20, -0.5)
118 self.buffering_delay = (7.5, 30, 10)
120 # Arno, 2010-04-20: STBSPEED: is_interesting is now a variable.
121 self.is_interesting = self.is_interesting_normal
123 def set_transporter(self, transporter):
124 self.transporter = transporter
127 Arno, 2010-04-20: STBSPEED: Replaced by transporter.complete_from_persistent_state()
128 # update its information -- pieces read from disk
129 if not self.videostatus.live_streaming:
130 for i in xrange(self.videostatus.first_piece,self.videostatus.last_piece+1):
132 self.transporter.complete( i, downloaded=False )
135 def set_videostatus(self,videostatus):
136 """ Download in a wrap-around fashion between pieces [0,numpieces).
137 Look at most delta pieces ahead from download_range[0].
139 self.videostatus = videostatus
141 if self.videostatus.live_streaming:
142 self.is_interesting = self.is_interesting_live
144 self.is_interesting = self.is_interesting_vod
145 videostatus.add_playback_pos_observer( self.change_playback_pos )
147 def is_interesting_live(self,piece):
148 return self.videostatus.in_download_range( piece ) and not self.has[piece]
150 def is_interesting_vod(self,piece):
151 return (self.videostatus.first_piece <= piece <= self.videostatus.last_piece) and not self.has[piece]
153 def is_interesting_normal(self,piece):
154 return not self.has[piece]
157 def change_playback_pos(self, oldpos, newpos):
160 valid = self.is_interesting
162 for d in self.peer_connections.values():
164 has = d["connection"].download.have
166 # Arno, 2009-11-07: STBSPEED: iterator over just valid range, that's
167 # what we'll be interested in.
168 #for i in xrange(self.videostatus.first_piece,self.videostatus.last_piece+1):
169 for i in self.get_valid_range_iterator():
170 if has[i] and valid(i):
173 d["interesting"] = interesting
175 # playback position incremented -- remove timed out piece
176 for d in self.peer_connections.values():
177 d["interesting"].pop(oldpos,0)
179 def got_have(self, piece, connection=None):
181 # print >>sys.stderr,"PiecePickerStreaming: got_have:",piece
182 self.maxhave = max(self.maxhave,piece)
184 # Arno, 2010-04-15: STBSPEED Disabled, does nothing but stats.
185 #if self.transporter:
186 # self.transporter.got_have( piece )
187 PiecePicker.got_have(self,piece,connection)
189 if self.is_interesting(piece):
190 self.peer_connections[connection]["interesting"][piece] = 1
194 self.maxhave = self.numpieces
195 PiecePicker.got_seed( self )
197 def lost_have(self, piece):
198 PiecePicker.lost_have( self, piece )
200 def got_peer(self, connection):
201 PiecePicker.got_peer( self, connection )
203 self.peer_connections[connection]["interesting"] = {}
205 def lost_peer(self, connection):
206 PiecePicker.lost_peer( self, connection )
208 def got_piece(self, *request):
209 if request in self.outstanding_requests:
210 del self.outstanding_requests[request]
212 self.transporter.got_piece(*request)
214 def complete(self, piece):
216 print >>sys.stderr,"PiecePickerStreaming: complete:",piece
218 PiecePicker.complete( self, piece )
220 self.transporter.complete( piece )
222 for request in self.outstanding_requests.keys():
223 if request[0] == piece:
224 del self.outstanding_requests[request]
226 # don't consider this piece anymore
227 for d in self.peer_connections.itervalues():
228 d["interesting"].pop(piece,0)
230 def num_nonempty_neighbours(self):
231 # return #neighbours who have something
232 return len( [c for c in self.peer_connections if c.download.have.numfalse < c.download.have.length] )
234 def pos_is_sustainable(self,fudge=2):
236 Returns whether we have enough data around us to support the current playback position.
237 If not, playback should pause, stall or reinitialised when pieces are lost.
239 vs = self.videostatus
241 # only holds for live streaming for now. theoretically, vod can have the same problem
242 # since data can be seeded in a 'live' fashion
243 if not vs.live_streaming:
245 print >>sys.stderr, "PiecePickerStreaming: pos is sustainable: not streaming live"
248 # We assume the maximum piece number that is available at at least half of the neighbours
249 # to be sustainable. Although we only need a fixed number of neighbours with enough bandwidth,
250 # such neighbours may depart, hence we choose a relative trade-off.
252 # this means that our current playback position is sustainable if any future piece
253 # is owned by at least half of the peers
255 # ignore peers which have nothing
256 numconn = self.num_nonempty_neighbours()
259 # not sustainable, but nothing we can do. Return True to avoid pausing
260 # and getting out of sync.
262 print >>sys.stderr, "PiecePickerStreaming: pos is sustainable: no neighbours with pieces"
265 half = max( 1, numconn/2 )
266 skip = fudge # ignore the first 'fudge' pieces
268 for x in vs.generate_range( vs.download_range() ):
271 elif self.numhaves[x] >= half:
273 print >>sys.stderr, "PiecePickerStreaming: pos is sustainable: piece %s @ %s>%s peers (fudge=%s)" % (x,self.numhaves[x],half,fudge)
279 print >>sys.stderr, "PiecePickerStreaming: pos is NOT sustainable playpos=%s fudge=%s numconn=%s half=%s numpeers=%s %s" % (vs.playback_pos,fudge,numconn,half,len(self.peer_connections),[x.get_ip() for x in self.peer_connections])
281 # too few neighbours own the future pieces. it's wise to pause and let neighbours catch up
286 # next: selects next piece to download. adjusts wantfunc with filter for streaming; calls
287 # _next: selects next piece to download. completes partial downloads first, if needed, otherwise calls
288 # next_new: selects next piece to download. override this with the piece picking policy
290 def next(self, haves, wantfunc, sdownload, complete_first = False, helper_con = False, slowpieces=[], willrequest=True,connection=None,proxyhave=None):
291 def newwantfunc( piece ):
292 #print >>sys.stderr,"S",self.streaming_piece_filter( piece ),"!sP",not (piece in slowpieces),"w",wantfunc( piece )
293 return not (piece in slowpieces) and wantfunc( piece )
295 # fallback: original piece picker
296 p = PiecePicker.next(self, haves, newwantfunc, sdownload, complete_first, helper_con, slowpieces=slowpieces, willrequest=willrequest,connection=connection)
297 if DEBUGPP and self.videostatus.prebuffering:
298 print >>sys.stderr,"PiecePickerStreaming: original PP.next returns",p
299 # Arno, 2010-03-11: Njaal's CS something causes this to return None
300 # when we're not complete: added check
301 if p is None and not self.videostatus.live_streaming and self.am_I_complete() or TEST_VOD_OVERRIDE:
302 # When the file we selected from a multi-file torrent is complete,
303 # we won't request anymore pieces, so the normal way of detecting
304 # we're done is not working and we won't tell the video player
305 # we're playable. Do it here instead.
306 self.transporter.notify_playable()
309 def _next(self, haves, wantfunc, complete_first, helper_con, willrequest=True, connection=None):
310 """ First, complete any partials if needed. Otherwise, select a new piece. """
312 #print >>sys.stderr,"PiecePickerStreaming: complete_first is",complete_first,"started",self.started
314 # cutoff = True: random mode
315 # False: rarest-first mode
316 cutoff = self.numgot < self.rarest_first_cutoff
318 # whether to complete existing partials first -- do so before the
319 # cutoff, or if forced by complete_first, but not for seeds.
320 #complete_first = (complete_first or cutoff) and not haves.complete()
321 complete_first = (complete_first or cutoff)
323 # most interesting piece
326 # interest level of best piece
329 # select piece we started to download with best interest index.
330 for i in self.started:
332 if haves[i] and wantfunc(i) and (self.helper is None or helper_con or not self.helper.is_ignored(i)):
334 if self.level_in_interests[i] < bestnum:
336 bestnum = self.level_in_interests[i]
339 # found a piece -- return it if we are completing partials first
340 # or if there is a cutoff
341 if complete_first or (cutoff and len(self.interests) > self.cutoff):
344 p = self.next_new(haves, wantfunc, complete_first, helper_con,willrequest=willrequest,connection=connection)
346 print >>sys.stderr,"PiecePickerStreaming: next_new returns",p
349 def check_outstanding_requests(self, downloads):
350 if not self.transporter:
355 in_high_range = self.videostatus.in_high_range
356 playing_mode = self.videostatus.playing and not self.videostatus.paused
357 piece_due = self.transporter.piece_due
361 min_delay, max_delay, offset_delay = self.playing_delay
364 min_delay, max_delay, offset_delay = self.buffering_delay
366 for download in downloads:
369 download_rate = download.get_short_term_rate()
370 for piece_id, begin, length in download.active_requests:
371 # select policy for this piece
373 time_request = self.outstanding_requests[(piece_id, begin, length)]
377 # add the length of this chunk to the total of bytes
378 # that needs to be downloaded
379 total_length += length
381 # each request must be allowed at least some
382 # minimal time to be handled
383 if now < time_request + min_delay:
386 # high-priority pieces are eligable for
387 # cancelation. Others are not. They will eventually be
388 # eligable as they become important for playback.
389 if in_high_range(piece_id):
390 if download_rate == 0:
391 # we have not received anything in the last min_delay seconds
392 if DEBUG: print >>sys.stderr, "PiecePickerStreaming: download not started yet for piece", piece_id, "chunk", begin, "on", download.ip
393 cancel_requests.append((piece_id, begin, length))
394 download.bad_performance_counter += 1
398 time_until_deadline = min(piece_due(piece_id), time_request + max_delay - now)
400 time_until_deadline = time_request + max_delay - now
401 time_until_download = total_length / download_rate
403 # we have to cancel when the deadline can not be met
404 if time_until_deadline < time_until_download - offset_delay:
405 if DEBUG: print >>sys.stderr, "PiecePickerStreaming: download speed too slow for piece", piece_id, "chunk", begin, "on", download.ip, "Deadline in", time_until_deadline, "while estimated download in", time_until_download
406 cancel_requests.append((piece_id, begin, length))
408 # Cancel all requests that are too late
411 self.downloader.cancel_requests(cancel_requests)
417 print_chunks(self.downloader, list(self.videostatus.generate_high_range()), compact=False)
419 def requested(self, *request):
420 self.outstanding_requests[request] = time.time()
421 return PiecePicker.requested(self, *request)
423 def next_new(self, haves, wantfunc, complete_first, helper_con, willrequest=True, connection=None):
424 """ Determine which piece to download next from a peer.
426 haves: set of pieces owned by that peer
427 wantfunc: custom piece filter
428 complete_first: whether to complete partial pieces first
430 willrequest: whether the returned piece will actually be requested
434 vs = self.videostatus
436 if vs.live_streaming:
437 # first, make sure we know where to start downloading
438 if vs.live_startpos is None:
439 self.transporter.calc_live_startpos( self.transporter.max_prebuf_packets, False )
440 print >>sys.stderr,"vod: pp: determined startpos of",vs.live_startpos
442 # select any interesting piece, rarest first
444 # Without 'connection', we don't know who we will request from.
446 #print >>sys.stderr,"PiecePickerStreaming: pp",connection.get_ip(),"int",self.peer_connections[connection]["interesting"]
448 return rarest_first( self.peer_connections[connection]["interesting"], self.numhaves, wantfunc )
450 def pick_first( f, t ): # no shuffle
451 for i in vs.generate_range((f,t)):
452 # Is there a piece in the range the peer has?
453 # Is there a piece in the range we don't have?
454 if not haves[i] or self.has[i]:
457 if not wantfunc(i): # Is there a piece in the range we want?
460 if self.helper is None or helper_con or not self.helper.is_ignored(i):
465 def pick_rarest_loop_over_small_range(f,t,shuffle=True):
466 # Arno: pick_rarest is way expensive for the midrange thing,
467 # therefore loop over the list of pieces we want and see
468 # if it's avail, rather than looping over the list of all
469 # pieces to see if one falls in the (f,t) range.
471 xr = vs.generate_range((f,t))
474 # xr is an xrange generator, need real values to shuffle
481 #print >>sys.stderr,"H",
482 if not haves[i] or self.has[i]:
485 #print >>sys.stderr,"W",
489 if self.helper is None or helper_con or not self.helper.is_ignored(i):
494 def pick_rarest_small_range(f,t):
495 #print >>sys.stderr,"choice small",f,t
496 d = vs.dist_range(f,t)
498 for level in xrange(len(self.interests)):
499 piecelist = self.interests[level]
501 if len(piecelist) > d:
502 #if level+1 == len(self.interests):
503 # Arno: Lowest level priorities / long piecelist.
504 # This avoids doing a scan that goes over the entire list
505 # of pieces when we already have the hi and/or mid ranges.
507 # Arno, 2008-05-21: Apparently, the big list is not always
508 # at the lowest level, hacked distance metric to determine
509 # whether to use slow or fast method.
511 #print >>sys.stderr,"choice QUICK"
512 return pick_rarest_loop_over_small_range(f,t)
513 #print >>sys.stderr,"choice Q",diffstr,"l",level,"s",len(piecelist)
515 # Higher priorities / short lists
517 if not vs.in_range( f, t, i ):
520 #print >>sys.stderr,"H",
521 if not haves[i] or self.has[i]:
524 #print >>sys.stderr,"W",
528 if self.helper is None or helper_con or not self.helper.is_ignored(i):
533 def pick_rarest(f,t): #BitTorrent already shuffles the self.interests for us
534 for piecelist in self.interests:
536 if not vs.in_range( f, t, i ):
539 #print >>sys.stderr,"H",
540 if not haves[i] or self.has[i]:
543 #print >>sys.stderr,"W",
547 if self.helper is None or helper_con or not self.helper.is_ignored(i):
552 first, last = vs.download_range()
553 priority_first, priority_last = vs.get_high_range()
554 if priority_first != priority_last:
555 first = priority_first
556 highprob_cutoff = vs.normalize(priority_last + 1)
557 # Arno, 2010-08-10: Errr, mid = MU * high
558 midprob_cutoff = vs.normalize(first + self.MU * vs.get_range_length(first, highprob_cutoff))
560 highprob_cutoff = last
561 midprob_cutoff = vs.normalize(first + self.MU * vs.high_prob_curr_pieces)
562 # h = vs.time_to_pieces( self.HIGH_PROB_SETSIZE )
563 # highprob_cutoff = vs.normalize(first + max(h, self.HIGH_PROB_MIN_PIECES))
564 # midprob_cutoff = vs.normalize(first + max(self.MU * h, self.HIGH_PROB_MIN_PIECES))
566 # print >>sys.stderr, "Prio %s:%s:%s" % (first, highprob_cutoff, midprob_cutoff), highprob_cutoff - first, midprob_cutoff - highprob_cutoff
568 # first,last = vs.download_range()
570 # max_lookahead = vs.wraparound_delta
572 # max_lookahead = vs.last_piece - vs.playback_pos
574 # highprob_cutoff = vs.normalize( first + min( h, max_lookahead ) )
575 # midprob_cutoff = vs.normalize( first + min( h + self.MU * h, max_lookahead ) )
577 if vs.live_streaming:
578 # for live playback consider peers to be bad when they miss the deadline 5 times
579 allow_based_on_performance = connection.download.bad_performance_counter < 5
581 # for VOD playback consider peers to be bad when they miss the deadline 1 time
582 # Diego : patch from Jan
584 allow_based_on_performance = connection.download.bad_performance_counter < 1
586 allow_based_on_performance = True
590 t = vs.normalize( first + self.transporter.max_prebuf_packets )
591 choice = pick_rarest_small_range(f,t)
597 if vs.live_streaming:
598 choice = pick_rarest_small_range( first, highprob_cutoff )
600 choice = pick_first( first, highprob_cutoff )
603 # it is possible that the performance of this peer prohibits
604 # us from selecting this piece...
605 if not allow_based_on_performance:
606 high_priority_choice = choice
610 choice = pick_rarest_small_range( highprob_cutoff, midprob_cutoff )
614 if vs.live_streaming:
615 # Want: loop over what peer has avail, respecting piece priorities
616 # (could ignore those for live).
618 # Attempt 1: loop over range (which is 25% of window (see
619 # VideoStatus), ignoring priorities, no shuffle.
620 #print >>sys.stderr,"vod: choice low RANGE",midprob_cutoff,last
621 #choice = pick_rarest_loop_over_small_range(midprob_cutoff,last,shuffle=False)
624 choice = pick_rarest( midprob_cutoff, last )
627 if choice and willrequest:
628 self.stats[type] += 1
631 print >>sys.stderr,"vod: pp: picked piece %s [type=%s] [%d,%d,%d,%d]" % (`choice`,type,first,highprob_cutoff,midprob_cutoff,last)
633 # 12/05/09, boudewijn: (1) The bad_performance_counter is
634 # incremented whenever a piece download failed and decremented
635 # whenever is succeeds. (2) A peer with a positive
636 # bad_performance_counter is only allowd to pick low-priority
637 # pieces. (Conclusion) When all low-priority pieces are
638 # downloaded the client hangs when one or more high-priority
639 # pieces are required and if all peers have a positive
640 # bad_performance_counter.
641 if choice is None and not allow_based_on_performance:
642 # ensure that there is another known peer with a
643 # non-positive bad_performance_counter that has the piece
644 # that we would pick from the high-priority set for this
647 if high_priority_choice:
649 for download in self.downloader.downloads:
650 if download.have[high_priority_choice] and not download.bad_performance_counter:
654 # no other connection has it... then ignore the
655 # bad_performance_counter advice and attempt to
656 # download it from this connection anyway
657 if DEBUG: print >>sys.stderr, "vod: pp: the bad_performance_counter says this is a bad peer... but we have nothing better... requesting piece", high_priority_choice, "regardless."
658 choice = high_priority_choice
660 if not vs.live_streaming:
661 if choice is None and not self.am_I_complete():
663 # VOD + seeking: we seeked into the future and played till end,
664 # there is a gap between the old playback and the seek point
665 # which we didn't download, and otherwise never will.
667 secondchoice = pick_rarest(vs.first_piece,vs.last_piece)
668 if secondchoice is not None:
670 print >>sys.stderr,"vod: pp: Picking skipped-over piece",secondchoice
675 def is_valid_piece(self,piece):
676 return self.videostatus.in_valid_range(piece)
678 def get_valid_range_iterator(self):
679 if self.videostatus.live_streaming and self.videostatus.get_live_startpos() is None:
680 # Not hooked in, so cannot provide a sensible download range
681 #print >>sys.stderr,"PiecePickerStreaming: Not hooked in, valid range set to total"
682 return PiecePicker.get_valid_range_iterator(self)
684 #print >>sys.stderr,"PiecePickerStreaming: Live hooked in, or VOD, valid range set to subset"
685 first,last = self.videostatus.download_range()
686 return self.videostatus.generate_range((first,last))
688 def get_live_source_have(self):
689 for d in self.peer_connections.values():
690 if d["connection"].is_live_source():
691 return d["connection"].download.have
696 def am_I_complete(self):
697 return self.done and not TEST_VOD_OVERRIDE
700 PiecePickerVOD = PiecePickerStreaming