CIS: CIWorker works; now we need communication via web services
[living-lab-site.git] / cis / cisd.py
1 #!/usr/bin/env python
2
3 import sys
4 import os
5 import fnmatch
6 import shutil
7 import time
8 import threading
9 from Queue import Queue
10
11 import config
12 import bt
13
14
15 class CIWorker(threading.Thread):
16     """
17     Content Ingestion Worker. A class which executes content ingestion jobs
18     on a separate thread.
19
20     CIWorker shares a Queue with its master where jobs are submitted.
21     """
22
23     raw_videos_dir = 'tmp/raw'
24     transcoded_videos_dir = 'tmp/media'
25     thumbs_dir = 'tmp/thumbs'
26     torrents_dir = 'tmp/torrents'
27
28     def __init__(self, queue, bit_torrent):
29         """
30         Initialize Content Ingestion Worker.
31
32         @param queue a list of dictionaries with the following keys:
33         <ul>
34             <li>raw_video</li>
35             <li>name: a video name which must be a valid file name</li>
36             <li>transcode_configs: a list of transcode configuration
37             dictionaries having the keys as the parameters of
38             api.BaseTranscoder.transcode(...)</li>
39             <li>thumbs: string 'random' for extracting a thumbnail
40             image from a random video position or a positive integer which
41             represents the number of summary thumbnails to be extracted</li>
42         </ul>
43         """
44
45         threading.Thread.__init__(self, name='CIWorker')
46
47         self.queue = queue
48         self.bit_torrent = bit_torrent
49
50     def transfer_in(self, raw_video):
51         """
52         Transfers a raw video file from the Web Server.
53
54         @param raw_video raw video file name
55         """
56
57         file_transfer = config.FILE_TRANSFERER_CLASS( \
58                 self.raw_videos_dir, config.INPUT_PATH)
59         file_transfer.get([raw_video])
60         file_transfer.close()
61
62         print '** Transfering in finished.'
63
64     def transcode(self, input_video, video_name, transcode_configs):
65         """
66         Transcodes a video in each requested formats.
67
68         @param input_video input video file name
69         @param video_name a video name which must be a valid file name
70         @param transcode_configs a list of dictionaries with format settings
71         """
72
73         transcoder = config.TRANSCODER_CLASS( \
74                 input_file = os.path.join(self.raw_videos_dir, input_video), \
75                 name = video_name, prog_bin = config.TRANSCODER_BIN)
76         transcoder.dest_path = self.transcoded_videos_dir
77         
78         # Transcode the raw video in each requested format.
79         # TODO report partial errors
80         for transcode_config in transcode_configs:
81             transcode_config['output_file'] = \
82                     transcoder.transcode(**transcode_config)
83
84         print '** Transcoding finished.'
85
86     def extract_thumbs(self, input_video, video_name, thumbs):
87         """
88         Extracts thumbnail images from a video.
89
90         @param input_video input video file name
91         @param video_name a video name which must be a valid file name
92         @param thumbs use 'random' to extract a thumbnail image from a random
93         point of the video or use a positive integer n to extract n summary
94         thumbnail
95         """
96
97         # TODO report partial errors
98         thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
99                 input_file = os.path.join(self.raw_videos_dir, input_video), \
100                 name = video_name, \
101                 prog_bin = config.THUMB_EXTRACTOR_BIN)
102         thumb_extractor.dest_path = self.thumbs_dir
103         if thumbs == 'random':
104             thumb_extractor.extract_random_thumb()
105         elif type(thumbs) is int and thumbs > 0:
106             thumb_extractor.extract_summary_thumbs(thumbs)
107
108         print '** Extracting thumbs finished.'
109
110     def seed(self, transcode_configs):
111         """
112         Creates torrents from the videos passed and then stats seeding them.
113
114         @param transcode_configs a list of dictionaries with format settings
115         """
116
117         for transcode_config in transcode_configs:
118             # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
119             # Create torrent file.
120             bt.create_torrent(transcode_config['output_file'])
121             
122             # The torrent file is created in the same directory with the
123             # source file. Move it to the torrents directory.
124             shutil.move(transcode_config['output_file'] + '.tstream', \
125                     self.torrents_dir)
126
127             output_file = transcode_config['output_file'] + '.tstream'
128             output_file = output_file[(output_file.rindex('/') + 1):]
129
130             # * SEED TORRENTS
131             bit_torrent.start_download( \
132                     os.path.join(self.torrents_dir, output_file),
133                     self.transcoded_videos_dir)
134
135         print '** Creating torrents and seeding finished.'
136
137     def transfer_out(self, local_files, local_path, remote_path):
138         """
139         Transfers some local files to a remote path of the Web Server.
140
141         @param local_files list local files to transfer
142         @param remote_path destination path on the Web Server
143         """
144
145         file_transfer = config.FILE_TRANSFERER_CLASS( \
146                 local_path, remote_path)
147         file_transfer.put(local_files)
148         file_transfer.close()
149
150         print '** Creating torrents and seeding finished.'
151
152     def remove_files(self, files, path):
153         """
154         Deletes files from a specified path.
155         """
156
157         for f in files:
158             os.unlink(os.path.join(path, f))
159
160         print '** Cleaning up finished.'
161
162     def run(self):
163         while True:
164             job = self.queue.get()
165
166             # * TRANSFER RAW VIDEO IN
167             self.transfer_in(job['raw_video'])
168
169             # * TRANSCODE RAW VIDEO
170             self.transcode(job['raw_video'], job['name'], \
171                     job['transcode_configs'])
172
173             # * EXTRACT THUMBNAIL IMAGES
174             if job['thumbs'] != 0:
175                 self.extract_thumbs(job['raw_video'], job['name'], \
176                         job['thumbs'])
177
178             # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
179             self.seed(job['transcode_configs'])
180
181             # Torrent files.
182             files = [f for f in os.listdir(self.torrents_dir) \
183                     if os.path.isfile(os.path.join( \
184                             self.torrents_dir, f))]
185             torrent_files = fnmatch.filter(files, name + "_*")
186
187             # Thumbnail images files.
188             files = [f for f in os.listdir(self.thumbs_dir) \
189                     if os.path.isfile(os.path.join( \
190                             self.thumbs_dir, f))]
191             thumb_files = fnmatch.filter(files, name + "_*")
192                 
193             # Raw video files.
194             raw_files = [f for f in os.listdir(self.raw_videos_dir) \
195                     if os.path.isfile(os.path.join( \
196                             self.raw_videos_dir, f))]
197
198             # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
199             self.transfer_out(torrent_files, self.torrents_dir, \
200                     config.OUTPUT_TORRENTS_PATH)
201             self.transfer_out(thumb_files, self.thumbs_dir, \
202                     config.OUTPUT_THUMBS_PATH)
203             
204             # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
205             self.remove_files(raw_files, self.raw_videos_dir)
206             self.remove_files(thumb_files, self.thumbs_dir)
207
208             # * JOB FINISHED
209             queue.task_done()
210
211
212 if __name__ == '__main__':
213     # Jobs queue.
214     queue = Queue()
215
216     # The BitTorrent object implements a NextShare (Tribler) BitTorrent client
217     # for seeding, downloading etc.
218     bit_torrent = bt.BitTorrent()
219
220     # Worker thread.
221     ci_worker = CIWorker(queue, bit_torrent)
222     ci_worker.daemon = True
223     ci_worker.start()
224
225     while True:
226         raw_video = sys.stdin.readline().strip()
227         if raw_video == 'x':
228             break
229
230         container = 'webm'
231         a_codec = 'vorbis'
232         a_bitrate = '128k'
233         v_codec = 'vp8'
234         v_bitrate = '480k'
235         v_resolution = '640x480'
236         
237         name = raw_video[:raw_video.rindex('.')]
238         transcode_config = {
239             'container': container,
240             'a_codec': a_codec,
241             'a_bitrate': a_bitrate,
242             'v_codec': v_codec,
243             'v_bitrate': v_bitrate,
244             'v_resolution': v_resolution
245         }
246         thumbs = 4
247
248         job = {
249             'raw_video': raw_video,
250             'name': name,
251             'transcode_configs': [transcode_config],
252             'thumbs': thumbs
253         }
254         
255         queue.put(job)
256
257     queue.join()