CIS: front-end added; ingest_content request implemented; get_load is buggy
authorCalin-Andrei Burloiu <calin.burloiu@gmail.com>
Fri, 23 Dec 2011 17:15:10 +0000 (19:15 +0200)
committerCalin-Andrei Burloiu <calin.burloiu@gmail.com>
Fri, 23 Dec 2011 17:15:10 +0000 (19:15 +0200)
cis/api/base.py
cis/cisd.py
cis/job_test.json [new file with mode: 0644]

index 36d490b..0b30bc9 100644 (file)
@@ -85,10 +85,10 @@ class BaseTranscoder:
         if a_codec is None and v_codec is None:
             raise ValueError('No audio or video codec specified.')
 
-        if a_codec is not None and type(a_codec) is not str:
+        if a_codec is not None and type(a_codec) not in [str, unicode]:
             raise TypeError('Audio codec must be string.')
 
-        if v_codec is not None and type(v_codec) is not str:
+        if v_codec is not None and type(v_codec) not in [str, unicode]:
             raise TypeError('Video codec must be string.')
 
         if a_samplingrate is not None and type(a_samplingrate) is not int:
index 483017b..d286ef7 100755 (executable)
@@ -7,6 +7,8 @@ import shutil
 import time
 import threading
 from Queue import Queue
+import web
+import json
 
 import config
 import bt
@@ -25,26 +27,16 @@ class CIWorker(threading.Thread):
     thumbs_dir = 'tmp/thumbs'
     torrents_dir = 'tmp/torrents'
 
-    def __init__(self, queue, bit_torrent):
+    def __init__(self, shared, bit_torrent):
         """
         Initialize Content Ingestion Worker.
 
-        @param queue a list of dictionaries with the following keys:
-        <ul>
-            <li>raw_video</li>
-            <li>name: a video name which must be a valid file name</li>
-            <li>transcode_configs: a list of transcode configuration
-            dictionaries having the keys as the parameters of
-            api.BaseTranscoder.transcode(...)</li>
-            <li>thumbs: string 'random' for extracting a thumbnail
-            image from a random video position or a positive integer which
-            represents the number of summary thumbnails to be extracted</li>
-        </ul>
+        @param shared data shared with the front-end (Service)
         """
 
         threading.Thread.__init__(self, name='CIWorker')
 
-        self.queue = queue
+        self.shared = shared
         self.bit_torrent = bit_torrent
 
     def transfer_in(self, raw_video):
@@ -161,7 +153,7 @@ class CIWorker(threading.Thread):
 
     def run(self):
         while True:
-            job = self.queue.get()
+            job = self.shared.queue.get()
 
             # * TRANSFER RAW VIDEO IN
             self.transfer_in(job['raw_video'])
@@ -182,13 +174,13 @@ class CIWorker(threading.Thread):
             files = [f for f in os.listdir(self.torrents_dir) \
                     if os.path.isfile(os.path.join( \
                             self.torrents_dir, f))]
-            torrent_files = fnmatch.filter(files, name + "_*")
+            torrent_files = fnmatch.filter(files, job['name'] + "_*")
 
             # Thumbnail images files.
             files = [f for f in os.listdir(self.thumbs_dir) \
                     if os.path.isfile(os.path.join( \
                             self.thumbs_dir, f))]
