9 from Queue import Queue
12 from web.wsgiserver import CherryPyWSGIServer
20 CherryPyWSGIServer.ssl_certificate = "cacert.pem"
21 CherryPyWSGIServer.ssl_private_key = "privkey.pem"
24 class CIWorker(threading.Thread):
26 Content Ingestion Worker. A class which executes content ingestion jobs
29 CIWorker shares a Queue with its master where jobs are submitted.
34 Initialize Content Ingestion Worker.
37 threading.Thread.__init__(self, name='CIWorker')
39 def transfer_in(self, raw_video):
41 Transfers a raw video file from the Web Server.
43 @param raw_video raw video file name
46 logger.log_msg('#%s: transfering in...' % self.job_id)
48 file_transfer = config.FILE_TRANSFERER_CLASS( \
49 config.RAW_VIDEOS_PATH, config.WS_UPLOAD_PATH)
50 file_transfer.get([raw_video])
53 def transcode(self, input_video, video_name, transcode_configs):
55 Transcodes a video in each requested formats.
57 @param input_video input video file name
58 @param video_name a video name which must be a valid file name
59 @param transcode_configs a list of dictionaries with format settings
62 logger.log_msg('#%s: transcoding...' % self.job_id)
64 transcoder = config.TRANSCODER_CLASS( \
65 input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \
66 name = video_name, prog_bin = config.TRANSCODER_BIN)
67 transcoder.dest_path = config.MEDIA_PATH
69 # Transcode the raw video in each requested format.
70 # TODO report partial errors
71 for transcode_config in transcode_configs:
72 transcode_config['output_file'] = \
73 transcoder.transcode(**transcode_config)
75 def extract_thumbs(self, input_video, video_name, thumbs):
77 Extracts thumbnail images from a video.
79 @param input_video input video file name
80 @param video_name a video name which must be a valid file name
81 @param thumbs use 'random' to extract a thumbnail image from a random
82 point of the video or use a positive integer n to extract n summary
86 logger.log_msg('#%s: extracting image thumbnails...' % self.job_id)
88 # TODO report partial errors
89 thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
90 input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \
92 prog_bin = config.THUMB_EXTRACTOR_BIN)
93 thumb_extractor.dest_path = config.THUMBS_PATH
94 if thumbs == 'random':
95 thumb_extractor.extract_random_thumb()
96 elif type(thumbs) is int and thumbs > 0:
97 thumb_extractor.extract_summary_thumbs(thumbs)
99 def seed(self, transcode_configs):
101 Creates torrents from the videos passed and then stats seeding them.
103 @param transcode_configs a list of dictionaries with format settings
106 logger.log_msg('#%s: creating torrents and starting seeding...' \
109 for transcode_config in transcode_configs:
110 # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
111 # Create torrent file.
112 bt.create_torrent(transcode_config['output_file'])
114 # The torrent file is created in the same directory with the
115 # source file. Move it to the torrents directory.
116 shutil.move(transcode_config['output_file'] + '.tstream', \
117 config.TORRENTS_PATH)
119 output_file = transcode_config['output_file'] + '.tstream'
120 output_file = output_file[(output_file.rindex('/') + 1):]
123 Server.bit_torrent.start_torrent( \
124 os.path.join(config.TORRENTS_PATH, output_file),
127 def transfer_out(self, local_files, local_path, remote_path):
129 Transfers some local files to a remote path of the Web Server.
131 @param local_files list local files to transfer
132 @param remote_path destination path on the Web Server
135 logger.log_msg('#%s: transfering out...' % self.job_id)
137 file_transfer = config.FILE_TRANSFERER_CLASS( \
138 local_path, remote_path)
139 file_transfer.put(local_files)
140 file_transfer.close()
142 def remove_files(self, files, path):
144 Deletes files from a specified path.
147 logger.log_msg('#%s: cleaning up...' % self.job_id)
150 os.unlink(os.path.join(path, f))
154 job = Server.queue.get()
155 self.job_id = job['id']
157 # * TRANSFER RAW VIDEO IN
159 self.transfer_in(job['raw_video'])
160 except Exception as e:
161 logger.log_msg('#%s: error while transferring in: %s' \
162 % (job['id'], str(e)), logger.LOG_LEVEL_FATAL)
165 # * TRANSCODE RAW VIDEO
167 self.transcode(job['raw_video'], job['name'], \
168 job['transcode_configs'])
169 except Exception as e:
170 logger.log_msg('#%s: error while transcoding: %s' \
171 % (job['id'], str(e)), logger.LOG_LEVEL_FATAL)
174 # * EXTRACT THUMBNAIL IMAGES
175 if job['thumbs'] != 0:
177 self.extract_thumbs(job['raw_video'], job['name'], \
179 except Exception as e:
181 '#%s: error while extracting thumbnail images: %s' \
182 % (job['id'], str(e)), logger.LOG_LEVEL_FATAL)
185 # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
186 self.seed(job['transcode_configs'])
189 files = [f for f in os.listdir(config.TORRENTS_PATH) \
190 if os.path.isfile(os.path.join( \
191 config.TORRENTS_PATH, f))]
192 torrent_files = fnmatch.filter(files, job['name'] + "_*")
194 # Thumbnail images files.
195 files = [f for f in os.listdir(config.THUMBS_PATH) \
196 if os.path.isfile(os.path.join( \
197 config.THUMBS_PATH, f))]
198 thumb_files = fnmatch.filter(files, job['name'] + "_*")
200 # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
202 self.transfer_out(torrent_files, config.TORRENTS_PATH, \
203 config.WS_TORRENTS_PATH)
204 self.transfer_out(thumb_files, config.THUMBS_PATH, \
205 config.WS_THUMBS_PATH)
206 except Exception as e:
207 logger.log_msg('#%s: error while transferring out: %s' \
208 % (job['id'], str(e)), logger.LOG_LEVEL_FATAL)
211 # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
212 self.remove_files([ job['raw_video'] ], config.RAW_VIDEOS_PATH)
213 self.remove_files(thumb_files, config.THUMBS_PATH)
216 Server.queue.task_done()
217 Server.load -= job['weight']
222 Implementation of the RESTful web service which constitutes the interface
223 with the client (web server).
232 def GET(self, request):
233 #web.header('Cache-Control', 'no-cache')
235 if request == 'get_load':
236 resp = {"load": Server.load}
237 web.header('Content-Type', 'application/json')
238 return json.dumps(resp)
239 #elif request == 'shutdown':
241 elif request == 'test':
248 def POST(self, request):
249 if request == 'ingest_content':
250 # Read JSON parameters.
251 json_data = web.data()
252 data = json.loads(json_data)
255 if config.SECURITY and \
256 not self.authenticate(data["username"], data["password"]):
257 return "Authentication failed!"
259 # Add job weight to CIS load.
260 Server.load += data["weight"]
263 Server.queue.put(data)
265 return 'Job submitted.'
266 elif request == 'start_torrents':
267 # Read JSON parameters.
268 json_data = web.data()
269 data = json.loads(json_data)
272 Server.start_torrents(data)
273 elif request == 'stop_torrents':
274 # Read JSON parameters.
275 json_data = web.data()
276 data = json.loads(json_data)
279 Server.stop_torrents(data)
280 elif request == 'remove_torrents':
281 # Read JSON parameters.
282 json_data = web.data()
283 data = json.loads(json_data)
286 Server.stop_torrents(data, True)
292 def start_torrents(torrents=None):
294 Scans torrent path for files in order to start download for the files
295 that are not already started.
300 files = [f for f in os.listdir(config.TORRENTS_PATH) \
301 if os.path.isfile(os.path.join( \
302 config.TORRENTS_PATH, f))]
303 torrents = fnmatch.filter(files, "*.tstream")
305 for torrent_file in torrents:
306 Server.bit_torrent.start_torrent( \
307 os.path.join(config.TORRENTS_PATH, torrent_file),
311 def stop_torrents(torrents, remove_content=False):
312 for torrent_file in torrents:
313 Server.bit_torrent.stop_torrent( \
314 torrent_file, remove_content)
316 def authenticate(self, username, password):
317 if not config.SECURITY:
319 if users.users[username] == password:
326 if __name__ == '__main__':
327 # The BitTorrent object implements a NextShare (Tribler) BitTorrent
328 # client for seeding, downloading etc.
329 Server.bit_torrent = bt.BitTorrent()
330 Server.queue = Queue()
333 Server.start_torrents()
334 t = threading.Timer(config.START_DOWNLOADS_INTERVAL, \
335 Server.start_torrents)
340 ci_worker = CIWorker()
341 ci_worker.daemon = True
345 urls = ('/(.*)', 'Server')
346 app = web.application(urls, globals())