cis notified web server of a job completion; upload form interface and validation...
[living-lab-site.git] / cis / cis.py
1 #!/usr/bin/env python
2
3 import sys
4 import os
5 import fnmatch
6 import shutil
7 import time
8 import threading
9 from Queue import Queue
10 import web
11 import json
12 from web.wsgiserver import CherryPyWSGIServer
13 import urllib
14
15 import config
16 import bt
17 import users
18 import logger
19 import cis_exceptions
20
21 if config.SECURITY:
22     CherryPyWSGIServer.ssl_certificate = "cacert.pem"
23     CherryPyWSGIServer.ssl_private_key = "privkey.pem"
24
25
26 class CIWorker(threading.Thread):
27     """
28     Content Ingestion Worker. A class which executes content ingestion jobs
29     on a separate thread.
30
31     CIWorker shares a Queue with its master where jobs are submitted.
32     """
33
34     def __init__(self):
35         """
36         Initialize Content Ingestion Worker.
37         """
38
39         threading.Thread.__init__(self, name='CIWorker')
40
41     def transfer_in(self, raw_video):
42         """
43         Transfers a raw video file from the Web Server.
44
45         @param raw_video raw video file name
46         """
47         
48         logger.log_msg('#%s: transfering in...' % self.job_id)
49         
50         file_transfer = config.FILE_TRANSFERER_CLASS( \
51                 config.RAW_VIDEOS_PATH, config.WS_UPLOAD_PATH)            
52         file_transfer.get([raw_video])
53         file_transfer.close()
54
55     def transcode(self, input_video, video_name, transcode_configs):
56         """
57         Transcodes a video in each requested formats.
58
59         @param input_video input video file name
60         @param video_name a video name which must be a valid file name
61         @param transcode_configs a list of dictionaries with format settings
62         """
63
64         logger.log_msg('#%s: transcoding...' % self.job_id)
65         
66         transcoder = config.TRANSCODER_CLASS( \
67                 input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \
68                 name = video_name, prog_bin = config.TRANSCODER_BIN)
69         transcoder.dest_path = config.MEDIA_PATH
70         
71         # Transcode the raw video in each requested format.
72         # TODO report partial errors
73         for transcode_config in transcode_configs:
74             transcode_config['output_file'] = \
75                     transcoder.transcode(**transcode_config)
76
77     def extract_thumbs(self, input_video, video_name, thumbs):
78         """
79         Extracts thumbnail images from a video.
80
81         @param input_video input video file name
82         @param video_name a video name which must be a valid file name
83         @param thumbs use 'random' to extract a thumbnail image from a random
84         point of the video or use a positive integer n to extract n summary
85         thumbnail
86         """
87
88         logger.log_msg('#%s: extracting image thumbnails...' % self.job_id)
89         
90         # TODO report partial errors
91         thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
92                 input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \
93                 name = video_name, \
94                 prog_bin = config.THUMB_EXTRACTOR_BIN)
95         thumb_extractor.dest_path = config.THUMBS_PATH
96         if thumbs == 'random':
97             thumb_extractor.extract_random_thumb()
98         elif type(thumbs) is int and thumbs > 0:
99             thumb_extractor.extract_summary_thumbs(thumbs)
100
101     def seed(self, transcode_configs):
102         """
103         Creates torrents from the videos passed and then stats seeding them.
104
105         @param transcode_configs a list of dictionaries with format settings
106         """
107         
108         logger.log_msg('#%s: creating torrents and starting seeding...' \
109                 % self.job_id)
110
111         for transcode_config in transcode_configs:
112             # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
113             # Create torrent file.
114             bt.create_torrent(transcode_config['output_file'])
115             
116             # The torrent file is created in the same directory with the
117             # source file. Move it to the torrents directory.
118             shutil.move(transcode_config['output_file'] + '.tstream', \
119                     config.TORRENTS_PATH)
120
121             output_file = transcode_config['output_file'] + '.tstream'
122             output_file = output_file[(output_file.rindex('/') + 1):]
123
124             # * SEED TORRENTS
125             Server.bit_torrent.start_torrent( \
126                     os.path.join(config.TORRENTS_PATH, output_file),
127                     config.MEDIA_PATH)
128                     
129     def transfer_out(self, local_files, local_path, remote_path):
130         """
131         Transfers some local files to a remote path of the Web Server.
132
133         @param local_files list local files to transfer
134         @param remote_path destination path on the Web Server
135         """
136         
137         logger.log_msg('#%s: transfering out...' % self.job_id)
138
139         file_transfer = config.FILE_TRANSFERER_CLASS( \
140                 local_path, remote_path)
141         file_transfer.put(local_files)
142         file_transfer.close()
143
144     def remove_files(self, files, path):
145         """
146         Deletes files from a specified path.
147         """
148         
149         logger.log_msg('#%s: cleaning up...' % self.job_id)
150
151         for f in files:
152             os.unlink(os.path.join(path, f))
153
154     def notify_completion(self):
155         logger.log_msg('#%s: notifying web server about the job completion...'\
156                 % self.job_id)
157         
158         f = urllib.urlopen(config.WS_COMPLETION)
159         f.read()
160     
161     def run(self):
162         while True:
163             job = Server.queue.get()
164             self.job_id = job['code']
165
166             # * TRANSFER RAW VIDEO IN
167             try:
168                 self.transfer_in(job['raw_video'])
169             except cis_exceptions.FileAlreadyExistsException as e:
170                 logger.log_msg('#%s: %s' \
171                         % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
172                 continue
173             except Exception as e:
174                 logger.log_msg('#%s: error while transferring in: %s' \
175                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
176                 continue
177
178             # * TRANSCODE RAW VIDEO
179             try:
180                 self.transcode(job['raw_video'], job['name'], \
181                         job['transcode_configs'])
182             except cis_exceptions.FileAlreadyExistsException as e:
183                 logger.log_msg('#%s: %s' \
184                         % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
185                 continue
186             except Exception as e:
187                 logger.log_msg('#%s: error while transcoding: %s' \
188                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
189                 continue
190
191             # * EXTRACT THUMBNAIL IMAGES
192             if job['thumbs'] != 0:
193                 try:
194                     self.extract_thumbs(job['raw_video'], job['name'], \
195                             job['thumbs'])
196                 except cis_exceptions.FileAlreadyExistsException as e:
197                     logger.log_msg('#%s: %s' \
198                             % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
199                     continue
200                 except Exception as e:
201                     logger.log_msg( \
202                             '#%s: error while extracting thumbnail images: %s' \
203                             % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
204                     continue
205
206             # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
207             self.seed(job['transcode_configs'])
208
209             # Torrent files.
210             files = [f for f in os.listdir(config.TORRENTS_PATH) \
211                     if os.path.isfile(os.path.join( \
212                             config.TORRENTS_PATH, f))]
213             torrent_files = fnmatch.filter(files, job['name'] + "_*")
214
215             # Thumbnail images files.
216             files = [f for f in os.listdir(config.THUMBS_PATH) \
217                     if os.path.isfile(os.path.join( \
218                             config.THUMBS_PATH, f))]
219             thumb_files = fnmatch.filter(files, job['name'] + "_*")
220
221             # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
222             try:
223                 self.transfer_out(torrent_files, config.TORRENTS_PATH, \
224                         config.WS_TORRENTS_PATH)
225                 self.transfer_out(thumb_files, config.THUMBS_PATH, \
226                         config.WS_THUMBS_PATH)
227             except Exception as e:
228                 logger.log_msg('#%s: error while transferring out: %s' \
229                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
230                 continue
231             
232             # * NOTIFY WEB SERVER ABOUT CONTENT INGESTION COMPLETION
233             # TODO in the future web server should also be notified about errors
234             try:
235                 self.notify_completion()
236             except Exception as e:
237                 logger.log_msg(
238                         '#%s: error while notifying web server about the job completion: %s' \
239                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
240                 continue
241             
242             # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
243             self.remove_files([ job['raw_video'] ], config.RAW_VIDEOS_PATH)
244             self.remove_files(thumb_files, config.THUMBS_PATH)
245
246             # * JOB FINISHED
247             Server.queue.task_done()
248             Server.load -= job['weight']
249
250
251 class Server:
252     """
253     Implementation of the RESTful web service which constitutes the interface
254     with the client (web server).
255     """
256
257     #def __init__(self):
258         #pass
259         
260     #def __del__(self):
261         #pass
262     
263     def GET(self, request):
264         #web.header('Cache-Control', 'no-cache')
265
266         if request == 'get_load':
267             resp = {"load": Server.load}
268             web.header('Content-Type', 'application/json')
269             return json.dumps(resp)
270         elif request == 'get_torrent_list':
271             resp = Server.bit_torrent.get_torrent_list()
272             web.header('Content-Type', 'application/json')
273             return json.dumps(resp)
274         #elif request == 'shutdown':
275             #exit(0)
276         elif request == 'test':
277             return ''
278         else:
279             web.badrequest()
280             return ''
281         
282
283     def POST(self, request):
284         if request == 'ingest_content':
285             # Read JSON parameters.
286             json_data = web.data()
287             data = json.loads(json_data)
288
289             # Authenticate user.
290             if config.SECURITY and \
291                     not self.authenticate(data["username"], data["password"]):
292                 return "Authentication failed!"
293
294             # Add job weight to CIS load.
295             Server.load += data["weight"]
296
297             # Submit job.
298             Server.queue.put(data)
299
300             return 'Job submitted.'
301         elif request == 'start_torrents':
302             # Read JSON parameters.
303             json_data = web.data()
304             data = json.loads(json_data)
305             
306             # TODO Verify data
307             Server.start_torrents(data)
308         elif request == 'stop_torrents':
309             # Read JSON parameters.
310             json_data = web.data()
311             data = json.loads(json_data)
312             
313             # TODO Verify data
314             Server.stop_torrents(data)
315         elif request == 'remove_torrents':
316             # Read JSON parameters.
317             json_data = web.data()
318             data = json.loads(json_data)
319             
320             # TODO Verify data
321             Server.stop_torrents(data, True)
322         else:
323             web.badrequest()
324             return ""
325     
326     @staticmethod
327     def start_torrents(torrents=None):
328         """
329         Scans torrent path for files in order to start download for the files
330         that are not already started.
331         """
332         
333         # All torrent files.
334         if torrents == None:
335             files = [f for f in os.listdir(config.TORRENTS_PATH) \
336                     if os.path.isfile(os.path.join( \
337                             config.TORRENTS_PATH, f))]
338             torrents = fnmatch.filter(files, "*.tstream")
339
340         for torrent_file in torrents:
341             Server.bit_torrent.start_torrent( \
342                     os.path.join(config.TORRENTS_PATH, torrent_file),
343                     config.MEDIA_PATH)
344     
345     @staticmethod
346     def stop_torrents(torrents, remove_content=False):
347         for torrent_file in torrents:
348             Server.bit_torrent.stop_torrent( \
349                     torrent_file, remove_content)
350
351     def authenticate(self, username, password):
352         if not config.SECURITY:
353             return True
354         if users.users[username] == password:
355             return True
356         else:
357             web.forbidden()
358             return False
359
360
361 if __name__ == '__main__':
362     # The BitTorrent object implements a NextShare (Tribler) BitTorrent
363     # client for seeding, downloading etc.
364     Server.bit_torrent = bt.BitTorrent()
365     Server.queue = Queue()
366     Server.load = 0
367     
368     Server.start_torrents()
369     t = threading.Timer(config.START_DOWNLOADS_INTERVAL, \
370             Server.start_torrents)
371     t.daemon = True
372     t.start()
373     
374     # Worker thread.
375     ci_worker = CIWorker()
376     ci_worker.daemon = True
377     ci_worker.start()
378
379     # Web service.
380     urls = ('/(.*)', 'Server')
381     app = web.application(urls, globals())
382     app.run()