user gets notified interatively or by email in case of a CIS error
[living-lab-site.git] / cis / cis.py
1 #!/usr/bin/env python
2
3 import sys
4 import os
5 import fnmatch
6 import shutil
7 import time
8 import threading
9 from Queue import Queue
10 import web
11 import json
12 from web.wsgiserver import CherryPyWSGIServer
13 import urllib
14
15 import config
16 import bt
17 import users
18 import logger
19 import cis_exceptions
20
21 if config.SECURITY:
22     CherryPyWSGIServer.ssl_certificate = "cacert.pem"
23     CherryPyWSGIServer.ssl_private_key = "privkey.pem"
24
25
26 class CIWorker(threading.Thread):
27     """
28     Content Ingestion Worker. A class which executes content ingestion jobs
29     on a separate thread.
30
31     CIWorker shares a Queue with its master where jobs are submitted.
32     """
33
34     def __init__(self):
35         """
36         Initialize Content Ingestion Worker.
37         """
38
39         threading.Thread.__init__(self, name='CIWorker')
40
41     def transfer_in(self, raw_video):
42         """
43         Transfers a raw video file from the Web Server.
44
45         @param raw_video raw video file name
46         """
47         
48         logger.log_msg('#%s: transfering in...' % self.job_id)
49         
50         file_transfer = config.FILE_TRANSFERER_CLASS( \
51                 config.RAW_VIDEOS_PATH, config.WS_UPLOAD_PATH)            
52         file_transfer.get([raw_video])
53         file_transfer.close()
54
55     def transcode(self, input_video, video_name, transcode_configs):
56         """
57         Transcodes a video in each requested formats.
58
59         @param input_video input video file name
60         @param video_name a video name which must be a valid file name
61         @param transcode_configs a list of dictionaries with format settings
62         """
63
64         logger.log_msg('#%s: transcoding...' % self.job_id)
65         
66         transcoder = config.TRANSCODER_CLASS( \
67                 input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \
68                 name = video_name, prog_bin = config.TRANSCODER_BIN)
69         transcoder.dest_path = config.MEDIA_PATH
70         
71         # Transcode the raw video in each requested format.
72         # TODO report partial errors
73         for transcode_config in transcode_configs:
74             transcode_config['output_file'] = \
75                     transcoder.transcode(**transcode_config)
76
77     def extract_thumbs(self, input_video, video_name, thumbs):
78         """
79         Extracts thumbnail images from a video.
80
81         @param input_video input video file name
82         @param video_name a video name which must be a valid file name
83         @param thumbs use 'random' to extract a thumbnail image from a random
84         point of the video or use a positive integer n to extract n summary
85         thumbnail
86         """
87
88         logger.log_msg('#%s: extracting image thumbnails...' % self.job_id)
89         
90         # TODO report partial errors
91         thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
92                 input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \
93                 name = video_name, \
94                 prog_bin = config.THUMB_EXTRACTOR_BIN)
95         thumb_extractor.dest_path = config.THUMBS_PATH
96         if thumbs == 'random':
97             thumb_extractor.extract_random_thumb()
98         elif type(thumbs) is int and thumbs > 0:
99             thumb_extractor.extract_summary_thumbs(thumbs)
100
101     def seed(self, transcode_configs):
102         """
103         Creates torrents from the videos passed and then stats seeding them.
104
105         @param transcode_configs a list of dictionaries with format settings
106         """
107         
108         logger.log_msg('#%s: creating torrents and starting seeding...' \
109                 % self.job_id)
110
111         for transcode_config in transcode_configs:
112             # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
113             # Create torrent file.
114             bt.create_torrent(transcode_config['output_file'])
115             
116             output_file = transcode_config['output_file'] + '.tstream'
117             output_file = output_file[(output_file.rindex('/') + 1):]
118             
119             # The torrent file is created in the same directory with the
120             # source file. Move it to the torrents directory.
121             #if not os.path.exists(
122             #        os.path.join(output_file, config.TORRENTS_PATH)):
123             try:
124                 shutil.move(transcode_config['output_file'] + '.tstream', \
125                         config.TORRENTS_PATH)
126             except:
127                 pass
128
129             # * SEED TORRENTS
130             Server.bit_torrent.start_torrent( \
131                     os.path.join(config.TORRENTS_PATH, output_file),
132                     config.MEDIA_PATH)
133                     
134     def transfer_out(self, local_files, local_path, remote_path):
135         """
136         Transfers some local files to a remote path of the Web Server.
137
138         @param local_files list local files to transfer
139         @param remote_path destination path on the Web Server
140         """
141         
142         logger.log_msg('#%s: transfering out...' % self.job_id)
143
144         file_transfer = config.FILE_TRANSFERER_CLASS( \
145                 local_path, remote_path)
146         file_transfer.put(local_files)
147         file_transfer.close()
148
149     def remove_files(self, files, path):
150         """
151         Deletes files from a specified path.
152         """
153         
154         logger.log_msg('#%s: cleaning up...' % self.job_id)
155
156         for f in files:
157             os.unlink(os.path.join(path, f))
158
159     def notify_completion(self, code):
160         logger.log_msg('#%s: notifying web server about the job completion...'\
161                 % self.job_id)
162         
163         if config.WS_COMPLETION[len(config.WS_COMPLETION) - 1] == '/':
164             url = config.WS_COMPLETION + code
165         else:
166             url = config.WS_COMPLETION + '/' + code
167         
168         f = urllib.urlopen(url)
169         f.read()
170         
171     def notify_error(self, code):
172         logger.log_msg('#%s: notifying web server about the error...'\
173                 % self.job_id)
174         
175         if config.WS_ERROR[len(config.WS_ERROR) - 1] == '/':
176             url = config.WS_ERROR + code
177         else:
178             url = config.WS_ERROR + '/' + code
179         url = url + '/' + 'internal_error'
180         
181         f = urllib.urlopen(url)
182         f.read()
183     
184     def run(self):
185         while True:
186             job = Server.queue.get()
187             self.job_id = job['code']
188
189             # * TRANSFER RAW VIDEO IN
190             try:
191                 self.transfer_in(job['raw_video'])
192             except cis_exceptions.FileAlreadyExistsException as e:
193                 logger.log_msg('#%s: %s' \
194                         % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
195                 self.notify_error(job['code'])
196                 continue
197             except Exception as e:
198                 logger.log_msg('#%s: error while transferring in: %s' \
199                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL)
200                 self.notify_error(job['code'])
201                 continue
202
203             # * TRANSCODE RAW VIDEO
204             try:
205                 self.transcode(job['raw_video'], job['name'], \
206                         job['transcode_configs'])
207             except cis_exceptions.FileAlreadyExistsException as e:
208                 logger.log_msg('#%s: %s' \
209                         % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
210                 self.notify_error(job['code'])
211                 continue
212             except Exception as e:
213                 logger.log_msg('#%s: error while transcoding: %s' \
214                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL)
215                 self.notify_error(job['code'])
216                 continue
217
218             # * EXTRACT THUMBNAIL IMAGES
219             if job['thumbs'] != 0:
220                 try:
221                     self.extract_thumbs(job['raw_video'], job['name'], \
222                             job['thumbs'])
223                 except cis_exceptions.FileAlreadyExistsException as e:
224                     logger.log_msg('#%s: %s' \
225                             % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
226                     self.notify_error(job['code'])
227                     continue
228                 except Exception as e:
229                     logger.log_msg( \
230                             '#%s: error while extracting thumbnail images: %s' \
231                             % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL)
232                     self.notify_error(job['code'])
233                     continue
234
235             # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
236             self.seed(job['transcode_configs'])
237
238             # Torrent files.
239             files = [f for f in os.listdir(config.TORRENTS_PATH) \
240                     if os.path.isfile(os.path.join( \
241                             config.TORRENTS_PATH, f))]
242             torrent_files = fnmatch.filter(files, job['name'] + "_*")
243
244             # Thumbnail images files.
245             files = [f for f in os.listdir(config.THUMBS_PATH) \
246                     if os.path.isfile(os.path.join( \
247                             config.THUMBS_PATH, f))]
248             thumb_files = fnmatch.filter(files, job['name'] + "_*")
249
250             # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
251             try:
252                 self.transfer_out(torrent_files, config.TORRENTS_PATH, \
253                         config.WS_TORRENTS_PATH)
254                 self.transfer_out(thumb_files, config.THUMBS_PATH, \
255                         config.WS_THUMBS_PATH)
256             except Exception as e:
257                 logger.log_msg('#%s: error while transferring out: %s' \
258                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
259                 self.notify_error(job['code'])
260                 continue
261             
262             # * NOTIFY WEB SERVER ABOUT CONTENT INGESTION COMPLETION
263             try:
264                 self.notify_completion(job['code'])
265             except Exception as e:
266                 logger.log_msg(
267                         '#%s: error while notifying web server about the job completion: %s' \
268                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL)
269                 self.notify_error(job['code'])
270                 continue
271             
272             # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
273             self.remove_files([ job['raw_video'] ], config.RAW_VIDEOS_PATH)
274             self.remove_files(thumb_files, config.THUMBS_PATH)
275
276             # * JOB FINISHED
277             Server.queue.task_done()
278             Server.load -= job['weight']
279             logger.log_msg('#%s: finished' \
280                         % job['code'], logger.LOG_LEVEL_INFO)                     
281
282
283 class Server:
284     """
285     Implementation of the RESTful web service which constitutes the interface
286     with the client (web server).
287     """
288
289     #def __init__(self):
290         #pass
291         
292     #def __del__(self):
293         #pass
294     
295     def GET(self, request):
296         #web.header('Cache-Control', 'no-cache')
297
298         if request == 'get_load':
299             resp = {"load": Server.load}
300             web.header('Content-Type', 'application/json')
301             return json.dumps(resp)
302         elif request == 'get_torrent_list':
303             resp = Server.bit_torrent.get_torrent_list()
304             web.header('Content-Type', 'application/json')
305             return json.dumps(resp)
306         #elif request == 'shutdown':
307             #exit(0)
308         elif request == 'test':
309             return ''
310         else:
311             web.badrequest()
312             return ''
313         
314
315     def POST(self, request):
316         if request == 'ingest_content':
317             # Read JSON parameters.
318             json_data = web.data()
319             data = json.loads(json_data)
320
321             # Authenticate user.
322             if config.SECURITY and \
323                     not self.authenticate(data["username"], data["password"]):
324                 return "Authentication failed!"
325
326             # Add job weight to CIS load.
327             Server.load += data["weight"]
328
329             # Submit job.
330             Server.queue.put(data)
331
332             return 'Job submitted.'
333         elif request == 'start_torrents':
334             # Read JSON parameters.
335             json_data = web.data()
336             data = json.loads(json_data)
337             
338             # TODO Verify data
339             Server.start_torrents(data)
340         elif request == 'stop_torrents':
341             # Read JSON parameters.
342             json_data = web.data()
343             data = json.loads(json_data)
344             
345             # TODO Verify data
346             Server.stop_torrents(data)
347         elif request == 'remove_torrents':
348             # Read JSON parameters.
349             json_data = web.data()
350             data = json.loads(json_data)
351             
352             # TODO Verify data
353             Server.stop_torrents(data, True)
354         else:
355             web.badrequest()
356             return ""
357     
358     @staticmethod
359     def start_torrents(torrents=None):
360         """
361         Scans torrent path for files in order to start download for the files
362         that are not already started.
363         """
364         
365         # All torrent files.
366         if torrents == None:
367             files = [f for f in os.listdir(config.TORRENTS_PATH) \
368                     if os.path.isfile(os.path.join( \
369                             config.TORRENTS_PATH, f))]
370             torrents = fnmatch.filter(files, "*.tstream")
371
372         for torrent_file in torrents:
373             Server.bit_torrent.start_torrent( \
374                     os.path.join(config.TORRENTS_PATH, torrent_file),
375                     config.MEDIA_PATH)
376     
377     @staticmethod
378     def stop_torrents(torrents, remove_content=False):
379         for torrent_file in torrents:
380             Server.bit_torrent.stop_torrent( \
381                     torrent_file, remove_content)
382
383     def authenticate(self, username, password):
384         if not config.SECURITY:
385             return True
386         if users.users[username] == password:
387             return True
388         else:
389             web.forbidden()
390             return False
391
392
393 if __name__ == '__main__':
394     # The BitTorrent object implements a NextShare (Tribler) BitTorrent
395     # client for seeding, downloading etc.
396     Server.bit_torrent = bt.BitTorrent()
397     Server.queue = Queue()
398     Server.load = 0
399     
400     Server.start_torrents()
401     t = threading.Timer(config.START_DOWNLOADS_INTERVAL, \
402             Server.start_torrents)
403     t.daemon = True
404     t.start()
405     
406     # Worker thread.
407     ci_worker = CIWorker()
408     ci_worker.daemon = True
409     ci_worker.start()
410
411     # Web service.
412     urls = ('/(.*)', 'Server')
413     app = web.application(urls, globals())
414     app.run()