upload facility now works in single CIS mode; some simple command-line video moderati...
[living-lab-site.git] / cis / cis.py
1 #!/usr/bin/env python
2
3 import sys
4 import os
5 import fnmatch
6 import shutil
7 import time
8 import threading
9 from Queue import Queue
10 import web
11 import json
12 from web.wsgiserver import CherryPyWSGIServer
13 import urllib
14
15 import config
16 import bt
17 import users
18 import logger
19 import cis_exceptions
20
21 if config.SECURITY:
22     CherryPyWSGIServer.ssl_certificate = "cacert.pem"
23     CherryPyWSGIServer.ssl_private_key = "privkey.pem"
24
25
26 class CIWorker(threading.Thread):
27     """
28     Content Ingestion Worker. A class which executes content ingestion jobs
29     on a separate thread.
30
31     CIWorker shares a Queue with its master where jobs are submitted.
32     """
33
34     def __init__(self):
35         """
36         Initialize Content Ingestion Worker.
37         """
38
39         threading.Thread.__init__(self, name='CIWorker')
40
41     def transfer_in(self, raw_video):
42         """
43         Transfers a raw video file from the Web Server.
44
45         @param raw_video raw video file name
46         """
47         
48         logger.log_msg('#%s: transfering in...' % self.job_id)
49         
50         file_transfer = config.FILE_TRANSFERER_CLASS( \
51                 config.RAW_VIDEOS_PATH, config.WS_UPLOAD_PATH)            
52         file_transfer.get([raw_video])
53         file_transfer.close()
54
55     def transcode(self, input_video, video_name, transcode_configs):
56         """
57         Transcodes a video in each requested formats.
58
59         @param input_video input video file name
60         @param video_name a video name which must be a valid file name
61         @param transcode_configs a list of dictionaries with format settings
62         """
63
64         logger.log_msg('#%s: transcoding...' % self.job_id)
65         
66         transcoder = config.TRANSCODER_CLASS( \
67                 input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \
68                 name = video_name, prog_bin = config.TRANSCODER_BIN)
69         transcoder.dest_path = config.MEDIA_PATH
70         
71         # Transcode the raw video in each requested format.
72         # TODO report partial errors
73         for transcode_config in transcode_configs:
74             transcode_config['output_file'] = \
75                     transcoder.transcode(**transcode_config)
76
77     def extract_thumbs(self, input_video, video_name, thumbs):
78         """
79         Extracts thumbnail images from a video.
80
81         @param input_video input video file name
82         @param video_name a video name which must be a valid file name
83         @param thumbs use 'random' to extract a thumbnail image from a random
84         point of the video or use a positive integer n to extract n summary
85         thumbnail
86         """
87
88         logger.log_msg('#%s: extracting image thumbnails...' % self.job_id)
89         
90         # TODO report partial errors
91         thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
92                 input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \
93                 name = video_name, \
94                 prog_bin = config.THUMB_EXTRACTOR_BIN)
95         thumb_extractor.dest_path = config.THUMBS_PATH
96         if thumbs == 'random':
97             thumb_extractor.extract_random_thumb()
98         elif type(thumbs) is int and thumbs > 0:
99             thumb_extractor.extract_summary_thumbs(thumbs)
100
101     def seed(self, transcode_configs):
102         """
103         Creates torrents from the videos passed and then stats seeding them.
104
105         @param transcode_configs a list of dictionaries with format settings
106         """
107         
108         logger.log_msg('#%s: creating torrents and starting seeding...' \
109                 % self.job_id)
110
111         for transcode_config in transcode_configs:
112             # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
113             # Create torrent file.
114             bt.create_torrent(transcode_config['output_file'])
115             
116             # The torrent file is created in the same directory with the
117             # source file. Move it to the torrents directory.
118             shutil.move(transcode_config['output_file'] + '.tstream', \
119                     config.TORRENTS_PATH)
120
121             output_file = transcode_config['output_file'] + '.tstream'
122             output_file = output_file[(output_file.rindex('/') + 1):]
123
124             # * SEED TORRENTS
125             Server.bit_torrent.start_torrent( \
126                     os.path.join(config.TORRENTS_PATH, output_file),
127                     config.MEDIA_PATH)
128                     
129     def transfer_out(self, local_files, local_path, remote_path):
130         """
131         Transfers some local files to a remote path of the Web Server.
132
133         @param local_files list local files to transfer
134         @param remote_path destination path on the Web Server
135         """
136         
137         logger.log_msg('#%s: transfering out...' % self.job_id)
138
139         file_transfer = config.FILE_TRANSFERER_CLASS( \
140                 local_path, remote_path)
141         file_transfer.put(local_files)
142         file_transfer.close()
143
144     def remove_files(self, files, path):
145         """
146         Deletes files from a specified path.
147         """
148         
149         logger.log_msg('#%s: cleaning up...' % self.job_id)
150
151         for f in files:
152             os.unlink(os.path.join(path, f))
153
154     def notify_completion(self, code):
155         logger.log_msg('#%s: notifying web server about the job completion...'\
156                 % self.job_id)
157         
158         if config.WS_COMPLETION[len(config.WS_COMPLETION) - 1] == '/':
159             url = config.WS_COMPLETION + code
160         else:
161             url = config.WS_COMPLETION + '/' + code
162         
163         f = urllib.urlopen(url)
164         f.read()
165     
166     def run(self):
167         while True:
168             job = Server.queue.get()
169             self.job_id = job['code']
170
171             # * TRANSFER RAW VIDEO IN
172             try:
173                 self.transfer_in(job['raw_video'])
174             except cis_exceptions.FileAlreadyExistsException as e:
175                 logger.log_msg('#%s: %s' \
176                         % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
177                 continue
178             except Exception as e:
179                 logger.log_msg('#%s: error while transferring in: %s' \
180                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
181                 continue
182
183             # * TRANSCODE RAW VIDEO
184             try:
185                 self.transcode(job['raw_video'], job['name'], \
186                         job['transcode_configs'])
187             except cis_exceptions.FileAlreadyExistsException as e:
188                 logger.log_msg('#%s: %s' \
189                         % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
190                 continue
191             except Exception as e:
192                 logger.log_msg('#%s: error while transcoding: %s' \
193                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
194                 continue
195
196             # * EXTRACT THUMBNAIL IMAGES
197             if job['thumbs'] != 0:
198                 try:
199                     self.extract_thumbs(job['raw_video'], job['name'], \
200                             job['thumbs'])
201                 except cis_exceptions.FileAlreadyExistsException as e:
202                     logger.log_msg('#%s: %s' \
203                             % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
204                     continue
205                 except Exception as e:
206                     logger.log_msg( \
207                             '#%s: error while extracting thumbnail images: %s' \
208                             % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
209                     continue
210
211             # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
212             self.seed(job['transcode_configs'])
213
214             # Torrent files.
215             files = [f for f in os.listdir(config.TORRENTS_PATH) \
216                     if os.path.isfile(os.path.join( \
217                             config.TORRENTS_PATH, f))]
218             torrent_files = fnmatch.filter(files, job['name'] + "_*")
219
220             # Thumbnail images files.
221             files = [f for f in os.listdir(config.THUMBS_PATH) \
222                     if os.path.isfile(os.path.join( \
223                             config.THUMBS_PATH, f))]
224             thumb_files = fnmatch.filter(files, job['name'] + "_*")
225
226             # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
227             try:
228                 self.transfer_out(torrent_files, config.TORRENTS_PATH, \
229                         config.WS_TORRENTS_PATH)
230                 self.transfer_out(thumb_files, config.THUMBS_PATH, \
231                         config.WS_THUMBS_PATH)
232             except Exception as e:
233                 logger.log_msg('#%s: error while transferring out: %s' \
234                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
235                 continue
236             
237             # * NOTIFY WEB SERVER ABOUT CONTENT INGESTION COMPLETION
238             # TODO in the future web server should also be notified about errors
239             try:
240                 self.notify_completion(job['code'])
241             except Exception as e:
242                 logger.log_msg(
243                         '#%s: error while notifying web server about the job completion: %s' \
244                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
245                 continue
246             
247             # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
248             self.remove_files([ job['raw_video'] ], config.RAW_VIDEOS_PATH)
249             self.remove_files(thumb_files, config.THUMBS_PATH)
250
251             # * JOB FINISHED
252             Server.queue.task_done()
253             Server.load -= job['weight']
254             logger.log_msg('#%s: finished' \
255                         % job['code'], logger.LOG_LEVEL_INFO)                     
256
257
258 class Server:
259     """
260     Implementation of the RESTful web service which constitutes the interface
261     with the client (web server).
262     """
263
264     #def __init__(self):
265         #pass
266         
267     #def __del__(self):
268         #pass
269     
270     def GET(self, request):
271         #web.header('Cache-Control', 'no-cache')
272
273         if request == 'get_load':
274             resp = {"load": Server.load}
275             web.header('Content-Type', 'application/json')
276             return json.dumps(resp)
277         elif request == 'get_torrent_list':
278             resp = Server.bit_torrent.get_torrent_list()
279             web.header('Content-Type', 'application/json')
280             return json.dumps(resp)
281         #elif request == 'shutdown':
282             #exit(0)
283         elif request == 'test':
284             return ''
285         else:
286             web.badrequest()
287             return ''
288         
289
290     def POST(self, request):
291         if request == 'ingest_content':
292             # Read JSON parameters.
293             json_data = web.data()
294             data = json.loads(json_data)
295
296             # Authenticate user.
297             if config.SECURITY and \
298                     not self.authenticate(data["username"], data["password"]):
299                 return "Authentication failed!"
300
301             # Add job weight to CIS load.
302             Server.load += data["weight"]
303
304             # Submit job.
305             Server.queue.put(data)
306
307             return 'Job submitted.'
308         elif request == 'start_torrents':
309             # Read JSON parameters.
310             json_data = web.data()
311             data = json.loads(json_data)
312             
313             # TODO Verify data
314             Server.start_torrents(data)
315         elif request == 'stop_torrents':
316             # Read JSON parameters.
317             json_data = web.data()
318             data = json.loads(json_data)
319             
320             # TODO Verify data
321             Server.stop_torrents(data)
322         elif request == 'remove_torrents':
323             # Read JSON parameters.
324             json_data = web.data()
325             data = json.loads(json_data)
326             
327             # TODO Verify data
328             Server.stop_torrents(data, True)
329         else:
330             web.badrequest()
331             return ""
332     
333     @staticmethod
334     def start_torrents(torrents=None):
335         """
336         Scans torrent path for files in order to start download for the files
337         that are not already started.
338         """
339         
340         # All torrent files.
341         if torrents == None:
342             files = [f for f in os.listdir(config.TORRENTS_PATH) \
343                     if os.path.isfile(os.path.join( \
344                             config.TORRENTS_PATH, f))]
345             torrents = fnmatch.filter(files, "*.tstream")
346
347         for torrent_file in torrents:
348             Server.bit_torrent.start_torrent( \
349                     os.path.join(config.TORRENTS_PATH, torrent_file),
350                     config.MEDIA_PATH)
351     
352     @staticmethod
353     def stop_torrents(torrents, remove_content=False):
354         for torrent_file in torrents:
355             Server.bit_torrent.stop_torrent( \
356                     torrent_file, remove_content)
357
358     def authenticate(self, username, password):
359         if not config.SECURITY:
360             return True
361         if users.users[username] == password:
362             return True
363         else:
364             web.forbidden()
365             return False
366
367
368 if __name__ == '__main__':
369     # The BitTorrent object implements a NextShare (Tribler) BitTorrent
370     # client for seeding, downloading etc.
371     Server.bit_torrent = bt.BitTorrent()
372     Server.queue = Queue()
373     Server.load = 0
374     
375     Server.start_torrents()
376     t = threading.Timer(config.START_DOWNLOADS_INTERVAL, \
377             Server.start_torrents)
378     t.daemon = True
379     t.start()
380     
381     # Worker thread.
382     ci_worker = CIWorker()
383     ci_worker.daemon = True
384     ci_worker.start()
385
386     # Web service.
387     urls = ('/(.*)', 'Server')
388     app = web.application(urls, globals())
389     app.run()