From 9d8c32e99434169d17b02774156a32833b1801c7 Mon Sep 17 00:00:00 2001 From: Calin-Andrei Burloiu Date: Fri, 23 Dec 2011 19:15:10 +0200 Subject: [PATCH] CIS: front-end added; ingest_content request implemented; get_load is buggy --- cis/api/base.py | 4 +- cis/cisd.py | 151 ++++++++++++++++++++++++++++------------------ cis/job_test.json | 16 +++++ 3 files changed, 111 insertions(+), 60 deletions(-) create mode 100644 cis/job_test.json diff --git a/cis/api/base.py b/cis/api/base.py index 36d490b..0b30bc9 100644 --- a/cis/api/base.py +++ b/cis/api/base.py @@ -85,10 +85,10 @@ class BaseTranscoder: if a_codec is None and v_codec is None: raise ValueError('No audio or video codec specified.') - if a_codec is not None and type(a_codec) is not str: + if a_codec is not None and type(a_codec) not in [str, unicode]: raise TypeError('Audio codec must be string.') - if v_codec is not None and type(v_codec) is not str: + if v_codec is not None and type(v_codec) not in [str, unicode]: raise TypeError('Video codec must be string.') if a_samplingrate is not None and type(a_samplingrate) is not int: diff --git a/cis/cisd.py b/cis/cisd.py index 483017b..d286ef7 100755 --- a/cis/cisd.py +++ b/cis/cisd.py @@ -7,6 +7,8 @@ import shutil import time import threading from Queue import Queue +import web +import json import config import bt @@ -25,26 +27,16 @@ class CIWorker(threading.Thread): thumbs_dir = 'tmp/thumbs' torrents_dir = 'tmp/torrents' - def __init__(self, queue, bit_torrent): + def __init__(self, shared, bit_torrent): """ Initialize Content Ingestion Worker. - @param queue a list of dictionaries with the following keys: - + @param shared data shared with the front-end (Service) """ threading.Thread.__init__(self, name='CIWorker') - self.queue = queue + self.shared = shared self.bit_torrent = bit_torrent def transfer_in(self, raw_video): @@ -161,7 +153,7 @@ class CIWorker(threading.Thread): def run(self): while True: - job = self.queue.get() + job = self.shared.queue.get() # * TRANSFER RAW VIDEO IN self.transfer_in(job['raw_video']) @@ -182,13 +174,13 @@ class CIWorker(threading.Thread): files = [f for f in os.listdir(self.torrents_dir) \ if os.path.isfile(os.path.join( \ self.torrents_dir, f))] - torrent_files = fnmatch.filter(files, name + "_*") + torrent_files = fnmatch.filter(files, job['name'] + "_*") # Thumbnail images files. files = [f for f in os.listdir(self.thumbs_dir) \ if os.path.isfile(os.path.join( \ self.thumbs_dir, f))] - thumb_files = fnmatch.filter(files, name + "_*") + thumb_files = fnmatch.filter(files, job['name'] + "_*") # Raw video files. raw_files = [f for f in os.listdir(self.raw_videos_dir) \ @@ -206,52 +198,95 @@ class CIWorker(threading.Thread): self.remove_files(thumb_files, self.thumbs_dir) # * JOB FINISHED - queue.task_done() + self.shared.queue.task_done() + print 'load in run is', self.shared.load + self.shared.load -= job['weight'] +class Shared: + """ + Shared data between Service (front-end) and CIWorker (back-end). -if __name__ == '__main__': - # Jobs queue. - queue = Queue() + @member queue a list of dictionaries with the following keys: + + @member load total weight of the jobs from the queue + """ - # The BitTorrent object implements a NextShare (Tribler) BitTorrent client - # for seeding, downloading etc. - bit_torrent = bt.BitTorrent() + def __init__(self): + # Jobs queue. + self.queue = Queue() + + # Sever load. + self.load = 0 + + +class Service: + """ + Implementation of the RESTful web service which constitutes the interface + with the client (web server). + """ + + def __init__(self): + # Shared data with back-end (CIWorker). + self.shared = Shared() - # Worker thread. - ci_worker = CIWorker(queue, bit_torrent) - ci_worker.daemon = True - ci_worker.start() - - while True: - raw_video = sys.stdin.readline().strip() - if raw_video == 'x': - break - - container = 'webm' - a_codec = 'vorbis' - a_bitrate = '128k' - v_codec = 'vp8' - v_bitrate = '480k' - v_resolution = '640x480' + global bit_torrent + + # Worker thread. + ci_worker = CIWorker(self.shared, bit_torrent) + ci_worker.daemon = True + ci_worker.start() - name = raw_video[:raw_video.rindex('.')] - transcode_config = { - 'container': container, - 'a_codec': a_codec, - 'a_bitrate': a_bitrate, - 'v_codec': v_codec, - 'v_bitrate': v_bitrate, - 'v_resolution': v_resolution - } - thumbs = 4 - - job = { - 'raw_video': raw_video, - 'name': name, - 'transcode_configs': [transcode_config], - 'thumbs': thumbs - } + def __del__(self): + self.shared.queue.join() + + def GET(self, request): + #web.header('Cache-Control', 'no-cache') + + if request == 'get_load': + resp = {"load": self.shared.load} + print 'load in GET is', self.shared.load + web.header('Content-Type', 'application/json') + return json.dumps(resp) + else: + web.badrequest() + return "" - queue.put(job) - queue.join() + def POST(self, request): + if request == 'ingest_content': + # Read JSON parameters. + json_data = web.data() + data = json.loads(json_data) + + # Add job weight to CIS load. + self.shared.load += data["weight"] + print 'load in POST is', self.shared.load + + # Submit job. + self.shared.queue.put(data) + + return 'Job submitted.' + else: + web.badrequest() + return "" + + +if __name__ == '__main__': + # The BitTorrent object implements a NextShare (Tribler) BitTorrent + # client for seeding, downloading etc. + global bit_torrent + bit_torrent = bt.BitTorrent() + + # Web service. + urls = ('/(.*)', 'Service') + service = web.application(urls, globals()) + service.run() diff --git a/cis/job_test.json b/cis/job_test.json new file mode 100644 index 0000000..d29455f --- /dev/null +++ b/cis/job_test.json @@ -0,0 +1,16 @@ +{ + "raw_video": "test.ogv", + "name": "test", + "weight": 3, + "transcode_configs": [ + { + "container": "webm", + "a_codec": "vorbis", + "a_bitrate": "128k", + "v_codec": "vp8", + "v_bitrate": "480k", + "v_resolution": "640x480" + } + ], + "thumbs": 4 +} -- 2.20.1