instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / BitTornado / BT1 / GetRightHTTPDownloader.py
1 # Written by John Hoffman
2 # see LICENSE.txt for license information
3
4 # Patched by Diego Andres Rabaioli.
5 # This is the HTTPDownloader class that implements the GetRight
6 # style WebSeeding technique. Compared to the John Hoffman's style it
7 # doesn't require any web server support.However the biggest gap (see
8 # http://www.bittorrent.org/beps/bep_0019.html) is not taken into
9 # account when requesting pieces.
10
11 import sys
12 from random import randint
13 from urlparse import urlparse
14 from httplib import HTTPConnection
15 import urllib
16 from threading import Thread,currentThread,Lock
17 from traceback import print_exc, print_stack 
18
19 from BaseLib.Core.BitTornado.__init__ import product_name,version_short
20 from BaseLib.Core.BitTornado.CurrentRateMeasure import Measure
21 from BaseLib.Core.Utilities.timeouturlopen import find_proxy
22
23 # ProxyService_
24 #
25 try:
26     from BaseLib.Core.ProxyService.Helper import SingleDownloadHelperInterface
27 except ImportError:
28     class SingleDownloadHelperInterface:
29         
30         def __init__(self):
31             pass
32 #
33 # _ProxyService
34
35 DEBUG = False
36
37 EXPIRE_TIME = 60 * 60
38
39 VERSION = product_name+'/'+version_short
40
41 class haveComplete:
42     def complete(self):
43         return True
44     def __getitem__(self, x):
45         return True
46 haveall = haveComplete()
47
48 class SingleDownload(SingleDownloadHelperInterface):
49
50     def __init__(self, downloader, url):
51         SingleDownloadHelperInterface.__init__(self)
52         self.downloader = downloader
53         self.baseurl = url
54         
55         try:
56             (self.scheme, self.netloc, path, pars, query, fragment) = urlparse(url)
57         except:
58             self.downloader.errorfunc('cannot parse http seed address: '+url)
59             return
60         if self.scheme != 'http':
61             self.downloader.errorfunc('http seed url not http: '+url)
62             return
63
64         # Arno, 2010-03-08: Make proxy aware
65         self.proxyhost = find_proxy(url)
66         try:
67             if self.proxyhost is None:
68                 self.connection = HTTPConnection(self.netloc)
69             else:
70                 self.connection = HTTPConnection(self.proxyhost)
71         except:
72             self.downloader.errorfunc('cannot connect to http seed: '+url)
73             return
74         
75         self.seedurl = path
76         self.measure = Measure(downloader.max_rate_period)
77         self.index = None
78         self.piece_size = self.downloader.storage._piecelen( 0 )
79         self.total_len = self.downloader.storage.total_length
80         self.url = ''
81         self.requests = []
82         self.request_size = 0
83         self.endflag = False
84         self.error = None
85         self.retry_period = 30
86         self._retry_period = None
87         self.errorcount = 0
88         self.goodseed = False
89         self.active = False
90         self.cancelled = False
91         # HTTP Video Support
92         self.request_lock = Lock()
93         self.video_support_policy     = True  # TODO : get from constructor parameters
94         self.video_support_enabled    = False # Don't start immediately with support
95         self.video_support_speed      = 0.0   # Start with the faster rescheduling speed
96         self.video_support_slow_start = False # If enabled delay the first request (give chance to peers to give bandwidth)
97         # Arno, 2010-04-07: Wait 1 second before using HTTP seed. TODO good policy
98         # If Video Support policy is not eneabled then use Http seed normaly
99         if not self.video_support_policy:
100             self.resched(1)
101
102
103     def resched(self, len = None):
104         if self.video_support_policy:
105             if ( not self.video_support_enabled ) or self.video_support_slow_start:
106                 return
107         if len is None:
108             len = self.retry_period
109         if self.errorcount > 3:
110             len = min(1.0,len) * (self.errorcount - 2)
111
112         # Arno, 2010-04-07: If immediately, don't go via queue. Actual work is
113         # done by other thread, so no worries of hogging NetworkThread. 
114         if len > 0: 
115             self.downloader.rawserver.add_task(self.download, len)
116         else:
117             self.download() 
118
119     def _want(self, index):
120         if self.endflag:
121             return self.downloader.storage.do_I_have_requests(index)
122         else:
123             return self.downloader.storage.is_unstarted(index)
124
125     def download(self):
126         from BaseLib.Core.Session import Session
127         session = Session.get_instance()
128         session.uch.perform_usercallback(self._download)
129
130     def _download(self):
131 # 2fastbt_
132         self.request_lock.acquire()
133         if DEBUG:
134             print "http-sdownload: download()"
135         if self.is_frozen_by_helper():
136             if DEBUG:
137                 print "http-sdownload: blocked, rescheduling"
138             self.resched(1)
139             return
140 # _2fastbt    
141         self.cancelled = False
142         if self.downloader.picker.am_I_complete():
143             self.downloader.downloads.remove(self)
144             return
145         self.index = self.downloader.picker.next(haveall, self._want, self)
146 # 2fastbt_
147         if self.index is None and self.frozen_by_helper:
148             self.resched(0.01)
149             return
150 # _2fastbt
151         if ( self.index is None and not self.endflag
152                      and not self.downloader.peerdownloader.has_downloaders() ):
153             self.endflag = True
154             self.index = self.downloader.picker.next(haveall, self._want, self)
155         if self.index is None:
156             self.endflag = True
157             self.resched()
158         else:
159             self.url = self.seedurl
160             start = self.piece_size * self.index
161             end   = start + self.downloader.storage._piecelen( self.index ) - 1
162             self.request_range = '%d-%d' % ( start, end )
163             self._get_requests()
164             # Just overwrite other blocks and don't ask for ranges.
165             self._request()
166             # Diego : 2010-05-19 : Moving thread creation on _download and not on
167             # _request anymore. One Lock handles sync problems between threads performing
168             # new requests before the previous response is read.
169             """
170             # Arno, 2010-04-07: Use threads from pool to Download, more efficient
171             # than creating a new one for every piece.
172             from BaseLib.Core.Session import Session
173             session = Session.get_instance()
174             session.uch.perform_usercallback(self._request)
175             # Diego
176             rq = Thread(target = self._request)
177             rq.setName( "GetRightHTTPDownloader"+rq.getName() )
178             rq.setDaemon(True)
179             rq.start()
180             """
181             self.active = True
182
183     def _request(self):
184         import encodings.ascii
185         import encodings.punycode
186         import encodings.idna
187         
188         self.error = None
189         self.received_data = None
190         try:
191             #print >>sys.stderr, 'HTTP piece ', self.index
192             if self.proxyhost is None:
193                 realurl = self.url
194             else: 
195                 realurl = self.scheme+'://'+self.netloc+self.url
196
197             self.connection.request( 'GET', realurl, None,
198                                 {'Host': self.netloc, 'User-Agent': VERSION, 'Range' : 'bytes=%s' % self.request_range } )
199
200             r = self.connection.getresponse()
201             self.connection_status = r.status
202             self.received_data = r.read()
203             
204         except Exception, e:
205             print_exc()
206             
207             self.error = 'error accessing http seed: '+str(e)
208             try:
209                 self.connection.close()
210             except:
211                 pass
212             try:
213                 self.connection = HTTPConnection(self.netloc)
214             except:
215                 self.connection = None  # will cause an exception and retry next cycle
216         self.downloader.rawserver.add_task(self.request_finished)
217
218     def request_finished(self):
219         self.active = False
220         if self.error is not None:
221             if self.goodseed:
222                 self.downloader.errorfunc(self.error)
223             self.errorcount += 1
224         if self.received_data:
225             self.errorcount = 0
226             if not self._got_data():
227                 self.received_data = None
228         if not self.received_data:
229             self._release_requests()
230             self.downloader.peerdownloader.piece_flunked(self.index)
231         self.request_lock.release()
232         if self._retry_period is not None:
233             self.resched(self._retry_period)
234             self._retry_period = None
235             return
236         self.resched()
237
238     def _got_data(self):
239         if self.connection_status == 503:   # seed is busy
240             try:
241                 self.retry_period = max(int(self.received_data), 5)
242             except:
243                 pass
244             return False
245         
246         if self.connection_status != 200 and self.connection_status != 206: # 206 = partial download OK
247             self.errorcount += 1
248             return False
249         # Arno,  2010-04-07: retry_period set to 0 for faster DL speeds
250         # Diego, 2010-04-16: retry_period set depending on the level of support asked by the MovieOnDemandTransporter
251         self._retry_period = self.video_support_speed
252
253         if len(self.received_data) != self.request_size:
254             if self.goodseed:
255                 self.downloader.errorfunc('corrupt data from http seed - redownloading')
256             return False
257         self.measure.update_rate(len(self.received_data))
258         self.downloader.measurefunc(len(self.received_data))
259         if self.cancelled:
260             return False
261         if not self._fulfill_requests():
262             return False
263         if not self.goodseed:
264             self.goodseed = True
265             self.downloader.seedsfound += 1
266         if self.downloader.storage.do_I_have(self.index):
267             self.downloader.picker.complete(self.index)
268             self.downloader.peerdownloader.check_complete(self.index)
269             self.downloader.gotpiecefunc(self.index)
270         return True
271     
272     def _get_requests(self):
273         self.requests = []
274         self.request_size = 0L
275         while self.downloader.storage.do_I_have_requests(self.index):
276             r = self.downloader.storage.new_request(self.index)
277             self.requests.append(r)
278             self.request_size += r[1]
279         self.requests.sort()
280
281     def _fulfill_requests(self):
282         start = 0L
283         success = True
284         while self.requests:
285             begin, length = self.requests.pop(0)
286 # 2fastbt_
287             if not self.downloader.storage.piece_came_in(self.index, begin, [],
288                             self.received_data[start:start+length], length):
289 # _2fastbt
290                 success = False
291                 break
292             start += length
293         return success
294
295     def _release_requests(self):
296         for begin, length in self.requests:
297             self.downloader.storage.request_lost(self.index, begin, length)
298         self.requests = []
299
300     def _request_ranges(self):
301         s = ''
302         begin, length = self.requests[0]
303         for begin1, length1 in self.requests[1:]:
304             if begin + length == begin1:
305                 length += length1
306                 continue
307             else:
308                 if s:
309                     s += ','
310                 s += str(begin)+'-'+str(begin+length-1)
311                 begin, length = begin1, length1
312         if s:
313             s += ','
314         s += str(begin)+'-'+str(begin+length-1)
315         return s
316
317 # 2fastbt_
318     def helper_forces_unchoke(self):
319         pass
320
321     def helper_set_freezing(self,val):
322         self.frozen_by_helper = val
323 # _2fastbt
324
325     def slow_start_wake_up( self ):
326         self.video_support_slow_start = False
327         self.resched(0)
328
329     def is_slow_start( self ):
330         return self.video_support_slow_start
331
332     def start_video_support( self, level = 0.0, sleep_time = None ):
333         '''
334         Level indicates how fast a new request is scheduled and therefore the level of support required.
335         0 = maximum support. (immediate rescheduling)
336         1 ~= 0.01 seconds between each request
337         2 ~= 0.1 seconds between each request
338         and so on... at the moment just level 0 is asked. To be noted that level is a float!
339         '''
340         
341         if DEBUG:
342             print >>sys.stderr,"GetRightHTTPDownloader: START"
343         self.video_support_speed = 0.001 * ( ( 10 ** level ) - 1 )
344         if not self.video_support_enabled:
345             self.video_support_enabled = True
346             if sleep_time:
347                 if not self.video_support_slow_start:
348                     self.video_support_slow_start = True
349                     self.downloader.rawserver.add_task( self.slow_start_wake_up, sleep_time )
350             else:
351                 self.resched( self.video_support_speed )
352
353     def stop_video_support( self ):
354         if DEBUG:
355             print >>sys.stderr,"GetRightHTTPDownloader: STOP"
356         if not self.video_support_enabled:
357             return
358         self.video_support_enabled = False
359
360     def is_video_support_enabled( self ):
361         return self.video_support_enabled
362
363     
364 class GetRightHTTPDownloader:
365     def __init__(self, storage, picker, rawserver,
366                  finflag, errorfunc, peerdownloader,
367                  max_rate_period, infohash, measurefunc, gotpiecefunc):
368         self.storage = storage
369         self.picker = picker
370         self.rawserver = rawserver
371         self.finflag = finflag
372         self.errorfunc = errorfunc
373         self.peerdownloader = peerdownloader
374         self.infohash = infohash
375         self.max_rate_period = max_rate_period
376         self.gotpiecefunc = gotpiecefunc
377         self.measurefunc = measurefunc
378         self.downloads = []
379         self.seedsfound = 0
380         self.video_support_enabled = False
381
382     def make_download(self, url):
383         self.downloads.append(SingleDownload(self, url))
384         return self.downloads[-1]
385
386     def get_downloads(self):
387         if self.finflag.isSet():
388             return []
389         return self.downloads
390
391     def cancel_piece_download(self, pieces):
392         for d in self.downloads:
393             if d.active and d.index in pieces:
394                 d.cancelled = True
395
396     # Diego : wrap each single http download
397     def start_video_support( self, level = 0.0, sleep_time = None ):
398         for d in self.downloads:
399             d.start_video_support( level, sleep_time )
400         self.video_support_enabled = True
401
402     def stop_video_support( self ):
403         for d in self.downloads:
404             d.stop_video_support()
405         self.video_support_enabled = False
406
407     def is_video_support_enabled( self ):
408         return self.video_support_enabled
409
410     def is_slow_start( self ):
411         for d in self.downloads:
412             if d.is_slow_start():
413                 return True
414         return False
415