1 # Written by Arno Bakker, Diego Rabioli
2 # see LICENSE.txt for license information
4 # Implements the BackgroundProcess, i.e. SwarmEngine for SwarmPlugin and
5 # SwarmTransport=SwarmPlayer v2. See Plugin/SwarmEngine.py and Transport/SwarmEngine.py
8 # The BackgroundProcess shares a base class BaseApp with the SwarmPlayer v1,
9 # which is a standalone P2P-based video player.
13 # - Implement play while hashcheck?
14 # Not needed when proper shutdown & restart was done.
15 # - load_checkpoint with DLSTATUS_DOWNLOADING for Plugin?
16 # Nah, if we start BG when plugin started we have a video to play soon,
17 # so start others in STOPPED state (rather than switching them all
18 # to off and restart one in VOD mode just after)
25 # 1.0.2 Added STOP message to tell plugin to stop playing the current item
26 # (needed to support new behaviour where control conn is not always
27 # shutdown anymore to support input.set_p2ptarget.
29 # Added ERROR message to tell plugin NSSA won't be able to serve the
30 # content requested via START (for <video> support).
32 # 1.0.1 Added INFO message to convey NSSA info to plugin for providing
33 # feedback to the user.
35 # NSPlugin JavaScript API 1.0.2
37 # 1.0.2 Added input.set_p2ptarget() method to switch the TorrentDef currently
38 # playing. Released in M24.1
40 # 1.0.1 Added input.p2pstatus read-only property giving the latest status as
41 # reported by the NSSA. Released in M24.
43 # 1.0.0 Copy of VLC's Javascript interface
46 # modify the sys.stderr and sys.stdout for safe output
47 import BaseLib.Debug.console
56 from cStringIO import StringIO
57 from base64 import b64encode, encodestring, decodestring
58 from traceback import print_exc,print_stack
59 from threading import Thread,currentThread,Lock
61 if sys.platform == "win32":
70 wxversion.select('2.8')
75 from BaseLib.Core.API import *
76 from BaseLib.Core.osutils import *
77 from BaseLib.Core.Utilities.utilities import get_collected_torrent_filename
78 from BaseLib.Utilities.LinuxSingleInstanceChecker import *
79 from BaseLib.Utilities.Instance2Instance import InstanceConnectionHandler,InstanceConnection, Instance2InstanceClient
80 from BaseLib.Utilities.TimedTaskQueue import TimedTaskQueue
81 from BaseLib.Player.BaseApp import BaseApp
82 from BaseLib.Player.swarmplayer import get_status_msgs
83 from BaseLib.Plugin.defs import *
84 from BaseLib.Plugin.Search import *
85 from BaseLib.Plugin.AtomFeedParser import *
87 from BaseLib.Video.defs import *
88 from BaseLib.Video.utils import videoextdefaults
89 from BaseLib.Video.VideoServer import VideoHTTPServer,MultiHTTPServer
90 from BaseLib.Video.Ogg import is_ogg,OggMagicLiveStream
92 from BaseLib.Core.Statistics.Status import Status, LivingLabReporter
93 from BaseLib.WebUI.WebUI import WebIFPathMapper
94 from BaseLib.Core.ClosedSwarm.ClosedSwarm import InvalidPOAException
100 ALLOW_MULTIPLE = False
103 IDLE_BEFORE_SELFKILL = 60.0 # Number of seconds
106 class BackgroundApp(BaseApp):
108 def __init__(self, redirectstderrout, appname, appversion, params, single_instance_checker, installdir, i2iport, sport, httpport):
110 # Almost generic HTTP server
111 self.videoHTTPServer = VideoHTTPServer(httpport)
112 self.videoHTTPServer.register(self.videoservthread_error_callback,self.videoservthread_set_status_callback)
114 BaseApp.__init__(self, redirectstderrout, appname, appversion, params, single_instance_checker, installdir, i2iport, sport)
115 self.httpport = httpport
118 # Maps a query ID to the original searchstr, timestamp and all hits (local + remote)
119 self.id2hits = Query2HitsMap()
121 # Maps a URL path received by HTTP server to the requested resource,
122 # reading or generating it dynamically.
124 # For saving .torrents received in hits to P2P searches using
125 # SIMPLE+METADATA queries
126 self.tqueue = TimedTaskQueue(nameprefix="BGTaskQueue")
127 self.searchmapper = SearchPathMapper(self.s,self.id2hits,self.tqueue)
128 self.hits2anypathmapper = Hits2AnyPathMapper(self.s,self.id2hits)
130 self.videoHTTPServer.add_path_mapper(self.searchmapper)
131 self.videoHTTPServer.add_path_mapper(self.hits2anypathmapper)
134 # Maps a URL path received by HTTP server to the requested resource,
135 # reading or generating it dynamically.
136 self.webIFmapper = WebIFPathMapper(self, self.s)
138 self.videoHTTPServer.add_path_mapper(self.webIFmapper)
140 # Generic HTTP server start. Don't add mappers dynamically afterwards!
141 self.videoHTTPServer.background_serve()
143 # Maps Downloads to a using InstanceConnection and streaminfo when it
144 # plays. So it contains the Downloads in VOD mode for which there is
145 # active interest from a plugin.
147 # At the moment each Download is used/owned by a single IC and a new
148 # request for the same torrent will stop playback to the original IC
149 # and resume it to the new user.
152 self.approxplayerstate = MEDIASTATE_STOPPED
154 self.counter = 0 # counter for the stats reported periodically
155 self.interval = 120 # report interval
156 self.iseedeadpeople = False
158 if sys.platform == "win32":
159 # If the BG Process is started by the plug-in notify it with an event
161 startupEvent = win32event.CreateEvent( None, 0, 0, 'startupEvent' )
162 win32event.SetEvent( startupEvent )
163 win32api.CloseHandle( startupEvent ) # TODO : is it possible to avoid importing win32api just to close an handler?
169 # Do common initialization
170 BaseApp.OnInitBase(self)
172 # Arno, 2010-07-15: We try to detect browser presence by looking
173 # at get_speed_info JSON request from Firefox statusbar. However.
174 # these calls are unreliable, i.e., somethings the XmlHTTPRequest
175 # at the client doesn't reach the server, although the server is
176 # capable of replying to the request. Hence, we disable self-destruct
179 print >>sys.stderr,"bg: Kill-on-idle test enabled"
180 self.i2is.add_task(self.i2i_kill_on_browser_gone,IDLE_BEFORE_SELFKILL/2)
182 print >>sys.stderr,"bg: Kill-on-idle test disabled"
184 print >>sys.stderr,"bg: Awaiting commands"
189 self.show_error(str(e))
194 # Arno: SEARCH: disable overlay for now
195 # Also need to ensure that *stats*db SQL scripts are copied along during
198 def configure_session(self):
199 # Leave buddycast, etc. enabled for SEARCH
200 self.sconfig.set_social_networking(False)
201 self.sconfig.set_bartercast(False)
202 self.sconfig.set_crawler(False) # Arno: Cleanup million stats dbs first
206 # InstanceConnectionHandler interface. Called by Instance2InstanceThread
208 def external_connection_made(self,s):
209 ic = BGInstanceConnection(s,self,self.readlinecallback,self.videoHTTPServer)
210 self.singsock2ic[s] = ic
212 print >>sys.stderr,"bg: Plugin connection_made",len(self.singsock2ic),"++++++++++++++++++++++++++++++++++++++++++++++++"
214 # Arno: Concurrency problems getting SEARCHURL message to work,
215 # JavaScript can't always read it. TODO
216 ##ic.searchurl(self.searchurl)
218 def connection_lost(self,s):
220 print >>sys.stderr,"bg: Plugin: connection_lost ------------------------------------------------"
222 ic = self.singsock2ic[s]
223 InstanceConnectionHandler.connection_lost(self,s)
224 wx.CallAfter(self.gui_connection_lost,ic)
226 def gui_connection_lost(self,ic,switchp2ptarget=False):
227 # Find which download ic was interested in
229 for d,duser in self.dusers.iteritems():
230 if duser['uic'] == ic:
235 # IC may or may not have been shutdown:
236 # Not: sudden browser crashes
237 # Yes: controlled stop via ic.shutdown()
240 ic.cleanup_playback() # idempotent
242 ic.shutdown() # idempotent
246 if d2remove is not None:
247 # For VOD, apply cleanup policy to the Download, but only
248 # after X seconds so if the plugin comes back with a new
249 # request for the same stuff we can give it to him pronto.
250 # This is expected to happen a lot due to page reloads /
251 # history navigation.
253 # Arno, 2010-08-01: Restored old behaviour for live. Zapping
254 # more important than extra robustness.
256 d_delayed_remove_if_lambda = lambda:self.i2ithread_delayed_remove_if_not_complete(d2remove)
257 # h4x0r, abuse Istance2Instance server task queue for the delay
258 self.i2is.add_task(d_delayed_remove_if_lambda,10.0)
260 def i2ithread_delayed_remove_if_not_complete(self,d2remove):
262 print >>sys.stderr,"bg: i2ithread_delayed_remove_if_not_complete"
263 d2remove.set_state_callback(self.sesscb_remove_playing_callback)
265 def remove_playing_download(self,d2remove):
266 """ Called when sesscb_remove_playing_callback has determined that
267 we should remove this Download, because it would take too much
268 bandwidth to download it. However, we must check in another user has not
272 print >>sys.stderr,"bg: remove_playing_download @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
273 if d2remove in self.dusers:
274 duser = self.dusers[d2remove]
275 if duser['uic'] is None:
278 print >>sys.stderr,"bg: remove_playing_download: Yes, no interest"
279 BaseApp.remove_playing_download(self,d2remove)
280 if 'streaminfo' in duser:
281 stream = duser['streaminfo']['stream']
282 stream.close() # Close original stream.
283 del self.dusers[d2remove]
285 print >>sys.stderr,"bg: remove_playing_download: No, someone interested",`duser['uic']`
288 def i2ithread_readlinecallback(self,ic,cmd):
289 """ Called by Instance2Instance thread """
290 wx.CallAfter(self.gui_readlinecallback,ic,cmd)
292 def split_params(self, url):
294 Returns a touple (path, {name:value}) where the map can be empty.
295 Example: "/path?p1=v1&p2=v2" -> ('/path', {"p1":"v1", "p2":"v2"})
300 _params = url[idx+1:].split("&")
302 for param in _params:
303 if param.find("=") == -1:
304 continue # Not a parameter
305 (name, value) = param.split("=", 1)
309 def gui_readlinecallback(self,ic,cmd):
310 """ Receive command from Plugin """
313 print >>sys.stderr,"bg: Got command:",cmd
316 if cmd.startswith( 'START' ):
317 torrenturl = cmd.partition( ' ' )[2]
318 if torrenturl is None:
319 raise ValueError('bg: Unformatted START command')
321 # SWITCHP2PTARGET: See if already downloading/playing something
322 items = self.dusers.items()
323 for d,duser in items:
324 if duser['uic'] == ic:
326 self.gui_connection_lost(ic,switchp2ptarget=True)
328 # Here we need to drag the POA off the torrenturl,
330 (url, params) = self.split_params(torrenturl)
332 poa_serialized = decodestring(params["poa"])
334 poa = ClosedSwarm.POA.deserialize(poa_serialized)
337 print >>sys.stderr,"Bad POA, ignoring"
342 self.get_torrent_start_download(ic,url,poa=poa)
345 elif cmd.startswith('SHUTDOWN'):
346 print >>sys.stderr,"bg: Got SHUTDOWN, sending SHUTDOWN"
348 elif cmd.startswith('SUPPORTS'):
349 # Arno, 2010-06-15: only used by SwarmTransport at the moment
350 # to convey it cannot pause.
351 ic.set_supported_vod_events([VODEVENT_START])
353 raise ValueError('bg: Unknown command: '+cmd)
356 # Arno, 2010-05-27: Don't kill Control connection, for set_p2ptarget
358 ic.cleanup_playback()
360 def get_torrent_start_download(self,ic,url,poa=None):
361 """ Retrieve torrent file from url and start it in VOD mode, if not already """
363 if url.endswith(".html"):
364 # Search mode, in which case URL is apparently the base URL of the search page.
365 # Just to keep exception trace away.
368 tdef = TorrentDef.load_from_url(url)
370 # tdef.input['announce'] = "http://dead.globe.cs.vu.nl:6969/announce"
371 #tdef.metainfo['announce'] = "http://dead.globe.cs.vu.nl:6969/announce"
373 # Select which video to play (if multiple)
375 videofiles = tdef.get_files()
377 videofiles = tdef.get_files(exts=videoextdefaults)
378 if len(videofiles) == 1:
379 dlfile = videofiles[0]
380 elif len(videofiles) == 0:
381 raise ValueError("bg: get_torrent_start_download: No video files found! Giving up")
382 elif len(videofiles) > 1:
383 raise ValueError("bg: get_torrent_start_download: Too many files found! Giving up")
386 print >>sys.stderr,"bg: get_torrent_start_download: Found video file",dlfile
390 if tdef.get_cs_keys():
391 # This is a closed swarm, try to get a POA
392 poa = self._get_poa(tdef)
394 infohash = tdef.get_infohash()
396 for d in self.s.get_downloads():
397 if d.get_def().get_infohash() == infohash:
402 # Start a new Download, or if it already exists, start playback from
403 # beginning. This means that we don't currently support two ICs
404 # playing the same video. That is, two browser windows cannot play the
407 if oldd is None or (oldd not in self.downloads_in_vodmode):
408 # New Download, or Download exists, but not in VOD mode, restart
412 print >>sys.stderr,"bg: get_torrent_start_download: Starting new Download"
414 print >>sys.stderr,"bg: get_torrent_start_download: Restarting old Download in VOD mode"
416 d = self.start_download(tdef,dlfile,poa,ic.get_supported_vod_events())
418 self.dusers[d] = duser
420 # oldd is already running in VOD mode. If it's a VOD torrent we
421 # don't need to restart, we can just seek(0) on the stream.
422 # If it's a live torrent, we should tell EOF to any old IC and
423 # continue playback to the new IC where it left off.
425 duser = self.dusers[d]
426 olduic = duser['uic']
427 if olduic is not None:
428 # Cleanup like a shutdown, but send STOP
429 print >>sys.stderr,"bg: get_torrent_start_download: Telling old player to stop"
430 olduic.cleanup_playback()
433 if 'streaminfo' not in duser:
434 # Hasn't started playing yet, ignore.
437 # Already playing. Tell previous owner IC to quit, let new IC
438 # start either from start (VOD) or where previous left off
440 if not tdef.get_live():
441 duser['streaminfo']['stream'].seek(0)
442 ic.set_streaminfo(duser['streaminfo'])
444 ic.start_playback(infohash)
446 duser['said_start_playback'] = False
447 duser['decodeprogress'] = 0
452 def gui_states_callback(self,dslist,haspeerlist):
453 """ Override BaseApp """
454 #print >>sys.stderr,"bg: gui_states_callback",currentThread().getName()
456 (playing_dslist,totalhelping,totalspeed) = BaseApp.gui_states_callback(self,dslist,haspeerlist)
458 self.report_periodic_vod_stats(playing_dslist)
462 for ds in playing_dslist:
463 d = ds.get_download()
464 duser = self.dusers[d]
467 # Generate info string for all
468 [topmsg,msg,duser['said_start_playback'],duser['decodeprogress']] = get_status_msgs(ds,self.approxplayerstate,self.appname,duser['said_start_playback'],duser['decodeprogress'],totalhelping,totalspeed)
471 # print >>sys.stderr, 'bg: 4INFO: Sending',info
474 def sesscb_vod_event_callback( self, d, event, params ):
475 """ Registered by BaseApp. Called by SessionCallbackThread """
476 wx.CallAfter(self.gui_vod_event_callback,d,event,params)
478 def gui_vod_event_callback( self, d, event, params ):
480 print >>sys.stderr,"bg: gui_vod_event_callback: Event: ", event
481 print >>sys.stderr,"bg: gui_vod_event_callback: Params: ", params
482 if event == VODEVENT_START:
483 if params['filename']:
484 stream = open( params['filename'], "rb" )
486 stream = params['stream']
488 # Ric: small hack for the ogg mimetype (just for windows,
489 # linux thinks it's an audio/ogg file)
490 if params['mimetype'] == 'video/x-ogg':
491 params['mimetype'] = 'application/ogg'
493 # Arno: My Win7 thinks this is 'video/mpeg', so patch for that.
494 selectedfiles = d.get_selected_files()
495 if selectedfiles is not None and len(selectedfiles) > 0:
496 for fn in selectedfiles:
498 params['mimetype'] = 'application/ogg'
500 name = d.get_def().get_name_as_unicode()
502 params['mimetype'] = 'application/ogg'
504 if d.get_def().get_live():
505 # Live Ogg stream. To support this we need to do
507 # 1. Write Ogg headers (stored in .tstream)
508 # 2. Find first Ogg page in stream.
509 stream = OggMagicLiveStream(d.get_def(),stream)
511 if not d.get_def().get_live() and not params['filename']:
512 # Arno, < 2010-08-10: Firefox reads aggressively, we just
513 # give it data at bitrate pace such that we know when we
514 # have to fallback to HTTP servers.
516 # 2010-08-10: not when file complete on disk ;-)
517 stream = AtBitrateStream( stream, params['bitrate'] )
520 blocksize = d.get_def().get_piece_length()
521 #Ric: add svc on streaminfo, added bitrate
522 streaminfo = { 'mimetype': params['mimetype'], 'stream': stream, 'length': params['length'], 'blocksize':blocksize, 'svc': d.get_mode() == DLMODE_SVC, 'bitrate': params['bitrate'] }
524 duser = self.dusers[d]
525 duser['streaminfo'] = streaminfo
526 if duser['uic'] is not None:
527 # Only if playback wasn't canceled since starting
528 duser['uic'].set_streaminfo(duser['streaminfo'])
529 duser['uic'].start_playback(d.get_def().get_infohash())
531 self.approxplayerstate = MEDIASTATE_PLAYING
533 self.approxplayerstate = MEDIASTATE_STOPPED
535 elif event == VODEVENT_PAUSE:
536 duser = self.dusers[d]
537 if duser['uic'] is not None:
539 self.approxplayerstate = MEDIASTATE_PAUSED
541 elif event == VODEVENT_RESUME:
542 duser = self.dusers[d]
543 if duser['uic'] is not None:
544 duser['uic'].resume()
545 self.approxplayerstate = MEDIASTATE_PLAYING
548 def get_supported_vod_events(self):
549 # See BGInstanceConnection.set_supported_vod_events() too.
550 return [ VODEVENT_START, VODEVENT_PAUSE, VODEVENT_RESUME ]
553 # VideoServer status/error reporting
555 def videoservthread_error_callback(self,e,url):
556 """ Called by HTTP serving thread """
557 wx.CallAfter(self.videoserver_error_guicallback,e,url)
559 def videoserver_error_guicallback(self,e,url):
560 print >>sys.stderr,"bg: Video server reported error",str(e)
561 # self.show_error(str(e))
563 # ARNOTODO: schedule current Download for removal?
565 def videoservthread_set_status_callback(self,status):
566 """ Called by HTTP serving thread """
567 wx.CallAfter(self.videoserver_set_status_guicallback,status)
569 def videoserver_set_status_guicallback(self,status):
570 #print >>sys.stderr,"bg: Video server sets status callback",status
571 # ARNOTODO: Report status to plugin
575 # reports vod stats collected periodically
577 def report_periodic_vod_stats(self,playing_dslist):
578 #print >>sys.stderr, "VOD Stats"
580 if self.counter%self.interval == 0:
581 event_reporter = Status.get_status_holder("LivingLab")
582 if event_reporter is not None:
583 for ds in playing_dslist:
584 dw = ds.get_download()
585 b64_infohash = b64encode(dw.get_def().get_infohash())
586 vod_stats = ds.get_vod_stats()
587 #if vod_stats_has_key("prebuf"): event_reporter.add_event(b64_infohash, "prebufp:%d" % vod_stats['prebuf']) # prebuffering time that was needed
588 if vod_stats.has_key("stall"): event_reporter.create_and_add_event("stall", [b64_infohash, vod_stats['stall']]) # time the player stalled
589 if vod_stats.has_key("late"): event_reporter.create_and_add_event("late", [b64_infohash, vod_stats['late']]) # number of pieces arrived after they were due
590 if vod_stats.has_key("dropped"): event_reporter.create_and_add_event("dropped", [b64_infohash, vod_stats['dropped']]) # number of pieces lost
591 if vod_stats.has_key("pos"): event_reporter.create_and_add_event("pos", [b64_infohash, vod_stats['pos']]) # playback position
593 def gui_webui_remove_download(self,d2remove):
594 """ Called when user has decided to remove a specific DL via webUI """
596 print >>sys.stderr,"bg: gui_webui_remove_download"
597 self.gui_webui_halt_download(d2remove,stop=False)
600 def gui_webui_stop_download(self,d2stop):
601 """ Called when user has decided to stop a specific DL via webUI """
603 print >>sys.stderr,"bg: gui_webui_stop_download"
604 self.gui_webui_halt_download(d2stop,stop=True)
607 def gui_webui_restart_download(self,d2restart):
608 """ Called when user has decided to restart a specific DL via webUI for sharing """
610 self.dusers[d2restart] = duser
614 def gui_webui_halt_download(self,d2halt,stop=False):
615 """ Called when user has decided to stop or remove a specific DL via webUI.
616 For stop the Download is not removed. """
617 if d2halt in self.dusers:
619 duser = self.dusers[d2halt]
620 olduic = duser['uic']
621 if olduic is not None:
622 print >>sys.stderr,"bg: gui_webui_halt_download: Oops, someone interested, removing anyway"
624 if 'streaminfo' in duser:
625 # Download was already playing, clean up.
626 stream = duser['streaminfo']['stream']
627 stream.close() # Close original stream.
629 del self.dusers[d2halt]
631 BaseApp.stop_playing_download(self,d2halt)
633 BaseApp.remove_playing_download(self,d2halt)
636 def gui_webui_remove_all_downloads(self,ds2remove):
637 """ Called when user has decided to remove all DLs via webUI """
639 print >>sys.stderr,"bg: gui_webui_remove_all_downloads"
641 for d2remove in ds2remove:
642 self.gui_webui_halt_download(d2remove,stop=False)
645 def gui_webui_stop_all_downloads(self,ds2stop):
646 """ Called when user has decided to stop all DLs via webUI """
648 print >>sys.stderr,"bg: gui_webui_stop_all_downloads"
650 for d2stop in ds2stop:
651 self.gui_webui_halt_download(d2stop,stop=True)
654 def gui_webui_restart_all_downloads(self,ds2restart):
655 """ Called when user has decided to restart all DLs via webUI """
657 print >>sys.stderr,"bg: gui_webui_restart_all_downloads"
659 for d2restart in ds2restart:
660 self.gui_webui_restart_download(d2restart)
662 def i2i_kill_on_browser_gone(self):
665 lastt = self.webIFmapper.lastreqtime
667 print >>sys.stderr,"bg: Test for self destruct: idle",time.time()-lastt,currentThread().getName()
669 if time.time() - IDLE_BEFORE_SELFKILL > lastt:
670 if self.iseedeadpeople:
671 print >>sys.stderr,"bg: SHOULD HAVE self destructed, hardcore stylie"
675 print >>sys.stderr,"bg: SHOULD HAVE self destructed"
676 self.iseedeadpeople = True
677 # No sign of life from statusbar, self destruct
678 #wx.CallAfter(self.ExitMainLoop)
681 self.i2is.add_task(self.i2i_kill_on_browser_gone,IDLE_BEFORE_SELFKILL/2)
684 class BGInstanceConnection(InstanceConnection):
686 def __init__(self,singsock,connhandler,readlinecallback,videoHTTPServer):
687 InstanceConnection.__init__(self, singsock, connhandler, readlinecallback)
689 self.bgapp = connhandler
690 self.videoHTTPServer = videoHTTPServer
692 self.cstreaminfo = {}
693 self.shutteddown = False
694 self.supportedvodevents = [VODEVENT_START,VODEVENT_PAUSE,VODEVENT_RESUME]
697 def set_streaminfo(self,streaminfo):
698 """ Copy streaminfo contents and replace stream with a ControlledStream """
700 For each IC we create separate stream object and a unique path in the
701 HTTP server. This avoids nasty thread synchronization with the server
702 when a new IC wants to play the same content. The Tribler Core stream
703 does not allow multiple readers. This means we would have to stop
704 the HTTP server from writing the stream to the old IC, before we
705 can allow the new IC to read.
707 We solved this as follows. The original Tribler Core stream is
708 wrapped in a ControlledStream, one for each IC. When a new IC
709 wants to play we tell the old IC's ControlledStream to generate
710 an EOF to the HTTP server, and tell the old IC to SHUTDOWN. We
711 then either rewind the Tribler Core stream (VOD) or leave it (live)
712 and tell the new IC to PLAY. The new ControlledStream will then
713 be read by the HTTP server again.
715 self.cstreaminfo.update(streaminfo)
716 stream = streaminfo['stream']
717 cstream = ControlledStream( stream )
718 self.cstreaminfo['stream'] = cstream
720 def start_playback(self,infohash):
721 """ Register cstream with HTTP server and tell IC to start reading """
723 self.urlpath = URLPATH_CONTENT_PREFIX+'/'+infohash2urlpath(infohash)+'/'+str(random.random())
725 self.videoHTTPServer.set_inputstream(self.cstreaminfo,self.urlpath)
728 print >> sys.stderr, "bg: Telling plugin to start playback of",self.urlpath
730 self.write( 'PLAY '+self.get_video_url()+'\r\n' )
732 def cleanup_playback(self):
734 print >>sys.stderr,'bg: ic: cleanup'
735 # Cause HTTP server thread to receive EOF on inputstream
736 if len(self.cstreaminfo) != 0:
737 self.cstreaminfo['stream'].close()
739 # TODO: get rid of del_inputstream lock
740 # Arno, 2009-12-11: Take this out of critical path on MainThread
741 http_del_inputstream_lambda = lambda:self.videoHTTPServer.del_inputstream(self.urlpath)
742 self.bgapp.tqueue.add_task(http_del_inputstream_lambda,0)
747 def get_video_url(self):
748 return 'http://127.0.0.1:'+str(self.videoHTTPServer.get_port())+self.urlpath
751 self.write( 'PAUSE\r\n' )
754 self.write( 'RESUME\r\n' )
756 def info(self,infostr):
757 self.write( 'INFO '+infostr+'\r\n' )
759 # Arno, 2010-05-28: Convey the BGprocess won't be able to serve the content
760 def error(self,infostr):
761 self.write( 'ERROR '+infostr+'\r\n' )
763 # Arno, 2010-05-27: Stop playback
766 self.write( 'STOP\r\n' )
771 print >>sys.stderr,'bg: ic: shutdown'
772 if not self.shutteddown:
773 self.shutteddown = True
774 self.cleanup_playback()
776 self.write( 'SHUTDOWN\r\n' )
777 # Will cause BaseApp.connection_lost() to be called, where we'll
778 # handle what to do about the Download that was started for this
785 def get_supported_vod_events(self):
786 return self.supportedvodevents
788 def set_supported_vod_events(self,eventlist):
789 self.supportedvodevents = eventlist
792 class ControlledStream:
793 """ A file-like object that throws EOF when closed, without actually closing
794 the underlying inputstream. See BGInstanceConnection.set_streaminfo() for
795 an explanation on how this is used.
797 def __init__(self,stream):
799 self.done = False # Event()
801 def read(self,nbytes=None):
803 return self.stream.read(nbytes)
807 def seek(self,pos,whence=os.SEEK_SET):
808 self.stream.seek(pos,whence)
812 # DO NOT close original stream
814 class AtBitrateStream:
815 """ Give from playback position plus a safe margin at video bitrate speed.
816 On seeking resync the playback position and the safe margin.
819 # Give at bitrate speed policy: give from playback position + SAFE_MARGIN_TIME
820 # at bitrate speed during STREAM_STATE_PLAYING, give at full speed during
821 # STREAM_STATE_PREBUFFER. STREAM_STATE_TRANSITION indicates that the playback has
822 # to start or that the user just seeked.
824 # Safe buffer size in seconds
825 SAFE_MARGIN_TIME = 10.0 # same as VideoOnDemand.py
827 # Increment the bitrate by percentage (give more bandwidth to the player).
828 BITRATE_SPEED_INCREMENT = 1.05 # +5%
831 STREAM_STATE_TRANSITION = 0
832 STREAM_STATE_PREBUFFER = 1
833 STREAM_STATE_PLAYING = 2
835 def __init__( self, stream, bitrate ):
837 self.done = False # Event()
838 self.bitrate = bitrate
839 self.safe_bytes = self.SAFE_MARGIN_TIME * bitrate
840 self.stream_state = self.STREAM_STATE_TRANSITION
843 self.given_bytes_till = 0
845 def has_to_sleep( self, nbytes ):
846 curr_time = time.time()
847 if self.stream_state is self.STREAM_STATE_TRANSITION:
848 self.last_time = curr_time
850 self.stream_state = self.STREAM_STATE_PREBUFFER
852 elapsed_time = curr_time - self.last_time
853 self.last_time = curr_time
855 self.playback += elapsed_time * self.BITRATE_SPEED_INCREMENT
856 if self.stream_state is self.STREAM_STATE_PREBUFFER:
857 played_bytes = self.playback * self.bitrate
858 if played_bytes + self.safe_bytes <= self.given_bytes_till:
859 self.stream_state = self.STREAM_STATE_PLAYING
860 self.given_bytes_till += nbytes
863 delta_time = ( self.given_bytes_till / float( self.bitrate ) ) - ( self.playback + self.SAFE_MARGIN_TIME )
864 if delta_time <= 0.0:
865 self.stream_state = self.STREAM_STATE_PREBUFFER
866 self.given_bytes_till += nbytes
867 return max( 0.0, delta_time )
869 def read(self,nbytes=None):
871 to_give = self.stream.read( nbytes )
872 sleep_time = self.has_to_sleep( nbytes )
873 #print >>sys.stderr,"DIEGO DEBUG : SLEEP_time", sleep_time
875 time.sleep( sleep_time )
880 def seek(self,pos,whence=os.SEEK_SET):
881 self.stream.seek(pos,whence)
882 self.stream_state = self.STREAM_STATE_TRANSITION
883 self.given_bytes_till = pos
884 self.playback = pos / float( self.bitrate )
888 # DO NOT close original stream
891 ##############################################################
893 # Main Program Start Here
895 ##############################################################
896 def run_bgapp(appname,appversion,i2iport,sessport,httpport, params = None,killonidle=False):
897 """ Set sys.argv[1] to "--nopause" to inform the Core that the player
898 doesn't support VODEVENT_PAUSE, e.g. the SwarmTransport.
903 if len(sys.argv) > 1:
904 params = sys.argv[1:]
907 KILLONIDLE = killonidle
910 # Create single instance semaphore
911 # Arno: On Linux and wxPython-2.8.1.1 the SingleInstanceChecker appears
912 # to mess up stderr, i.e., I get IOErrors when writing to it via print_exc()
914 if sys.platform != 'linux2':
915 single_instance_checker = wx.SingleInstanceChecker(appname+"-"+ wx.GetUserId())
917 single_instance_checker = LinuxSingleInstanceChecker(appname)
919 # Arno, 2010-03-05: This is a vital print that must not be removed, otherwise
920 # the program will just say "15:29:02: Deleted stale lock file '/home/arno/SwarmPlugin-arno'"
921 # and exit after a restart of the instance :-(
923 print >>sys.stderr,"bg: Test if already running"
924 single_instance_checker = wx.SingleInstanceChecker(appname+"-"+ wx.GetUserId())
925 if single_instance_checker.IsAnotherRunning():
926 print >>sys.stderr,"bg: Already running, exit"
929 arg0 = sys.argv[0].lower()
930 if arg0.endswith('.exe'):
931 installdir = os.path.abspath(os.path.dirname(sys.argv[0]))
933 installdir = os.getcwd()
935 # Launch first single instance
936 app = BackgroundApp(0, appname, appversion, params, single_instance_checker, installdir, i2iport, sessport, httpport)
939 # Enable P2P-Next ULANC logging.
941 status = Status.get_status_holder("LivingLab")
942 id = encodestring(s.get_permid()).replace("\n","")
943 reporter = LivingLabReporter.LivingLabPeriodicReporter("Living lab CS reporter", 300, id) # Report every 5 minutes
944 status.add_reporter(reporter)
951 print >>sys.stderr,"Sleeping seconds to let other threads finish"
954 if not ALLOW_MULTIPLE:
955 del single_instance_checker
957 # Ultimate catchall for hanging popen2's and what not