cis: logger created; start_downloads done
[living-lab-site.git] / cis / cis.py
1 #!/usr/bin/env python
2
3 import sys
4 import os
5 import fnmatch
6 import shutil
7 import time
8 import threading
9 from Queue import Queue
10 import web
11 import json
12 from web.wsgiserver import CherryPyWSGIServer
13
14 import config
15 import bt
16 import users
17 import logger
18
19 if config.SECURITY:
20     CherryPyWSGIServer.ssl_certificate = "cacert.pem"
21     CherryPyWSGIServer.ssl_private_key = "privkey.pem"
22
23
24 class CIWorker(threading.Thread):
25     """
26     Content Ingestion Worker. A class which executes content ingestion jobs
27     on a separate thread.
28
29     CIWorker shares a Queue with its master where jobs are submitted.
30     """
31
32     def __init__(self):
33         """
34         Initialize Content Ingestion Worker.
35         """
36
37         threading.Thread.__init__(self, name='CIWorker')
38
39     def transfer_in(self, raw_video):
40         """
41         Transfers a raw video file from the Web Server.
42
43         @param raw_video raw video file name
44         """
45         
46         logger.log_msg('#%s: transfering in...' % self.job_id)
47         
48         file_transfer = config.FILE_TRANSFERER_CLASS( \
49                 config.RAW_VIDEOS_PATH, config.WS_UPLOAD_PATH)            
50         file_transfer.get([raw_video])
51         file_transfer.close()
52
53     def transcode(self, input_video, video_name, transcode_configs):
54         """
55         Transcodes a video in each requested formats.
56
57         @param input_video input video file name
58         @param video_name a video name which must be a valid file name
59         @param transcode_configs a list of dictionaries with format settings
60         """
61
62         logger.log_msg('#%s: transcoding...' % self.job_id)
63         
64         transcoder = config.TRANSCODER_CLASS( \
65                 input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \
66                 name = video_name, prog_bin = config.TRANSCODER_BIN)
67         transcoder.dest_path = config.MEDIA_PATH
68         
69         # Transcode the raw video in each requested format.
70         # TODO report partial errors
71         for transcode_config in transcode_configs:
72             transcode_config['output_file'] = \
73                     transcoder.transcode(**transcode_config)
74
75     def extract_thumbs(self, input_video, video_name, thumbs):
76         """
77         Extracts thumbnail images from a video.
78
79         @param input_video input video file name
80         @param video_name a video name which must be a valid file name
81         @param thumbs use 'random' to extract a thumbnail image from a random
82         point of the video or use a positive integer n to extract n summary
83         thumbnail
84         """
85
86         logger.log_msg('#%s: extracting image thumbnails...' % self.job_id)
87         
88         # TODO report partial errors
89         thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
90                 input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \
91                 name = video_name, \
92                 prog_bin = config.THUMB_EXTRACTOR_BIN)
93         thumb_extractor.dest_path = config.THUMBS_PATH
94         if thumbs == 'random':
95             thumb_extractor.extract_random_thumb()
96         elif type(thumbs) is int and thumbs > 0:
97             thumb_extractor.extract_summary_thumbs(thumbs)
98
99     def seed(self, transcode_configs):
100         """
101         Creates torrents from the videos passed and then stats seeding them.
102
103         @param transcode_configs a list of dictionaries with format settings
104         """
105         
106         logger.log_msg('#%s: creating torrents and starting seeding...' \
107                 % self.job_id)
108
109         for transcode_config in transcode_configs:
110             # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
111             # Create torrent file.
112             bt.create_torrent(transcode_config['output_file'])
113             
114             # The torrent file is created in the same directory with the
115             # source file. Move it to the torrents directory.
116             shutil.move(transcode_config['output_file'] + '.tstream', \
117                     config.TORRENTS_PATH)
118
119             output_file = transcode_config['output_file'] + '.tstream'
120             output_file = output_file[(output_file.rindex('/') + 1):]
121
122             # * SEED TORRENTS
123             Server.bit_torrent.start_torrent( \
124                     os.path.join(config.TORRENTS_PATH, output_file),
125                     config.MEDIA_PATH)
126                     
127     def transfer_out(self, local_files, local_path, remote_path):
128         """
129         Transfers some local files to a remote path of the Web Server.
130
131         @param local_files list local files to transfer
132         @param remote_path destination path on the Web Server
133         """
134         
135         logger.log_msg('#%s: transfering out...' % self.job_id)
136
137         file_transfer = config.FILE_TRANSFERER_CLASS( \
138                 local_path, remote_path)
139         file_transfer.put(local_files)
140         file_transfer.close()
141
142     def remove_files(self, files, path):
143         """
144         Deletes files from a specified path.
145         """
146         
147         logger.log_msg('#%s: cleaning up...' % self.job_id)
148
149         for f in files:
150             os.unlink(os.path.join(path, f))
151
152     def run(self):
153         while True:
154             job = Server.queue.get()
155             self.job_id = job['id']
156
157             # * TRANSFER RAW VIDEO IN
158             try:
159                 self.transfer_in(job['raw_video'])
160             except Exception as e:
161                 logger.log_msg('#%s: error while transferring in: %s' \
162                         % (job['id'], str(e)), logger.LOG_LEVEL_FATAL) 
163                 continue
164
165             # * TRANSCODE RAW VIDEO
166             try:
167                 self.transcode(job['raw_video'], job['name'], \
168                         job['transcode_configs'])
169             except Exception as e:
170                 logger.log_msg('#%s: error while transcoding: %s' \
171                         % (job['id'], str(e)), logger.LOG_LEVEL_FATAL) 
172                 continue
173
174             # * EXTRACT THUMBNAIL IMAGES
175             if job['thumbs'] != 0:
176                 try:
177                     self.extract_thumbs(job['raw_video'], job['name'], \
178                             job['thumbs'])
179                 except Exception as e:
180                     logger.log_msg( \
181                             '#%s: error while extracting thumbnail images: %s' \
182                             % (job['id'], str(e)), logger.LOG_LEVEL_FATAL) 
183                     continue
184
185             # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
186             self.seed(job['transcode_configs'])
187
188             # Torrent files.
189             files = [f for f in os.listdir(config.TORRENTS_PATH) \
190                     if os.path.isfile(os.path.join( \
191                             config.TORRENTS_PATH, f))]
192             torrent_files = fnmatch.filter(files, job['name'] + "_*")
193
194             # Thumbnail images files.
195             files = [f for f in os.listdir(config.THUMBS_PATH) \
196                     if os.path.isfile(os.path.join( \
197                             config.THUMBS_PATH, f))]
198             thumb_files = fnmatch.filter(files, job['name'] + "_*")
199
200             # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
201             try:
202                 self.transfer_out(torrent_files, config.TORRENTS_PATH, \
203                         config.WS_TORRENTS_PATH)
204                 self.transfer_out(thumb_files, config.THUMBS_PATH, \
205                         config.WS_THUMBS_PATH)
206             except Exception as e:
207                 logger.log_msg('#%s: error while transferring out: %s' \
208                         % (job['id'], str(e)), logger.LOG_LEVEL_FATAL) 
209                 continue
210             
211             # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
212             self.remove_files([ job['raw_video'] ], config.RAW_VIDEOS_PATH)
213             self.remove_files(thumb_files, config.THUMBS_PATH)
214
215             # * JOB FINISHED
216             Server.queue.task_done()
217             Server.load -= job['weight']
218
219
220 class Server:
221     """
222     Implementation of the RESTful web service which constitutes the interface
223     with the client (web server).
224     """
225
226     #def __init__(self):
227         #pass
228         
229     #def __del__(self):
230         #pass
231     
232     def GET(self, request):
233         #web.header('Cache-Control', 'no-cache')
234
235         if request == 'get_load':
236             resp = {"load": Server.load}
237             web.header('Content-Type', 'application/json')
238             return json.dumps(resp)
239         #elif request == 'shutdown':
240             #sys.exit(0)
241         elif request == 'test':
242             return ''
243         else:
244             web.badrequest()
245             return ""
246         
247
248     def POST(self, request):
249         if request == 'ingest_content':
250             # Read JSON parameters.
251             json_data = web.data()
252             data = json.loads(json_data)
253
254             # Authenticate user.
255             if config.SECURITY and \
256                     not self.authenticate(data["username"], data["password"]):
257                 return "Authentication failed!"
258
259             # Add job weight to CIS load.
260             Server.load += data["weight"]
261
262             # Submit job.
263             Server.queue.put(data)
264
265             return 'Job submitted.'
266         elif request == 'start_torrents':
267             # Read JSON parameters.
268             json_data = web.data()
269             data = json.loads(json_data)
270             
271             # TODO Verify data
272             Server.start_torrents(data)
273         elif request == 'stop_torrents':
274             # Read JSON parameters.
275             json_data = web.data()
276             data = json.loads(json_data)
277             
278             # TODO Verify data
279             Server.stop_torrents(data)
280         elif request == 'remove_torrents':
281             # Read JSON parameters.
282             json_data = web.data()
283             data = json.loads(json_data)
284             
285             # TODO Verify data
286             Server.stop_torrents(data, True)
287         else:
288             web.badrequest()
289             return ""
290     
291     @staticmethod
292     def start_torrents(torrents=None):
293         """
294         Scans torrent path for files in order to start download for the files
295         that are not already started.
296         """
297         
298         # All torrent files.
299         if torrents == None:
300             files = [f for f in os.listdir(config.TORRENTS_PATH) \
301                     if os.path.isfile(os.path.join( \
302                             config.TORRENTS_PATH, f))]
303             torrents = fnmatch.filter(files, "*.tstream")
304
305         for torrent_file in torrents:
306             Server.bit_torrent.start_torrent( \
307                     os.path.join(config.TORRENTS_PATH, torrent_file),
308                     config.MEDIA_PATH)
309     
310     @staticmethod
311     def stop_torrents(torrents, remove_content=False):
312         for torrent_file in torrents:
313             Server.bit_torrent.stop_torrent( \
314                     torrent_file, remove_content)
315
316     def authenticate(self, username, password):
317         if not config.SECURITY:
318             return True
319         if users.users[username] == password:
320             return True
321         else:
322             web.forbidden()
323             return False
324
325
326 if __name__ == '__main__':
327     # The BitTorrent object implements a NextShare (Tribler) BitTorrent
328     # client for seeding, downloading etc.
329     Server.bit_torrent = bt.BitTorrent()
330     Server.queue = Queue()
331     Server.load = 0
332     
333     Server.start_torrents()
334     t = threading.Timer(config.START_DOWNLOADS_INTERVAL, \
335             Server.start_torrents)
336     t.daemon = True
337     t.start()
338     
339     # Worker thread.
340     ci_worker = CIWorker()
341     ci_worker.daemon = True
342     ci_worker.start()
343
344     # Web service.
345     urls = ('/(.*)', 'Server')
346     app = web.application(urls, globals())
347     app.run()