9 from Queue import Queue
15 class CIWorker(threading.Thread):
17 Content Ingestion Worker. A class which executes content ingestion jobs
20 CIWorker shares a Queue with its master where jobs are submitted.
23 raw_videos_dir = 'tmp/raw'
24 transcoded_videos_dir = 'tmp/media'
25 thumbs_dir = 'tmp/thumbs'
26 torrents_dir = 'tmp/torrents'
28 def __init__(self, queue, bit_torrent):
30 Initialize Content Ingestion Worker.
32 @param queue a list of dictionaries with the following keys:
35 <li>name: a video name which must be a valid file name</li>
36 <li>transcode_configs: a list of transcode configuration
37 dictionaries having the keys as the parameters of
38 api.BaseTranscoder.transcode(...)</li>
39 <li>thumbs: string 'random' for extracting a thumbnail
40 image from a random video position or a positive integer which
41 represents the number of summary thumbnails to be extracted</li>
45 threading.Thread.__init__(self, name='CIWorker')
48 self.bit_torrent = bit_torrent
50 def transfer_in(self, raw_video):
52 Transfers a raw video file from the Web Server.
54 @param raw_video raw video file name
57 file_transfer = config.FILE_TRANSFERER_CLASS( \
58 self.raw_videos_dir, config.INPUT_PATH)
59 file_transfer.get([raw_video])
62 print '** Transfering in finished.'
64 def transcode(self, input_video, video_name, transcode_configs):
66 Transcodes a video in each requested formats.
68 @param input_video input video file name
69 @param video_name a video name which must be a valid file name
70 @param transcode_configs a list of dictionaries with format settings
73 transcoder = config.TRANSCODER_CLASS( \
74 input_file = os.path.join(self.raw_videos_dir, input_video), \
75 name = video_name, prog_bin = config.TRANSCODER_BIN)
76 transcoder.dest_path = self.transcoded_videos_dir
78 # Transcode the raw video in each requested format.
79 # TODO report partial errors
80 for transcode_config in transcode_configs:
81 transcode_config['output_file'] = \
82 transcoder.transcode(**transcode_config)
84 print '** Transcoding finished.'
86 def extract_thumbs(self, input_video, video_name, thumbs):
88 Extracts thumbnail images from a video.
90 @param input_video input video file name
91 @param video_name a video name which must be a valid file name
92 @param thumbs use 'random' to extract a thumbnail image from a random
93 point of the video or use a positive integer n to extract n summary
97 # TODO report partial errors
98 thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
99 input_file = os.path.join(self.raw_videos_dir, input_video), \
101 prog_bin = config.THUMB_EXTRACTOR_BIN)
102 thumb_extractor.dest_path = self.thumbs_dir
103 if thumbs == 'random':
104 thumb_extractor.extract_random_thumb()
105 elif type(thumbs) is int and thumbs > 0:
106 thumb_extractor.extract_summary_thumbs(thumbs)
108 print '** Extracting thumbs finished.'
110 def seed(self, transcode_configs):
112 Creates torrents from the videos passed and then stats seeding them.
114 @param transcode_configs a list of dictionaries with format settings
117 for transcode_config in transcode_configs:
118 # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
119 # Create torrent file.
120 bt.create_torrent(transcode_config['output_file'])
122 # The torrent file is created in the same directory with the
123 # source file. Move it to the torrents directory.
124 shutil.move(transcode_config['output_file'] + '.tstream', \
127 output_file = transcode_config['output_file'] + '.tstream'
128 output_file = output_file[(output_file.rindex('/') + 1):]
131 bit_torrent.start_download( \
132 os.path.join(self.torrents_dir, output_file),
133 self.transcoded_videos_dir)
135 print '** Creating torrents and seeding finished.'
137 def transfer_out(self, local_files, local_path, remote_path):
139 Transfers some local files to a remote path of the Web Server.
141 @param local_files list local files to transfer
142 @param remote_path destination path on the Web Server
145 file_transfer = config.FILE_TRANSFERER_CLASS( \
146 local_path, remote_path)
147 file_transfer.put(local_files)
148 file_transfer.close()
150 print '** Creating torrents and seeding finished.'
152 def remove_files(self, files, path):
154 Deletes files from a specified path.
158 os.unlink(os.path.join(path, f))
160 print '** Cleaning up finished.'
164 job = self.queue.get()
166 # * TRANSFER RAW VIDEO IN
167 self.transfer_in(job['raw_video'])
169 # * TRANSCODE RAW VIDEO
170 self.transcode(job['raw_video'], job['name'], \
171 job['transcode_configs'])
173 # * EXTRACT THUMBNAIL IMAGES
174 if job['thumbs'] != 0:
175 self.extract_thumbs(job['raw_video'], job['name'], \
178 # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
179 self.seed(job['transcode_configs'])
182 files = [f for f in os.listdir(self.torrents_dir) \
183 if os.path.isfile(os.path.join( \
184 self.torrents_dir, f))]
185 torrent_files = fnmatch.filter(files, name + "_*")
187 # Thumbnail images files.
188 files = [f for f in os.listdir(self.thumbs_dir) \
189 if os.path.isfile(os.path.join( \
190 self.thumbs_dir, f))]
191 thumb_files = fnmatch.filter(files, name + "_*")
194 raw_files = [f for f in os.listdir(self.raw_videos_dir) \
195 if os.path.isfile(os.path.join( \
196 self.raw_videos_dir, f))]
198 # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
199 self.transfer_out(torrent_files, self.torrents_dir, \
200 config.OUTPUT_TORRENTS_PATH)
201 self.transfer_out(thumb_files, self.thumbs_dir, \
202 config.OUTPUT_THUMBS_PATH)
204 # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
205 self.remove_files(raw_files, self.raw_videos_dir)
206 self.remove_files(thumb_files, self.thumbs_dir)
212 if __name__ == '__main__':
216 # The BitTorrent object implements a NextShare (Tribler) BitTorrent client
217 # for seeding, downloading etc.
218 bit_torrent = bt.BitTorrent()
221 ci_worker = CIWorker(queue, bit_torrent)
222 ci_worker.daemon = True
226 raw_video = sys.stdin.readline().strip()
235 v_resolution = '640x480'
237 name = raw_video[:raw_video.rindex('.')]
239 'container': container,
241 'a_bitrate': a_bitrate,
243 'v_bitrate': v_bitrate,
244 'v_resolution': v_resolution
249 'raw_video': raw_video,
251 'transcode_configs': [transcode_config],