9 from Queue import Queue
12 from web.wsgiserver import CherryPyWSGIServer
22 CherryPyWSGIServer.ssl_certificate = "cacert.pem"
23 CherryPyWSGIServer.ssl_private_key = "privkey.pem"
26 class CIWorker(threading.Thread):
28 Content Ingestion Worker. A class which executes content ingestion jobs
31 CIWorker shares a Queue with its master where jobs are submitted.
36 Initialize Content Ingestion Worker.
39 threading.Thread.__init__(self, name='CIWorker')
41 def transfer_in(self, raw_video):
43 Transfers a raw video file from the Web Server.
45 @param raw_video raw video file name
48 logger.log_msg('#%s: transfering in...' % self.job_id)
50 file_transfer = config.FILE_TRANSFERER_CLASS( \
51 config.RAW_VIDEOS_PATH, config.WS_UPLOAD_PATH)
52 file_transfer.get([raw_video])
55 def transcode(self, input_video, video_name, transcode_configs):
57 Transcodes a video in each requested formats.
59 @param input_video input video file name
60 @param video_name a video name which must be a valid file name
61 @param transcode_configs a list of dictionaries with format settings
64 logger.log_msg('#%s: transcoding...' % self.job_id)
66 transcoder = config.TRANSCODER_CLASS( \
67 input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \
68 name = video_name, prog_bin = config.TRANSCODER_BIN)
69 transcoder.dest_path = config.MEDIA_PATH
71 # Transcode the raw video in each requested format.
72 # TODO report partial errors
73 for transcode_config in transcode_configs:
74 transcode_config['output_file'] = \
75 transcoder.transcode(**transcode_config)
77 def extract_thumbs(self, input_video, video_name, thumbs):
79 Extracts thumbnail images from a video.
81 @param input_video input video file name
82 @param video_name a video name which must be a valid file name
83 @param thumbs use 'random' to extract a thumbnail image from a random
84 point of the video or use a positive integer n to extract n summary
88 logger.log_msg('#%s: extracting image thumbnails...' % self.job_id)
90 # TODO report partial errors
91 thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
92 input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \
94 prog_bin = config.THUMB_EXTRACTOR_BIN)
95 thumb_extractor.dest_path = config.THUMBS_PATH
96 if thumbs == 'random':
97 thumb_extractor.extract_random_thumb()
98 elif type(thumbs) is int and thumbs > 0:
99 thumb_extractor.extract_summary_thumbs(thumbs)
101 def seed(self, transcode_configs):
103 Creates torrents from the videos passed and then stats seeding them.
105 @param transcode_configs a list of dictionaries with format settings
108 logger.log_msg('#%s: creating torrents and starting seeding...' \
111 for transcode_config in transcode_configs:
112 # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
113 # Create torrent file.
114 bt.create_torrent(transcode_config['output_file'])
116 output_file = transcode_config['output_file'] + '.tstream'
117 output_file = output_file[(output_file.rindex('/') + 1):]
119 # The torrent file is created in the same directory with the
120 # source file. Move it to the torrents directory.
121 #if not os.path.exists(
122 # os.path.join(output_file, config.TORRENTS_PATH)):
124 shutil.move(transcode_config['output_file'] + '.tstream', \
125 config.TORRENTS_PATH)
130 Server.bit_torrent.start_torrent( \
131 os.path.join(config.TORRENTS_PATH, output_file),
134 def transfer_out(self, local_files, local_path, remote_path):
136 Transfers some local files to a remote path of the Web Server.
138 @param local_files list local files to transfer
139 @param remote_path destination path on the Web Server
142 logger.log_msg('#%s: transfering out...' % self.job_id)
144 file_transfer = config.FILE_TRANSFERER_CLASS( \
145 local_path, remote_path)
146 file_transfer.put(local_files)
147 file_transfer.close()
149 def remove_files(self, files, path):
151 Deletes files from a specified path.
154 logger.log_msg('#%s: cleaning up...' % self.job_id)
157 os.unlink(os.path.join(path, f))
159 def notify_completion(self, code):
160 logger.log_msg('#%s: notifying web server about the job completion...'\
163 if config.WS_COMPLETION[len(config.WS_COMPLETION) - 1] == '/':
164 url = config.WS_COMPLETION + code
166 url = config.WS_COMPLETION + '/' + code
168 f = urllib.urlopen(url)
171 def notify_error(self, code):
172 logger.log_msg('#%s: notifying web server about the error...'\
175 if config.WS_ERROR[len(config.WS_ERROR) - 1] == '/':
176 url = config.WS_ERROR + code
178 url = config.WS_ERROR + '/' + code
179 url = url + '/' + 'internal_error'
181 f = urllib.urlopen(url)
186 job = Server.queue.get()
187 self.job_id = job['code']
189 # * TRANSFER RAW VIDEO IN
191 self.transfer_in(job['raw_video'])
192 except cis_exceptions.FileAlreadyExistsException as e:
193 logger.log_msg('#%s: %s' \
194 % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
195 self.notify_error(job['code'])
197 except Exception as e:
198 logger.log_msg('#%s: error while transferring in: %s' \
199 % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL)
200 self.notify_error(job['code'])
203 # * TRANSCODE RAW VIDEO
205 self.transcode(job['raw_video'], job['name'], \
206 job['transcode_configs'])
207 except cis_exceptions.FileAlreadyExistsException as e:
208 logger.log_msg('#%s: %s' \
209 % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
210 self.notify_error(job['code'])
212 except Exception as e:
213 logger.log_msg('#%s: error while transcoding: %s' \
214 % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL)
215 self.notify_error(job['code'])
218 # * EXTRACT THUMBNAIL IMAGES
219 if job['thumbs'] != 0:
221 self.extract_thumbs(job['raw_video'], job['name'], \
223 except cis_exceptions.FileAlreadyExistsException as e:
224 logger.log_msg('#%s: %s' \
225 % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
226 self.notify_error(job['code'])
228 except Exception as e:
230 '#%s: error while extracting thumbnail images: %s' \
231 % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL)
232 self.notify_error(job['code'])
235 # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
236 self.seed(job['transcode_configs'])
239 files = [f for f in os.listdir(config.TORRENTS_PATH) \
240 if os.path.isfile(os.path.join( \
241 config.TORRENTS_PATH, f))]
242 torrent_files = fnmatch.filter(files, job['name'] + "_*")
244 # Thumbnail images files.
245 files = [f for f in os.listdir(config.THUMBS_PATH) \
246 if os.path.isfile(os.path.join( \
247 config.THUMBS_PATH, f))]
248 thumb_files = fnmatch.filter(files, job['name'] + "_*")
250 # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
252 self.transfer_out(torrent_files, config.TORRENTS_PATH, \
253 config.WS_TORRENTS_PATH)
254 self.transfer_out(thumb_files, config.THUMBS_PATH, \
255 config.WS_THUMBS_PATH)
256 except Exception as e:
257 logger.log_msg('#%s: error while transferring out: %s' \
258 % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL)
259 self.notify_error(job['code'])
262 # * NOTIFY WEB SERVER ABOUT CONTENT INGESTION COMPLETION
264 self.notify_completion(job['code'])
265 except Exception as e:
267 '#%s: error while notifying web server about the job completion: %s' \
268 % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL)
269 self.notify_error(job['code'])
272 # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
273 self.remove_files([ job['raw_video'] ], config.RAW_VIDEOS_PATH)
274 self.remove_files(thumb_files, config.THUMBS_PATH)
277 Server.queue.task_done()
278 Server.load -= job['weight']
279 logger.log_msg('#%s: finished' \
280 % job['code'], logger.LOG_LEVEL_INFO)
285 Implementation of the RESTful web service which constitutes the interface
286 with the client (web server).
295 def GET(self, request):
296 #web.header('Cache-Control', 'no-cache')
298 if request == 'get_load':
299 resp = {"load": Server.load}
300 web.header('Content-Type', 'application/json')
301 return json.dumps(resp)
302 elif request == 'get_torrent_list':
303 resp = Server.bit_torrent.get_torrent_list()
304 web.header('Content-Type', 'application/json')
305 return json.dumps(resp)
306 #elif request == 'shutdown':
308 elif request == 'test':
315 def POST(self, request):
316 if request == 'ingest_content':
317 # Read JSON parameters.
318 json_data = web.data()
319 data = json.loads(json_data)
322 if config.SECURITY and \
323 not self.authenticate(data["username"], data["password"]):
324 return "Authentication failed!"
326 # Add job weight to CIS load.
327 Server.load += data["weight"]
330 Server.queue.put(data)
332 return 'Job submitted.'
333 elif request == 'start_torrents':
334 # Read JSON parameters.
335 json_data = web.data()
336 data = json.loads(json_data)
339 Server.start_torrents(data)
340 elif request == 'stop_torrents':
341 # Read JSON parameters.
342 json_data = web.data()
343 data = json.loads(json_data)
346 Server.stop_torrents(data)
347 elif request == 'remove_torrents':
348 # Read JSON parameters.
349 json_data = web.data()
350 data = json.loads(json_data)
353 Server.stop_torrents(data, True)
359 def start_torrents(torrents=None):
361 Scans torrent path for files in order to start download for the files
362 that are not already started.
367 files = [f for f in os.listdir(config.TORRENTS_PATH) \
368 if os.path.isfile(os.path.join( \
369 config.TORRENTS_PATH, f))]
370 torrents = fnmatch.filter(files, "*.tstream")
372 for torrent_file in torrents:
373 Server.bit_torrent.start_torrent( \
374 os.path.join(config.TORRENTS_PATH, torrent_file),
378 def stop_torrents(torrents, remove_content=False):
379 for torrent_file in torrents:
380 Server.bit_torrent.stop_torrent( \
381 torrent_file, remove_content)
383 def authenticate(self, username, password):
384 if not config.SECURITY:
386 if users.users[username] == password:
393 if __name__ == '__main__':
394 # The BitTorrent object implements a NextShare (Tribler) BitTorrent
395 # client for seeding, downloading etc.
396 Server.bit_torrent = bt.BitTorrent()
397 Server.queue = Queue()
400 Server.start_torrents()
401 t = threading.Timer(config.START_DOWNLOADS_INTERVAL, \
402 Server.start_torrents)
407 ci_worker = CIWorker()
408 ci_worker.daemon = True
412 urls = ('/(.*)', 'Server')
413 app = web.application(urls, globals())