1 # written by Jan David Mol
2 # see LICENSE.txt for license information
4 # Represent a source of video (other than a BitTorrent swarm), which can inject
5 # pieces into the downloading engine.
7 # We assume we are the sole originator of these pieces, i.e. none of the pieces
8 # injected are already obtained from another source or requested from some peer.
11 from threading import RLock,Thread
12 from traceback import print_exc
13 from time import sleep
14 from BaseLib.Core.BitTornado.BT1.PiecePicker import PiecePicker
15 from BaseLib.Core.simpledefs import *
16 from BaseLib.Core.Video.LiveSourceAuth import NullAuthenticator,ECDSAAuthenticator,RSAAuthenticator
17 from BaseLib.Core.Utilities.Crypto import sha
22 class SimpleThread(Thread):
23 """ Wraps a thread around a single function. """
25 def __init__(self,runfunc):
28 self.setName("VideoSourceSimple"+self.getName())
29 self.runfunc = runfunc
35 class VideoSourceTransporter:
36 """ Reads data from an external source and turns it into BitTorrent chunks. """
38 def __init__(self, stream, bt1download, authconfig,restartstatefilename):
40 self.bt1download = bt1download
41 self.restartstatefilename = restartstatefilename
44 # shortcuts to the parts we use
45 self.storagewrapper = bt1download.storagewrapper
46 self.picker = bt1download.picker
47 self.rawserver = bt1download.rawserver
48 self.connecter = bt1download.connecter
49 self.fileselector = bt1download.fileselector
51 # generic video information
52 self.videostatus = bt1download.videostatus
54 # buffer to accumulate video data
57 self.bufferlock = RLock()
58 self.handling_pieces = False
59 self.readlastseqnum = False
62 if authconfig.get_method() == LIVE_AUTHMETHOD_ECDSA:
63 self.authenticator = ECDSAAuthenticator(self.videostatus.piecelen,self.bt1download.len_pieces,keypair=authconfig.get_keypair())
64 elif authconfig.get_method() == LIVE_AUTHMETHOD_RSA:
65 self.authenticator = RSAAuthenticator(self.videostatus.piecelen,self.bt1download.len_pieces,keypair=authconfig.get_keypair())
67 self.authenticator = NullAuthenticator(self.videostatus.piecelen,self.bt1download.len_pieces)
70 """ Start transporting data. """
72 self.input_thread_handle = SimpleThread(self.input_thread)
73 self.input_thread_handle.start()
75 def _read(self,length):
76 """ Called by input_thread. """
77 return self.stream.read(length)
79 def input_thread(self):
80 """ A thread reading the stream and buffering it. """
82 print >>sys.stderr,"VideoSource: started input thread"
84 # we can't set the playback position from this thread, so
85 # we assume all pieces are vs.piecelen in size.
87 contentbs = self.authenticator.get_content_blocksize()
89 while not self.exiting:
90 data = self._read(contentbs)
95 print >>sys.stderr,"VideoSource: read %d bytes" % len(data)
97 self.process_data(data)
105 """ Stop transporting data. """
107 print >>sys.stderr,"VideoSource: shutting down"
117 # error on closing, nothing we can do
120 def process_data(self,data):
121 """ Turn data into pieces and queue them for insertion. """
122 """ Called by input thread. """
124 vs = self.videostatus
126 self.bufferlock.acquire()
129 self.buffer.append( data )
130 self.buflen += len( data )
132 if not self.handling_pieces:
133 # signal to network thread that data has arrived
134 self.rawserver.add_task( self.create_pieces )
135 self.handling_pieces = True
137 self.bufferlock.release()
139 def create_pieces(self):
140 """ Process the buffer and create pieces when possible.
141 Called by network thread """
143 def handle_one_piece():
144 vs = self.videostatus
147 # Arno: make room for source auth info
148 contentbs = self.authenticator.get_content_blocksize()
150 if self.buflen < contentbs:
153 if len(self.buffer[0]) == contentbs:
154 content = self.buffer[0]
158 print >>sys.stderr,"VideoSource: JOIN ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
159 buffer = "".join(self.buffer)
160 self.buffer = [buffer[contentbs:]]
161 content = buffer[:contentbs]
162 self.buflen -= contentbs
164 datas = self.authenticator.sign(content)
166 piece = "".join(datas)
169 self.add_piece(vs.playback_pos,piece)
171 # invalidate old piece
172 self.del_piece( vs.live_piece_to_invalidate() )
175 lastseqnum = self.authenticator.get_source_seqnum()
176 f = open(self.restartstatefilename,"wb")
177 f.write(str(lastseqnum))
183 vs.inc_playback_pos()
187 if not self.readlastseqnum:
188 self.readlastseqnum = True
190 f = open(self.restartstatefilename,"rb")
193 lastseqnum = int(data)
195 print >>sys.stderr,"VideoSource: Restarting stream at abs.piece",lastseqnum
197 # Set playback pos of source and absolute piece nr.
198 lastpiecenum = lastseqnum % self.authenticator.get_npieces()
199 self.authenticator.set_source_seqnum(lastseqnum+1L)
201 self.videostatus.set_live_startpos(lastpiecenum)
202 self.videostatus.inc_playback_pos()
206 self.bufferlock.acquire()
208 while handle_one_piece():
211 self.handling_pieces = False
213 self.bufferlock.release()
215 def add_piece(self,index,piece):
216 """ Push one piece into the BitTorrent system. """
218 # Modelled after BitTornado.BT1.Downloader.got_piece
219 # We don't need most of that function, since this piece
220 # was never requested from another peer.
223 print >>sys.stderr,"VideoSource: created piece #%d" % index
225 #print >>sys.stderr,"VideoSource: sig",`piece[-64:]`
226 #print >>sys.stderr,"VideoSource: dig",sha(piece[:-64]).hexdigest()
228 #print >>sys.stderr,"VideoSource: sig",`piece[-96:]`
229 #print >>sys.stderr,"VideoSource: dig",sha(piece[:-112]).hexdigest()
232 # act as if the piece was requested and just came in
233 # do this in chunks, as StorageWrapper expects to handle
234 # a request for each chunk
235 chunk_size = self.storagewrapper.request_size
236 length = min( len(piece), self.storagewrapper._piecelen(index) )
239 self.storagewrapper.new_request( index )
240 self.storagewrapper.piece_came_in( index, x, [], piece[x:x+chunk_size], min(chunk_size,length-x) )
243 # also notify the piecepicker
244 self.picker.complete( index )
246 # notify our neighbours
247 self.connecter.got_piece( index )
249 def del_piece(self,piece):
251 print >>sys.stderr,"VideoSource: del_piece",piece
252 # See Tribler/Core/Video/VideoOnDemand.py, live_invalidate_piece_globally
253 self.picker.invalidate_piece(piece)
254 self.picker.downloader.live_invalidate(piece)
257 class RateLimitedVideoSourceTransporter(VideoSourceTransporter):
258 """ Reads from the stream at a certain byte rate.
260 Useful for creating live streams from file. """
262 def __init__( self, ratelimit, *args, **kwargs ):
263 """@param ratelimit: maximum rate in bps"""
264 VideoSourceTransporter.__init__( self, *args, **kwargs )
266 self.ratelimit = int(ratelimit)
268 def _read(self,length):
269 # assumes reads and processing data is instant, so
270 # we know how long to sleep
271 sleep(1.0 * length / self.ratelimit)
272 return VideoSourceTransporter._read(self,length)
275 class PiecePickerSource(PiecePicker):
276 """ A special piece picker for the source, which never
277 picks any pieces. Used to prevent the injection
278 of corrupted pieces at the source. """
280 def next(self,*args,**kwargs):
281 # never pick any pieces