8 from Queue import Queue
14 class CIWorker(threading.Thread):
16 Content Ingestion Worker. A class which executes content ingestion jobs
19 CIWorker shares a Queue with its master where jobs are submitted.
22 raw_videos_dir = 'tmp/raw'
23 transcoded_videos_dir = 'tmp/media'
24 thumbs_dir = 'tmp/thumbs'
25 torrents_dir = 'tmp/torrents'
27 def __init__(self, queue, bit_torrent):
29 Initialize Content Ingestion Worker.
31 @param queue a list of dictionaries with the following keys:
34 <li>name: a video name which must be a valid file name</li>
35 <li>transcode_configs: a list of transcode configuration
36 dictionaries having the keys as the parameters of
37 api.BaseTranscoder.transcode(...)</li>
38 <li>thumbs: string 'random' for extracting a thumbnail
39 image from a random video position or a positive integer which
40 represents the number of summary thumbnails to be extracted</li>
44 threading.Thread.__init__(self)
47 self.bit_torrent = bit_torrent
49 def transfer_in(self, raw_video):
51 Transfers a raw video file from the Web Server.
53 @param raw_video raw video file name
56 file_transfer = config.FILE_TRANSFERER_CLASS( \
57 self.raw_videos_dir, config.INPUT_PATH)
58 file_transfer.get(raw_video)
61 print '** Transfering in finished.'
63 def transcode(self, input_video, video_name, transcode_configs):
65 Transcodes a video in each requested formats.
67 @param input_video input video file name
68 @param video_name a video name which must be a valid file name
69 @param transcode_configs a list of dictionaries with format settings
72 transcoder = config.TRANSCODER_CLASS( \
73 input_file = video_name, \
74 name = video_name, prog_bin = config.TRANSCODER_BIN)
75 transcoder.dest_path = self.transcoded_videos_dir
77 # Transcode the raw video in each requested format.
78 # TODO report partial errors
79 for transcode_config in transcode_configs:
80 transcode_config['output_file'] = transcoder.transcode( \
81 container = transcode_config['container'], \
82 a_codec = transcode_config['a_codec'], \
83 a_bitrate = transcode_config['a_bitrate'], \
84 a_samplingrate = transcode_config['a_samplingrate'], \
85 a_channels = transcode_config['a_channels'], \
86 v_codec = transcode_config['v_codec'], \
87 v_bitrate = transcode_config['v_bitrate'], \
88 v_framerate = transcode_config['v_framerate'], \
89 v_resolution = transcode_config['v_resolution'], \
90 v_dar = transcode_config['dar'])
92 print '** Transcoding finished.'
94 def extract_thumbs(self, input_video, video_name, thumbs):
96 Extracts thumbnail images from a video.
98 @param input_video input video file name
99 @param video_name a video name which must be a valid file name
100 @param thumbs use 'random' to extract a thumbnail image from a random
101 point of the video or use a positive integer n to extract n summary
105 # TODO report partial errors
106 thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
107 input_file = input_video, name = video_name, \
108 prog_bin = config.THUMB_EXTRACTOR_BIN)
109 thumb_extractor.dest_path = self.thumbs_dir
110 if thumbs == 'random':
111 thumb_extractor.extract_random_thumb()
112 elif type(thumbs) is int and thumbs > 0:
113 thumb_extractor.extract_summary_thumbs(thumbs)
115 print '** Extracting thumbs finished.'
117 def seed(self, transcode_configs):
119 Creates torrents from the videos passed and then stats seeding them.
121 @param transcode_configs a list of dictionaries with format settings
124 for transcode_config in transcode_cofigs:
125 # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
126 # Create torrent file.
127 bt.create_torrent(transcode_config['output_file'])
129 # The torrent file is created in the same directory with the
130 # source file. Move it to the torrents directory.
131 shutil.move(transcode_config['output_file'] + '.tstream', \
135 bit_torrent.start_download( \
136 transcode_config['output_file'] + '.tstream',
137 self_transcoded_videos_dir)
139 print '** Creating torrents and seeding finished.'
141 def transfer_out(self, local_files, local_path, remote_path):
143 Transfers some local files to a remote path of the Web Server.
145 @param local_files list local files to transfer
146 @param remote_path destination path on the Web Server
149 file_transfer = config.FILE_TRANSFERER_CLASS( \
150 local_path, remote_path)
151 file_transfer.put(local_files)
152 file_transfer.close()
154 print '** Creating torrents and seeding finished.'
156 def remove_file(self, files, path):
158 Deletes files from a specified path.
162 os.unlink(os.path.join(path, f))
164 print '** Cleaning up finished.'
168 job = self.queue.get()
170 # * TRANSFER RAW VIDEO IN
171 self.transfer_in(job['raw_video'])
173 # * TRANSCODE RAW VIDEO
174 self.transcode(job['raw_video'], job['name'], \
175 job['transcode_configs'])
177 # * EXTRACT THUMBNAIL IMAGES
178 if job['thumbs'] != 0:
179 self.extract_thumbs(job['raw_video'], job['name'], \
182 # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
183 self.seed(job['transcode_configs'])
186 files = [f for f in os.listdir(self.torrents_dir) \
187 if os.path.isfile(os.path.join( \
188 self.torrents_dir, f))]
189 torrent_files = fnmatch.filter(files, name + "_*")
191 # Thumbnail images files.
192 files = [f for f in os.listdir(self.thumbs_dir) \
193 if os.path.isfile(os.path.join( \
194 self.thumbs_dir, f))]
195 thumb_files = fnmatch.filter(files, name + "_*")
198 raw_files = [f for f in os.listdir(self.raw_videos_dir) \
199 if os.path.isfile(os.path.join( \
200 self.raw_videos_dir, f))]
202 # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
203 self.transfer_out(torrent_files, self.torrents_dir, \
204 config.OUTPUT_TORRENTS_PATH)
205 self.transfer_out(thumb_files, self.thumbs_dir, \
206 config.OUTPUT_THUMBS_PATH)
208 # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
209 self.remove_files(raw_files, self.raw_videos_dir)
210 self.remove_files(thumb_files, self.thumbs_dir)
216 if __name__ == '__main__':
220 # The BitTorrent object implements a NextShare (Tribler) BitTorrent client
221 # for seeding, downloading etc.
222 bit_torrent = bt.BitTorrent()
225 ci_worker = CIWorker(queue, bit_torrent)
226 ci_worker.daemon = True
230 raw_video = sys.stdin.readline().strip()
239 v_resolution = '640x480'
241 name = raw_video[:raw_video.rindex('.')]
243 'container': container,
245 'a_bitrate': a_bitrate,
247 'v_bitrate': v_bitrate,
248 'v_resolution': v_resolution
253 'raw_video': raw_video,
255 'transcode_configs': [transcode_config],