9 from Queue import Queue
17 class CIWorker(threading.Thread):
19 Content Ingestion Worker. A class which executes content ingestion jobs
22 CIWorker shares a Queue with its master where jobs are submitted.
25 raw_videos_dir = 'tmp/raw'
26 transcoded_videos_dir = 'tmp/media'
27 thumbs_dir = 'tmp/thumbs'
28 torrents_dir = 'tmp/torrents'
30 def __init__(self, shared, bit_torrent):
32 Initialize Content Ingestion Worker.
34 @param shared data shared with the front-end (Service)
37 threading.Thread.__init__(self, name='CIWorker')
40 self.bit_torrent = bit_torrent
42 def transfer_in(self, raw_video):
44 Transfers a raw video file from the Web Server.
46 @param raw_video raw video file name
49 file_transfer = config.FILE_TRANSFERER_CLASS( \
50 self.raw_videos_dir, config.INPUT_PATH)
51 file_transfer.get([raw_video])
54 print '** Transfering in finished.'
56 def transcode(self, input_video, video_name, transcode_configs):
58 Transcodes a video in each requested formats.
60 @param input_video input video file name
61 @param video_name a video name which must be a valid file name
62 @param transcode_configs a list of dictionaries with format settings
65 transcoder = config.TRANSCODER_CLASS( \
66 input_file = os.path.join(self.raw_videos_dir, input_video), \
67 name = video_name, prog_bin = config.TRANSCODER_BIN)
68 transcoder.dest_path = self.transcoded_videos_dir
70 # Transcode the raw video in each requested format.
71 # TODO report partial errors
72 for transcode_config in transcode_configs:
73 transcode_config['output_file'] = \
74 transcoder.transcode(**transcode_config)
76 print '** Transcoding finished.'
78 def extract_thumbs(self, input_video, video_name, thumbs):
80 Extracts thumbnail images from a video.
82 @param input_video input video file name
83 @param video_name a video name which must be a valid file name
84 @param thumbs use 'random' to extract a thumbnail image from a random
85 point of the video or use a positive integer n to extract n summary
89 # TODO report partial errors
90 thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
91 input_file = os.path.join(self.raw_videos_dir, input_video), \
93 prog_bin = config.THUMB_EXTRACTOR_BIN)
94 thumb_extractor.dest_path = self.thumbs_dir
95 if thumbs == 'random':
96 thumb_extractor.extract_random_thumb()
97 elif type(thumbs) is int and thumbs > 0:
98 thumb_extractor.extract_summary_thumbs(thumbs)
100 print '** Extracting thumbs finished.'
102 def seed(self, transcode_configs):
104 Creates torrents from the videos passed and then stats seeding them.
106 @param transcode_configs a list of dictionaries with format settings
109 for transcode_config in transcode_configs:
110 # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
111 # Create torrent file.
112 bt.create_torrent(transcode_config['output_file'])
114 # The torrent file is created in the same directory with the
115 # source file. Move it to the torrents directory.
116 shutil.move(transcode_config['output_file'] + '.tstream', \
119 output_file = transcode_config['output_file'] + '.tstream'
120 output_file = output_file[(output_file.rindex('/') + 1):]
123 bit_torrent.start_download( \
124 os.path.join(self.torrents_dir, output_file),
125 self.transcoded_videos_dir)
127 print '** Creating torrents and seeding finished.'
129 def transfer_out(self, local_files, local_path, remote_path):
131 Transfers some local files to a remote path of the Web Server.
133 @param local_files list local files to transfer
134 @param remote_path destination path on the Web Server
137 file_transfer = config.FILE_TRANSFERER_CLASS( \
138 local_path, remote_path)
139 file_transfer.put(local_files)
140 file_transfer.close()
142 print '** Creating torrents and seeding finished.'
144 def remove_files(self, files, path):
146 Deletes files from a specified path.
150 os.unlink(os.path.join(path, f))
152 print '** Cleaning up finished.'
156 job = self.shared.queue.get()
158 # * TRANSFER RAW VIDEO IN
159 self.transfer_in(job['raw_video'])
161 # * TRANSCODE RAW VIDEO
162 self.transcode(job['raw_video'], job['name'], \
163 job['transcode_configs'])
165 # * EXTRACT THUMBNAIL IMAGES
166 if job['thumbs'] != 0:
167 self.extract_thumbs(job['raw_video'], job['name'], \
170 # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
171 self.seed(job['transcode_configs'])
174 files = [f for f in os.listdir(self.torrents_dir) \
175 if os.path.isfile(os.path.join( \
176 self.torrents_dir, f))]
177 torrent_files = fnmatch.filter(files, job['name'] + "_*")
179 # Thumbnail images files.
180 files = [f for f in os.listdir(self.thumbs_dir) \
181 if os.path.isfile(os.path.join( \
182 self.thumbs_dir, f))]
183 thumb_files = fnmatch.filter(files, job['name'] + "_*")
186 raw_files = [f for f in os.listdir(self.raw_videos_dir) \
187 if os.path.isfile(os.path.join( \
188 self.raw_videos_dir, f))]
190 # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
191 self.transfer_out(torrent_files, self.torrents_dir, \
192 config.OUTPUT_TORRENTS_PATH)
193 self.transfer_out(thumb_files, self.thumbs_dir, \
194 config.OUTPUT_THUMBS_PATH)
196 # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
197 self.remove_files(raw_files, self.raw_videos_dir)
198 self.remove_files(thumb_files, self.thumbs_dir)
201 self.shared.queue.task_done()
202 print 'load in run is', self.shared.load
203 self.shared.load -= job['weight']
207 Shared data between Service (front-end) and CIWorker (back-end).
209 @member queue a list of dictionaries with the following keys:
212 <li>name: a video name which must be a valid file name</li>
213 <li>transcode_configs: a list of transcode configuration
214 dictionaries having the keys as the parameters of
215 api.BaseTranscoder.transcode(...)</li>
216 <li>thumbs: string 'random' for extracting a thumbnail
217 image from a random video position or a positive integer which
218 represents the number of summary thumbnails to be extracted</li>
220 @member load total weight of the jobs from the queue
233 Implementation of the RESTful web service which constitutes the interface
234 with the client (web server).
238 # Shared data with back-end (CIWorker).
239 self.shared = Shared()
244 ci_worker = CIWorker(self.shared, bit_torrent)
245 ci_worker.daemon = True
249 self.shared.queue.join()
251 def GET(self, request):
252 #web.header('Cache-Control', 'no-cache')
254 if request == 'get_load':
255 resp = {"load": self.shared.load}
256 print 'load in GET is', self.shared.load
257 web.header('Content-Type', 'application/json')
258 return json.dumps(resp)
264 def POST(self, request):
265 if request == 'ingest_content':
266 # Read JSON parameters.
267 json_data = web.data()
268 data = json.loads(json_data)
270 # Add job weight to CIS load.
271 self.shared.load += data["weight"]
272 print 'load in POST is', self.shared.load
275 self.shared.queue.put(data)
277 return 'Job submitted.'
283 if __name__ == '__main__':
284 # The BitTorrent object implements a NextShare (Tribler) BitTorrent
285 # client for seeding, downloading etc.
287 bit_torrent = bt.BitTorrent()
290 urls = ('/(.*)', 'Service')
291 service = web.application(urls, globals())