cis: logger created; start_downloads done
[living-lab-site.git] / cis / cisd.py
1 #!/usr/bin/env python
2
3 import sys
4 import os
5 import fnmatch
6 import shutil
7 import time
8 import threading
9 from Queue import Queue
10 import web
11 import json
12 from web.wsgiserver import CherryPyWSGIServer
13
14 import config
15 import bt
16 import users
17 import logger
18
19 if config.SECURITY:
20     CherryPyWSGIServer.ssl_certificate = "cacert.pem"
21     CherryPyWSGIServer.ssl_private_key = "privkey.pem"
22
23
24 class CIWorker(threading.Thread):
25     """
26     Content Ingestion Worker. A class which executes content ingestion jobs
27     on a separate thread.
28
29     CIWorker shares a Queue with its master where jobs are submitted.
30     """
31
32     def __init__(self):
33         """
34         Initialize Content Ingestion Worker.
35         """
36
37         threading.Thread.__init__(self, name='CIWorker')
38
39     def transfer_in(self, raw_video):
40         """
41         Transfers a raw video file from the Web Server.
42
43         @param raw_video raw video file name
44         """
45         
46         logger.log_msg('#%s: transfering in...' % self.job_id)
47         
48         file_transfer = config.FILE_TRANSFERER_CLASS( \
49                 config.RAW_VIDEOS_PATH, config.WS_UPLOAD_PATH)
50         file_transfer.get([raw_video])
51         file_transfer.close()
52
53     def transcode(self, input_video, video_name, transcode_configs):
54         """
55         Transcodes a video in each requested formats.
56
57         @param input_video input video file name
58         @param video_name a video name which must be a valid file name
59         @param transcode_configs a list of dictionaries with format settings
60         """
61
62         logger.log_msg('#%s: transcoding...' % self.job_id)
63         
64         transcoder = config.TRANSCODER_CLASS( \
65                 input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \
66                 name = video_name, prog_bin = config.TRANSCODER_BIN)
67         transcoder.dest_path = config.MEDIA_PATH
68         
69         # Transcode the raw video in each requested format.
70         # TODO report partial errors
71         for transcode_config in transcode_configs:
72             transcode_config['output_file'] = \
73                     transcoder.transcode(**transcode_config)
74
75     def extract_thumbs(self, input_video, video_name, thumbs):
76         """
77         Extracts thumbnail images from a video.
78
79         @param input_video input video file name
80         @param video_name a video name which must be a valid file name
81         @param thumbs use 'random' to extract a thumbnail image from a random
82         point of the video or use a positive integer n to extract n summary
83         thumbnail
84         """
85
86         logger.log_msg('#%s: extracting image thumbnails...' % self.job_id)
87         
88         # TODO report partial errors
89         thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
90                 input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \
91                 name = video_name, \
92                 prog_bin = config.THUMB_EXTRACTOR_BIN)
93         thumb_extractor.dest_path = config.THUMBS_PATH
94         if thumbs == 'random':
95             thumb_extractor.extract_random_thumb()
96         elif type(thumbs) is int and thumbs > 0:
97             thumb_extractor.extract_summary_thumbs(thumbs)
98
99     def seed(self, transcode_configs):
100         """
101         Creates torrents from the videos passed and then stats seeding them.
102
103         @param transcode_configs a list of dictionaries with format settings
104         """
105         
106         logger.log_msg('#%s: creating torrents and starting seeding...' \
107                 % self.job_id)
108
109         for transcode_config in transcode_configs:
110             # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
111             # Create torrent file.
112             bt.create_torrent(transcode_config['output_file'])
113             
114             # The torrent file is created in the same directory with the
115             # source file. Move it to the torrents directory.
116             shutil.move(transcode_config['output_file'] + '.tstream', \
117                     config.TORRENTS_PATH)
118
119             output_file = transcode_config['output_file'] + '.tstream'
120             output_file = output_file[(output_file.rindex('/') + 1):]
121
122             # * SEED TORRENTS
123             Server.bit_torrent.start_download( \
124                     os.path.join(config.TORRENTS_PATH, output_file),
125                     config.MEDIA_PATH)
126                     
127     def transfer_out(self, local_files, local_path, remote_path):
128         """
129         Transfers some local files to a remote path of the Web Server.
130
131         @param local_files list local files to transfer
132         @param remote_path destination path on the Web Server
133         """
134         
135         logger.log_msg('#%s: transfering out...' % self.job_id)
136
137         file_transfer = config.FILE_TRANSFERER_CLASS( \
138                 local_path, remote_path)
139         file_transfer.put(local_files)
140         file_transfer.close()
141
142     def remove_files(self, files, path):
143         """
144         Deletes files from a specified path.
145         """
146         
147         logger.log_msg('#%s: cleaning up...' % self.job_id)
148
149         for f in files:
150             os.unlink(os.path.join(path, f))
151
152     def run(self):
153         while True:
154             job = Server.queue.get()
155             self.job_id = job['id']
156
157             # * TRANSFER RAW VIDEO IN
158             self.transfer_in(job['raw_video'])
159
160             # * TRANSCODE RAW VIDEO
161             self.transcode(job['raw_video'], job['name'], \
162                     job['transcode_configs'])
163
164             # * EXTRACT THUMBNAIL IMAGES
165             if job['thumbs'] != 0:
166                 self.extract_thumbs(job['raw_video'], job['name'], \
167                         job['thumbs'])
168
169             # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
170             self.seed(job['transcode_configs'])
171
172             # Torrent files.
173             files = [f for f in os.listdir(config.TORRENTS_PATH) \
174                     if os.path.isfile(os.path.join( \
175                             config.TORRENTS_PATH, f))]
176             torrent_files = fnmatch.filter(files, job['name'] + "_*")
177
178             # Thumbnail images files.
179             files = [f for f in os.listdir(config.THUMBS_PATH) \
180                     if os.path.isfile(os.path.join( \
181                             config.THUMBS_PATH, f))]
182             thumb_files = fnmatch.filter(files, job['name'] + "_*")
183
184             # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
185             self.transfer_out(torrent_files, config.TORRENTS_PATH, \
186                     config.WS_TORRENTS_PATH)
187             self.transfer_out(thumb_files, config.THUMBS_PATH, \
188                     config.WS_THUMBS_PATH)
189             
190             # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
191             self.remove_files([ job['raw_video'] ], config.RAW_VIDEOS_PATH)
192             self.remove_files(thumb_files, config.THUMBS_PATH)
193
194             # * JOB FINISHED
195             Server.queue.task_done()
196             Server.load -= job['weight']
197
198
199 class Server:
200     """
201     Implementation of the RESTful web service which constitutes the interface
202     with the client (web server).
203     """
204
205     #def __init__(self):
206         #pass
207         
208     #def __del__(self):
209         #pass
210     
211     def GET(self, request):
212         #web.header('Cache-Control', 'no-cache')
213
214         if request == 'get_load':
215             resp = {"load": Server.load}
216             web.header('Content-Type', 'application/json')
217             return json.dumps(resp)
218         elif request == 'test':
219             return ''
220         else:
221             web.badrequest()
222             return ""
223         
224
225     def POST(self, request):
226         if request == 'ingest_content':
227             # Read JSON parameters.
228             json_data = web.data()
229             data = json.loads(json_data)
230
231             # Authenticate user.
232             if config.SECURITY and \
233                     not self.authenticate(data["username"], data["password"]):
234                 return "Authentication failed!"
235
236             # Add job weight to CIS load.
237             Server.load += data["weight"]
238
239             # Submit job.
240             Server.queue.put(data)
241
242             return 'Job submitted.'
243         else:
244             web.badrequest()
245             return ""
246     
247     @staticmethod
248     def start_downloads():
249         # All torrent files.
250         files = [f for f in os.listdir(config.TORRENTS_PATH) \
251                 if os.path.isfile(os.path.join( \
252                         config.TORRENTS_PATH, f))]
253         torrent_files = fnmatch.filter(files, "*.tstream")
254
255         for torrent_file in torrent_files:
256             Server.bit_torrent.start_download( \
257                     torrent_file,
258                     config.MEDIA_PATH)
259         
260         t = threading.Timer(config.START_DOWNLOADS_INTERVAL, \
261                 Server.start_downloads)
262         t.start()
263
264     def authenticate(self, username, password):
265         if not config.SECURITY:
266             return True
267         if users.users[username] == password:
268             return True
269         else:
270             web.forbidden()
271             return False
272
273
274 if __name__ == '__main__':
275     # The BitTorrent object implements a NextShare (Tribler) BitTorrent
276     # client for seeding, downloading etc.
277     Server.bit_torrent = bt.BitTorrent()
278     Server.queue = Queue()
279     Server.load = 0
280     
281     Server.start_downloads()
282     
283     # Worker thread.
284     ci_worker = CIWorker()
285     ci_worker.daemon = True
286     ci_worker.start()
287
288     # Web service.
289     urls = ('/(.*)', 'Server')
290     app = web.application(urls, globals())
291     app.run()