e63b7292abdde3850f502e0fb1cca49ad2ffd70f
[living-lab-site.git] / cis / cis.py
1 #!/usr/bin/env python
2
3 import sys
4 import os
5 import fnmatch
6 import shutil
7 import time
8 import threading
9 from Queue import Queue
10 import web
11 import json
12 from web.wsgiserver import CherryPyWSGIServer
13 import urllib
14
15 import config
16 import bt
17 import users
18 import logger
19 import cis_exceptions
20
21 if config.SECURITY:
22     CherryPyWSGIServer.ssl_certificate = "cacert.pem"
23     CherryPyWSGIServer.ssl_private_key = "privkey.pem"
24
25
26 class CIWorker(threading.Thread):
27     """
28     Content Ingestion Worker. A class which executes content ingestion jobs
29     on a separate thread.
30
31     CIWorker shares a Queue with its master where jobs are submitted.
32     """
33
34     def __init__(self):
35         """
36         Initialize Content Ingestion Worker.
37         """
38
39         threading.Thread.__init__(self, name='CIWorker')
40
41     def transfer_in(self, raw_video):
42         """
43         Transfers a raw video file from the Web Server.
44
45         @param raw_video raw video file name
46         """
47         
48         logger.log_msg('#%s: transfering in...' % self.job_id)
49         
50         file_transfer = config.FILE_TRANSFERER_CLASS( \
51                 config.RAW_VIDEOS_PATH, config.WS_UPLOAD_PATH)            
52         file_transfer.get([raw_video])
53         file_transfer.close()
54
55     def transcode(self, input_video, video_name, transcode_configs):
56         """
57         Transcodes a video in each requested formats.
58
59         @param input_video input video file name
60         @param video_name a video name which must be a valid file name
61         @param transcode_configs a list of dictionaries with format settings
62         """
63
64         logger.log_msg('#%s: transcoding...' % self.job_id)
65         
66         transcoder = config.TRANSCODER_CLASS( \
67                 input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \
68                 name = video_name, prog_bin = config.TRANSCODER_BIN)
69         transcoder.dest_path = config.MEDIA_PATH
70         
71         # Transcode the raw video in each requested format.
72         # TODO report partial errors
73         for transcode_config in transcode_configs:
74             transcode_config['output_file'] = \
75                     transcoder.transcode(**transcode_config)
76
77     def extract_thumbs(self, input_video, video_name, thumbs):
78         """
79         Extracts thumbnail images from a video.
80
81         @param input_video input video file name
82         @param video_name a video name which must be a valid file name
83         @param thumbs use 'random' to extract a thumbnail image from a random
84         point of the video or use a positive integer n to extract n summary
85         thumbnail
86         """
87
88         logger.log_msg('#%s: extracting image thumbnails...' % self.job_id)
89         
90         # TODO report partial errors
91         thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
92                 input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \
93                 name = video_name, \
94                 prog_bin = config.THUMB_EXTRACTOR_BIN)
95         thumb_extractor.dest_path = config.THUMBS_PATH
96         if thumbs == 'random':
97             thumb_extractor.extract_random_thumb()
98         elif type(thumbs) is int and thumbs > 0:
99             thumb_extractor.extract_summary_thumbs(thumbs)
100
101     def seed(self, transcode_configs):
102         """
103         Creates torrents from the videos passed and then stats seeding them.
104
105         @param transcode_configs a list of dictionaries with format settings
106         """
107         
108         logger.log_msg('#%s: creating torrents and starting seeding...' \
109                 % self.job_id)
110
111         for transcode_config in transcode_configs:
112             # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
113             # Create torrent file.
114             bt.create_torrent(transcode_config['output_file'])
115             
116             output_file = transcode_config['output_file'] + '.tstream'
117             output_file = output_file[(output_file.rindex('/') + 1):]
118             
119             # The torrent file is created in the same directory with the
120             # source file. Move it to the torrents directory.
121             #if not os.path.exists(
122             #        os.path.join(output_file, config.TORRENTS_PATH)):
123             try:
124                 shutil.move(transcode_config['output_file'] + '.tstream', \
125                         config.TORRENTS_PATH)
126             except:
127                 pass
128
129             # * SEED TORRENTS
130             Server.bit_torrent.start_torrent( \
131                     os.path.join(config.TORRENTS_PATH, output_file),
132                     config.MEDIA_PATH)
133                     
134     def transfer_out(self, local_files, local_path, remote_path):
135         """
136         Transfers some local files to a remote path of the Web Server.
137
138         @param local_files list local files to transfer
139         @param remote_path destination path on the Web Server
140         """
141         
142         logger.log_msg('#%s: transfering out...' % self.job_id)
143
144         file_transfer = config.FILE_TRANSFERER_CLASS( \
145                 local_path, remote_path)
146         file_transfer.put(local_files)
147         file_transfer.close()
148
149     def remove_files(self, files, path):
150         """
151         Deletes files from a specified path.
152         """
153         
154         logger.log_msg('#%s: cleaning up...' % self.job_id)
155
156         for f in files:
157             os.unlink(os.path.join(path, f))
158
159     def notify_completion(self, code):
160         logger.log_msg('#%s: notifying web server about the job completion...'\
161                 % self.job_id)
162         
163         if config.WS_COMPLETION[len(config.WS_COMPLETION) - 1] == '/':
164             url = config.WS_COMPLETION + code
165         else:
166             url = config.WS_COMPLETION + '/' + code
167         
168         f = urllib.urlopen(url)
169         f.read()
170     
171     def run(self):
172         while True:
173             job = Server.queue.get()
174             self.job_id = job['code']
175
176             # * TRANSFER RAW VIDEO IN
177             try:
178                 self.transfer_in(job['raw_video'])
179             except cis_exceptions.FileAlreadyExistsException as e:
180                 logger.log_msg('#%s: %s' \
181                         % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
182                 continue
183             except Exception as e:
184                 logger.log_msg('#%s: error while transferring in: %s' \
185                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
186                 continue
187
188             # * TRANSCODE RAW VIDEO
189             try:
190                 self.transcode(job['raw_video'], job['name'], \
191                         job['transcode_configs'])
192             except cis_exceptions.FileAlreadyExistsException as e:
193                 logger.log_msg('#%s: %s' \
194                         % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
195                 continue
196             except Exception as e:
197                 logger.log_msg('#%s: error while transcoding: %s' \
198                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
199                 continue
200
201             # * EXTRACT THUMBNAIL IMAGES
202             if job['thumbs'] != 0:
203                 try:
204                     self.extract_thumbs(job['raw_video'], job['name'], \
205                             job['thumbs'])
206                 except cis_exceptions.FileAlreadyExistsException as e:
207                     logger.log_msg('#%s: %s' \
208                             % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
209                     continue
210                 except Exception as e:
211                     logger.log_msg( \
212                             '#%s: error while extracting thumbnail images: %s' \
213                             % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
214                     continue
215
216             # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
217             self.seed(job['transcode_configs'])
218
219             # Torrent files.
220             files = [f for f in os.listdir(config.TORRENTS_PATH) \
221                     if os.path.isfile(os.path.join( \
222                             config.TORRENTS_PATH, f))]
223             torrent_files = fnmatch.filter(files, job['name'] + "_*")
224
225             # Thumbnail images files.
226             files = [f for f in os.listdir(config.THUMBS_PATH) \
227                     if os.path.isfile(os.path.join( \
228                             config.THUMBS_PATH, f))]
229             thumb_files = fnmatch.filter(files, job['name'] + "_*")
230
231             # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
232             try:
233                 self.transfer_out(torrent_files, config.TORRENTS_PATH, \
234                         config.WS_TORRENTS_PATH)
235                 self.transfer_out(thumb_files, config.THUMBS_PATH, \
236                         config.WS_THUMBS_PATH)
237             except Exception as e:
238                 logger.log_msg('#%s: error while transferring out: %s' \
239                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
240                 continue
241             
242             # * NOTIFY WEB SERVER ABOUT CONTENT INGESTION COMPLETION
243             # TODO in the future web server should also be notified about errors
244             try:
245                 self.notify_completion(job['code'])
246             except Exception as e:
247                 logger.log_msg(
248                         '#%s: error while notifying web server about the job completion: %s' \
249                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
250                 continue
251             
252             # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
253             self.remove_files([ job['raw_video'] ], config.RAW_VIDEOS_PATH)
254             self.remove_files(thumb_files, config.THUMBS_PATH)
255
256             # * JOB FINISHED
257             Server.queue.task_done()
258             Server.load -= job['weight']
259             logger.log_msg('#%s: finished' \
260                         % job['code'], logger.LOG_LEVEL_INFO)                     
261
262
263 class Server:
264     """
265     Implementation of the RESTful web service which constitutes the interface
266     with the client (web server).
267     """
268
269     #def __init__(self):
270         #pass
271         
272     #def __del__(self):
273         #pass
274     
275     def GET(self, request):
276         #web.header('Cache-Control', 'no-cache')
277
278         if request == 'get_load':
279             resp = {"load": Server.load}
280             web.header('Content-Type', 'application/json')
281             return json.dumps(resp)
282         elif request == 'get_torrent_list':
283             resp = Server.bit_torrent.get_torrent_list()
284             web.header('Content-Type', 'application/json')
285             return json.dumps(resp)
286         #elif request == 'shutdown':
287             #exit(0)
288         elif request == 'test':
289             return ''
290         else:
291             web.badrequest()
292             return ''
293         
294
295     def POST(self, request):
296         if request == 'ingest_content':
297             # Read JSON parameters.
298             json_data = web.data()
299             data = json.loads(json_data)
300
301             # Authenticate user.
302             if config.SECURITY and \
303                     not self.authenticate(data["username"], data["password"]):
304                 return "Authentication failed!"
305
306             # Add job weight to CIS load.
307             Server.load += data["weight"]
308
309             # Submit job.
310             Server.queue.put(data)
311
312             return 'Job submitted.'
313         elif request == 'start_torrents':
314             # Read JSON parameters.
315             json_data = web.data()
316             data = json.loads(json_data)
317             
318             # TODO Verify data
319             Server.start_torrents(data)
320         elif request == 'stop_torrents':
321             # Read JSON parameters.
322             json_data = web.data()
323             data = json.loads(json_data)
324             
325             # TODO Verify data
326             Server.stop_torrents(data)
327         elif request == 'remove_torrents':
328             # Read JSON parameters.
329             json_data = web.data()
330             data = json.loads(json_data)
331             
332             # TODO Verify data
333             Server.stop_torrents(data, True)
334         else:
335             web.badrequest()
336             return ""
337     
338     @staticmethod
339     def start_torrents(torrents=None):
340         """
341         Scans torrent path for files in order to start download for the files
342         that are not already started.
343         """
344         
345         # All torrent files.
346         if torrents == None:
347             files = [f for f in os.listdir(config.TORRENTS_PATH) \
348                     if os.path.isfile(os.path.join( \
349                             config.TORRENTS_PATH, f))]
350             torrents = fnmatch.filter(files, "*.tstream")
351
352         for torrent_file in torrents:
353             Server.bit_torrent.start_torrent( \
354                     os.path.join(config.TORRENTS_PATH, torrent_file),
355                     config.MEDIA_PATH)
356     
357     @staticmethod
358     def stop_torrents(torrents, remove_content=False):
359         for torrent_file in torrents:
360             Server.bit_torrent.stop_torrent( \
361                     torrent_file, remove_content)
362
363     def authenticate(self, username, password):
364         if not config.SECURITY:
365             return True
366         if users.users[username] == password:
367             return True
368         else:
369             web.forbidden()
370             return False
371
372
373 if __name__ == '__main__':
374     # The BitTorrent object implements a NextShare (Tribler) BitTorrent
375     # client for seeding, downloading etc.
376     Server.bit_torrent = bt.BitTorrent()
377     Server.queue = Queue()
378     Server.load = 0
379     
380     Server.start_torrents()
381     t = threading.Timer(config.START_DOWNLOADS_INTERVAL, \
382             Server.start_torrents)
383     t.daemon = True
384     t.start()
385     
386     # Worker thread.
387     ci_worker = CIWorker()
388     ci_worker.daemon = True
389     ci_worker.start()
390
391     # Web service.
392     urls = ('/(.*)', 'Server')
393     app = web.application(urls, globals())
394     app.run()