instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / Video / VideoSource.py
1 # written by Jan David Mol
2 # see LICENSE.txt for license information
3 #
4 # Represent a source of video (other than a BitTorrent swarm), which can inject
5 # pieces into the downloading engine.
6
7 # We assume we are the sole originator of these pieces, i.e. none of the pieces
8 # injected are already obtained from another source or requested from some peer.
9
10 import sys
11 from threading import RLock,Thread
12 from traceback import print_exc
13 from time import sleep
14 from BaseLib.Core.BitTornado.BT1.PiecePicker import PiecePicker
15 from BaseLib.Core.simpledefs import *
16 from BaseLib.Core.Video.LiveSourceAuth import NullAuthenticator,ECDSAAuthenticator,RSAAuthenticator
17 from BaseLib.Core.Utilities.Crypto import sha
18
19
20 DEBUG = True
21
22 class SimpleThread(Thread):
23     """ Wraps a thread around a single function. """
24
25     def __init__(self,runfunc):
26         Thread.__init__(self)
27         self.setDaemon(True)
28         self.setName("VideoSourceSimple"+self.getName())
29         self.runfunc = runfunc
30
31     def run(self):
32         self.runfunc()
33
34
35 class VideoSourceTransporter:
36     """ Reads data from an external source and turns it into BitTorrent chunks. """
37
38     def __init__(self, stream, bt1download, authconfig,restartstatefilename):
39         self.stream = stream
40         self.bt1download = bt1download
41         self.restartstatefilename = restartstatefilename
42         self.exiting = False
43
44         # shortcuts to the parts we use
45         self.storagewrapper = bt1download.storagewrapper
46         self.picker = bt1download.picker
47         self.rawserver = bt1download.rawserver
48         self.connecter = bt1download.connecter
49         self.fileselector = bt1download.fileselector
50
51         # generic video information
52         self.videostatus = bt1download.videostatus
53
54         # buffer to accumulate video data
55         self.buffer = []
56         self.buflen = 0
57         self.bufferlock = RLock()
58         self.handling_pieces = False
59         self.readlastseqnum = False
60
61         # LIVESOURCEAUTH
62         if authconfig.get_method() == LIVE_AUTHMETHOD_ECDSA:
63             self.authenticator = ECDSAAuthenticator(self.videostatus.piecelen,self.bt1download.len_pieces,keypair=authconfig.get_keypair())
64         elif authconfig.get_method() == LIVE_AUTHMETHOD_RSA:
65             self.authenticator = RSAAuthenticator(self.videostatus.piecelen,self.bt1download.len_pieces,keypair=authconfig.get_keypair())
66         else:
67             self.authenticator = NullAuthenticator(self.videostatus.piecelen,self.bt1download.len_pieces)
68
69     def start(self):
70         """ Start transporting data. """
71
72         self.input_thread_handle = SimpleThread(self.input_thread)
73         self.input_thread_handle.start()
74
75     def _read(self,length):
76         """ Called by input_thread. """
77         return self.stream.read(length)
78
79     def input_thread(self):
80         """ A thread reading the stream and buffering it. """
81
82         print >>sys.stderr,"VideoSource: started input thread"
83
84         # we can't set the playback position from this thread, so
85         # we assume all pieces are vs.piecelen in size.
86
87         contentbs = self.authenticator.get_content_blocksize()
88         try:
89             while not self.exiting:
90                 data = self._read(contentbs)
91                 if not data:
92                     break
93
94                 if DEBUG:
95                     print >>sys.stderr,"VideoSource: read %d bytes" % len(data)
96
97                 self.process_data(data)
98         except IOError:
99             if DEBUG:
100                 print_exc()
101
102         self.shutdown()
103
104     def shutdown(self):
105         """ Stop transporting data. """
106
107         print >>sys.stderr,"VideoSource: shutting down"
108
109         if self.exiting:
110             return
111
112         self.exiting = True
113
114         try:
115             self.stream.close()
116         except IOError:
117             # error on closing, nothing we can do
118             pass
119
120     def process_data(self,data):
121         """ Turn data into pieces and queue them for insertion. """
122         """ Called by input thread. """
123
124         vs = self.videostatus
125
126         self.bufferlock.acquire()
127         try:
128             # add data to buffer
129             self.buffer.append( data )
130             self.buflen += len( data )
131
132             if not self.handling_pieces:
133                 # signal to network thread that data has arrived
134                 self.rawserver.add_task( self.create_pieces )
135                 self.handling_pieces = True
136         finally:
137             self.bufferlock.release()
138
139     def create_pieces(self):
140         """ Process the buffer and create pieces when possible.
141         Called by network thread """
142
143         def handle_one_piece():
144             vs = self.videostatus
145
146             # LIVESOURCEAUTH
147             # Arno: make room for source auth info
148             contentbs = self.authenticator.get_content_blocksize()
149             
150             if self.buflen < contentbs:
151                 return False
152
153             if len(self.buffer[0]) == contentbs:
154                 content = self.buffer[0]
155                 del self.buffer[0]
156             else:
157                 if DEBUG:
158                     print >>sys.stderr,"VideoSource: JOIN ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
159                 buffer = "".join(self.buffer)
160                 self.buffer = [buffer[contentbs:]]
161                 content = buffer[:contentbs]
162             self.buflen -= contentbs
163             
164             datas = self.authenticator.sign(content)
165
166             piece = "".join(datas)
167             
168             # add new piece
169             self.add_piece(vs.playback_pos,piece)
170
171             # invalidate old piece
172             self.del_piece( vs.live_piece_to_invalidate() )
173
174             try:
175                 lastseqnum = self.authenticator.get_source_seqnum()
176                 f = open(self.restartstatefilename,"wb")
177                 f.write(str(lastseqnum))
178                 f.close()
179             except:
180                 print_exc()
181
182             # advance pointer
183             vs.inc_playback_pos()
184             
185             return True
186
187         if not self.readlastseqnum:
188             self.readlastseqnum = True
189             try:
190                 f = open(self.restartstatefilename,"rb")
191                 data = f.read()
192                 f.close()
193                 lastseqnum = int(data)
194                 
195                 print >>sys.stderr,"VideoSource: Restarting stream at abs.piece",lastseqnum
196                 
197                 # Set playback pos of source and absolute piece nr.
198                 lastpiecenum = lastseqnum % self.authenticator.get_npieces()
199                 self.authenticator.set_source_seqnum(lastseqnum+1L)
200                 
201                 self.videostatus.set_live_startpos(lastpiecenum)
202                 self.videostatus.inc_playback_pos()
203             except:
204                 print_exc()
205             
206         self.bufferlock.acquire()
207         try:
208             while handle_one_piece():
209                 pass
210
211             self.handling_pieces = False
212         finally:
213             self.bufferlock.release()
214
215     def add_piece(self,index,piece):
216         """ Push one piece into the BitTorrent system. """
217
218         # Modelled after BitTornado.BT1.Downloader.got_piece
219         # We don't need most of that function, since this piece
220         # was never requested from another peer.
221
222         if DEBUG:
223             print >>sys.stderr,"VideoSource: created piece #%d" % index
224             # ECDSA
225             #print >>sys.stderr,"VideoSource: sig",`piece[-64:]`
226             #print >>sys.stderr,"VideoSource: dig",sha(piece[:-64]).hexdigest()
227             # RSA, 768 bits
228             #print >>sys.stderr,"VideoSource: sig",`piece[-96:]`
229             #print >>sys.stderr,"VideoSource: dig",sha(piece[:-112]).hexdigest()
230
231
232         # act as if the piece was requested and just came in
233         # do this in chunks, as StorageWrapper expects to handle
234         # a request for each chunk
235         chunk_size = self.storagewrapper.request_size
236         length = min( len(piece), self.storagewrapper._piecelen(index) )
237         x = 0
238         while x < length:
239             self.storagewrapper.new_request( index )
240             self.storagewrapper.piece_came_in( index, x, [], piece[x:x+chunk_size], min(chunk_size,length-x) )
241             x += chunk_size
242
243         # also notify the piecepicker
244         self.picker.complete( index )
245
246         # notify our neighbours
247         self.connecter.got_piece( index )
248
249     def del_piece(self,piece):
250         if DEBUG:
251             print >>sys.stderr,"VideoSource: del_piece",piece
252         # See Tribler/Core/Video/VideoOnDemand.py, live_invalidate_piece_globally
253         self.picker.invalidate_piece(piece)
254         self.picker.downloader.live_invalidate(piece)
255
256
257 class RateLimitedVideoSourceTransporter(VideoSourceTransporter):
258     """ Reads from the stream at a certain byte rate.
259
260         Useful for creating live streams from file. """
261
262     def __init__( self, ratelimit, *args, **kwargs ):
263         """@param ratelimit: maximum rate in bps"""
264         VideoSourceTransporter.__init__( self, *args, **kwargs )
265
266         self.ratelimit = int(ratelimit)
267
268     def _read(self,length):
269         # assumes reads and processing data is instant, so
270         # we know how long to sleep
271         sleep(1.0 * length / self.ratelimit)
272         return VideoSourceTransporter._read(self,length)
273
274
275 class PiecePickerSource(PiecePicker):
276     """ A special piece picker for the source, which never
277         picks any pieces. Used to prevent the injection
278         of corrupted pieces at the source. """
279
280     def next(self,*args,**kwargs):
281         # never pick any pieces
282         return None
283
284