X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?p=living-lab-site.git;a=blobdiff_plain;f=cis%2Fcisd.py;fp=cis%2Fcisd.py;h=828488257b4f5b55c74a07c65c3a408b67761cc7;hp=d286ef7958b4a379c254473e98d7b71cb3e93847;hb=c7b70f31aa7d34d184c20ac6258fec2174ea8d67;hpb=9d8c32e99434169d17b02774156a32833b1801c7 diff --git a/cis/cisd.py b/cis/cisd.py index d286ef7..8284882 100755 --- a/cis/cisd.py +++ b/cis/cisd.py @@ -9,9 +9,15 @@ import threading from Queue import Queue import web import json +from web.wsgiserver import CherryPyWSGIServer import config import bt +import users + +if config.SECURITY: + CherryPyWSGIServer.ssl_certificate = "cacert.pem" + CherryPyWSGIServer.ssl_private_key = "privkey.pem" class CIWorker(threading.Thread): @@ -25,34 +31,29 @@ class CIWorker(threading.Thread): raw_videos_dir = 'tmp/raw' transcoded_videos_dir = 'tmp/media' thumbs_dir = 'tmp/thumbs' - torrents_dir = 'tmp/torrents' + torrents_dir = config.CIS_TORRENTS_PATH - def __init__(self, shared, bit_torrent): + def __init__(self): """ Initialize Content Ingestion Worker. - - @param shared data shared with the front-end (Service) """ threading.Thread.__init__(self, name='CIWorker') - self.shared = shared - self.bit_torrent = bit_torrent - def transfer_in(self, raw_video): """ Transfers a raw video file from the Web Server. @param raw_video raw video file name """ - + + print '** Transfering in...' + file_transfer = config.FILE_TRANSFERER_CLASS( \ - self.raw_videos_dir, config.INPUT_PATH) + self.raw_videos_dir, config.WS_UPLOAD_PATH) file_transfer.get([raw_video]) file_transfer.close() - print '** Transfering in finished.' - def transcode(self, input_video, video_name, transcode_configs): """ Transcodes a video in each requested formats. @@ -62,6 +63,8 @@ class CIWorker(threading.Thread): @param transcode_configs a list of dictionaries with format settings """ + print '** Transcoding...' + transcoder = config.TRANSCODER_CLASS( \ input_file = os.path.join(self.raw_videos_dir, input_video), \ name = video_name, prog_bin = config.TRANSCODER_BIN) @@ -73,8 +76,6 @@ class CIWorker(threading.Thread): transcode_config['output_file'] = \ transcoder.transcode(**transcode_config) - print '** Transcoding finished.' - def extract_thumbs(self, input_video, video_name, thumbs): """ Extracts thumbnail images from a video. @@ -86,6 +87,8 @@ class CIWorker(threading.Thread): thumbnail """ + print '** Extracting image thumbnails...' + # TODO report partial errors thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \ input_file = os.path.join(self.raw_videos_dir, input_video), \ @@ -97,14 +100,14 @@ class CIWorker(threading.Thread): elif type(thumbs) is int and thumbs > 0: thumb_extractor.extract_summary_thumbs(thumbs) - print '** Extracting thumbs finished.' - def seed(self, transcode_configs): """ Creates torrents from the videos passed and then stats seeding them. @param transcode_configs a list of dictionaries with format settings """ + + print '** Creating torrents and starting seeding...' for transcode_config in transcode_configs: # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO @@ -120,12 +123,10 @@ class CIWorker(threading.Thread): output_file = output_file[(output_file.rindex('/') + 1):] # * SEED TORRENTS - bit_torrent.start_download( \ + Server.bit_torrent.start_download( \ os.path.join(self.torrents_dir, output_file), self.transcoded_videos_dir) - print '** Creating torrents and seeding finished.' - def transfer_out(self, local_files, local_path, remote_path): """ Transfers some local files to a remote path of the Web Server. @@ -133,27 +134,27 @@ class CIWorker(threading.Thread): @param local_files list local files to transfer @param remote_path destination path on the Web Server """ + + print '** Transfering out...' file_transfer = config.FILE_TRANSFERER_CLASS( \ local_path, remote_path) file_transfer.put(local_files) file_transfer.close() - print '** Creating torrents and seeding finished.' - def remove_files(self, files, path): """ Deletes files from a specified path. """ + + print '** Cleaning up...' for f in files: os.unlink(os.path.join(path, f)) - print '** Cleaning up finished.' - def run(self): while True: - job = self.shared.queue.get() + job = Server.queue.get() # * TRANSFER RAW VIDEO IN self.transfer_in(job['raw_video']) @@ -181,81 +182,43 @@ class CIWorker(threading.Thread): if os.path.isfile(os.path.join( \ self.thumbs_dir, f))] thumb_files = fnmatch.filter(files, job['name'] + "_*") - - # Raw video files. - raw_files = [f for f in os.listdir(self.raw_videos_dir) \ - if os.path.isfile(os.path.join( \ - self.raw_videos_dir, f))] # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT self.transfer_out(torrent_files, self.torrents_dir, \ - config.OUTPUT_TORRENTS_PATH) + config.WS_TORRENTS_PATH) self.transfer_out(thumb_files, self.thumbs_dir, \ - config.OUTPUT_THUMBS_PATH) + config.WS_THUMBS_PATH) # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES - self.remove_files(raw_files, self.raw_videos_dir) + self.remove_files([ job['raw_video'] ], self.raw_videos_dir) self.remove_files(thumb_files, self.thumbs_dir) # * JOB FINISHED - self.shared.queue.task_done() - print 'load in run is', self.shared.load - self.shared.load -= job['weight'] - -class Shared: - """ - Shared data between Service (front-end) and CIWorker (back-end). - - @member queue a list of dictionaries with the following keys: - - @member load total weight of the jobs from the queue - """ - - def __init__(self): - # Jobs queue. - self.queue = Queue() + Server.queue.task_done() + Server.load -= job['weight'] - # Sever load. - self.load = 0 - -class Service: +class Server: """ Implementation of the RESTful web service which constitutes the interface with the client (web server). """ - def __init__(self): - # Shared data with back-end (CIWorker). - self.shared = Shared() - - global bit_torrent - - # Worker thread. - ci_worker = CIWorker(self.shared, bit_torrent) - ci_worker.daemon = True - ci_worker.start() + #def __init__(self): + #pass - def __del__(self): - self.shared.queue.join() + #def __del__(self): + #pass def GET(self, request): #web.header('Cache-Control', 'no-cache') if request == 'get_load': - resp = {"load": self.shared.load} - print 'load in GET is', self.shared.load + resp = {"load": Server.load} web.header('Content-Type', 'application/json') return json.dumps(resp) + elif request == 'test': + return '' else: web.badrequest() return "" @@ -267,26 +230,45 @@ class Service: json_data = web.data() data = json.loads(json_data) + # Authenticate user. + if config.SECURITY and \ + not self.authenticate(data["username"], data["password"]): + return "Authentication failed!" + # Add job weight to CIS load. - self.shared.load += data["weight"] - print 'load in POST is', self.shared.load + Server.load += data["weight"] # Submit job. - self.shared.queue.put(data) + Server.queue.put(data) return 'Job submitted.' else: web.badrequest() return "" + def authenticate(self, username, password): + if not config.SECURITY: + return True + if users.users[username] == password: + return True + else: + web.forbidden() + return False + if __name__ == '__main__': # The BitTorrent object implements a NextShare (Tribler) BitTorrent # client for seeding, downloading etc. - global bit_torrent - bit_torrent = bt.BitTorrent() + Server.bit_torrent = bt.BitTorrent() + Server.queue = Queue() + Server.load = 0 + + # Worker thread. + ci_worker = CIWorker() + ci_worker.daemon = True + ci_worker.start() # Web service. - urls = ('/(.*)', 'Service') - service = web.application(urls, globals()) - service.run() + urls = ('/(.*)', 'Server') + app = web.application(urls, globals()) + app.run()