X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=blobdiff_plain;f=cis%2Fcisd.py;h=d286ef7958b4a379c254473e98d7b71cb3e93847;hb=2aecadba1f242a0dcf68614a6aa602282f020b41;hp=e1acefd1974fd686d79c64501eb694daf3a6efc9;hpb=1ab12b6c509b6a6be95f79dc50089649a32872da;p=living-lab-site.git diff --git a/cis/cisd.py b/cis/cisd.py index e1acefd..d286ef7 100755 --- a/cis/cisd.py +++ b/cis/cisd.py @@ -1,41 +1,292 @@ #!/usr/bin/env python import sys +import os +import fnmatch +import shutil +import time +import threading +from Queue import Queue +import web +import json + import config +import bt -# -# !! Imports required for create_torrent -# -from BaseLib.Core.API import * -# -# -# -def create_torrent(input_): - tdef = TorrentDef() - tdef.add_content(input_, config.AVINFO_CLASS.get_video_duration(input_)) - tdef.set_tracker(config.BT_TRACKER) +class CIWorker(threading.Thread): + """ + Content Ingestion Worker. A class which executes content ingestion jobs + on a separate thread. - tdef.set_piece_length(32768) + CIWorker shares a Queue with its master where jobs are submitted. + """ - tdef.finalize() - tdef.save(input_ + ".tstream") + raw_videos_dir = 'tmp/raw' + transcoded_videos_dir = 'tmp/media' + thumbs_dir = 'tmp/thumbs' + torrents_dir = 'tmp/torrents' - print 'READY!', config.BT_TRACKER, config.AVINFO_CLASS.get_video_duration(input_) + def __init__(self, shared, bit_torrent): + """ + Initialize Content Ingestion Worker. -if __name__ == '__main__': - pass -# transcoder = config.TRANSCODER_CLASS(sys.argv[1]) -# transcoder.transcode('webm', "vorbis", "vp8", a_bitrate="128k", a_samplingrate=22050, a_channels=2, v_bitrate="256k", v_framerate=15, v_resolution="320x240", v_dar="4:3") + @param shared data shared with the front-end (Service) + """ + + threading.Thread.__init__(self, name='CIWorker') + + self.shared = shared + self.bit_torrent = bit_torrent + + def transfer_in(self, raw_video): + """ + Transfers a raw video file from the Web Server. + + @param raw_video raw video file name + """ + + file_transfer = config.FILE_TRANSFERER_CLASS( \ + self.raw_videos_dir, config.INPUT_PATH) + file_transfer.get([raw_video]) + file_transfer.close() + + print '** Transfering in finished.' + + def transcode(self, input_video, video_name, transcode_configs): + """ + Transcodes a video in each requested formats. + + @param input_video input video file name + @param video_name a video name which must be a valid file name + @param transcode_configs a list of dictionaries with format settings + """ + + transcoder = config.TRANSCODER_CLASS( \ + input_file = os.path.join(self.raw_videos_dir, input_video), \ + name = video_name, prog_bin = config.TRANSCODER_BIN) + transcoder.dest_path = self.transcoded_videos_dir + + # Transcode the raw video in each requested format. + # TODO report partial errors + for transcode_config in transcode_configs: + transcode_config['output_file'] = \ + transcoder.transcode(**transcode_config) + + print '** Transcoding finished.' + + def extract_thumbs(self, input_video, video_name, thumbs): + """ + Extracts thumbnail images from a video. + + @param input_video input video file name + @param video_name a video name which must be a valid file name + @param thumbs use 'random' to extract a thumbnail image from a random + point of the video or use a positive integer n to extract n summary + thumbnail + """ + + # TODO report partial errors + thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \ + input_file = os.path.join(self.raw_videos_dir, input_video), \ + name = video_name, \ + prog_bin = config.THUMB_EXTRACTOR_BIN) + thumb_extractor.dest_path = self.thumbs_dir + if thumbs == 'random': + thumb_extractor.extract_random_thumb() + elif type(thumbs) is int and thumbs > 0: + thumb_extractor.extract_summary_thumbs(thumbs) + + print '** Extracting thumbs finished.' + + def seed(self, transcode_configs): + """ + Creates torrents from the videos passed and then stats seeding them. + + @param transcode_configs a list of dictionaries with format settings + """ + + for transcode_config in transcode_configs: + # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO + # Create torrent file. + bt.create_torrent(transcode_config['output_file']) + + # The torrent file is created in the same directory with the + # source file. Move it to the torrents directory. + shutil.move(transcode_config['output_file'] + '.tstream', \ + self.torrents_dir) + + output_file = transcode_config['output_file'] + '.tstream' + output_file = output_file[(output_file.rindex('/') + 1):] + + # * SEED TORRENTS + bit_torrent.start_download( \ + os.path.join(self.torrents_dir, output_file), + self.transcoded_videos_dir) + + print '** Creating torrents and seeding finished.' + + def transfer_out(self, local_files, local_path, remote_path): + """ + Transfers some local files to a remote path of the Web Server. + + @param local_files list local files to transfer + @param remote_path destination path on the Web Server + """ + + file_transfer = config.FILE_TRANSFERER_CLASS( \ + local_path, remote_path) + file_transfer.put(local_files) + file_transfer.close() + + print '** Creating torrents and seeding finished.' + + def remove_files(self, files, path): + """ + Deletes files from a specified path. + """ + + for f in files: + os.unlink(os.path.join(path, f)) + + print '** Cleaning up finished.' + + def run(self): + while True: + job = self.shared.queue.get() + + # * TRANSFER RAW VIDEO IN + self.transfer_in(job['raw_video']) + + # * TRANSCODE RAW VIDEO + self.transcode(job['raw_video'], job['name'], \ + job['transcode_configs']) + + # * EXTRACT THUMBNAIL IMAGES + if job['thumbs'] != 0: + self.extract_thumbs(job['raw_video'], job['name'], \ + job['thumbs']) + + # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS + self.seed(job['transcode_configs']) + + # Torrent files. + files = [f for f in os.listdir(self.torrents_dir) \ + if os.path.isfile(os.path.join( \ + self.torrents_dir, f))] + torrent_files = fnmatch.filter(files, job['name'] + "_*") + + # Thumbnail images files. + files = [f for f in os.listdir(self.thumbs_dir) \ + if os.path.isfile(os.path.join( \ + self.thumbs_dir, f))] + thumb_files = fnmatch.filter(files, job['name'] + "_*") + + # Raw video files. + raw_files = [f for f in os.listdir(self.raw_videos_dir) \ + if os.path.isfile(os.path.join( \ + self.raw_videos_dir, f))] + + # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT + self.transfer_out(torrent_files, self.torrents_dir, \ + config.OUTPUT_TORRENTS_PATH) + self.transfer_out(thumb_files, self.thumbs_dir, \ + config.OUTPUT_THUMBS_PATH) + + # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES + self.remove_files(raw_files, self.raw_videos_dir) + self.remove_files(thumb_files, self.thumbs_dir) + + # * JOB FINISHED + self.shared.queue.task_done() + print 'load in run is', self.shared.load + self.shared.load -= job['weight'] + +class Shared: + """ + Shared data between Service (front-end) and CIWorker (back-end). + + @member queue a list of dictionaries with the following keys: + + @member load total weight of the jobs from the queue + """ + + def __init__(self): + # Jobs queue. + self.queue = Queue() + + # Sever load. + self.load = 0 + + +class Service: + """ + Implementation of the RESTful web service which constitutes the interface + with the client (web server). + """ + + def __init__(self): + # Shared data with back-end (CIWorker). + self.shared = Shared() + + global bit_torrent + + # Worker thread. + ci_worker = CIWorker(self.shared, bit_torrent) + ci_worker.daemon = True + ci_worker.start() + + def __del__(self): + self.shared.queue.join() -# thumb_extractor = config.THUMB_EXTRACTOR_CLASS(sys.argv[1]) -# #print thumb_extractor.get_video_duration() -# #thumb_extractor.extract_random_thumb() -# print thumb_extractor.extract_summary_thumbs(5) + def GET(self, request): + #web.header('Cache-Control', 'no-cache') + + if request == 'get_load': + resp = {"load": self.shared.load} + print 'load in GET is', self.shared.load + web.header('Content-Type', 'application/json') + return json.dumps(resp) + else: + web.badrequest() + return "" + -# file_transfer = config.FILE_TRANSFERER_CLASS() -# file_transfer.get(['vim_config.tar.gz']) -# #file_transfer.put(['cisd.py']) -# file_transfer.close() + def POST(self, request): + if request == 'ingest_content': + # Read JSON parameters. + json_data = web.data() + data = json.loads(json_data) + + # Add job weight to CIS load. + self.shared.load += data["weight"] + print 'load in POST is', self.shared.load + + # Submit job. + self.shared.queue.put(data) + + return 'Job submitted.' + else: + web.badrequest() + return "" + + +if __name__ == '__main__': + # The BitTorrent object implements a NextShare (Tribler) BitTorrent + # client for seeding, downloading etc. + global bit_torrent + bit_torrent = bt.BitTorrent() - create_torrent(sys.argv[1]) + # Web service. + urls = ('/(.*)', 'Service') + service = web.application(urls, globals()) + service.run()