d286ef7958b4a379c254473e98d7b71cb3e93847
[living-lab-site.git] / cis / cisd.py
1 #!/usr/bin/env python
2
3 import sys
4 import os
5 import fnmatch
6 import shutil
7 import time
8 import threading
9 from Queue import Queue
10 import web
11 import json
12
13 import config
14 import bt
15
16
17 class CIWorker(threading.Thread):
18     """
19     Content Ingestion Worker. A class which executes content ingestion jobs
20     on a separate thread.
21
22     CIWorker shares a Queue with its master where jobs are submitted.
23     """
24
25     raw_videos_dir = 'tmp/raw'
26     transcoded_videos_dir = 'tmp/media'
27     thumbs_dir = 'tmp/thumbs'
28     torrents_dir = 'tmp/torrents'
29
30     def __init__(self, shared, bit_torrent):
31         """
32         Initialize Content Ingestion Worker.
33
34         @param shared data shared with the front-end (Service)
35         """
36
37         threading.Thread.__init__(self, name='CIWorker')
38
39         self.shared = shared
40         self.bit_torrent = bit_torrent
41
42     def transfer_in(self, raw_video):
43         """
44         Transfers a raw video file from the Web Server.
45
46         @param raw_video raw video file name
47         """
48
49         file_transfer = config.FILE_TRANSFERER_CLASS( \
50                 self.raw_videos_dir, config.INPUT_PATH)
51         file_transfer.get([raw_video])
52         file_transfer.close()
53
54         print '** Transfering in finished.'
55
56     def transcode(self, input_video, video_name, transcode_configs):
57         """
58         Transcodes a video in each requested formats.
59
60         @param input_video input video file name
61         @param video_name a video name which must be a valid file name
62         @param transcode_configs a list of dictionaries with format settings
63         """
64
65         transcoder = config.TRANSCODER_CLASS( \
66                 input_file = os.path.join(self.raw_videos_dir, input_video), \
67                 name = video_name, prog_bin = config.TRANSCODER_BIN)
68         transcoder.dest_path = self.transcoded_videos_dir
69         
70         # Transcode the raw video in each requested format.
71         # TODO report partial errors
72         for transcode_config in transcode_configs:
73             transcode_config['output_file'] = \
74                     transcoder.transcode(**transcode_config)
75
76         print '** Transcoding finished.'
77
78     def extract_thumbs(self, input_video, video_name, thumbs):
79         """
80         Extracts thumbnail images from a video.
81
82         @param input_video input video file name
83         @param video_name a video name which must be a valid file name
84         @param thumbs use 'random' to extract a thumbnail image from a random
85         point of the video or use a positive integer n to extract n summary
86         thumbnail
87         """
88
89         # TODO report partial errors
90         thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
91                 input_file = os.path.join(self.raw_videos_dir, input_video), \
92                 name = video_name, \
93                 prog_bin = config.THUMB_EXTRACTOR_BIN)
94         thumb_extractor.dest_path = self.thumbs_dir
95         if thumbs == 'random':
96             thumb_extractor.extract_random_thumb()
97         elif type(thumbs) is int and thumbs > 0:
98             thumb_extractor.extract_summary_thumbs(thumbs)
99
100         print '** Extracting thumbs finished.'
101
102     def seed(self, transcode_configs):
103         """
104         Creates torrents from the videos passed and then stats seeding them.
105
106         @param transcode_configs a list of dictionaries with format settings
107         """
108
109         for transcode_config in transcode_configs:
110             # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
111             # Create torrent file.
112             bt.create_torrent(transcode_config['output_file'])
113             
114             # The torrent file is created in the same directory with the
115             # source file. Move it to the torrents directory.
116             shutil.move(transcode_config['output_file'] + '.tstream', \
117                     self.torrents_dir)
118
119             output_file = transcode_config['output_file'] + '.tstream'
120             output_file = output_file[(output_file.rindex('/') + 1):]
121
122             # * SEED TORRENTS
123             bit_torrent.start_download( \
124                     os.path.join(self.torrents_dir, output_file),
125                     self.transcoded_videos_dir)
126
127         print '** Creating torrents and seeding finished.'
128
129     def transfer_out(self, local_files, local_path, remote_path):
130         """
131         Transfers some local files to a remote path of the Web Server.
132
133         @param local_files list local files to transfer
134         @param remote_path destination path on the Web Server
135         """
136
137         file_transfer = config.FILE_TRANSFERER_CLASS( \
138                 local_path, remote_path)
139         file_transfer.put(local_files)
140         file_transfer.close()
141
142         print '** Creating torrents and seeding finished.'
143
144     def remove_files(self, files, path):
145         """
146         Deletes files from a specified path.
147         """
148
149         for f in files:
150             os.unlink(os.path.join(path, f))
151
152         print '** Cleaning up finished.'
153
154     def run(self):
155         while True:
156             job = self.shared.queue.get()
157
158             # * TRANSFER RAW VIDEO IN
159             self.transfer_in(job['raw_video'])
160
161             # * TRANSCODE RAW VIDEO
162             self.transcode(job['raw_video'], job['name'], \
163                     job['transcode_configs'])
164
165             # * EXTRACT THUMBNAIL IMAGES
166             if job['thumbs'] != 0:
167                 self.extract_thumbs(job['raw_video'], job['name'], \
168                         job['thumbs'])
169
170             # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
171             self.seed(job['transcode_configs'])
172
173             # Torrent files.
174             files = [f for f in os.listdir(self.torrents_dir) \
175                     if os.path.isfile(os.path.join( \
176                             self.torrents_dir, f))]
177             torrent_files = fnmatch.filter(files, job['name'] + "_*")
178
179             # Thumbnail images files.
180             files = [f for f in os.listdir(self.thumbs_dir) \
181                     if os.path.isfile(os.path.join( \
182                             self.thumbs_dir, f))]
183             thumb_files = fnmatch.filter(files, job['name'] + "_*")
184                 
185             # Raw video files.
186             raw_files = [f for f in os.listdir(self.raw_videos_dir) \
187                     if os.path.isfile(os.path.join( \
188                             self.raw_videos_dir, f))]
189
190             # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
191             self.transfer_out(torrent_files, self.torrents_dir, \
192                     config.OUTPUT_TORRENTS_PATH)
193             self.transfer_out(thumb_files, self.thumbs_dir, \
194                     config.OUTPUT_THUMBS_PATH)
195             
196             # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
197             self.remove_files(raw_files, self.raw_videos_dir)
198             self.remove_files(thumb_files, self.thumbs_dir)
199
200             # * JOB FINISHED
201             self.shared.queue.task_done()
202             print 'load in run is', self.shared.load
203             self.shared.load -= job['weight']
204
205 class Shared:
206     """
207     Shared data between Service (front-end) and CIWorker (back-end).
208
209     @member queue a list of dictionaries with the following keys:
210         <ul>
211             <li>raw_video</li>
212             <li>name: a video name which must be a valid file name</li>
213             <li>transcode_configs: a list of transcode configuration
214             dictionaries having the keys as the parameters of
215             api.BaseTranscoder.transcode(...)</li>
216             <li>thumbs: string 'random' for extracting a thumbnail
217             image from a random video position or a positive integer which
218             represents the number of summary thumbnails to be extracted</li>
219         </ul>
220     @member load total weight of the jobs from the queue 
221     """
222
223     def __init__(self):
224         # Jobs queue.
225         self.queue = Queue()
226
227         # Sever load.
228         self.load = 0
229
230
231 class Service:
232     """
233     Implementation of the RESTful web service which constitutes the interface
234     with the client (web server).
235     """
236
237     def __init__(self):
238         # Shared data with back-end (CIWorker).
239         self.shared = Shared()
240
241         global bit_torrent
242
243         # Worker thread.
244         ci_worker = CIWorker(self.shared, bit_torrent)
245         ci_worker.daemon = True
246         ci_worker.start()
247         
248     def __del__(self):
249         self.shared.queue.join()
250     
251     def GET(self, request):
252         #web.header('Cache-Control', 'no-cache')
253
254         if request == 'get_load':
255             resp = {"load": self.shared.load}
256             print 'load in GET is', self.shared.load
257             web.header('Content-Type', 'application/json')
258             return json.dumps(resp)
259         else:
260             web.badrequest()
261             return ""
262         
263
264     def POST(self, request):
265         if request == 'ingest_content':
266             # Read JSON parameters.
267             json_data = web.data()
268             data = json.loads(json_data)
269
270             # Add job weight to CIS load.
271             self.shared.load += data["weight"]
272             print 'load in POST is', self.shared.load
273
274             # Submit job.
275             self.shared.queue.put(data)
276
277             return 'Job submitted.'
278         else:
279             web.badrequest()
280             return ""
281
282
283 if __name__ == '__main__':
284     # The BitTorrent object implements a NextShare (Tribler) BitTorrent
285     # client for seeding, downloading etc.
286     global bit_torrent
287     bit_torrent = bt.BitTorrent()
288
289     # Web service.
290     urls = ('/(.*)', 'Service')
291     service = web.application(urls, globals())
292     service.run()