instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / BitTornado / BT1 / Downloader.py
1 # Written by Bram Cohen and Pawel Garbacki, George Milescu
2 # see LICENSE.txt for license information
3
4 import sys
5 import time
6 from BaseLib.Core.BitTornado.CurrentRateMeasure import Measure
7 from BaseLib.Core.BitTornado.bitfield import Bitfield
8 from random import shuffle
9 from base64 import b64encode
10 from BaseLib.Core.BitTornado.clock import clock
11 from BaseLib.Core.Statistics.Status.Status import get_status_holder
12
13 #ProxyService_
14 #
15 try:
16     from BaseLib.Core.ProxyService.Helper import SingleDownloadHelperInterface
17 except ImportError:
18     class SingleDownloadHelperInterface:
19         
20         def __init__(self):
21             pass
22 #
23 #_ProxyService
24
25 try:
26     True
27 except:
28     True = 1
29     False = 0
30
31 DEBUG = False
32 DEBUGBF = False
33 DEBUG_CHUNKS = False # set DEBUG_CHUNKS in PiecePickerStreaming to True
34 EXPIRE_TIME = 60 * 60
35
36 # only define the following functions in __debug__. And only import
37 # them in this case. They are to expensive to have, and have no
38 # purpose, outside debug mode.
39 #
40 # Arno, 2009-06-15: Win32 binary versions have __debug__ True apparently, workaround.
41 #
42 if DEBUG_CHUNKS:
43     _ident_letters = {}
44     _ident_letter_pool = None
45     def get_ident_letter(download):
46         if not download.ip in _ident_letters:
47             global _ident_letter_pool
48             if not _ident_letter_pool:
49                 _ident_letter_pool = [chr(c) for c in range(ord("a"), ord("z")+1)] + [chr(c) for c in range(ord("A"), ord("Z")+1)]
50             _ident_letters[download.ip] = _ident_letter_pool.pop(0)
51         return _ident_letters[download.ip]
52
53     def print_chunks(downloader, pieces, before=(), after=(), compact=True):
54         """
55         Print a line summery indicating completed/outstanding/non-requested chunks
56
57         When COMPACT is True one character will represent one piece.
58         #   --> downloaded
59         -   --> no outstanding requests
60         1-9 --> the number of outstanding requests (max 9)
61
62         When COMPACT is False one character will requests one chunk.
63         #   --> downloaded
64         -   --> no outstanding requests
65         a-z --> requested at peer with that character (also capitals, duplicates may occur)
66         1-9 --> requested multipile times (at n peers)
67         """
68         if pieces:
69             do_I_have = downloader.storage.do_I_have
70             do_I_have_requests = downloader.storage.do_I_have_requests
71             inactive_requests = downloader.storage.inactive_requests
72             piece_size = downloader.storage.piece_length
73             chunk_size = downloader.storage.request_size
74             chunks_per_piece = int(piece_size / chunk_size)
75
76             if compact:
77                 request_map = {}
78                 for download in downloader.downloads:
79                     for piece, begin, length in download.active_requests:
80                         if not piece in request_map:
81                             request_map[piece] = 0
82                         request_map[piece] += 1
83
84                 def print_chunks_helper(piece_id):
85                     if do_I_have(piece_id): return "#"
86                     if do_I_have_requests(piece_id): return "-"
87                     if piece_id in request_map: return str(min(9, request_map[piece_id]))
88                     return "?"
89
90             else:
91                 request_map = {}
92                 for download in downloader.downloads:
93                     
94                     for piece, begin, length in download.active_requests:
95                         if not piece in request_map:
96                             request_map[piece] = ["-"] * chunks_per_piece
97                         index = int(begin/chunk_size)
98                         if request_map[piece][index] == "-":
99                             request_map[piece][index] = get_ident_letter(download)
100                         elif type(request_map[piece][index]) is str:
101                             request_map[piece][index] = 2
102                         else:
103                             request_map[piece][index] += 1
104                         request_map[piece][int(begin/chunk_size)] = get_ident_letter(download)
105
106                 def print_chunks_helper(piece_id):
107                     if do_I_have(piece_id): return "#" * chunks_per_piece
108 #                    if do_I_have_requests(piece_id): return "-" * chunks_per_piece
109                     if piece_id in request_map:
110                         if piece_id in inactive_requests and type(inactive_requests[piece_id]) is list:
111                             for begin, length in inactive_requests[piece_id]:
112                                 request_map[piece_id][int(begin/chunk_size)] = " "
113                         return "".join([str(c) for c in request_map[piece_id]])
114                     return "-" * chunks_per_piece
115
116             if before:
117                 s_before = before[0]
118             else:
119                 s_before = ""
120
121             if after:
122                 s_after = after[-1]
123             else:
124                 s_after = ""
125
126             print >>sys.stderr, "Outstanding %s:%d:%d:%s [%s|%s|%s]" % (s_before, pieces[0], pieces[-1], s_after, "".join(map(print_chunks_helper, before)), "".join(map(print_chunks_helper, pieces)), "".join(map(print_chunks_helper, after)))
127
128         else:
129             print >>sys.stderr, "Outstanding 0:0 []"
130
131 else:
132     def print_chunks(downloader, pieces, before=(), after=(), compact=True):
133         pass
134
135
136 class PerIPStats:  
137     def __init__(self, ip):
138         self.numgood = 0
139         self.bad = {}
140         self.numconnections = 0
141         self.lastdownload = None
142         self.peerid = None
143
144 class BadDataGuard:
145     def __init__(self, download):
146         self.download = download
147         self.ip = download.ip
148         self.downloader = download.downloader
149         self.stats = self.downloader.perip[self.ip]
150         self.lastindex = None
151
152     def failed(self, index, bump = False):
153         self.stats.bad.setdefault(index, 0)
154         self.downloader.gotbaddata[self.ip] = 1
155         self.stats.bad[index] += 1
156         if len(self.stats.bad) > 1:
157             if self.download is not None:
158                 self.downloader.try_kick(self.download)
159             elif self.stats.numconnections == 1 and self.stats.lastdownload is not None:
160                 self.downloader.try_kick(self.stats.lastdownload)
161         if len(self.stats.bad) >= 3 and len(self.stats.bad) > int(self.stats.numgood/30):
162             self.downloader.try_ban(self.ip)
163         elif bump:
164             self.downloader.picker.bump(index)
165
166     def good(self, index):
167         # lastindex is a hack to only increase numgood by one for each good
168         # piece, however many chunks come from the connection(s) from this IP
169         if index != self.lastindex:
170             self.stats.numgood += 1
171             self.lastindex = index
172
173 # 2fastbt_
174 class SingleDownload(SingleDownloadHelperInterface):
175 # _2fastbt
176     def __init__(self, downloader, connection):
177 # 2fastbt_
178         SingleDownloadHelperInterface.__init__(self)
179 # _2fastbt
180         self.downloader = downloader
181         self.connection = connection
182         self.choked = True
183         self.interested = False
184         self.active_requests = []
185         self.measure = Measure(downloader.max_rate_period)
186         self.peermeasure = Measure(downloader.max_rate_period)
187         self.have = Bitfield(downloader.numpieces)
188         self.last = -1000
189         self.last2 = -1000
190         self.example_interest = None
191         self.backlog = 2
192         self.ip = connection.get_ip()
193         self.guard = BadDataGuard(self)
194 # 2fastbt_
195         self.helper = downloader.picker.helper
196         self.proxy_have = Bitfield(downloader.numpieces)
197 # _2fastbt
198
199         # boudewijn: VOD needs a download measurement that is not
200         # averaged over a 'long' period. downloader.max_rate_period is
201         # (by default) 20 seconds because this matches the unchoke
202         # policy.
203         self.short_term_measure = Measure(5)
204
205         # boudewijn: each download maintains a counter for the number
206         # of high priority piece requests that did not get any
207         # responce within x seconds.
208         self.bad_performance_counter = 0
209
210     def _backlog(self, just_unchoked):
211         self.backlog = int(min(
212             2+int(4*self.measure.get_rate()/self.downloader.chunksize),
213             (2*just_unchoked)+self.downloader.queue_limit() ))
214         if self.backlog > 50:
215             self.backlog = int(max(50, self.backlog * 0.075))
216         return self.backlog
217     
218     def disconnected(self):
219         self.downloader.lost_peer(self)
220
221         """ JD: obsoleted -- moved to picker.lost_peer
222
223         if self.have.complete():
224             self.downloader.picker.lost_seed()
225         else:
226             for i in xrange(len(self.have)):
227                 if self.have[i]:
228                     self.downloader.picker.lost_have(i)
229         """
230
231         if self.have.complete() and self.downloader.storage.is_endgame():
232             self.downloader.add_disconnected_seed(self.connection.get_readable_id())
233         self._letgo()
234         self.guard.download = None
235
236     def _letgo(self):
237         if self.downloader.queued_out.has_key(self):
238             del self.downloader.queued_out[self]
239         if not self.active_requests:
240             return
241         if self.downloader.endgamemode:
242             self.active_requests = []
243             return
244         lost = {}
245         for index, begin, length in self.active_requests:
246             self.downloader.storage.request_lost(index, begin, length)
247             lost[index] = 1
248         lost = lost.keys()
249         self.active_requests = []
250         if self.downloader.paused:
251             return
252         ds = [d for d in self.downloader.downloads if not d.choked]
253         shuffle(ds)
254         for d in ds:
255             d._request_more()
256         for d in self.downloader.downloads:
257             if d.choked and not d.interested:
258                 for l in lost:
259                     if d.have[l] and self.downloader.storage.do_I_have_requests(l):
260                         d.send_interested()
261                         break
262
263     def got_choke(self):
264         if not self.choked:
265             self.choked = True
266             self._letgo()
267
268     def got_unchoke(self):
269         if self.choked:
270             self.choked = False
271             if self.interested:
272                 self._request_more(new_unchoke = True)
273             self.last2 = clock()
274
275     def is_choked(self):
276         return self.choked
277
278     def is_interested(self):
279         return self.interested
280
281     def send_interested(self):
282         if not self.interested:
283             self.interested = True
284             self.connection.send_interested()
285
286     def send_not_interested(self):
287         if self.interested:
288             self.interested = False
289             self.connection.send_not_interested()
290
291     def got_piece(self, index, begin, hashlist, piece):
292         """
293         Returns True if the piece is complete.
294         Note that in this case a -piece- means a chunk!
295         """
296
297         if self.bad_performance_counter:
298             self.bad_performance_counter -= 1
299             if DEBUG: print >>sys.stderr, "decreased bad_performance_counter to", self.bad_performance_counter
300
301         length = len(piece)
302         #if DEBUG:
303         #    print >> sys.stderr, 'Downloader: got piece of length %d' % length
304         try:
305             self.active_requests.remove((index, begin, length))
306         except ValueError:
307             self.downloader.discarded += length
308             return False
309         if self.downloader.endgamemode:
310             self.downloader.all_requests.remove((index, begin, length))
311             if DEBUG: print >>sys.stderr, "Downloader: got_piece: removed one request from all_requests", len(self.downloader.all_requests), "remaining"
312
313         self.last = clock()
314         self.last2 = clock()
315         self.measure.update_rate(length)
316         # Update statistic gatherer
317         status = get_status_holder("LivingLab")
318         s_download = status.get_or_create_status_element("downloaded",0)
319         s_download.inc(length)
320         
321         self.short_term_measure.update_rate(length)
322         self.downloader.measurefunc(length)
323         if not self.downloader.storage.piece_came_in(index, begin, hashlist, piece, self.guard):
324             self.downloader.piece_flunked(index)
325             return False
326
327         # boudewijn: we need more accurate (if possibly invalid)
328         # measurements on current download speed
329         self.downloader.picker.got_piece(index, begin, length)
330
331 #        print "Got piece=", index, "begin=", begin, "len=", length
332         if self.downloader.storage.do_I_have(index):
333             self.downloader.picker.complete(index)
334
335         if self.downloader.endgamemode:
336             for d in self.downloader.downloads:
337                 if d is not self:
338                     if d.interested:
339                         if d.choked:
340                             assert not d.active_requests
341                             d.fix_download_endgame()
342                         else:
343                             try:
344                                 d.active_requests.remove((index, begin, length))
345                             except ValueError:
346                                 continue
347                             d.connection.send_cancel(index, begin, length)
348                             d.fix_download_endgame()
349                     else:
350                         assert not d.active_requests
351         self._request_more()
352         self.downloader.check_complete(index)
353         
354         # BarterCast counter
355         self.connection.total_downloaded += length
356     
357         return self.downloader.storage.do_I_have(index)
358
359 # 2fastbt_
360     def helper_forces_unchoke(self):
361         self.choked = False
362 # _2fastbt
363
364     def _request_more(self, new_unchoke = False, slowpieces = []):
365 # 2fastbt_
366         if DEBUG:
367             print >>sys.stderr,"Downloader: _request_more()"
368         if self.helper is not None and self.is_frozen_by_helper():
369             if DEBUG:
370                 print >>sys.stderr,"Downloader: _request_more: blocked, returning"
371             return
372 # _2fastbt    
373         if self.choked:
374             if DEBUG:
375                 print >>sys.stderr,"Downloader: _request_more: choked, returning"
376             return
377 # 2fastbt_
378         # do not download from coordinator
379         if self.connection.connection.is_coordinator_con():
380             if DEBUG:
381                 print >>sys.stderr,"Downloader: _request_more: coordinator conn"
382             return
383 # _2fastbt
384         if self.downloader.endgamemode:
385             self.fix_download_endgame(new_unchoke)
386             if DEBUG:
387                 print >>sys.stderr,"Downloader: _request_more: endgame mode, returning"
388             return
389         if self.downloader.paused:
390             if DEBUG:
391                 print >>sys.stderr,"Downloader: _request_more: paused, returning"
392             return
393         if len(self.active_requests) >= self._backlog(new_unchoke):
394             if DEBUG:
395                 print >>sys.stderr,"Downloader: more req than unchoke (active req: %d >= backlog: %d)" % (len(self.active_requests), self._backlog(new_unchoke))
396             # Jelle: Schedule _request more to be called in some time. Otherwise requesting and receiving packages
397             # may stop, if they arrive to quickly
398             if self.downloader.download_rate:
399                 wait_period = self.downloader.chunksize / self.downloader.download_rate / 2.0
400
401                 # Boudewijn: when wait_period is 0.0 this will cause
402                 # the the _request_more method to be scheduled
403                 # multiple times (recursively), causing severe cpu
404                 # problems.
405                 #
406                 # Therefore, only schedule _request_more to be called
407                 # if the call will be made in the future. The minimal
408                 # wait_period should be tweaked.
409                 if wait_period > 1.0:
410                     if DEBUG:
411                         print >>sys.stderr,"Downloader: waiting for %f s to call _request_more again" % wait_period
412                     self.downloader.scheduler(self._request_more, wait_period)
413                                           
414             if not (self.active_requests or self.backlog):
415                 self.downloader.queued_out[self] = 1
416             return
417         
418         #if DEBUG:
419         #    print >>sys.stderr,"Downloader: _request_more: len act",len(self.active_requests),"back",self.backlog
420         
421         lost_interests = []
422         while len(self.active_requests) < self.backlog:
423             #if DEBUG:
424             #    print >>sys.stderr,"Downloader: Looking for interesting piece"
425             #st = time.time()
426             #print "DOWNLOADER self.have=", self.have.toboollist()
427             
428             # This is the PiecePicker call is the current client is a Coordinator
429             interest = self.downloader.picker.next(self.have,
430                                self.downloader.storage.do_I_have_requests,
431                                self,
432                                self.downloader.too_many_partials(),
433                                self.connection.connection.is_helper_con(),
434                                slowpieces = slowpieces, connection = self.connection, proxyhave = self.proxy_have)
435             #et = time.time()
436             #diff = et-st
437             diff=-1
438             if DEBUG:
439                 print >>sys.stderr,"Downloader: _request_more: next() returned",interest,"took %.5f" % (diff)                               
440             if interest is None:
441                 break
442             
443             if self.helper and self.downloader.storage.inactive_requests[interest] is None:
444                 # The current node is a helper and received a request from a coordinator for a piece it has already downloaded
445                 # Should send a Have message to the coordinator
446                 self.connection.send_have(interest)
447                 break
448
449             if self.helper and self.downloader.storage.inactive_requests[interest] == []:
450                 # The current node is a helper and received a request from a coordinator for a piece that is downloading
451                 # (all blocks are requested to the swarm, and have not arrived yet)
452                 break
453
454             
455             self.example_interest = interest
456             self.send_interested()
457             loop = True
458             while len(self.active_requests) < self.backlog and loop:
459                 
460                 begin, length = self.downloader.storage.new_request(interest)
461                 
462                 if DEBUG:
463                     print >>sys.stderr,"Downloader: new_request",interest,begin,length,"to",self.connection.connection.get_ip(),self.connection.connection.get_port()
464                 
465                 self.downloader.picker.requested(interest, begin, length)
466                 self.active_requests.append((interest, begin, length))
467                 self.connection.send_request(interest, begin, length)
468                 self.downloader.chunk_requested(length)
469                 if not self.downloader.storage.do_I_have_requests(interest):
470                     loop = False
471                     lost_interests.append(interest)
472         if not self.active_requests:
473             self.send_not_interested()
474         if lost_interests:
475             for d in self.downloader.downloads:
476                 if d.active_requests or not d.interested:
477                     continue
478                 if d.example_interest is not None and self.downloader.storage.do_I_have_requests(d.example_interest):
479                     continue
480                 for lost in lost_interests:
481                     if d.have[lost]:
482                         break
483                 else:
484                     continue
485 # 2fastbt_
486                 #st = time.time()
487                 interest = self.downloader.picker.next(d.have,
488                                    self.downloader.storage.do_I_have_requests,
489                                    self, # Arno, 2008-05-22; self -> d? Original Pawel code
490                                    self.downloader.too_many_partials(),
491                                    self.connection.connection.is_helper_con(), willrequest=False,connection=self.connection, proxyhave = self.proxy_have)
492                 #et = time.time()
493                 #diff = et-st
494                 diff=-1
495                 if DEBUG:                                   
496                     print >>sys.stderr,"Downloader: _request_more: next()2 returned",interest,"took %.5f" % (diff)
497
498                 if interest is not None:
499                     # The helper has at least one piece that the coordinator requested 
500                     if self.helper and self.downloader.storage.inactive_requests[interest] is None:
501                         # The current node is a helper and received a request from a coordinator for a piece it has already downloaded
502                         # Should send a Have message to the coordinator
503                         self.connection.send_have(interest)
504                         break
505                     if self.helper and self.downloader.storage.inactive_requests[interest] == []:
506                         # The current node is a helper and received a request from a coordinator for a piece that is downloading
507                         # (all blocks are requested to the swarm, and have not arrived yet)
508                         break
509
510 # _2fastbt
511                 if interest is None:
512                     d.send_not_interested()
513                 else:
514                     d.example_interest = interest
515                     
516         # Arno: LIVEWRAP: no endgame
517         if not self.downloader.endgamemode and \
518            self.downloader.storage.is_endgame() and \
519            not (self.downloader.picker.videostatus and self.downloader.picker.videostatus.live_streaming):
520             self.downloader.start_endgame()
521
522
523     def fix_download_endgame(self, new_unchoke = False):
524 # 2fastbt_
525         # do not download from coordinator
526         if self.downloader.paused or self.connection.connection.is_coordinator_con():
527             if DEBUG: print >>sys.stderr, "Downloader: fix_download_endgame: paused", self.downloader.paused, "or is_coordinator_con", self.connection.connection.is_coordinator_con()
528             return
529 # _2fastbt
530
531         if len(self.active_requests) >= self._backlog(new_unchoke):
532             if not (self.active_requests or self.backlog) and not self.choked:
533                 self.downloader.queued_out[self] = 1
534             if DEBUG: print >>sys.stderr, "Downloader: fix_download_endgame: returned"
535             return
536 # 2fastbt_
537         want = [a for a in self.downloader.all_requests if self.have[a[0]] and a not in self.active_requests and (self.helper is None or self.connection.connection.is_helper_con() or not self.helper.is_ignored(a[0]))]
538 # _2fastbt
539         if not (self.active_requests or want):
540             self.send_not_interested()
541             if DEBUG: print >>sys.stderr, "Downloader: fix_download_endgame: not interested"
542             return
543         if want:
544             self.send_interested()
545         if self.choked:
546             if DEBUG: print >>sys.stderr, "Downloader: fix_download_endgame: choked"
547             return
548         shuffle(want)
549         del want[self.backlog - len(self.active_requests):]
550         self.active_requests.extend(want)
551         for piece, begin, length in want:
552 # 2fastbt_
553             if self.helper is None or self.connection.connection.is_helper_con() or self.helper.reserve_piece(piece,self):
554                 self.connection.send_request(piece, begin, length)
555                 self.downloader.chunk_requested(length)
556 # _2fastbt
557
558     def got_have(self, index):
559 #        print >>sys.stderr,"Downloader: got_have",index
560         if DEBUG:
561             print >>sys.stderr,"Downloader: got_have",index
562         if index == self.downloader.numpieces-1:
563             self.downloader.totalmeasure.update_rate(self.downloader.storage.total_length-(self.downloader.numpieces-1)*self.downloader.storage.piece_length)
564             self.peermeasure.update_rate(self.downloader.storage.total_length-(self.downloader.numpieces-1)*self.downloader.storage.piece_length)
565         else:
566             self.downloader.totalmeasure.update_rate(self.downloader.storage.piece_length)
567             self.peermeasure.update_rate(self.downloader.storage.piece_length)
568
569         # Arno: LIVEWRAP
570         if not self.downloader.picker.is_valid_piece(index):
571             if DEBUG:
572                 print >>sys.stderr,"Downloader: got_have",index,"is invalid piece"
573             return # TODO: should we request_more()? 
574         
575         if self.have[index]:
576             return
577         
578         self.have[index] = True
579         self.downloader.picker.got_have(index,self.connection)
580         # ProxyService_
581         #
582         # Aggregate the haves bitfields and send them to the coordinator
583         # If I am a coordinator, i will exit shortly
584         self.downloader.aggregate_and_send_haves()
585         #
586         # _ProxyService
587         
588         if self.have.complete():
589             self.downloader.picker.became_seed()
590             if self.downloader.picker.am_I_complete():
591                 self.downloader.add_disconnected_seed(self.connection.get_readable_id())
592                 self.connection.close()
593                 return
594         if self.downloader.endgamemode:
595             self.fix_download_endgame()
596         elif ( not self.downloader.paused
597                and not self.downloader.picker.is_blocked(index)
598                and self.downloader.storage.do_I_have_requests(index) ):
599             if not self.choked:
600                 self._request_more()
601             else:
602                 self.send_interested()
603
604     def _check_interests(self):
605         if self.interested or self.downloader.paused:
606             return
607         for i in xrange(len(self.have)):
608             if ( self.have[i] and not self.downloader.picker.is_blocked(i)
609                  and ( self.downloader.endgamemode
610                        or self.downloader.storage.do_I_have_requests(i) ) ):
611                 self.send_interested()
612                 return
613
614     def got_have_bitfield(self, have):
615         if self.downloader.picker.am_I_complete() and have.complete():
616             # Arno: If we're both seeds
617             if self.downloader.super_seeding:
618                 self.connection.send_bitfield(have.tostring()) # be nice, show you're a seed too
619             self.connection.close()
620             self.downloader.add_disconnected_seed(self.connection.get_readable_id())
621             return
622
623         if DEBUGBF:
624             st = time.time()
625
626         if have.complete():
627             # Arno: He is seed
628             self.downloader.picker.got_seed()
629         else:
630             # Arno: pass on HAVE knowledge to PiecePicker and if LIVEWRAP: 
631             # filter out valid pieces
632             
633             # STBSPEED: if we haven't hooked in yet, don't iterate over whole range
634             # just over the active ranges in the received Bitfield
635             activerangeiterators = []
636             if self.downloader.picker.videostatus and self.downloader.picker.videostatus.live_streaming and self.downloader.picker.videostatus.get_live_startpos() is None:
637                 # Not hooked in
638                 activeranges = have.get_active_ranges()
639                 
640                 if len(activeranges) == 0:
641                     # Bug, fallback to whole range
642                     activerangeiterators = [self.downloader.picker.get_valid_range_iterator()]
643                 else:
644                     # Create iterators for the active ranges
645                     for (s,e) in activeranges:
646                         activerangeiterators.append(xrange(s,e+1))
647             else:
648                 # Hooked in, use own valid range as active range
649
650                 # Arno, 2010-04-20: Not correct for VOD with seeking, then we
651                 # should store the HAVE info for things before playback too.
652                 
653                 activerangeiterators = [self.downloader.picker.get_valid_range_iterator()]
654
655             if DEBUGBF:
656                 print >>sys.stderr,"Downloader: got_have_field: live: Filtering bitfield",activerangeiterators 
657
658             if not self.downloader.picker.videostatus or self.downloader.picker.videostatus.live_streaming:
659                 if DEBUGBF:
660                     print >>sys.stderr,"Downloader: got_have_field: live or normal filter"
661                 # Transfer HAVE knowledge to PiecePicker and filter pieces if live
662                 validhave = Bitfield(self.downloader.numpieces)
663                 for iterator in activerangeiterators:
664                     for i in iterator:
665                         if have[i]:
666                             validhave[i] = True
667                             self.downloader.picker.got_have(i,self.connection)
668             else: # VOD
669                 if DEBUGBF:
670                     print >>sys.stderr,"Downloader: got_have_field: VOD filter" 
671                 validhave = Bitfield(self.downloader.numpieces)
672                 (first,last) = self.downloader.picker.videostatus.download_range()
673                 for i in xrange(first,last):
674                     if have[i]:
675                         validhave[i] = True
676                         self.downloader.picker.got_have(i,self.connection)
677             # ProxyService_
678             #
679             # Aggregate the haves bitfields and send them to the coordinator
680             # ARNOPS: Shouldn't this be done after have = validhave?
681             self.downloader.aggregate_and_send_haves()
682             #
683             # _ProxyService
684
685             """
686             # SANITY CHECK
687             checkhave = Bitfield(self.downloader.numpieces)
688             for i in self.downloader.picker.get_valid_range_iterator():
689                 if have[i]:
690                     checkhave[i] = True
691
692             assert validhave.tostring() == checkhave.tostring()
693             """
694                     
695             # Store filtered bitfield instead of received one
696             have = validhave
697
698         if DEBUGBF:
699             et = time.time()
700             diff = et - st
701             print >>sys.stderr,"Download: got_have_field: took",diff
702
703                 
704         self.have = have
705         
706         #print >>sys.stderr,"Downloader: got_have_bitfield: valid",`have.toboollist()`
707                     
708         if self.downloader.endgamemode and not self.downloader.paused:
709             for piece, begin, length in self.downloader.all_requests:
710                 if self.have[piece]:
711                     self.send_interested()
712                     break
713             return
714         self._check_interests()
715
716     def get_rate(self):
717         return self.measure.get_rate()
718
719     def get_short_term_rate(self):
720         return self.short_term_measure.get_rate()
721
722     def is_snubbed(self):
723 # 2fastbt_
724         if not self.choked and clock() - self.last2 > self.downloader.snub_time and \
725             not self.connection.connection.is_helper_con() and \
726             not self.connection.connection.is_coordinator_con():
727 # _2fastbt
728             for index, begin, length in self.active_requests:
729                 self.connection.send_cancel(index, begin, length)
730             self.got_choke()    # treat it just like a choke
731         return clock() - self.last > self.downloader.snub_time
732
733     def peer_is_complete(self):
734         return self.have.complete()
735
736 class Downloader:
737     def __init__(self, infohash, storage, picker, backlog, max_rate_period,
738                  numpieces, chunksize, measurefunc, snub_time,
739                  kickbans_ok, kickfunc, banfunc, scheduler = None):
740         self.infohash = infohash
741         self.b64_infohash = b64encode(infohash)
742         self.storage = storage
743         self.picker = picker
744         self.backlog = backlog
745         self.max_rate_period = max_rate_period
746         self.measurefunc = measurefunc
747         self.totalmeasure = Measure(max_rate_period*storage.piece_length/storage.request_size)
748         self.numpieces = numpieces
749         self.chunksize = chunksize
750         self.snub_time = snub_time
751         self.kickfunc = kickfunc
752         self.banfunc = banfunc
753         self.disconnectedseeds = {}
754         self.downloads = []
755         self.perip = {}
756         self.gotbaddata = {}
757         self.kicked = {}
758         self.banned = {}
759         self.kickbans_ok = kickbans_ok
760         self.kickbans_halted = False
761         self.super_seeding = False
762         self.endgamemode = False
763         self.endgame_queued_pieces = []
764         self.all_requests = []
765         self.discarded = 0L
766         self.download_rate = 0
767 #        self.download_rate = 25000  # 25K/s test rate
768         self.bytes_requested = 0
769         self.last_time = clock()
770         self.queued_out = {}
771         self.requeueing = False
772         self.paused = False
773         self.scheduler = scheduler
774
775         # hack: we should not import this since it is not part of the
776         # core nor should we import here, but otherwise we will get
777         # import errors
778         #
779         # _event_reporter stores events that are logged somewhere...
780         # from BaseLib.Core.Statistics.StatusReporter import get_reporter_instance
781         # self._event_reporter = get_reporter_instance()
782         self._event_reporter = get_status_holder("LivingLab")
783
784         # check periodicaly
785         self.scheduler(self.dlr_periodic_check, 1)
786
787     def dlr_periodic_check(self):
788         self.picker.check_outstanding_requests(self.downloads)
789
790         ds = [d for d in self.downloads if not d.choked]
791         shuffle(ds)
792         for d in ds:
793             d._request_more()
794
795         self.scheduler(self.dlr_periodic_check, 1)
796
797     def set_download_rate(self, rate):
798         self.download_rate = rate * 1000
799         self.bytes_requested = 0
800         
801     def queue_limit(self):
802         if not self.download_rate:
803             return 10e10    # that's a big queue!
804         t = clock()
805         self.bytes_requested -= (t - self.last_time) * self.download_rate
806         self.last_time = t
807         if not self.requeueing and self.queued_out and self.bytes_requested < 0:
808             self.requeueing = True
809             q = self.queued_out.keys()
810             shuffle(q)
811             self.queued_out = {}
812             for d in q:
813                 d._request_more()
814             self.requeueing = False
815         if -self.bytes_requested > 5*self.download_rate:
816             self.bytes_requested = -5*self.download_rate
817         ql = max(int(-self.bytes_requested/self.chunksize), 0)
818         # if DEBUG:
819         #     print >> sys.stderr, 'Downloader: download_rate: %s, bytes_requested: %s, chunk: %s -> queue limit: %d' % \
820         #         (self.download_rate, self.bytes_requested, self.chunksize, ql)
821         return ql
822
823     def chunk_requested(self, size):
824         self.bytes_requested += size
825
826     external_data_received = chunk_requested
827
828     def make_download(self, connection):
829         ip = connection.get_ip()
830         if self.perip.has_key(ip):
831             perip = self.perip[ip]
832         else:
833             perip = self.perip.setdefault(ip, PerIPStats(ip))
834         perip.peerid = connection.get_readable_id()
835         perip.numconnections += 1
836         d = SingleDownload(self, connection)
837         perip.lastdownload = d
838         self.downloads.append(d)
839         self._event_reporter.create_and_add_event("connection-established", [self.b64_infohash, str(ip)])
840         return d
841
842     def piece_flunked(self, index):
843         if self.paused:
844             return
845         if self.endgamemode:
846             if self.downloads:
847                 while self.storage.do_I_have_requests(index):
848                     nb, nl = self.storage.new_request(index)
849                     self.all_requests.append((index, nb, nl))
850                 for d in self.downloads:
851                     d.fix_download_endgame()
852                 return
853             self._reset_endgame()
854             return
855         ds = [d for d in self.downloads if not d.choked]
856         shuffle(ds)
857         for d in ds:
858             d._request_more()
859         ds = [d for d in self.downloads if not d.interested and d.have[index]]
860         for d in ds:
861             d.example_interest = index
862             d.send_interested()
863
864     def has_downloaders(self):
865         return len(self.downloads)
866
867     def lost_peer(self, download):
868         ip = download.ip
869         self.perip[ip].numconnections -= 1
870         if self.perip[ip].lastdownload == download:
871             self.perip[ip].lastdownload = None
872         self.downloads.remove(download)
873         if self.endgamemode and not self.downloads: # all peers gone
874             self._reset_endgame()
875
876         self._event_reporter.create_and_add_event("connection-upload", [self.b64_infohash, ip, download.connection.total_uploaded])
877         self._event_reporter.create_and_add_event("connection-download", [self.b64_infohash, ip, download.connection.total_downloaded])
878         self._event_reporter.create_and_add_event("connection-lost", [self.b64_infohash, ip])
879         
880     def _reset_endgame(self):            
881         if DEBUG: print >>sys.stderr, "Downloader: _reset_endgame"
882         self.storage.reset_endgame(self.all_requests)
883         self.endgamemode = False
884         self.all_requests = []
885         self.endgame_queued_pieces = []
886
887     def add_disconnected_seed(self, id):
888 #        if not self.disconnectedseeds.has_key(id):
889 #            self.picker.seed_seen_recently()
890         self.disconnectedseeds[id]=clock()
891
892 #   def expire_disconnected_seeds(self):
893
894     def num_disconnected_seeds(self):
895         # first expire old ones
896         expired = []
897         for id, t in self.disconnectedseeds.items():
898             if clock() - t > EXPIRE_TIME:     #Expire old seeds after so long
899                 expired.append(id)
900         for id in expired:
901 #            self.picker.seed_disappeared()
902             del self.disconnectedseeds[id]
903         return len(self.disconnectedseeds)
904         # if this isn't called by a stats-gathering function
905         # it should be scheduled to run every minute or two.
906
907     def _check_kicks_ok(self):
908         if len(self.gotbaddata) > 10:
909             self.kickbans_ok = False
910             self.kickbans_halted = True
911         return self.kickbans_ok and len(self.downloads) > 2
912
913     def try_kick(self, download):
914         if self._check_kicks_ok():
915             download.guard.download = None
916             ip = download.ip
917             id = download.connection.get_readable_id()
918             self.kicked[ip] = id
919             self.perip[ip].peerid = id
920             self.kickfunc(download.connection)
921         
922     def try_ban(self, ip):
923         if self._check_kicks_ok():
924             self.banfunc(ip)
925             self.banned[ip] = self.perip[ip].peerid
926             if self.kicked.has_key(ip):
927                 del self.kicked[ip]
928
929     def set_super_seed(self):
930         self.super_seeding = True
931
932     def check_complete(self, index):
933         if self.endgamemode and not self.all_requests:
934             self.endgamemode = False
935         if self.endgame_queued_pieces and not self.endgamemode:
936             self.requeue_piece_download()
937         if self.picker.am_I_complete():
938             assert not self.all_requests
939             assert not self.endgamemode
940
941             for download in self.downloads:
942                 if download.have.complete():
943                     download.connection.send_have(index)   # be nice, tell the other seed you completed
944                     self.add_disconnected_seed(download.connection.get_readable_id())
945                     download.connection.close()
946
947                     self._event_reporter.create_and_add_event("connection-seed", [self.b64_infohash, download.ip, download.connection.total_uploaded])
948                 else:
949                     self._event_reporter.create_and_add_event("connection-upload", [self.b64_infohash, download.ip, download.connection.total_uploaded])
950                     self._event_reporter.create_and_add_event("connection-download", [self.b64_infohash, download.ip, download.connection.total_downloaded])
951
952             self._event_reporter.create_and_add_event("complete", [self.b64_infohash])
953             # self._event_reporter.flush()
954                     
955             return True
956         return False
957
958     def too_many_partials(self):
959         return len(self.storage.dirty) > (len(self.downloads)/2)
960
961     def cancel_requests(self, requests, allowrerequest=True):
962
963         # todo: remove duplicates
964         slowpieces = [piece_id for piece_id, _, _ in requests]
965
966         if self.endgamemode:
967             if self.endgame_queued_pieces:
968                 for piece_id, _, _ in requests:
969                     if not self.storage.do_I_have(piece_id):
970                         try:
971                             self.endgame_queued_pieces.remove(piece_id)
972                         except:
973                             pass
974
975             # remove the items in requests from self.all_requests
976             if not allowrerequest:
977                 self.all_requests = [request for request in self.all_requests if not request in requests]
978                 if DEBUG: print >>sys.stderr, "Downloader: cancel_requests: all_requests", len(self.all_requests), "remaining"
979
980         for download in self.downloads:
981             hit = False
982             for request in download.active_requests:
983                 if request in requests:
984                     hit = True
985                     if DEBUG: print >>sys.stderr, "Downloader:cancel_requests: canceling", request, "on", download.ip
986                     download.connection.send_cancel(*request)
987                     if not self.endgamemode:
988                         self.storage.request_lost(*request)
989             if hit:
990                 download.active_requests = [request for request in download.active_requests if not request in requests]
991                 # Arno: VOD: all these peers were slow for their individually 
992                 # assigned pieces. These pieces have high priority, so don't
993                 # retrieve any of theses pieces from these slow peers, just
994                 # give them something further in the future.
995                 if allowrerequest:
996                     download._request_more()
997                 else:
998                     # Arno: ALT is to just kick peer. Good option if we have lots (See Encryper.to_connect() queue
999                     #print >>sys.stderr,"Downloader: Kicking slow peer",d.ip
1000                     #d.connection.close() # bye bye, zwaai zwaai
1001                     download._request_more(slowpieces=slowpieces)
1002
1003             if not self.endgamemode and download.choked:
1004                 download._check_interests()
1005
1006     def cancel_piece_download(self, pieces, allowrerequest=True):
1007         if self.endgamemode:
1008             if self.endgame_queued_pieces:
1009                 for piece in pieces:
1010                     try:
1011                         self.endgame_queued_pieces.remove(piece)
1012                     except:
1013                         pass
1014
1015             if allowrerequest:
1016                 for index, nb, nl in self.all_requests:
1017                     if index in pieces:
1018                         self.storage.request_lost(index, nb, nl)
1019
1020             else:
1021                 new_all_requests = []
1022                 for index, nb, nl in self.all_requests:
1023                     if index in pieces:
1024                         self.storage.request_lost(index, nb, nl)
1025                     else:
1026                         new_all_requests.append((index, nb, nl))
1027                 self.all_requests = new_all_requests
1028                 if DEBUG: print >>sys.stderr, "Downloader: cancel_piece_download: all_requests", len(self.all_requests), "remaining"
1029
1030         for d in self.downloads:
1031             hit = False
1032             for index, nb, nl in d.active_requests:
1033                 if index in pieces:
1034                     hit = True
1035                     d.connection.send_cancel(index, nb, nl)
1036                     if not self.endgamemode:
1037                         self.storage.request_lost(index, nb, nl)
1038             if hit:
1039                 d.active_requests = [ r for r in d.active_requests
1040                                       if r[0] not in pieces ]
1041                 # Arno: VOD: all these peers were slow for their individually 
1042                 # assigned pieces. These pieces have high priority, so don't
1043                 # retrieve any of theses pieces from these slow peers, just
1044                 # give them something further in the future.
1045                 if not allowrerequest:
1046                     # Arno: ALT is to just kick peer. Good option if we have lots (See Encryper.to_connect() queue
1047                     #print >>sys.stderr,"Downloader: Kicking slow peer",d.ip
1048                     #d.connection.close() # bye bye, zwaai zwaai
1049                     d._request_more(slowpieces=pieces)
1050                 else:
1051                     d._request_more()
1052             if not self.endgamemode and d.choked:
1053                 d._check_interests()
1054
1055     def requeue_piece_download(self, pieces = []):
1056         if self.endgame_queued_pieces:
1057             for piece in pieces:
1058                 if not piece in self.endgame_queued_pieces:
1059                     self.endgame_queued_pieces.append(piece)
1060             pieces = self.endgame_queued_pieces
1061         if self.endgamemode:
1062             if self.all_requests:
1063                 self.endgame_queued_pieces = pieces
1064                 return
1065             self.endgamemode = False
1066             self.endgame_queued_pieces = None
1067            
1068         ds = [d for d in self.downloads]
1069         shuffle(ds)
1070         for d in ds:
1071             if d.choked:
1072                 d._check_interests()
1073             else:
1074                 d._request_more()
1075
1076     def start_endgame(self):
1077         assert not self.endgamemode
1078         self.endgamemode = True
1079         assert not self.all_requests
1080         for d in self.downloads:
1081             if d.active_requests:
1082                 assert d.interested and not d.choked
1083             for request in d.active_requests:
1084                 assert not request in self.all_requests
1085                 self.all_requests.append(request)
1086         for d in self.downloads:
1087             d.fix_download_endgame()
1088         if DEBUG: print >>sys.stderr, "Downloader: start_endgame: we have", len(self.all_requests), "requests remaining"
1089
1090     def pause(self, flag):
1091         self.paused = flag
1092         if flag:
1093             for d in self.downloads:
1094                 for index, begin, length in d.active_requests:
1095                     d.connection.send_cancel(index, begin, length)
1096                 d._letgo()
1097                 d.send_not_interested()
1098             if self.endgamemode:
1099                 self._reset_endgame()
1100         else:
1101             shuffle(self.downloads)
1102             for d in self.downloads:
1103                 d._check_interests()
1104                 if d.interested and not d.choked:
1105                     d._request_more()
1106
1107     def live_invalidate(self,piece,mevirgin=False): # Arno: LIVEWRAP
1108         #print >>sys.stderr,"Downloader: live_invalidate",piece
1109         for d in self.downloads:
1110             d.have[piece] = False
1111         # STBSPEED: If I have no pieces yet, no need to loop to invalidate them.
1112         if not mevirgin:
1113             self.storage.live_invalidate(piece)
1114         
1115     def live_invalidate_ranges(self,toinvalidateranges,toinvalidateset):
1116         """ STBPEED: Faster version of live_invalidate that copies have arrays
1117         rather than iterate over them for clearing
1118         """
1119         if len(toinvalidateranges) == 1:
1120             (s,e) = toinvalidateranges[0]
1121             emptyrange = [False for piece in xrange(s,e+1)]
1122             assert len(emptyrange) == e+1-s
1123             
1124             for d in self.downloads:
1125                 newhave = d.have[0:s] + emptyrange + d.have[e+1:]
1126
1127                 #oldhave = d.have
1128                 d.have = Bitfield(length=len(newhave),fromarray=newhave)
1129                 #assert oldhave.tostring() == d.have.tostring()
1130                 """
1131                 for piece in toinvalidateset:
1132                     d.have[piece] = False
1133                 print >>sys.stderr,"d len",len(d.have)
1134                 print >>sys.stderr,"new len",len(newhave)
1135                     
1136                 for i in xrange(0,len(newhave)):
1137                     if d.have[i] != newhave[i]:
1138                         print >>sys.stderr,"newhave diff",i
1139                         assert False
1140                 """
1141                 
1142         else:
1143             (s1,e1) = toinvalidateranges[0]
1144             (s2,e2) = toinvalidateranges[1]
1145             emptyrange1 = [False for piece in xrange(s1,e1+1)]
1146             emptyrange2 = [False for piece in xrange(s2,e2+1)]
1147             
1148             assert len(emptyrange1) == e1+1-s1
1149             assert len(emptyrange2) == e2+1-s2
1150             
1151             for d in self.downloads:
1152                 newhave = emptyrange1 + d.have[e1+1:s2] + emptyrange2
1153                 
1154                 #oldhave = d.have
1155                 d.have = Bitfield(length=len(newhave),fromarray=newhave)
1156                 #assert oldhave.tostring() == d.have.tostring()
1157                 """
1158                 for piece in toinvalidateset:
1159                     d.have[piece] = False
1160                 print >>sys.stderr,"d len",len(d.have)
1161                 print >>sys.stderr,"new len",len(newhave)
1162                 for i in xrange(0,len(newhave)):
1163                     if d.have[i] != newhave[i]:
1164                         print >>sys.stderr,"newhave diff",i
1165                         assert False
1166                 """
1167                 
1168     # ProxyService_
1169     #
1170     def aggregate_and_send_haves(self):
1171         """ Aggregates the information from the haves bitfields for all the active connections,
1172         then calls the helper class to send the aggregated information as a PROXY_HAVE message 
1173         """
1174         if self.picker.helper:
1175             # The current node is a coordinator
1176             if DEBUG:
1177                 print >> sys.stderr,"Downloader: aggregate_and_send_haves: helper None or helper conn"
1178             
1179             # haves_vector is a matrix, having on each line a Bitfield
1180             haves_vector = [None] * len(self.downloads)
1181             for i in range(0, len(self.downloads)):
1182                 haves_vector[i] = self.downloads[i].have
1183             
1184             #Calculate the aggregated haves
1185             aggregated_haves = Bitfield(self.numpieces)
1186             for piece in range (0, self.numpieces):
1187                 aggregated_value = False
1188                 # For every column in the haves_vector matrix
1189                 for d in range(0, len(self.downloads)):
1190                     # For every active connection
1191                     aggregated_value = aggregated_value or haves_vector[d][piece] # Logical OR operation 
1192                 aggregated_haves[piece] = aggregated_value
1193             
1194             self.picker.helper.send_proxy_have(aggregated_haves)
1195     #
1196     # _ProxyService