1 # Written by Arno Bakker
2 # see LICENSE.txt for license information
7 from traceback import print_exc,print_stack
8 from threading import RLock,Condition,Event,Thread,currentThread
10 from BaseLib.Core.DownloadState import DownloadState
11 from BaseLib.Core.DownloadConfig import DownloadStartupConfig
12 from BaseLib.Core.simpledefs import *
13 from BaseLib.Core.exceptions import *
14 from BaseLib.Core.osutils import *
15 from BaseLib.Core.APIImplementation.SingleDownload import SingleDownload
16 import BaseLib.Core.APIImplementation.maketorrent as maketorrent
22 def __init__(self,session,tdef):
24 # just enough so error saving and get_state() works
27 # To be able to return the progress of a stopped torrent, how far it got.
28 self.progressbeforestop = 0.0
29 self.filepieceranges = []
30 self.pstate_for_restart = None # h4x0r to remember resumedata
32 # Copy tdef, so we get an infohash
33 self.session = session
34 self.tdef = tdef.copy()
35 self.tdef.readonly = True
40 def setup(self,dcfg=None,pstate=None,initialdlstatus=None,lmcreatedcallback=None,lmvodeventcallback=None):
42 Create a Download object. Used internally by Session.
43 @param dcfg DownloadStartupConfig or None (in which case
44 a new DownloadConfig() is created and the result
45 becomes the runtime config of this Download.
47 # Called by any thread
49 self.dllock.acquire() # not really needed, no other threads know of this object
51 torrentdef = self.get_def()
52 metainfo = torrentdef.get_metainfo()
53 # H4xor this so the 'name' field is safe
54 self.correctedinfoname = fix_filebasename(torrentdef.get_name_as_unicode())
57 print >>sys.stderr,"Download: setup: piece size",metainfo['info']['piece length']
59 # See if internal tracker used
60 itrackerurl = self.session.get_internal_tracker_url()
61 #infohash = self.tdef.get_infohash()
62 metainfo = self.tdef.get_metainfo()
66 print >>sys.stderr,"Download: setup: internal tracker?",metainfo['announce'],itrackerurl,"#"
68 if itrackerurl.endswith('/'):
69 slashless = itrackerurl[:-1]
71 slashless = itrackerurl
72 if metainfo['announce'] == itrackerurl or metainfo['announce'] == slashless:
74 elif 'announce-list' in metainfo:
75 for tier in metainfo['announce-list']:
76 if itrackerurl in tier or slashless in tier:
82 print >>sys.stderr,"Download: setup: Using internal tracker"
83 # Copy .torrent to state_dir/itracker so the tracker thread
84 # finds it and accepts peer registrations for it.
86 self.session.add_to_internal_tracker(self.tdef)
88 print >>sys.stderr,"Download: setup: Not using internal tracker"
90 # Copy dlconfig, from default if not specified
92 cdcfg = DownloadStartupConfig()
95 self.dlconfig = copy.copy(cdcfg.dlconfig)
98 # Copy sessconfig into dlconfig, such that BitTornado.BT1.Connecter, etc.
99 # knows whether overlay is on, etc.
101 for (k,v) in self.session.get_current_startup_config_copy().sessconfig.iteritems():
102 self.dlconfig.setdefault(k,v)
103 self.set_filepieceranges(metainfo)
105 # Things that only exist at runtime
106 self.dlruntimeconfig= {}
107 self.dlruntimeconfig['max_desired_upload_rate'] = 0
108 self.dlruntimeconfig['max_desired_download_rate'] = 0
111 print >>sys.stderr,"DownloadImpl: setup: initialdlstatus",`self.tdef.get_name_as_unicode()`,initialdlstatus
113 # Closed swarms config
114 self.dlconfig['cs_keys'] = self.tdef.get_cs_keys_as_ders()
115 self.dlconfig['permid'] = self.session.get_permid()
116 if self.dlconfig['cs_keys']:
117 print >> sys.stderr,"DownloadImpl: setup: This is a closed swarm"
119 # self.dlconfig['poa'] = dcfg.get_poa()
121 # print >> sys.stderr,"POA not available - seeding?"
124 if pstate is not None and pstate.has_key('dlstate'):
125 self.progressbeforestop = pstate['dlstate'].get('progress', 0.0)
127 # Note: initialdlstatus now only works for STOPPED
128 if initialdlstatus != DLSTATUS_STOPPED:
129 if pstate is None or pstate['dlstate']['status'] != DLSTATUS_STOPPED:
130 # Also restart on STOPPED_ON_ERROR, may have been transient
131 self.create_engine_wrapper(lmcreatedcallback,pstate,lmvodeventcallback,initialdlstatus) # RePEX: propagate initialdlstatus
133 self.pstate_for_restart = pstate
135 self.dllock.release()
139 self.dllock.release()
141 def create_engine_wrapper(self,lmcreatedcallback,pstate,lmvodeventcallback,initialdlstatus=None):
142 """ Called by any thread, assume dllock already acquired """
144 print >>sys.stderr,"Download: create_engine_wrapper()"
147 infohash = self.get_def().get_infohash()
148 metainfo = copy.deepcopy(self.get_def().get_metainfo())
150 # H4xor this so the 'name' field is safe
151 metainfo['info']['name'] = self.correctedinfoname
152 if 'name.utf-8' in metainfo['info']:
153 metainfo['info']['name.utf-8'] = self.correctedinfoname
155 multihandler = self.session.lm.multihandler
156 listenport = self.session.get_listen_port()
157 vapath = self.session.get_video_analyser_path()
159 # Note: BT1Download is started with copy of d.dlconfig, not direct access
160 kvconfig = copy.copy(self.dlconfig)
162 # RePEX: extend kvconfig with initialdlstatus
163 kvconfig['initialdlstatus'] = initialdlstatus
165 # Define which file to DL in VOD mode
166 live = self.get_def().get_live()
176 # --- streaming settings
177 if self.dlconfig['mode'] == DLMODE_VOD or self.dlconfig['video_source']:
178 # video file present which is played or produced
180 if 'files' in metainfo['info']:
184 if multi and len(self.dlconfig['selected_files']) == 0:
185 # Multi-file torrent, but no file selected
186 raise VODNoFileSelectedInMultifileTorrentException()
189 # single-file torrent
190 file = self.get_def().get_name()
192 bitrate = self.get_def().get_bitrate()
195 file = self.dlconfig['selected_files'][0]
196 idx = self.get_def().get_index_of_file_in_files(file)
197 bitrate = self.get_def().get_bitrate(file)
199 # Determine MIME type
200 mimetype = self.get_mimetype(file)
201 # Arno: don't encode mimetype in lambda, allow for dynamic
202 # determination by videoanalyser
203 vod_usercallback_wrapper = lambda event,params:self.session.uch.perform_vod_usercallback(self,self.dlconfig['vod_usercallback'],event,params)
205 vodfileindex['index'] = idx
206 vodfileindex['inpath'] = file
207 vodfileindex['bitrate'] = bitrate
208 vodfileindex['mimetype'] = mimetype
209 vodfileindex['usercallback'] = vod_usercallback_wrapper
210 vodfileindex['userevents'] = self.dlconfig['vod_userevents'][:]
212 # live torrents must be streamed or produced, but not just downloaded
213 raise LiveTorrentRequiresUsercallbackException()
214 # Ric: added svc case TODO
215 elif self.dlconfig['mode'] == DLMODE_SVC:
216 # video file present which is played or produced
218 if 'files' in metainfo['info']:
222 if multi and len(self.dlconfig['selected_files']) == 0:
223 # Multi-file torrent, but no file selected
224 raise VODNoFileSelectedInMultifileTorrentException()
227 # Ric: the selected files are already ordered
228 files = self.dlconfig['selected_files']
232 idx.append( self.get_def().get_index_of_file_in_files(file) )
234 bitrate = self.get_def().get_bitrate(files[0])
236 # Determine MIME type
237 mimetype = self.get_mimetype(file)
238 # Arno: don't encode mimetype in lambda, allow for dynamic
239 # determination by videoanalyser
240 vod_usercallback_wrapper = lambda event,params:self.session.uch.perform_vod_usercallback(self,self.dlconfig['vod_usercallback'],event,params)
242 vodfileindex['index'] = idx
243 vodfileindex['inpath'] = files
244 vodfileindex['bitrate'] = bitrate
245 vodfileindex['mimetype'] = mimetype
246 vodfileindex['usercallback'] = vod_usercallback_wrapper
247 vodfileindex['userevents'] = self.dlconfig['vod_userevents'][:]
250 vodfileindex['mimetype'] = 'application/octet-stream'
253 print >>sys.stderr,"Download: create_engine_wrapper: vodfileindex",`vodfileindex`
255 # Delegate creation of engine wrapper to network thread
256 network_create_engine_wrapper_lambda = lambda:self.network_create_engine_wrapper(infohash,metainfo,kvconfig,multihandler,listenport,vapath,vodfileindex,lmcreatedcallback,pstate,lmvodeventcallback)
257 self.session.lm.rawserver.add_task(network_create_engine_wrapper_lambda,0)
260 def network_create_engine_wrapper(self,infohash,metainfo,kvconfig,multihandler,listenport,vapath,vodfileindex,lmcallback,pstate,lmvodeventcallback):
261 """ Called by network thread """
262 self.dllock.acquire()
264 self.sd = SingleDownload(infohash,metainfo,kvconfig,multihandler,self.session.lm.get_ext_ip,listenport,vapath,vodfileindex,self.set_error,pstate,lmvodeventcallback,self.session.lm.hashcheck_done)
267 if lmcallback is not None:
268 lmcallback(self,sd,exc,pstate)
270 self.dllock.release()
276 # No lock because attrib immutable and return value protected
280 # Retrieving DownloadState
282 def set_state_callback(self,usercallback,getpeerlist=False):
283 """ Called by any thread """
284 self.dllock.acquire()
286 network_get_state_lambda = lambda:self.network_get_state(usercallback,getpeerlist)
287 # First time on general rawserver
288 self.session.lm.rawserver.add_task(network_get_state_lambda,0.0)
290 self.dllock.release()
293 def network_get_state(self,usercallback,getpeerlist,sessioncalling=False):
294 """ Called by network thread """
295 self.dllock.acquire()
297 # RePEX: get last stored SwarmCache, if any:
299 if self.pstate_for_restart is not None and self.pstate_for_restart.has_key('dlstate'):
300 swarmcache = self.pstate_for_restart['dlstate'].get('swarmcache',None)
304 print >>sys.stderr,"DownloadImpl: network_get_state: Download not running"
305 ds = DownloadState(self,DLSTATUS_STOPPED,self.error,self.progressbeforestop,swarmcache=swarmcache)
307 # RePEX: try getting the swarmcache from SingleDownload or use our last known swarmcache:
308 swarmcache = self.sd.get_swarmcache() or swarmcache
310 (status,stats,logmsgs,coopdl_helpers,coopdl_coordinator) = self.sd.get_stats(getpeerlist)
311 ds = DownloadState(self,status,self.error,0.0,stats=stats,filepieceranges=self.filepieceranges,logmsgs=logmsgs,coopdl_helpers=coopdl_helpers,coopdl_coordinator=coopdl_coordinator,swarmcache=swarmcache)
312 self.progressbeforestop = ds.get_progress()
317 # Invoke the usercallback function via a new thread.
318 # After the callback is invoked, the return values will be passed to
319 # the returncallback for post-callback processing.
320 self.session.uch.perform_getstate_usercallback(usercallback,ds,self.sesscb_get_state_returncallback)
322 self.dllock.release()
325 def sesscb_get_state_returncallback(self,usercallback,when,newgetpeerlist):
326 """ Called by SessionCallbackThread """
327 self.dllock.acquire()
330 # Schedule next invocation, either on general or DL specific
331 # TODO: ensure this continues when dl is stopped. Should be OK.
332 network_get_state_lambda = lambda:self.network_get_state(usercallback,newgetpeerlist)
334 self.session.lm.rawserver.add_task(network_get_state_lambda,when)
336 self.sd.dlrawserver.add_task(network_get_state_lambda,when)
338 self.dllock.release()
341 # Download stop/resume
344 """ Called by any thread """
345 self.stop_remove(removestate=False,removecontent=False)
347 def stop_remove(self,removestate=False,removecontent=False):
348 """ Called by any thread """
350 print >>sys.stderr,"DownloadImpl: stop_remove:",`self.tdef.get_name_as_unicode()`,"state",removestate,"content",removecontent
351 self.dllock.acquire()
353 network_stop_lambda = lambda:self.network_stop(removestate,removecontent)
354 self.session.lm.rawserver.add_task(network_stop_lambda,0.0)
356 self.dllock.release()
358 def network_stop(self,removestate,removecontent):
359 """ Called by network thread """
361 print >>sys.stderr,"DownloadImpl: network_stop",`self.tdef.get_name_as_unicode()`
362 self.dllock.acquire()
364 infohash = self.tdef.get_infohash()
365 pstate = self.network_get_persistent_state()
366 if self.sd is not None:
367 pstate['engineresumedata'] = self.sd.shutdown()
369 self.pstate_for_restart = pstate
371 # This method is also called at Session shutdown, where one may
372 # choose to checkpoint its Download. If the Download was
373 # stopped before, pstate_for_restart contains its resumedata.
374 # and that should be written into the checkpoint.
376 if self.pstate_for_restart is not None:
378 print >>sys.stderr,"DownloadImpl: network_stop: Reusing previously saved engineresume data for checkpoint"
379 # Don't copy full pstate_for_restart, as the torrent
380 # may have gone from e.g. HASHCHECK at startup to STOPPED
381 # now, at shutdown. In other words, it was never active
382 # in this session and the pstate_for_restart still says
384 pstate['engineresumedata'] = self.pstate_for_restart['engineresumedata']
386 # Offload the removal of the content and other disk cleanup to another thread
388 contentdest = self.get_content_dest()
389 self.session.uch.perform_removestate_callback(infohash,contentdest,removecontent)
391 return (infohash,pstate)
393 self.dllock.release()
396 def restart(self, initialdlstatus=None):
397 """ Restart the Download. Technically this action does not need to be
398 delegated to the network thread, but does so removes some concurrency
399 problems. By scheduling both stops and restarts via the network task
400 queue we ensure that they are executed in the order they were called.
402 Note that when a Download is downloading or seeding, calling restart
403 is a no-op. If a Download is performing some other task, it is left
404 up to the internal running SingleDownload to determine what a restart
405 means. Often it means SingleDownload will abort its current task and
406 switch to downloading/seeding.
408 Called by any thread """
409 # RePEX: added initialdlstatus parameter
410 # RePEX: TODO: Should we mention the initialdlstatus behaviour in the docstring?
412 print >>sys.stderr,"DownloadImpl: restart:",`self.tdef.get_name_as_unicode()`
413 self.dllock.acquire()
415 network_restart_lambda = lambda:self.network_restart(initialdlstatus)
416 self.session.lm.rawserver.add_task(network_restart_lambda,0.0)
418 self.dllock.release()
420 def network_restart(self,initialdlstatus=None):
421 """ Called by network thread """
422 # Must schedule the hash check via lm. In some cases we have batch stops
423 # and restarts, e.g. we have stop all-but-one & restart-all for VOD)
425 # RePEX: added initialdlstatus parameter
427 print >>sys.stderr,"DownloadImpl: network_restart",`self.tdef.get_name_as_unicode()`
428 self.dllock.acquire()
431 self.error = None # assume fatal error is reproducible
432 # h4xor: restart using earlier loaded resumedata
433 # RePEX: propagate initialdlstatus
434 self.create_engine_wrapper(self.session.lm.network_engine_wrapper_created_callback,pstate=self.pstate_for_restart,lmvodeventcallback=self.session.lm.network_vod_event_callback,initialdlstatus=initialdlstatus)
437 print >>sys.stderr,"DownloadImpl: network_restart: SingleDownload already running",`self`
438 # RePEX: leave decision what to do to SingleDownload
439 self.sd.restart(initialdlstatus)
441 # No exception if already started, for convenience
443 self.dllock.release()
447 # Config parameters that only exists at runtime
449 def set_max_desired_speed(self,direct,speed):
451 print >>sys.stderr,"Download: set_max_desired_speed",direct,speed
455 self.dllock.acquire()
457 self.dlruntimeconfig['max_desired_upload_rate'] = speed
459 self.dlruntimeconfig['max_desired_download_rate'] = speed
460 self.dllock.release()
462 def get_max_desired_speed(self,direct):
463 self.dllock.acquire()
466 return self.dlruntimeconfig['max_desired_upload_rate']
468 return self.dlruntimeconfig['max_desired_download_rate']
470 self.dllock.release()
472 def get_dest_files(self, exts=None):
473 """ We could get this from BT1Download.files (see BT1Download.saveAs()),
474 but that object is the domain of the network thread.
475 You can give a list of extensions to return. If None: return all dest_files
478 def get_ext(filename):
479 (prefix,ext) = os.path.splitext(filename)
480 if ext != '' and ext[0] == '.':
484 self.dllock.acquire()
487 metainfo = self.tdef.get_metainfo()
488 if 'files' not in metainfo['info']:
489 # single-file torrent
490 diskfn = self.get_content_dest()
491 f2dtuple = (None, diskfn)
492 ext = get_ext(diskfn)
493 if exts is None or ext in exts:
494 f2dlist.append(f2dtuple)
497 if len(self.dlconfig['selected_files']) > 0:
498 fnlist = self.dlconfig['selected_files']
500 fnlist = self.tdef.get_files(exts=exts)
502 for filename in fnlist:
503 filerec = maketorrent.get_torrentfilerec_from_metainfo(filename,metainfo)
504 savepath = maketorrent.torrentfilerec2savefilename(filerec)
505 diskfn = maketorrent.savefilenames2finaldest(self.get_content_dest(),savepath)
506 ext = get_ext(diskfn)
507 if exts is None or ext in exts:
508 f2dtuple = (filename,diskfn)
509 f2dlist.append(f2dtuple)
513 self.dllock.release()
522 def network_checkpoint(self):
523 """ Called by network thread """
524 self.dllock.acquire()
526 pstate = self.network_get_persistent_state()
530 resdata = self.sd.checkpoint()
531 pstate['engineresumedata'] = resdata
532 return (self.tdef.get_infohash(),pstate)
534 self.dllock.release()
537 def network_get_persistent_state(self):
538 """ Assume dllock already held """
540 pstate['version'] = PERSISTENTSTATE_CURRENTVERSION
541 pstate['metainfo'] = self.tdef.get_metainfo() # assumed immutable
542 dlconfig = copy.copy(self.dlconfig)
543 # Reset unpicklable params
544 dlconfig['vod_usercallback'] = None
545 dlconfig['mode'] = DLMODE_NORMAL # no callback, no VOD
546 pstate['dlconfig'] = dlconfig
548 pstate['dlstate'] = {}
549 #ds = self.network_get_state(None,False,sessioncalling=True)
550 ds = self.network_get_state(None,True,sessioncalling=True) # RePEX: get peerlist in case of running Download
551 pstate['dlstate']['status'] = ds.get_status()
552 pstate['dlstate']['progress'] = ds.get_progress()
553 pstate['dlstate']['swarmcache'] = ds.get_swarmcache() # RePEX: store SwarmCache
556 print >>sys.stderr,"Download: netw_get_pers_state: status",dlstatus_strings[ds.get_status()],"progress",ds.get_progress()
558 pstate['engineresumedata'] = None
564 def get_coopdl_role_object(self,role):
565 """ Called by network thread """
567 self.dllock.acquire()
569 if self.sd is not None:
570 role_object = self.sd.get_coopdl_role_object(role)
572 self.dllock.release()
580 def set_error(self,e):
581 self.dllock.acquire()
583 self.dllock.release()
586 def set_filepieceranges(self,metainfo):
587 """ Determine which file maps to which piece ranges for progress info """
590 print >>sys.stderr,"Download: set_filepieceranges:",self.dlconfig['selected_files']
591 (length,self.filepieceranges) = maketorrent.get_length_filepieceranges_from_metainfo(metainfo,self.dlconfig['selected_files'])
593 def get_content_dest(self):
594 """ Returns the file (single-file torrent) or dir (multi-file torrent)
595 to which the downloaded content is saved. """
596 return os.path.join(self.dlconfig['saveas'],self.correctedinfoname)
598 # ARNOCOMMENT: better if we removed this from Core, user knows which
599 # file he selected to play, let him figure out MIME type
600 def get_mimetype(self,file):
601 (prefix,ext) = os.path.splitext(file)
604 if sys.platform == 'win32':
605 # TODO: Use Python's mailcap facility on Linux to find player
607 from BaseLib.Video.utils import win32_retrieve_video_play_command
609 [mimetype,playcmd] = win32_retrieve_video_play_command(ext,file)
611 print >>sys.stderr,"DownloadImpl: Win32 reg said MIME type is",mimetype
619 # homedir = os.path.expandvars('${HOME}')
620 homedir = get_home_dir()
621 homemapfile = os.path.join(homedir,'.mimetypes')
622 mapfiles = [homemapfile] + mimetypes.knownfiles
623 mimetypes.init(mapfiles)
624 (mimetype,encoding) = mimetypes.guess_type(file)
627 print >>sys.stderr,"DownloadImpl: /etc/mimetypes+ said MIME type is",mimetype,file
631 # if auto detect fails
634 # Arno, 2010-01-08: Hmmm... video/avi is not official registered at IANA
635 mimetype = 'video/avi'
636 elif ext == '.mpegts' or ext == '.ts':
637 mimetype = 'video/mp2t'
639 mimetype = 'video/x-matroska'
640 elif ext in ('.ogg', '.ogv'):
641 mimetype = 'video/ogg'
642 elif ext in ('.oga'):
643 mimetype = 'audio/ogg'
645 mimetype = 'video/webm'
647 mimetype = 'video/mpeg'