X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=blobdiff_plain;f=cis%2Fcisd.py;h=828488257b4f5b55c74a07c65c3a408b67761cc7;hb=007060953ce46eb7da637ee5fb6eb44c5812d74f;hp=6a6c310f4f3f7fcec0912f75fc89b0e6fcaf0dac;hpb=17430bb7ba2fdc1ffcd6e08ffd9dc4f027c7d4fc;p=living-lab-site.git diff --git a/cis/cisd.py b/cis/cisd.py index 6a6c310..8284882 100755 --- a/cis/cisd.py +++ b/cis/cisd.py @@ -2,13 +2,22 @@ import sys import os +import fnmatch import shutil import time import threading from Queue import Queue +import web +import json +from web.wsgiserver import CherryPyWSGIServer import config import bt +import users + +if config.SECURITY: + CherryPyWSGIServer.ssl_certificate = "cacert.pem" + CherryPyWSGIServer.ssl_private_key = "privkey.pem" class CIWorker(threading.Thread): @@ -22,29 +31,14 @@ class CIWorker(threading.Thread): raw_videos_dir = 'tmp/raw' transcoded_videos_dir = 'tmp/media' thumbs_dir = 'tmp/thumbs' - torrents_dir = 'tmp/torrents' + torrents_dir = config.CIS_TORRENTS_PATH - def __init__(self, queue, bit_torrent): + def __init__(self): """ Initialize Content Ingestion Worker. - - @param queue a list of dictionaries with the following keys: - """ - threading.Thread.__init__(self) - - self.queue = queue - self.bit_torrent = bit_torrent + threading.Thread.__init__(self, name='CIWorker') def transfer_in(self, raw_video): """ @@ -52,14 +46,14 @@ class CIWorker(threading.Thread): @param raw_video raw video file name """ - + + print '** Transfering in...' + file_transfer = config.FILE_TRANSFERER_CLASS( \ - self.raw_videos_dir, config.INPUT_PATH) - file_transfer.get(raw_video) + self.raw_videos_dir, config.WS_UPLOAD_PATH) + file_transfer.get([raw_video]) file_transfer.close() - print '** Transfering in finished.' - def transcode(self, input_video, video_name, transcode_configs): """ Transcodes a video in each requested formats. @@ -69,27 +63,18 @@ class CIWorker(threading.Thread): @param transcode_configs a list of dictionaries with format settings """ + print '** Transcoding...' + transcoder = config.TRANSCODER_CLASS( \ - input_file = video_name, \ + input_file = os.path.join(self.raw_videos_dir, input_video), \ name = video_name, prog_bin = config.TRANSCODER_BIN) transcoder.dest_path = self.transcoded_videos_dir # Transcode the raw video in each requested format. # TODO report partial errors for transcode_config in transcode_configs: - transcode_config['output_file'] = transcoder.transcode( \ - container = transcode_config['container'], \ - a_codec = transcode_config['a_codec'], \ - a_bitrate = transcode_config['a_bitrate'], \ - a_samplingrate = transcode_config['a_samplingrate'], \ - a_channels = transcode_config['a_channels'], \ - v_codec = transcode_config['v_codec'], \ - v_bitrate = transcode_config['v_bitrate'], \ - v_framerate = transcode_config['v_framerate'], \ - v_resolution = transcode_config['v_resolution'], \ - v_dar = transcode_config['dar']) - - print '** Transcoding finished.' + transcode_config['output_file'] = \ + transcoder.transcode(**transcode_config) def extract_thumbs(self, input_video, video_name, thumbs): """ @@ -102,9 +87,12 @@ class CIWorker(threading.Thread): thumbnail """ + print '** Extracting image thumbnails...' + # TODO report partial errors thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \ - input_file = input_video, name = video_name, \ + input_file = os.path.join(self.raw_videos_dir, input_video), \ + name = video_name, \ prog_bin = config.THUMB_EXTRACTOR_BIN) thumb_extractor.dest_path = self.thumbs_dir if thumbs == 'random': @@ -112,16 +100,16 @@ class CIWorker(threading.Thread): elif type(thumbs) is int and thumbs > 0: thumb_extractor.extract_summary_thumbs(thumbs) - print '** Extracting thumbs finished.' - def seed(self, transcode_configs): """ Creates torrents from the videos passed and then stats seeding them. @param transcode_configs a list of dictionaries with format settings """ + + print '** Creating torrents and starting seeding...' - for transcode_config in transcode_cofigs: + for transcode_config in transcode_configs: # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO # Create torrent file. bt.create_torrent(transcode_config['output_file']) @@ -131,12 +119,13 @@ class CIWorker(threading.Thread): shutil.move(transcode_config['output_file'] + '.tstream', \ self.torrents_dir) - # * SEED TORRENTS - bit_torrent.start_download( \ - transcode_config['output_file'] + '.tstream', - self_transcoded_videos_dir) + output_file = transcode_config['output_file'] + '.tstream' + output_file = output_file[(output_file.rindex('/') + 1):] - print '** Creating torrents and seeding finished.' + # * SEED TORRENTS + Server.bit_torrent.start_download( \ + os.path.join(self.torrents_dir, output_file), + self.transcoded_videos_dir) def transfer_out(self, local_files, local_path, remote_path): """ @@ -145,27 +134,27 @@ class CIWorker(threading.Thread): @param local_files list local files to transfer @param remote_path destination path on the Web Server """ + + print '** Transfering out...' file_transfer = config.FILE_TRANSFERER_CLASS( \ local_path, remote_path) file_transfer.put(local_files) file_transfer.close() - print '** Creating torrents and seeding finished.' - - def remove_file(self, files, path): + def remove_files(self, files, path): """ Deletes files from a specified path. """ + + print '** Cleaning up...' for f in files: os.unlink(os.path.join(path, f)) - print '** Cleaning up finished.' - def run(self): while True: - job = self.queue.get() + job = Server.queue.get() # * TRANSFER RAW VIDEO IN self.transfer_in(job['raw_video']) @@ -186,76 +175,100 @@ class CIWorker(threading.Thread): files = [f for f in os.listdir(self.torrents_dir) \ if os.path.isfile(os.path.join( \ self.torrents_dir, f))] - torrent_files = fnmatch.filter(files, name + "_*") + torrent_files = fnmatch.filter(files, job['name'] + "_*") # Thumbnail images files. files = [f for f in os.listdir(self.thumbs_dir) \ if os.path.isfile(os.path.join( \ self.thumbs_dir, f))] - thumb_files = fnmatch.filter(files, name + "_*") - - # Raw video files. - raw_files = [f for f in os.listdir(self.raw_videos_dir) \ - if os.path.isfile(os.path.join( \ - self.raw_videos_dir, f))] + thumb_files = fnmatch.filter(files, job['name'] + "_*") # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT self.transfer_out(torrent_files, self.torrents_dir, \ - config.OUTPUT_TORRENTS_PATH) + config.WS_TORRENTS_PATH) self.transfer_out(thumb_files, self.thumbs_dir, \ - config.OUTPUT_THUMBS_PATH) + config.WS_THUMBS_PATH) # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES - self.remove_files(raw_files, self.raw_videos_dir) + self.remove_files([ job['raw_video'] ], self.raw_videos_dir) self.remove_files(thumb_files, self.thumbs_dir) # * JOB FINISHED - queue.task_done() + Server.queue.task_done() + Server.load -= job['weight'] -if __name__ == '__main__': - # Jobs queue. - queue = Queue() +class Server: + """ + Implementation of the RESTful web service which constitutes the interface + with the client (web server). + """ + + #def __init__(self): + #pass + + #def __del__(self): + #pass + + def GET(self, request): + #web.header('Cache-Control', 'no-cache') + + if request == 'get_load': + resp = {"load": Server.load} + web.header('Content-Type', 'application/json') + return json.dumps(resp) + elif request == 'test': + return '' + else: + web.badrequest() + return "" + + + def POST(self, request): + if request == 'ingest_content': + # Read JSON parameters. + json_data = web.data() + data = json.loads(json_data) + + # Authenticate user. + if config.SECURITY and \ + not self.authenticate(data["username"], data["password"]): + return "Authentication failed!" - # The BitTorrent object implements a NextShare (Tribler) BitTorrent client - # for seeding, downloading etc. - bit_torrent = bt.BitTorrent() + # Add job weight to CIS load. + Server.load += data["weight"] + # Submit job. + Server.queue.put(data) + + return 'Job submitted.' + else: + web.badrequest() + return "" + + def authenticate(self, username, password): + if not config.SECURITY: + return True + if users.users[username] == password: + return True + else: + web.forbidden() + return False + + +if __name__ == '__main__': + # The BitTorrent object implements a NextShare (Tribler) BitTorrent + # client for seeding, downloading etc. + Server.bit_torrent = bt.BitTorrent() + Server.queue = Queue() + Server.load = 0 + # Worker thread. - ci_worker = CIWorker(queue, bit_torrent) + ci_worker = CIWorker() ci_worker.daemon = True ci_worker.start() - while True: - raw_video = sys.stdin.readline().strip() - if raw_video == 'x': - break - - container = 'webm' - a_codec = 'vorbis' - a_bitrate = '128k' - v_codec = 'vp8' - v_bitrate = '480k' - v_resolution = '640x480' - - name = raw_video[:raw_video.rindex('.')] - transcode_config = { - 'container': container, - 'a_codec': a_codec, - 'a_bitrate': a_bitrate, - 'v_codec': v_codec, - 'v_bitrate': v_bitrate, - 'v_resolution': v_resolution - } - thumbs = 'random' - - job = { - 'raw_video': raw_video, - 'name': name, - 'transcode_configs': [transcode_config], - 'thumbs': thumbs - } - - queue.put(job) - - queue.join() + # Web service. + urls = ('/(.*)', 'Server') + app = web.application(urls, globals()) + app.run()