cis: bug fixes
[living-lab-site.git] / cis / cisd.py
index 483017b..8284882 100755 (executable)
@@ -7,9 +7,17 @@ import shutil
 import time
 import threading
 from Queue import Queue
+import web
+import json
+from web.wsgiserver import CherryPyWSGIServer
 
 import config
 import bt
+import users
+
+if config.SECURITY:
+    CherryPyWSGIServer.ssl_certificate = "cacert.pem"
+    CherryPyWSGIServer.ssl_private_key = "privkey.pem"
 
 
 class CIWorker(threading.Thread):
@@ -23,44 +31,29 @@ class CIWorker(threading.Thread):
     raw_videos_dir = 'tmp/raw'
     transcoded_videos_dir = 'tmp/media'
     thumbs_dir = 'tmp/thumbs'
-    torrents_dir = 'tmp/torrents'
+    torrents_dir = config.CIS_TORRENTS_PATH
 
-    def __init__(self, queue, bit_torrent):
+    def __init__(self):
         """
         Initialize Content Ingestion Worker.
-
-        @param queue a list of dictionaries with the following keys:
-        <ul>
-            <li>raw_video</li>
-            <li>name: a video name which must be a valid file name</li>
-            <li>transcode_configs: a list of transcode configuration
-            dictionaries having the keys as the parameters of
-            api.BaseTranscoder.transcode(...)</li>
-            <li>thumbs: string 'random' for extracting a thumbnail
-            image from a random video position or a positive integer which
-            represents the number of summary thumbnails to be extracted</li>
-        </ul>
         """
 
         threading.Thread.__init__(self, name='CIWorker')
 
-        self.queue = queue
-        self.bit_torrent = bit_torrent
-
     def transfer_in(self, raw_video):
         """
         Transfers a raw video file from the Web Server.
 
         @param raw_video raw video file name
         """
-
+        
+        print '** Transfering in...'
+        
         file_transfer = config.FILE_TRANSFERER_CLASS( \
-                self.raw_videos_dir, config.INPUT_PATH)
+                self.raw_videos_dir, config.WS_UPLOAD_PATH)
         file_transfer.get([raw_video])
         file_transfer.close()
 
-        print '** Transfering in finished.'
-
     def transcode(self, input_video, video_name, transcode_configs):
         """
         Transcodes a video in each requested formats.
@@ -70,6 +63,8 @@ class CIWorker(threading.Thread):
         @param transcode_configs a list of dictionaries with format settings
         """
 
+        print '** Transcoding...'
+        
         transcoder = config.TRANSCODER_CLASS( \
                 input_file = os.path.join(self.raw_videos_dir, input_video), \
                 name = video_name, prog_bin = config.TRANSCODER_BIN)
@@ -81,8 +76,6 @@ class CIWorker(threading.Thread):
             transcode_config['output_file'] = \
                     transcoder.transcode(**transcode_config)
 
-        print '** Transcoding finished.'
-
     def extract_thumbs(self, input_video, video_name, thumbs):
         """
         Extracts thumbnail images from a video.
@@ -94,6 +87,8 @@ class CIWorker(threading.Thread):
         thumbnail
         """
 
+        print '** Extracting image thumbnails...'
+        
         # TODO report partial errors
         thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
                 input_file = os.path.join(self.raw_videos_dir, input_video), \
@@ -105,14 +100,14 @@ class CIWorker(threading.Thread):
         elif type(thumbs) is int and thumbs > 0:
             thumb_extractor.extract_summary_thumbs(thumbs)
 
-        print '** Extracting thumbs finished.'
-
     def seed(self, transcode_configs):
         """
         Creates torrents from the videos passed and then stats seeding them.
 
         @param transcode_configs a list of dictionaries with format settings
         """
+        
+        print '** Creating torrents and starting seeding...'
 
         for transcode_config in transcode_configs:
             # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
@@ -128,12 +123,10 @@ class CIWorker(threading.Thread):
             output_file = output_file[(output_file.rindex('/') + 1):]
 
             # * SEED TORRENTS
