cis: bug fixes
[living-lab-site.git] / cis / cisd.py
1 #!/usr/bin/env python
2
3 import sys
4 import os
5 import fnmatch
6 import shutil
7 import time
8 import threading
9 from Queue import Queue
10 import web
11 import json
12 from web.wsgiserver import CherryPyWSGIServer
13
14 import config
15 import bt
16 import users
17
18 if config.SECURITY:
19     CherryPyWSGIServer.ssl_certificate = "cacert.pem"
20     CherryPyWSGIServer.ssl_private_key = "privkey.pem"
21
22
23 class CIWorker(threading.Thread):
24     """
25     Content Ingestion Worker. A class which executes content ingestion jobs
26     on a separate thread.
27
28     CIWorker shares a Queue with its master where jobs are submitted.
29     """
30
31     raw_videos_dir = 'tmp/raw'
32     transcoded_videos_dir = 'tmp/media'
33     thumbs_dir = 'tmp/thumbs'
34     torrents_dir = config.CIS_TORRENTS_PATH
35
36     def __init__(self):
37         """
38         Initialize Content Ingestion Worker.
39         """
40
41         threading.Thread.__init__(self, name='CIWorker')
42
43     def transfer_in(self, raw_video):
44         """
45         Transfers a raw video file from the Web Server.
46
47         @param raw_video raw video file name
48         """
49         
50         print '** Transfering in...'
51         
52         file_transfer = config.FILE_TRANSFERER_CLASS( \
53                 self.raw_videos_dir, config.WS_UPLOAD_PATH)
54         file_transfer.get([raw_video])
55         file_transfer.close()
56
57     def transcode(self, input_video, video_name, transcode_configs):
58         """
59         Transcodes a video in each requested formats.
60
61         @param input_video input video file name
62         @param video_name a video name which must be a valid file name
63         @param transcode_configs a list of dictionaries with format settings
64         """
65
66         print '** Transcoding...'
67         
68         transcoder = config.TRANSCODER_CLASS( \
69                 input_file = os.path.join(self.raw_videos_dir, input_video), \
70                 name = video_name, prog_bin = config.TRANSCODER_BIN)
71         transcoder.dest_path = self.transcoded_videos_dir
72         
73         # Transcode the raw video in each requested format.
74         # TODO report partial errors
75         for transcode_config in transcode_configs:
76             transcode_config['output_file'] = \
77                     transcoder.transcode(**transcode_config)
78
79     def extract_thumbs(self, input_video, video_name, thumbs):
80         """
81         Extracts thumbnail images from a video.
82
83         @param input_video input video file name
84         @param video_name a video name which must be a valid file name
85         @param thumbs use 'random' to extract a thumbnail image from a random
86         point of the video or use a positive integer n to extract n summary
87         thumbnail
88         """
89
90         print '** Extracting image thumbnails...'
91         
92         # TODO report partial errors
93         thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
94                 input_file = os.path.join(self.raw_videos_dir, input_video), \
95                 name = video_name, \
96                 prog_bin = config.THUMB_EXTRACTOR_BIN)
97         thumb_extractor.dest_path = self.thumbs_dir
98         if thumbs == 'random':
99             thumb_extractor.extract_random_thumb()
100         elif type(thumbs) is int and thumbs > 0:
101             thumb_extractor.extract_summary_thumbs(thumbs)
102
103     def seed(self, transcode_configs):
104         """
105         Creates torrents from the videos passed and then stats seeding them.
106
107         @param transcode_configs a list of dictionaries with format settings
108         """
109         
110         print '** Creating torrents and starting seeding...'
111
112         for transcode_config in transcode_configs:
113             # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
114             # Create torrent file.
115             bt.create_torrent(transcode_config['output_file'])
116             
117             # The torrent file is created in the same directory with the
118             # source file. Move it to the torrents directory.
119             shutil.move(transcode_config['output_file'] + '.tstream', \
120                     self.torrents_dir)
121
122             output_file = transcode_config['output_file'] + '.tstream'
123             output_file = output_file[(output_file.rindex('/') + 1):]
124
125             # * SEED TORRENTS
126             Server.bit_torrent.start_download( \
127                     os.path.join(self.torrents_dir, output_file),
128                     self.transcoded_videos_dir)
129
130     def transfer_out(self, local_files, local_path, remote_path):
131         """
132         Transfers some local files to a remote path of the Web Server.
133
134         @param local_files list local files to transfer
135         @param remote_path destination path on the Web Server
136         """
137         
138         print '** Transfering out...'
139
140         file_transfer = config.FILE_TRANSFERER_CLASS( \
141                 local_path, remote_path)
142         file_transfer.put(local_files)
143         file_transfer.close()
144
145     def remove_files(self, files, path):
146         """
147         Deletes files from a specified path.
148         """
149         
150         print '** Cleaning up...'
151
152         for f in files:
153             os.unlink(os.path.join(path, f))
154
155     def run(self):
156         while True:
157             job = Server.queue.get()
158
159             # * TRANSFER RAW VIDEO IN
160             self.transfer_in(job['raw_video'])
161
162             # * TRANSCODE RAW VIDEO
163             self.transcode(job['raw_video'], job['name'], \
164                     job['transcode_configs'])
165
166             # * EXTRACT THUMBNAIL IMAGES
167             if job['thumbs'] != 0:
168                 self.extract_thumbs(job['raw_video'], job['name'], \
169                         job['thumbs'])
170
171             # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
172             self.seed(job['transcode_configs'])
173
174             # Torrent files.
175             files = [f for f in os.listdir(self.torrents_dir) \
176                     if os.path.isfile(os.path.join( \
177                             self.torrents_dir, f))]
178             torrent_files = fnmatch.filter(files, job['name'] + "_*")
179
180             # Thumbnail images files.
181             files = [f for f in os.listdir(self.thumbs_dir) \
182                     if os.path.isfile(os.path.join( \
183                             self.thumbs_dir, f))]
184             thumb_files = fnmatch.filter(files, job['name'] + "_*")
185
186             # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
187             self.transfer_out(torrent_files, self.torrents_dir, \
188                     config.WS_TORRENTS_PATH)
189             self.transfer_out(thumb_files, self.thumbs_dir, \
190                     config.WS_THUMBS_PATH)
191             
192             # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
193             self.remove_files([ job['raw_video'] ], self.raw_videos_dir)
194             self.remove_files(thumb_files, self.thumbs_dir)
195
196             # * JOB FINISHED
197             Server.queue.task_done()
198             Server.load -= job['weight']
199
200
201 class Server:
202     """
203     Implementation of the RESTful web service which constitutes the interface
204     with the client (web server).
205     """
206
207     #def __init__(self):
208         #pass
209         
210     #def __del__(self):
211         #pass
212     
213     def GET(self, request):
214         #web.header('Cache-Control', 'no-cache')
215
216         if request == 'get_load':
217             resp = {"load": Server.load}
218             web.header('Content-Type', 'application/json')
219             return json.dumps(resp)
220         elif request == 'test':
221             return ''
222         else:
223             web.badrequest()
224             return ""
225         
226
227     def POST(self, request):
228         if request == 'ingest_content':
229             # Read JSON parameters.
230             json_data = web.data()
231             data = json.loads(json_data)
232
233             # Authenticate user.
234             if config.SECURITY and \
235                     not self.authenticate(data["username"], data["password"]):
236                 return "Authentication failed!"
237
238             # Add job weight to CIS load.
239             Server.load += data["weight"]
240
241             # Submit job.
242             Server.queue.put(data)
243
244             return 'Job submitted.'
245         else:
246             web.badrequest()
247             return ""
248
249     def authenticate(self, username, password):
250         if not config.SECURITY:
251             return True
252         if users.users[username] == password:
253             return True
254         else:
255             web.forbidden()
256             return False
257
258
259 if __name__ == '__main__':
260     # The BitTorrent object implements a NextShare (Tribler) BitTorrent
261     # client for seeding, downloading etc.
262     Server.bit_torrent = bt.BitTorrent()
263     Server.queue = Queue()
264     Server.load = 0
265     
266     # Worker thread.
267     ci_worker = CIWorker()
268     ci_worker.daemon = True
269     ci_worker.start()
270
271     # Web service.
272     urls = ('/(.*)', 'Server')
273     app = web.application(urls, globals())
274     app.run()