instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Plugin / BackgroundProcess.py
1 # Written by Arno Bakker, Diego Rabioli
2 # see LICENSE.txt for license information
3 #
4 # Implements the BackgroundProcess, i.e. SwarmEngine for SwarmPlugin and 
5 # SwarmTransport=SwarmPlayer v2. See Plugin/SwarmEngine.py and Transport/SwarmEngine.py
6 # for main startup.
7 #
8 # The BackgroundProcess shares a base class BaseApp with the SwarmPlayer v1,
9 # which is a standalone P2P-based video player.
10 #
11 #
12 # Notes: 
13 # - Implement play while hashcheck?
14 #        Not needed when proper shutdown & restart was done.
15 # - load_checkpoint with DLSTATUS_DOWNLOADING for Plugin? 
16 #        Nah, if we start BG when plugin started we have a video to play soon,
17 #        so start others in STOPPED state (rather than switching them all
18 #        to off and restart one in VOD mode just after)
19 #
20
21 # History:
22 #
23 # NSSA API 1.0.2
24 #
25 #  1.0.2    Added STOP message to tell plugin to stop playing the current item
26 #           (needed to support new behaviour where control conn is not always
27 #           shutdown anymore to support input.set_p2ptarget.
28 #
29 #           Added ERROR message to tell plugin NSSA won't be able to serve the
30 #           content requested via START (for <video> support).    
31 #
32 #  1.0.1    Added INFO message to convey NSSA info to plugin for providing 
33 #           feedback to the user.
34 #
35 # NSPlugin JavaScript API 1.0.2
36 #
37 #  1.0.2    Added input.set_p2ptarget() method to switch the TorrentDef currently
38 #           playing. Released in M24.1
39 #
40 #  1.0.1    Added input.p2pstatus read-only property giving the latest status as
41 #           reported by the NSSA. Released in M24.
42 #
43 #  1.0.0    Copy of VLC's Javascript interface
44 #
45
46 # modify the sys.stderr and sys.stdout for safe output
47 import BaseLib.Debug.console
48
49 import os
50 import sys
51 import time
52 import random
53 import binascii
54 import tempfile
55 import urllib
56 from cStringIO import StringIO
57 from base64 import b64encode, encodestring, decodestring
58 from traceback import print_exc,print_stack
59 from threading import Thread,currentThread,Lock
60
61 if sys.platform == "win32":
62     try:
63         import win32event
64         import win32api
65     except:
66         pass
67
68 try:
69     import wxversion
70     wxversion.select('2.8')
71 except:
72     pass
73 import wx
74
75 from BaseLib.Core.API import *
76 from BaseLib.Core.osutils import *
77 from BaseLib.Core.Utilities.utilities import get_collected_torrent_filename
78 from BaseLib.Utilities.LinuxSingleInstanceChecker import *
79 from BaseLib.Utilities.Instance2Instance import InstanceConnectionHandler,InstanceConnection, Instance2InstanceClient
80 from BaseLib.Utilities.TimedTaskQueue import TimedTaskQueue
81 from BaseLib.Player.BaseApp import BaseApp
82 from BaseLib.Player.swarmplayer import get_status_msgs
83 from BaseLib.Plugin.defs import *
84 from BaseLib.Plugin.Search import *
85 from BaseLib.Plugin.AtomFeedParser import *
86
87 from BaseLib.Video.defs import *
88 from BaseLib.Video.utils import videoextdefaults
89 from BaseLib.Video.VideoServer import VideoHTTPServer,MultiHTTPServer
90 from BaseLib.Video.Ogg import is_ogg,OggMagicLiveStream
91
92 from BaseLib.Core.Statistics.Status import Status, LivingLabReporter
93 from BaseLib.WebUI.WebUI import WebIFPathMapper
94 from BaseLib.Core.ClosedSwarm.ClosedSwarm import InvalidPOAException
95
96
97 DEBUG = True
98 PHONEHOME = True
99
100 ALLOW_MULTIPLE = False
101
102 KILLONIDLE = False
103 IDLE_BEFORE_SELFKILL = 60.0 # Number of seconds 
104
105
106 class BackgroundApp(BaseApp):
107
108     def __init__(self, redirectstderrout, appname, appversion, params, single_instance_checker, installdir, i2iport, sport, httpport):
109
110         # Almost generic HTTP server
111         self.videoHTTPServer = VideoHTTPServer(httpport)
112         self.videoHTTPServer.register(self.videoservthread_error_callback,self.videoservthread_set_status_callback)
113
114         BaseApp.__init__(self, redirectstderrout, appname, appversion, params, single_instance_checker, installdir, i2iport, sport)
115         self.httpport = httpport
116         
117         # SEARCH:P2P
118         # Maps a query ID to the original searchstr, timestamp and all hits (local + remote)
119         self.id2hits = Query2HitsMap()
120         
121         # Maps a URL path received by HTTP server to the requested resource,
122         # reading or generating it dynamically.
123         #
124         # For saving .torrents received in hits to P2P searches using
125         # SIMPLE+METADATA queries
126         self.tqueue = TimedTaskQueue(nameprefix="BGTaskQueue")
127         self.searchmapper = SearchPathMapper(self.s,self.id2hits,self.tqueue)
128         self.hits2anypathmapper = Hits2AnyPathMapper(self.s,self.id2hits)
129         
130         self.videoHTTPServer.add_path_mapper(self.searchmapper)
131         self.videoHTTPServer.add_path_mapper(self.hits2anypathmapper)
132
133         # WEB Interface        
134         # Maps a URL path received by HTTP server to the requested resource,
135         # reading or generating it dynamically.
136         self.webIFmapper = WebIFPathMapper(self, self.s)
137         
138         self.videoHTTPServer.add_path_mapper(self.webIFmapper)
139
140         # Generic HTTP server start. Don't add mappers dynamically afterwards!
141         self.videoHTTPServer.background_serve()
142
143         # Maps Downloads to a using InstanceConnection and streaminfo when it 
144         # plays. So it contains the Downloads in VOD mode for which there is
145         # active interest from a plugin.
146         #
147         # At the moment each Download is used/owned by a single IC and a new
148         # request for the same torrent will stop playback to the original IC
149         # and resume it to the new user.
150         #
151         self.dusers = {}   
152         self.approxplayerstate = MEDIASTATE_STOPPED
153
154         self.counter = 0 # counter for the stats reported periodically
155         self.interval = 120 # report interval
156         self.iseedeadpeople = False
157         
158         if sys.platform == "win32":
159             # If the BG Process is started by the plug-in notify it with an event
160             try:
161                 startupEvent = win32event.CreateEvent( None, 0, 0, 'startupEvent' )
162                 win32event.SetEvent( startupEvent )
163                 win32api.CloseHandle( startupEvent ) # TODO : is it possible to avoid importing win32api just to close an handler?
164             except:
165                 pass
166
167     def OnInit(self):
168         try:
169             # Do common initialization
170             BaseApp.OnInitBase(self)
171             
172             # Arno, 2010-07-15: We try to detect browser presence by looking
173             # at get_speed_info JSON request from Firefox statusbar. However.
174             # these calls are unreliable, i.e., somethings the XmlHTTPRequest
175             # at the client doesn't reach the server, although the server is
176             # capable of replying to the request. Hence, we disable self-destruct
177             # for now.
178             if KILLONIDLE:
179                 print >>sys.stderr,"bg: Kill-on-idle test enabled"
180                 self.i2is.add_task(self.i2i_kill_on_browser_gone,IDLE_BEFORE_SELFKILL/2)
181             else:
182                 print >>sys.stderr,"bg: Kill-on-idle test disabled"
183             
184             print >>sys.stderr,"bg: Awaiting commands"
185             return True
186
187         except Exception,e:
188             print_exc()
189             self.show_error(str(e))
190             self.OnExit()
191             return False
192
193
194     # Arno: SEARCH: disable overlay for now
195     # Also need to ensure that *stats*db SQL scripts are copied along during
196     # build and crap.
197     """
198     def configure_session(self):
199         # Leave buddycast, etc. enabled for SEARCH
200         self.sconfig.set_social_networking(False)
201         self.sconfig.set_bartercast(False)
202         self.sconfig.set_crawler(False) # Arno: Cleanup million stats dbs first
203     """
204
205     #
206     # InstanceConnectionHandler interface. Called by Instance2InstanceThread
207     #
208     def external_connection_made(self,s):
209         ic = BGInstanceConnection(s,self,self.readlinecallback,self.videoHTTPServer)
210         self.singsock2ic[s] = ic
211         if DEBUG:
212             print >>sys.stderr,"bg: Plugin connection_made",len(self.singsock2ic),"++++++++++++++++++++++++++++++++++++++++++++++++"
213           
214         # Arno: Concurrency problems getting SEARCHURL message to work, 
215         # JavaScript can't always read it. TODO  
216         ##ic.searchurl(self.searchurl)
217
218     def connection_lost(self,s):
219         if DEBUG:
220             print >>sys.stderr,"bg: Plugin: connection_lost ------------------------------------------------" 
221
222         ic = self.singsock2ic[s]
223         InstanceConnectionHandler.connection_lost(self,s)
224         wx.CallAfter(self.gui_connection_lost,ic)
225         
226     def gui_connection_lost(self,ic,switchp2ptarget=False):
227         # Find which download ic was interested in
228         d2remove = None
229         for d,duser in self.dusers.iteritems():
230             if duser['uic'] == ic:
231                 duser['uic'] = None
232                 d2remove = d
233                 break
234         
235         # IC may or may not have been shutdown:
236         # Not: sudden browser crashes
237         # Yes: controlled stop via ic.shutdown()
238         try:
239             if switchp2ptarget:
240                 ic.cleanup_playback() # idempotent
241             else:
242                 ic.shutdown() # idempotent
243         except:
244             print_exc()
245         
246         if d2remove is not None:
247             # For VOD, apply cleanup policy to the Download, but only 
248             # after X seconds so if the plugin comes back with a new 
249             # request for the same stuff we can give it to him pronto. 
250             # This is expected to happen a lot due to page reloads / 
251             # history navigation.
252             #
253             # Arno, 2010-08-01: Restored old behaviour for live. Zapping
254             # more important than extra robustness.
255             #
256             d_delayed_remove_if_lambda = lambda:self.i2ithread_delayed_remove_if_not_complete(d2remove)
257             # h4x0r, abuse Istance2Instance server task queue for the delay
258             self.i2is.add_task(d_delayed_remove_if_lambda,10.0)
259         
260     def i2ithread_delayed_remove_if_not_complete(self,d2remove):
261         if DEBUG:
262             print >>sys.stderr,"bg: i2ithread_delayed_remove_if_not_complete"
263         d2remove.set_state_callback(self.sesscb_remove_playing_callback)
264         
265     def remove_playing_download(self,d2remove):
266         """ Called when sesscb_remove_playing_callback has determined that
267         we should remove this Download, because it would take too much
268         bandwidth to download it. However, we must check in another user has not
269         become interested. 
270         """
271         if DEBUG:
272             print >>sys.stderr,"bg: remove_playing_download @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
273         if d2remove in self.dusers:
274             duser = self.dusers[d2remove]
275             if duser['uic'] is None:
276                 # No interest
277                 if DEBUG:
278                     print >>sys.stderr,"bg: remove_playing_download: Yes, no interest"
279                 BaseApp.remove_playing_download(self,d2remove)
280                 if 'streaminfo' in duser:
281                     stream = duser['streaminfo']['stream']
282                     stream.close() # Close original stream. 
283                 del self.dusers[d2remove]
284             elif DEBUG:
285                 print >>sys.stderr,"bg: remove_playing_download: No, someone interested",`duser['uic']`
286
287         
288     def i2ithread_readlinecallback(self,ic,cmd):
289         """ Called by Instance2Instance thread """
290         wx.CallAfter(self.gui_readlinecallback,ic,cmd)
291
292     def split_params(self, url):
293         """
294         Returns a touple (path, {name:value}) where the map can be empty.
295         Example: "/path?p1=v1&p2=v2" -> ('/path', {"p1":"v1", "p2":"v2"})
296         """
297         params = {}
298         idx = url.find("?")
299         if idx > -1:
300             _params = url[idx+1:].split("&")
301             url = url[:idx]
302             for param in _params:
303                 if param.find("=") == -1:
304                     continue # Not a parameter
305                 (name, value) = param.split("=", 1)
306                 params[name] = value
307         return (url, params)
308         
309     def gui_readlinecallback(self,ic,cmd):
310         """ Receive command from Plugin """
311         
312         if DEBUG:
313             print >>sys.stderr,"bg: Got command:",cmd
314         try:
315             # START command
316             if cmd.startswith( 'START' ):
317                 torrenturl = cmd.partition( ' ' )[2]
318                 if torrenturl is None:
319                     raise ValueError('bg: Unformatted START command')
320                 else:
321                     # SWITCHP2PTARGET: See if already downloading/playing something
322                     items = self.dusers.items() 
323                     for d,duser in items:
324                         if duser['uic'] == ic:
325                             # Stop current
326                             self.gui_connection_lost(ic,switchp2ptarget=True)
327                     
328                     # Here we need to drag the POA off the torrenturl,
329                     # if one is given
330                     (url, params) = self.split_params(torrenturl)
331                     if "poa" in params:
332                         poa_serialized = decodestring(params["poa"])
333                         try:
334                             poa = ClosedSwarm.POA.deserialize(poa_serialized)
335                             poa.verify()
336                         except:
337                             print >>sys.stderr,"Bad POA, ignoring"
338                             poa = None
339                     else:
340                         poa = None
341                         url = torrenturl
342                     self.get_torrent_start_download(ic,url,poa=poa)
343         
344             # SHUTDOWN command
345             elif cmd.startswith('SHUTDOWN'):
346                 print >>sys.stderr,"bg: Got SHUTDOWN, sending SHUTDOWN"
347                 ic.shutdown()
348             elif cmd.startswith('SUPPORTS'):
349                 # Arno, 2010-06-15: only used by SwarmTransport at the moment
350                 # to convey it cannot pause.
351                 ic.set_supported_vod_events([VODEVENT_START])
352             else:
353                 raise ValueError('bg: Unknown command: '+cmd)
354         except Exception,e:
355             print_exc()
356             # Arno, 2010-05-27: Don't kill Control connection, for set_p2ptarget
357             ic.error(str(e))
358             ic.cleanup_playback()
359     
360     def get_torrent_start_download(self,ic,url,poa=None):
361         """ Retrieve torrent file from url and start it in VOD mode, if not already """
362         
363         if url.endswith(".html"):
364             # Search mode, in which case URL is apparently the base URL of the search page.
365             # Just to keep exception trace away.
366             return
367             
368         tdef  = TorrentDef.load_from_url(url)
369         
370         # tdef.input['announce'] = "http://dead.globe.cs.vu.nl:6969/announce"
371         #tdef.metainfo['announce'] = "http://dead.globe.cs.vu.nl:6969/announce"
372         
373         # Select which video to play (if multiple)
374         if tdef.get_live():
375             videofiles = tdef.get_files()
376         else:
377             videofiles = tdef.get_files(exts=videoextdefaults)
378         if len(videofiles) == 1:
379             dlfile = videofiles[0]
380         elif len(videofiles) == 0:
381             raise ValueError("bg: get_torrent_start_download: No video files found! Giving up")
382         elif len(videofiles) > 1:
383             raise ValueError("bg: get_torrent_start_download: Too many files found! Giving up")
384
385         if DEBUG:
386             print >>sys.stderr,"bg: get_torrent_start_download: Found video file",dlfile
387
388         # Closed swarms?
389         if not poa:
390             if tdef.get_cs_keys():
391                 # This is a closed swarm, try to get a POA
392                 poa = self._get_poa(tdef)
393
394         infohash = tdef.get_infohash()
395         oldd = None
396         for d in self.s.get_downloads():
397             if d.get_def().get_infohash() == infohash:
398                 oldd = d
399                 break
400         
401         #
402         # Start a new Download, or if it already exists, start playback from
403         # beginning. This means that we don't currently support two ICs
404         # playing the same video. That is, two browser windows cannot play the
405         # same video.
406         #
407         if oldd is None or (oldd not in self.downloads_in_vodmode):
408             # New Download, or Download exists, but not in VOD mode, restart
409           
410             if DEBUG:
411                 if oldd is None:
412                     print >>sys.stderr,"bg: get_torrent_start_download: Starting new Download"
413                 else:
414                     print >>sys.stderr,"bg: get_torrent_start_download: Restarting old Download in VOD mode"
415             
416             d = self.start_download(tdef,dlfile,poa,ic.get_supported_vod_events())
417             duser = {'uic':ic}
418             self.dusers[d] = duser
419         else:
420             # oldd is already running in VOD mode. If it's a VOD torrent we
421             # don't need to restart, we can just seek(0) on the stream.
422             # If it's a live torrent, we should tell EOF to any old IC and
423             # continue playback to the new IC where it left off.
424             #
425             duser = self.dusers[d]
426             olduic = duser['uic']
427             if olduic is not None:
428                 # Cleanup like a shutdown, but send STOP
429                 print >>sys.stderr,"bg: get_torrent_start_download: Telling old player to stop"
430                 olduic.cleanup_playback()
431                 olduic.stop()
432             duser['uic'] = ic
433             if 'streaminfo' not in duser:
434                 # Hasn't started playing yet, ignore.
435                 pass
436             else:
437                 # Already playing. Tell previous owner IC to quit, let new IC 
438                 # start either from start (VOD) or where previous left off 
439                 # (live).
440                 if not tdef.get_live():
441                     duser['streaminfo']['stream'].seek(0)
442                 ic.set_streaminfo(duser['streaminfo'])
443                 
444                 ic.start_playback(infohash)
445                 
446         duser['said_start_playback'] = False
447         duser['decodeprogress'] = 0
448         
449     #
450     # DownloadStates
451     #
452     def gui_states_callback(self,dslist,haspeerlist):
453         """ Override BaseApp """
454         #print >>sys.stderr,"bg: gui_states_callback",currentThread().getName()
455
456         (playing_dslist,totalhelping,totalspeed) = BaseApp.gui_states_callback(self,dslist,haspeerlist)
457         try:
458             self.report_periodic_vod_stats(playing_dslist)
459         except:
460             print_exc()
461        
462         for ds in playing_dslist:
463             d = ds.get_download()
464             duser = self.dusers[d]
465             uic = duser['uic']
466             if uic is not None:
467                 # Generate info string for all
468                 [topmsg,msg,duser['said_start_playback'],duser['decodeprogress']] = get_status_msgs(ds,self.approxplayerstate,self.appname,duser['said_start_playback'],duser['decodeprogress'],totalhelping,totalspeed)
469                 info = msg
470                 #if DEBUG:
471                 #    print >>sys.stderr, 'bg: 4INFO: Sending',info
472                 uic.info(info)
473             
474     def sesscb_vod_event_callback( self, d, event, params ):
475         """ Registered by BaseApp. Called by SessionCallbackThread """
476         wx.CallAfter(self.gui_vod_event_callback,d,event,params)
477         
478     def gui_vod_event_callback( self, d, event, params ):
479         if DEBUG:
480             print >>sys.stderr,"bg: gui_vod_event_callback: Event: ", event
481             print >>sys.stderr,"bg: gui_vod_event_callback: Params: ", params
482         if event == VODEVENT_START:
483             if params['filename']:
484                 stream = open( params['filename'], "rb" )
485             else:
486                 stream = params['stream']
487     
488             # Ric: small hack for the ogg mimetype (just for windows, 
489             # linux thinks it's an audio/ogg file)
490             if params['mimetype'] == 'video/x-ogg':
491                 params['mimetype'] = 'application/ogg'
492                 
493             # Arno: My Win7 thinks this is 'video/mpeg', so patch for that.  
494             selectedfiles = d.get_selected_files()
495             if selectedfiles is not None and len(selectedfiles) > 0:
496                 for fn in selectedfiles:
497                     if is_ogg(fn):
498                         params['mimetype'] = 'application/ogg'
499             else:
500                 name = d.get_def().get_name_as_unicode()
501                 if is_ogg(name):
502                     params['mimetype'] = 'application/ogg'
503                     
504                     if d.get_def().get_live():
505                         # Live Ogg stream. To support this we need to do
506                         # two things:
507                         # 1. Write Ogg headers (stored in .tstream)
508                         # 2. Find first Ogg page in stream.
509                         stream = OggMagicLiveStream(d.get_def(),stream)
510
511             if not d.get_def().get_live() and not params['filename']:
512                 # Arno, < 2010-08-10: Firefox reads aggressively, we just
513                 # give it data at bitrate pace such that we know when we
514                 # have to fallback to HTTP servers.
515                 #
516                 # 2010-08-10: not when file complete on disk ;-)
517                 stream = AtBitrateStream( stream, params['bitrate'] )
518
519             
520             blocksize = d.get_def().get_piece_length()
521             #Ric: add svc on streaminfo, added bitrate
522             streaminfo = { 'mimetype': params['mimetype'], 'stream': stream, 'length': params['length'], 'blocksize':blocksize, 'svc': d.get_mode() == DLMODE_SVC, 'bitrate': params['bitrate'] }
523
524             duser = self.dusers[d]
525             duser['streaminfo'] = streaminfo
526             if duser['uic'] is not None:
527                 # Only if playback wasn't canceled since starting
528                 duser['uic'].set_streaminfo(duser['streaminfo'])
529                 duser['uic'].start_playback(d.get_def().get_infohash())
530             
531                 self.approxplayerstate = MEDIASTATE_PLAYING
532             else:
533                 self.approxplayerstate = MEDIASTATE_STOPPED
534             
535         elif event == VODEVENT_PAUSE:
536             duser = self.dusers[d]
537             if duser['uic'] is not None:
538                 duser['uic'].pause()
539             self.approxplayerstate = MEDIASTATE_PAUSED
540             
541         elif event == VODEVENT_RESUME:
542             duser = self.dusers[d]
543             if duser['uic'] is not None:
544                 duser['uic'].resume()
545             self.approxplayerstate = MEDIASTATE_PLAYING
546
547
548     def get_supported_vod_events(self):
549         # See BGInstanceConnection.set_supported_vod_events() too.
550         return [ VODEVENT_START, VODEVENT_PAUSE, VODEVENT_RESUME ]
551
552     #
553     # VideoServer status/error reporting
554     #
555     def videoservthread_error_callback(self,e,url):
556         """ Called by HTTP serving thread """
557         wx.CallAfter(self.videoserver_error_guicallback,e,url)
558         
559     def videoserver_error_guicallback(self,e,url):
560         print >>sys.stderr,"bg: Video server reported error",str(e)
561         #    self.show_error(str(e))
562         pass
563         # ARNOTODO: schedule current Download for removal?
564
565     def videoservthread_set_status_callback(self,status):
566         """ Called by HTTP serving thread """
567         wx.CallAfter(self.videoserver_set_status_guicallback,status)
568
569     def videoserver_set_status_guicallback(self,status):
570         #print >>sys.stderr,"bg: Video server sets status callback",status
571         # ARNOTODO: Report status to plugin
572         pass
573
574     #
575     # reports vod stats collected periodically
576     #
577     def report_periodic_vod_stats(self,playing_dslist):
578         #print >>sys.stderr, "VOD Stats"
579         self.counter += 1
580         if self.counter%self.interval == 0:
581             event_reporter = Status.get_status_holder("LivingLab")
582             if event_reporter is not None:
583                 for ds in playing_dslist:
584                     dw = ds.get_download()
585                     b64_infohash = b64encode(dw.get_def().get_infohash())
586                     vod_stats = ds.get_vod_stats()
587                     #if vod_stats_has_key("prebuf"): event_reporter.add_event(b64_infohash, "prebufp:%d" % vod_stats['prebuf']) # prebuffering time that was needed
588                     if vod_stats.has_key("stall"): event_reporter.create_and_add_event("stall", [b64_infohash, vod_stats['stall']]) # time the player stalled
589                     if vod_stats.has_key("late"): event_reporter.create_and_add_event("late", [b64_infohash, vod_stats['late']]) # number of pieces arrived after they were due
590                     if vod_stats.has_key("dropped"): event_reporter.create_and_add_event("dropped", [b64_infohash, vod_stats['dropped']]) # number of pieces lost
591                     if vod_stats.has_key("pos"): event_reporter.create_and_add_event("pos", [b64_infohash, vod_stats['pos']]) # playback position
592
593     def gui_webui_remove_download(self,d2remove):
594         """ Called when user has decided to remove a specific DL via webUI """
595         if DEBUG:
596             print >>sys.stderr,"bg: gui_webui_remove_download"
597         self.gui_webui_halt_download(d2remove,stop=False)
598
599
600     def gui_webui_stop_download(self,d2stop):
601         """ Called when user has decided to stop a specific DL via webUI """
602         if DEBUG:
603             print >>sys.stderr,"bg: gui_webui_stop_download"
604         self.gui_webui_halt_download(d2stop,stop=True)
605         
606         
607     def gui_webui_restart_download(self,d2restart):
608         """ Called when user has decided to restart a specific DL via webUI for sharing """
609         duser = {'uic':None}
610         self.dusers[d2restart] = duser
611         d2restart.restart()
612
613
614     def gui_webui_halt_download(self,d2halt,stop=False):
615         """ Called when user has decided to stop or remove a specific DL via webUI.
616         For stop the Download is not removed. """
617         if d2halt in self.dusers:
618             try:
619                 duser = self.dusers[d2halt]
620                 olduic = duser['uic'] 
621                 if olduic is not None:
622                     print >>sys.stderr,"bg: gui_webui_halt_download: Oops, someone interested, removing anyway"
623                     olduic.shutdown()
624                 if 'streaminfo' in duser:
625                     # Download was already playing, clean up.
626                     stream = duser['streaminfo']['stream']
627                     stream.close() # Close original stream.
628             finally: 
629                 del self.dusers[d2halt]
630         if stop:
631             BaseApp.stop_playing_download(self,d2halt)
632         else:
633             BaseApp.remove_playing_download(self,d2halt)
634
635
636     def gui_webui_remove_all_downloads(self,ds2remove):
637         """ Called when user has decided to remove all DLs via webUI """
638         if DEBUG:
639             print >>sys.stderr,"bg: gui_webui_remove_all_downloads"
640             
641         for d2remove in ds2remove:
642             self.gui_webui_halt_download(d2remove,stop=False)
643             
644             
645     def gui_webui_stop_all_downloads(self,ds2stop):
646         """ Called when user has decided to stop all DLs via webUI """
647         if DEBUG:
648             print >>sys.stderr,"bg: gui_webui_stop_all_downloads"
649             
650         for d2stop in ds2stop:
651             self.gui_webui_halt_download(d2stop,stop=True)
652
653
654     def gui_webui_restart_all_downloads(self,ds2restart):
655         """ Called when user has decided to restart all DLs via webUI """
656         if DEBUG:
657             print >>sys.stderr,"bg: gui_webui_restart_all_downloads"
658             
659         for d2restart in ds2restart:
660             self.gui_webui_restart_download(d2restart)
661
662     def i2i_kill_on_browser_gone(self):
663         resched = True
664         try:
665             lastt = self.webIFmapper.lastreqtime
666             
667             print >>sys.stderr,"bg: Test for self destruct: idle",time.time()-lastt,currentThread().getName()
668             
669             if time.time() - IDLE_BEFORE_SELFKILL > lastt:
670                 if self.iseedeadpeople:
671                     print >>sys.stderr,"bg: SHOULD HAVE self destructed, hardcore stylie"
672                     resched = False
673                     #os._exit(0)
674                 else:
675                     print >>sys.stderr,"bg: SHOULD HAVE self destructed"
676                     self.iseedeadpeople = True
677                     # No sign of life from statusbar, self destruct 
678                     #wx.CallAfter(self.ExitMainLoop)            
679         finally:
680             if resched:
681                 self.i2is.add_task(self.i2i_kill_on_browser_gone,IDLE_BEFORE_SELFKILL/2)
682
683
684 class BGInstanceConnection(InstanceConnection):
685     
686     def __init__(self,singsock,connhandler,readlinecallback,videoHTTPServer):
687         InstanceConnection.__init__(self, singsock, connhandler, readlinecallback)
688         
689         self.bgapp = connhandler
690         self.videoHTTPServer = videoHTTPServer
691         self.urlpath = None
692         self.cstreaminfo = {}
693         self.shutteddown = False
694         self.supportedvodevents = [VODEVENT_START,VODEVENT_PAUSE,VODEVENT_RESUME]
695
696
697     def set_streaminfo(self,streaminfo):
698         """ Copy streaminfo contents and replace stream with a ControlledStream """
699         """
700         For each IC we create separate stream object and a unique path in the 
701         HTTP server. This avoids nasty thread synchronization with the server
702         when a new IC wants to play the same content. The Tribler Core stream
703         does not allow multiple readers. This means we would have to stop
704         the HTTP server from writing the stream to the old IC, before we
705         can allow the new IC to read.
706         
707         We solved this as follows. The original Tribler Core stream is
708         wrapped in a ControlledStream, one for each IC. When a new IC 
709         wants to play we tell the old IC's ControlledStream to generate
710         an EOF to the HTTP server, and tell the old IC to SHUTDOWN. We
711         then either rewind the Tribler Core stream (VOD) or leave it (live)
712         and tell the new IC to PLAY. The new ControlledStream will then
713         be read by the HTTP server again.
714         """
715         self.cstreaminfo.update(streaminfo)
716         stream = streaminfo['stream']
717         cstream = ControlledStream( stream )
718         self.cstreaminfo['stream'] = cstream
719
720     def start_playback(self,infohash):
721         """ Register cstream with HTTP server and tell IC to start reading """
722         
723         self.urlpath = URLPATH_CONTENT_PREFIX+'/'+infohash2urlpath(infohash)+'/'+str(random.random())
724
725         self.videoHTTPServer.set_inputstream(self.cstreaminfo,self.urlpath)
726         
727         if DEBUG:
728             print >> sys.stderr, "bg: Telling plugin to start playback of",self.urlpath
729         
730         self.write( 'PLAY '+self.get_video_url()+'\r\n' )
731
732     def cleanup_playback(self):
733         if DEBUG:
734             print >>sys.stderr,'bg: ic: cleanup'
735         # Cause HTTP server thread to receive EOF on inputstream
736         if len(self.cstreaminfo) != 0:
737             self.cstreaminfo['stream'].close()
738             try:
739                 # TODO: get rid of del_inputstream lock
740                 # Arno, 2009-12-11: Take this out of critical path on MainThread
741                 http_del_inputstream_lambda = lambda:self.videoHTTPServer.del_inputstream(self.urlpath)
742                 self.bgapp.tqueue.add_task(http_del_inputstream_lambda,0) 
743             except:
744                 print_exc()
745         
746
747     def get_video_url(self):
748         return 'http://127.0.0.1:'+str(self.videoHTTPServer.get_port())+self.urlpath
749
750     def pause(self):
751         self.write( 'PAUSE\r\n' )
752         
753     def resume(self):
754         self.write( 'RESUME\r\n' )
755
756     def info(self,infostr):
757         self.write( 'INFO '+infostr+'\r\n' )        
758
759     # Arno, 2010-05-28: Convey the BGprocess won't be able to serve the content
760     def error(self,infostr):
761         self.write( 'ERROR '+infostr+'\r\n' )        
762
763     # Arno, 2010-05-27: Stop playback
764     def stop(self):
765         # Stop playback
766         self.write( 'STOP\r\n' )
767
768     def shutdown(self):
769         # SHUTDOWN Service
770         if DEBUG:
771             print >>sys.stderr,'bg: ic: shutdown'
772         if not self.shutteddown:
773             self.shutteddown = True
774             self.cleanup_playback()
775             
776             self.write( 'SHUTDOWN\r\n' )
777             # Will cause BaseApp.connection_lost() to be called, where we'll
778             # handle what to do about the Download that was started for this
779             # IC.
780             try:
781                 self.close()
782             except:
783                 print_exc()
784
785     def get_supported_vod_events(self):
786         return self.supportedvodevents
787     
788     def set_supported_vod_events(self,eventlist):
789         self.supportedvodevents = eventlist
790
791
792 class ControlledStream:
793     """ A file-like object that throws EOF when closed, without actually closing
794     the underlying inputstream. See BGInstanceConnection.set_streaminfo() for
795     an explanation on how this is used. 
796     """
797     def __init__(self,stream):
798         self.stream = stream
799         self.done = False # Event()
800         
801     def read(self,nbytes=None):
802         if not self.done:
803             return self.stream.read(nbytes)
804         else:
805             return '' # EOF
806
807     def seek(self,pos,whence=os.SEEK_SET):
808         self.stream.seek(pos,whence)
809         
810     def close(self):
811         self.done = True
812         # DO NOT close original stream
813
814 class AtBitrateStream:
815     """ Give from playback position plus a safe margin at video bitrate speed.
816         On seeking resync the playback position and the safe margin.
817     """
818
819     # Give at bitrate speed policy: give from playback position + SAFE_MARGIN_TIME
820     # at bitrate speed during STREAM_STATE_PLAYING, give at full speed during
821     # STREAM_STATE_PREBUFFER. STREAM_STATE_TRANSITION indicates that the playback has
822     # to start or that the user just seeked.
823
824     # Safe buffer size in seconds
825     SAFE_MARGIN_TIME = 10.0  # same as VideoOnDemand.py
826
827     # Increment the bitrate by percentage (give more bandwidth to the player).
828     BITRATE_SPEED_INCREMENT = 1.05 # +5%
829
830     # Streaming status
831     STREAM_STATE_TRANSITION = 0
832     STREAM_STATE_PREBUFFER  = 1
833     STREAM_STATE_PLAYING    = 2
834
835     def __init__( self, stream, bitrate ):
836         self.stream = stream
837         self.done = False # Event()
838         self.bitrate = bitrate
839         self.safe_bytes = self.SAFE_MARGIN_TIME * bitrate
840         self.stream_state = self.STREAM_STATE_TRANSITION
841         self.last_time = 0.0
842         self.playback = 0.0
843         self.given_bytes_till = 0
844
845     def has_to_sleep( self, nbytes ):
846         curr_time = time.time()
847         if self.stream_state is self.STREAM_STATE_TRANSITION:
848             self.last_time = curr_time
849             elapsed_time = 0.0
850             self.stream_state = self.STREAM_STATE_PREBUFFER
851         else:
852             elapsed_time = curr_time - self.last_time
853             self.last_time = curr_time
854
855         self.playback += elapsed_time * self.BITRATE_SPEED_INCREMENT
856         if self.stream_state is self.STREAM_STATE_PREBUFFER:
857             played_bytes = self.playback * self.bitrate
858             if played_bytes + self.safe_bytes <= self.given_bytes_till:
859                 self.stream_state = self.STREAM_STATE_PLAYING
860             self.given_bytes_till += nbytes
861             return 0.0
862         else:
863             delta_time = ( self.given_bytes_till / float( self.bitrate ) ) - ( self.playback + self.SAFE_MARGIN_TIME )
864             if delta_time <= 0.0:
865                 self.stream_state = self.STREAM_STATE_PREBUFFER
866             self.given_bytes_till += nbytes
867             return max( 0.0, delta_time )
868
869     def read(self,nbytes=None):
870         if not self.done:
871             to_give = self.stream.read( nbytes )
872             sleep_time = self.has_to_sleep( nbytes )
873             #print >>sys.stderr,"DIEGO DEBUG : SLEEP_time", sleep_time
874             if sleep_time > 0.0:
875                 time.sleep( sleep_time )
876             return to_give
877         else:
878             return '' # EOF
879
880     def seek(self,pos,whence=os.SEEK_SET):
881         self.stream.seek(pos,whence)
882         self.stream_state = self.STREAM_STATE_TRANSITION
883         self.given_bytes_till = pos
884         self.playback = pos / float( self.bitrate )
885         
886     def close(self):
887         self.done = True
888         # DO NOT close original stream
889
890
891 ##############################################################
892 #
893 # Main Program Start Here
894 #
895 ##############################################################
896 def run_bgapp(appname,appversion,i2iport,sessport,httpport, params = None,killonidle=False):
897     """ Set sys.argv[1] to "--nopause" to inform the Core that the player
898     doesn't support VODEVENT_PAUSE, e.g. the SwarmTransport.
899     """ 
900     if params is None:
901         params = [""]
902     
903     if len(sys.argv) > 1:
904         params = sys.argv[1:]
905
906     global KILLONIDLE
907     KILLONIDLE = killonidle
908
909     """
910     # Create single instance semaphore
911     # Arno: On Linux and wxPython-2.8.1.1 the SingleInstanceChecker appears
912     # to mess up stderr, i.e., I get IOErrors when writing to it via print_exc()
913     #
914     if sys.platform != 'linux2':
915         single_instance_checker = wx.SingleInstanceChecker(appname+"-"+ wx.GetUserId())
916     else:
917         single_instance_checker = LinuxSingleInstanceChecker(appname)
918     """
919     # Arno, 2010-03-05: This is a vital print that must not be removed, otherwise
920     # the program will just say "15:29:02: Deleted stale lock file '/home/arno/SwarmPlugin-arno'"
921     # and exit after a restart of the instance :-(
922     #
923     print >>sys.stderr,"bg: Test if already running"
924     single_instance_checker = wx.SingleInstanceChecker(appname+"-"+ wx.GetUserId())
925     if single_instance_checker.IsAnotherRunning():
926         print >>sys.stderr,"bg: Already running, exit"
927         os._exit(0)
928
929     arg0 = sys.argv[0].lower()
930     if arg0.endswith('.exe'):
931         installdir = os.path.abspath(os.path.dirname(sys.argv[0]))
932     else:
933         installdir = os.getcwd()  
934
935     # Launch first single instance
936     app = BackgroundApp(0, appname, appversion, params, single_instance_checker, installdir, i2iport, sessport, httpport)
937     s = app.s
938
939     # Enable P2P-Next ULANC logging.
940     if PHONEHOME: 
941         status = Status.get_status_holder("LivingLab")
942         id = encodestring(s.get_permid()).replace("\n","")
943         reporter = LivingLabReporter.LivingLabPeriodicReporter("Living lab CS reporter", 300, id) # Report every 5 minutes 
944         status.add_reporter(reporter)
945
946     app.MainLoop()
947
948     if PHONEHOME:
949         reporter.stop()
950
951     print >>sys.stderr,"Sleeping seconds to let other threads finish"
952     time.sleep(2)
953
954     if not ALLOW_MULTIPLE:
955         del single_instance_checker
956         
957     # Ultimate catchall for hanging popen2's and what not
958     os._exit(0)
959