-            bit_torrent.start_download( \
+            Server.bit_torrent.start_download( \
                     os.path.join(self.torrents_dir, output_file),
                     self.transcoded_videos_dir)
 
-        print '** Creating torrents and seeding finished.'
-
     def transfer_out(self, local_files, local_path, remote_path):
         """
         Transfers some local files to a remote path of the Web Server.
@@ -141,27 +134,27 @@ class CIWorker(threading.Thread):
         @param local_files list local files to transfer
         @param remote_path destination path on the Web Server
         """
+        
+        print '** Transfering out...'
 
         file_transfer = config.FILE_TRANSFERER_CLASS( \
                 local_path, remote_path)
         file_transfer.put(local_files)
         file_transfer.close()
 
-        print '** Creating torrents and seeding finished.'
-
     def remove_files(self, files, path):
         """
         Deletes files from a specified path.
         """
+        
+        print '** Cleaning up...'
 
         for f in files:
             os.unlink(os.path.join(path, f))
 
-        print '** Cleaning up finished.'
-
     def run(self):
         while True:
-            job = self.queue.get()
+            job = Server.queue.get()
 
             # * TRANSFER RAW VIDEO IN
             self.transfer_in(job['raw_video'])
@@ -182,76 +175,100 @@ class CIWorker(threading.Thread):
             files = [f for f in os.listdir(self.torrents_dir) \
                     if os.path.isfile(os.path.join( \
                             self.torrents_dir, f))]
-            torrent_files = fnmatch.filter(files, name + "_*")
+            torrent_files = fnmatch.filter(files, job['name'] + "_*")
 
             # Thumbnail images files.
             files = [f for f in os.listdir(self.thumbs_dir) \
                     if os.path.isfile(os.path.join( \
                             self.thumbs_dir, f))]
-            thumb_files = fnmatch.filter(files, name + "_*")
-                
-            # Raw video files.
-            raw_files = [f for f in os.listdir(self.raw_videos_dir) \
-                    if os.path.isfile(os.path.join( \
-                            self.raw_videos_dir, f))]
+            thumb_files = fnmatch.filter(files, job['name'] + "_*")
 
             # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
             self.transfer_out(torrent_files, self.torrents_dir, \
-                    config.OUTPUT_TORRENTS_PATH)
+                    config.WS_TORRENTS_PATH)
             self.transfer_out(thumb_files, self.thumbs_dir, \
-                    config.OUTPUT_THUMBS_PATH)
+                    config.WS_THUMBS_PATH)
             
             # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
-            self.remove_files(raw_files, self.raw_videos_dir)
+            self.remove_files([ job['raw_video'] ], self.raw_videos_dir)
             self.remove_files(thumb_files, self.thumbs_dir)
 
             # * JOB FINISHED
-            queue.task_done()
+            Server.queue.task_done()
+            Server.load -= job['weight']
 
 
-if __name__ == '__main__':
-    # Jobs queue.
-    queue = Queue()
+class Server:
+    """
+    Implementation of the RESTful web service which constitutes the interface
+    with the client (web server).
+    """
+
+    #def __init__(self):
+        #pass
+        
+    #def __del__(self):
+        #pass
+    
+    def GET(self, request):
+        #web.header('Cache-Control', 'no-cache')
+
+        if request == 'get_load':
+            resp = {"load": Server.load}
+            web.header('Content-Type', 'application/json')
+            return json.dumps(resp)
+        elif request == 'test':
+            return ''
+        else:
+            web.badrequest()
+            return ""
+        
+
+    def POST(self, request):
+        if request == 'ingest_content':
+            # Read JSON parameters.
+            json_data = web.data()
+            data = json.loads(json_data)
+
+            # Authenticate user.
+            if config.SECURITY and \
+                    not self.authenticate(data["username"], data["password"]):
+                return "Authentication failed!"
 
-    # The BitTorrent object implements a NextShare (Tribler) BitTorrent client
-    # for seeding, downloading etc.
-    bit_torrent = bt.BitTorrent()
+            # Add job weight to CIS load.
+            Server.load += data["weight"]
 
+            # Submit job.
+            Server.queue.put(data)
+
+            return 'Job submitted.'
+        else:
+            web.badrequest()
+            return ""
+
+    def authenticate(self, username, password):
+        if not config.SECURITY:
+            return True
+        if users.users[username] == password:
+            return True
+        else:
+            web.forbidden()
+            return False
+
+
+if __name__ == '__main__':
+    # The BitTorrent object implements a NextShare (Tribler) BitTorrent
+    # client for seeding, downloading etc.
+    Server.bit_torrent = bt.BitTorrent()
+    Server.queue = Queue()
+    Server.load = 0
+    
     # Worker thread.
-    ci_worker = CIWorker(queue, bit_torrent)
+    ci_worker = CIWorker()
     ci_worker.daemon = True
     ci_worker.start()
 
-    while True:
-        raw_video = sys.stdin.readline().strip()
-        if raw_video == 'x':
-            break
-
-        container = 'webm'
-        a_codec = 'vorbis'
-        a_bitrate = '128k'
-        v_codec = 'vp8'
-        v_bitrate = '480k'
-        v_resolution = '640x480'
-        
-        name = raw_video[:raw_video.rindex('.')]
-        transcode_config = {
-            'container': container,
-            'a_codec': a_codec,
-            'a_bitrate': a_bitrate,
-            'v_codec': v_codec,
-            'v_bitrate': v_bitrate,
-            'v_resolution': v_resolution
-        }
-        thumbs = 4
-
-        job = {
-            'raw_video': raw_video,
-            'name': name,
-            'transcode_configs': [transcode_config],
-            'thumbs': thumbs
-        }
-        
-        queue.put(job)
-
-    queue.join()
+    # Web service.
+    urls = ('/(.*)', 'Server')
+    app = web.application(urls, globals())
+    app.run()