X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?p=living-lab-site.git;a=blobdiff_plain;f=cis%2Fcis.py;fp=cis%2Fcisd.py;h=f88aad5cdecbaf710bb5fb264e4b4bbe8fc7cdd7;hp=2f39a6edf55488d10ecd9b4e6c1f75b4579c07b5;hb=9d5e17576e133645963b8e41083baf235c5cceba;hpb=e487ab3d510e231706a1c0973973496beb25d2ff diff --git a/cis/cisd.py b/cis/cis.py similarity index 71% rename from cis/cisd.py rename to cis/cis.py index 2f39a6e..f88aad5 100755 --- a/cis/cisd.py +++ b/cis/cis.py @@ -46,7 +46,7 @@ class CIWorker(threading.Thread): logger.log_msg('#%s: transfering in...' % self.job_id) file_transfer = config.FILE_TRANSFERER_CLASS( \ - config.RAW_VIDEOS_PATH, config.WS_UPLOAD_PATH) + config.RAW_VIDEOS_PATH, config.WS_UPLOAD_PATH) file_transfer.get([raw_video]) file_transfer.close() @@ -120,7 +120,7 @@ class CIWorker(threading.Thread): output_file = output_file[(output_file.rindex('/') + 1):] # * SEED TORRENTS - Server.bit_torrent.start_download( \ + Server.bit_torrent.start_torrent( \ os.path.join(config.TORRENTS_PATH, output_file), config.MEDIA_PATH) @@ -155,16 +155,32 @@ class CIWorker(threading.Thread): self.job_id = job['id'] # * TRANSFER RAW VIDEO IN - self.transfer_in(job['raw_video']) + try: + self.transfer_in(job['raw_video']) + except Exception as e: + logger.log_msg('#%s: error while transferring in: %s' \ + % (job['id'], str(e)), logger.LOG_LEVEL_FATAL) + continue # * TRANSCODE RAW VIDEO - self.transcode(job['raw_video'], job['name'], \ - job['transcode_configs']) + try: + self.transcode(job['raw_video'], job['name'], \ + job['transcode_configs']) + except Exception as e: + logger.log_msg('#%s: error while transcoding: %s' \ + % (job['id'], str(e)), logger.LOG_LEVEL_FATAL) + continue # * EXTRACT THUMBNAIL IMAGES if job['thumbs'] != 0: - self.extract_thumbs(job['raw_video'], job['name'], \ - job['thumbs']) + try: + self.extract_thumbs(job['raw_video'], job['name'], \ + job['thumbs']) + except Exception as e: + logger.log_msg( \ + '#%s: error while extracting thumbnail images: %s' \ + % (job['id'], str(e)), logger.LOG_LEVEL_FATAL) + continue # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS self.seed(job['transcode_configs']) @@ -182,10 +198,15 @@ class CIWorker(threading.Thread): thumb_files = fnmatch.filter(files, job['name'] + "_*") # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT - self.transfer_out(torrent_files, config.TORRENTS_PATH, \ - config.WS_TORRENTS_PATH) - self.transfer_out(thumb_files, config.THUMBS_PATH, \ - config.WS_THUMBS_PATH) + try: + self.transfer_out(torrent_files, config.TORRENTS_PATH, \ + config.WS_TORRENTS_PATH) + self.transfer_out(thumb_files, config.THUMBS_PATH, \ + config.WS_THUMBS_PATH) + except Exception as e: + logger.log_msg('#%s: error while transferring out: %s' \ + % (job['id'], str(e)), logger.LOG_LEVEL_FATAL) + continue # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES self.remove_files([ job['raw_video'] ], config.RAW_VIDEOS_PATH) @@ -215,6 +236,8 @@ class Server: resp = {"load": Server.load} web.header('Content-Type', 'application/json') return json.dumps(resp) + #elif request == 'shutdown': + #sys.exit(0) elif request == 'test': return '' else: @@ -240,26 +263,55 @@ class Server: Server.queue.put(data) return 'Job submitted.' + elif request == 'start_torrents': + # Read JSON parameters. + json_data = web.data() + data = json.loads(json_data) + + # TODO Verify data + Server.start_torrents(data) + elif request == 'stop_torrents': + # Read JSON parameters. + json_data = web.data() + data = json.loads(json_data) + + # TODO Verify data + Server.stop_torrents(data) + elif request == 'remove_torrents': + # Read JSON parameters. + json_data = web.data() + data = json.loads(json_data) + + # TODO Verify data + Server.stop_torrents(data, True) else: web.badrequest() return "" @staticmethod - def start_downloads(): + def start_torrents(torrents=None): + """ + Scans torrent path for files in order to start download for the files + that are not already started. + """ + # All torrent files. - files = [f for f in os.listdir(config.TORRENTS_PATH) \ - if os.path.isfile(os.path.join( \ - config.TORRENTS_PATH, f))] - torrent_files = fnmatch.filter(files, "*.tstream") - - for torrent_file in torrent_files: - Server.bit_torrent.start_download( \ - torrent_file, + if torrents == None: + files = [f for f in os.listdir(config.TORRENTS_PATH) \ + if os.path.isfile(os.path.join( \ + config.TORRENTS_PATH, f))] + torrents = fnmatch.filter(files, "*.tstream") + + for torrent_file in torrents: + Server.bit_torrent.start_torrent( \ + os.path.join(config.TORRENTS_PATH, torrent_file), config.MEDIA_PATH) - - t = threading.Timer(config.START_DOWNLOADS_INTERVAL, \ - Server.start_downloads) - t.start() + + @staticmethod + def stop_torrents(torrents, remove_content=False): + for torrent_file in torrents: + Server.bit_torrent.stop_torrent( \ + torrent_file, remove_content) def authenticate(self, username, password): if not config.SECURITY: @@ -278,7 +330,11 @@ if __name__ == '__main__': Server.queue = Queue() Server.load = 0 - Server.start_downloads() + Server.start_torrents() + t = threading.Timer(config.START_DOWNLOADS_INTERVAL, \ + Server.start_torrents) + t.daemon = True + t.start() # Worker thread. ci_worker = CIWorker()