-            thumb_files = fnmatch.filter(files, name + "_*")
+            thumb_files = fnmatch.filter(files, job['name'] + "_*")
                 
             # Raw video files.
             raw_files = [f for f in os.listdir(self.raw_videos_dir) \
@@ -206,52 +198,95 @@ class CIWorker(threading.Thread):
             self.remove_files(thumb_files, self.thumbs_dir)
 
             # * JOB FINISHED
-            queue.task_done()
+            self.shared.queue.task_done()
+            print 'load in run is', self.shared.load
+            self.shared.load -= job['weight']
 
+class Shared:
+    """
+    Shared data between Service (front-end) and CIWorker (back-end).
 
-if __name__ == '__main__':
-    # Jobs queue.
-    queue = Queue()
+    @member queue a list of dictionaries with the following keys:
+        <ul>
+            <li>raw_video</li>
+            <li>name: a video name which must be a valid file name</li>
+            <li>transcode_configs: a list of transcode configuration
+            dictionaries having the keys as the parameters of
+            api.BaseTranscoder.transcode(...)</li>
+            <li>thumbs: string 'random' for extracting a thumbnail
+            image from a random video position or a positive integer which
+            represents the number of summary thumbnails to be extracted</li>
+        </ul>
+    @member load total weight of the jobs from the queue 
+    """
 
-    # The BitTorrent object implements a NextShare (Tribler) BitTorrent client
-    # for seeding, downloading etc.
-    bit_torrent = bt.BitTorrent()
+    def __init__(self):
+        # Jobs queue.
+        self.queue = Queue()
+
+        # Sever load.
+        self.load = 0
+
+
+class Service:
+    """
+    Implementation of the RESTful web service which constitutes the interface
+    with the client (web server).
+    """
+
+    def __init__(self):
+        # Shared data with back-end (CIWorker).
+        self.shared = Shared()
 
-    # Worker thread.
-    ci_worker = CIWorker(queue, bit_torrent)
-    ci_worker.daemon = True
-    ci_worker.start()
-
-    while True:
-        raw_video = sys.stdin.readline().strip()
-        if raw_video == 'x':
-            break
-
-        container = 'webm'
-        a_codec = 'vorbis'
-        a_bitrate = '128k'
-        v_codec = 'vp8'
-        v_bitrate = '480k'
-        v_resolution = '640x480'
+        global bit_torrent
+
+        # Worker thread.
+        ci_worker = CIWorker(self.shared, bit_torrent)
+        ci_worker.daemon = True
+        ci_worker.start()
         
-        name = raw_video[:raw_video.rindex('.')]
-        transcode_config = {
-            'container': container,
-            'a_codec': a_codec,
-            'a_bitrate': a_bitrate,
-            'v_codec': v_codec,
-            'v_bitrate': v_bitrate,
-            'v_resolution': v_resolution
-        }
-        thumbs = 4
-
-        job = {
-            'raw_video': raw_video,
-            'name': name,
-            'transcode_configs': [transcode_config],
-            'thumbs': thumbs
-        }
+    def __del__(self):
+        self.shared.queue.join()
+    
+    def GET(self, request):
+        #web.header('Cache-Control', 'no-cache')
+
+        if request == 'get_load':
+            resp = {"load": self.shared.load}
+            print 'load in GET is', self.shared.load
+            web.header('Content-Type', 'application/json')
+            return json.dumps(resp)
+        else:
+            web.badrequest()
+            return ""
         
-        queue.put(job)
 
-    queue.join()
+    def POST(self, request):
+        if request == 'ingest_content':
+            # Read JSON parameters.
+            json_data = web.data()
+            data = json.loads(json_data)
+
+            # Add job weight to CIS load.
+            self.shared.load += data["weight"]
+            print 'load in POST is', self.shared.load
+
+            # Submit job.
+            self.shared.queue.put(data)
+
+            return 'Job submitted.'
+        else:
+            web.badrequest()
+            return ""
+
+
+if __name__ == '__main__':
+    # The BitTorrent object implements a NextShare (Tribler) BitTorrent
+    # client for seeding, downloading etc.
+    global bit_torrent
+    bit_torrent = bt.BitTorrent()
+
+    # Web service.
+    urls = ('/(.*)', 'Service')
+    service = web.application(urls, globals())
+    service.run()
diff --git a/cis/job_test.json b/cis/job_test.json
new file mode 100644 (file)
index 0000000..d29455f
--- /dev/null
@@ -0,0 +1,16 @@
+{
+    "raw_video": "test.ogv",
+    "name": "test",
+    "weight": 3,
+    "transcode_configs": [
+        {
+            "container": "webm",
+            "a_codec": "vorbis",
+            "a_bitrate": "128k",
+            "v_codec": "vp8",
+            "v_bitrate": "480k",
+            "v_resolution": "640x480"
+        }
+    ],
+    "thumbs": 4
+}