From e487ab3d510e231706a1c0973973496beb25d2ff Mon Sep 17 00:00:00 2001 From: Calin-Andrei Burloiu Date: Wed, 8 Feb 2012 16:53:17 +0200 Subject: [PATCH] cis: logger created; start_downloads done --- cis/api/ftp_config.py | 2 +- cis/bt.py | 17 ++++++++-- cis/cisd.py | 73 ++++++++++++++++++++++++++----------------- cis/config.py | 18 ++++++++--- cis/job_test.json | 1 + cis/job_test2.json | 16 ---------- cis/logger.py | 37 ++++++++++++++++++++++ 7 files changed, 112 insertions(+), 52 deletions(-) delete mode 100644 cis/job_test2.json create mode 100644 cis/logger.py diff --git a/cis/api/ftp_config.py b/cis/api/ftp_config.py index 6cdc0de..551cd7d 100644 --- a/cis/api/ftp_config.py +++ b/cis/api/ftp_config.py @@ -4,5 +4,5 @@ Configuration file for FTPFileTransferer class. FTP_HOST = "localhost" FTP_USER = "calinburloiu" -FTP_PASSWD = "0k3D3m4T_" +FTP_PASSWD = "Ps1st1h14_" FTP_ACCT = "" diff --git a/cis/bt.py b/cis/bt.py index 2184fa5..9281876 100644 --- a/cis/bt.py +++ b/cis/bt.py @@ -4,6 +4,7 @@ from BaseLib.Core.API import * import tempfile import random import config +import logger def create_torrent(source): """ @@ -58,7 +59,7 @@ class BitTorrent: # setup and start download dscfg = DownloadStartupConfig() - dscfg.set_dest_dir(output_dir); + dscfg.set_dest_dir(output_dir) if torrent.startswith("http") or torrent.startswith(P2PURL_SCHEME): tdef = TorrentDef.load_from_url(torrent) @@ -66,6 +67,16 @@ class BitTorrent: tdef = TorrentDef.load(torrent) if tdef.get_live(): raise ValueError("CIS does not support live torrents") - - d = self.session.start_download(tdef, dscfg) + + new_download = True + try: + d = self.session.start_download(tdef, dscfg) + except DuplicateDownloadException: + new_download = False #d.set_state_callback(state_callback, getpeerlist=False) + + if new_download: + logger.log_msg('download of torrent "%s" started' % torrent) + #else: + #logger.log_msg('download of torrent "%s" already started' \ + #% torrent, logger.LOG_LEVEL_DEBUG) diff --git a/cis/cisd.py b/cis/cisd.py index 8284882..2f39a6e 100755 --- a/cis/cisd.py +++ b/cis/cisd.py @@ -14,6 +14,7 @@ from web.wsgiserver import CherryPyWSGIServer import config import bt import users +import logger if config.SECURITY: CherryPyWSGIServer.ssl_certificate = "cacert.pem" @@ -28,11 +29,6 @@ class CIWorker(threading.Thread): CIWorker shares a Queue with its master where jobs are submitted. """ - raw_videos_dir = 'tmp/raw' - transcoded_videos_dir = 'tmp/media' - thumbs_dir = 'tmp/thumbs' - torrents_dir = config.CIS_TORRENTS_PATH - def __init__(self): """ Initialize Content Ingestion Worker. @@ -47,10 +43,10 @@ class CIWorker(threading.Thread): @param raw_video raw video file name """ - print '** Transfering in...' + logger.log_msg('#%s: transfering in...' % self.job_id) file_transfer = config.FILE_TRANSFERER_CLASS( \ - self.raw_videos_dir, config.WS_UPLOAD_PATH) + config.RAW_VIDEOS_PATH, config.WS_UPLOAD_PATH) file_transfer.get([raw_video]) file_transfer.close() @@ -63,12 +59,12 @@ class CIWorker(threading.Thread): @param transcode_configs a list of dictionaries with format settings """ - print '** Transcoding...' + logger.log_msg('#%s: transcoding...' % self.job_id) transcoder = config.TRANSCODER_CLASS( \ - input_file = os.path.join(self.raw_videos_dir, input_video), \ + input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \ name = video_name, prog_bin = config.TRANSCODER_BIN) - transcoder.dest_path = self.transcoded_videos_dir + transcoder.dest_path = config.MEDIA_PATH # Transcode the raw video in each requested format. # TODO report partial errors @@ -87,14 +83,14 @@ class CIWorker(threading.Thread): thumbnail """ - print '** Extracting image thumbnails...' + logger.log_msg('#%s: extracting image thumbnails...' % self.job_id) # TODO report partial errors thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \ - input_file = os.path.join(self.raw_videos_dir, input_video), \ + input_file = os.path.join(config.RAW_VIDEOS_PATH, input_video), \ name = video_name, \ prog_bin = config.THUMB_EXTRACTOR_BIN) - thumb_extractor.dest_path = self.thumbs_dir + thumb_extractor.dest_path = config.THUMBS_PATH if thumbs == 'random': thumb_extractor.extract_random_thumb() elif type(thumbs) is int and thumbs > 0: @@ -107,7 +103,8 @@ class CIWorker(threading.Thread): @param transcode_configs a list of dictionaries with format settings """ - print '** Creating torrents and starting seeding...' + logger.log_msg('#%s: creating torrents and starting seeding...' \ + % self.job_id) for transcode_config in transcode_configs: # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO @@ -117,16 +114,16 @@ class CIWorker(threading.Thread): # The torrent file is created in the same directory with the # source file. Move it to the torrents directory. shutil.move(transcode_config['output_file'] + '.tstream', \ - self.torrents_dir) + config.TORRENTS_PATH) output_file = transcode_config['output_file'] + '.tstream' output_file = output_file[(output_file.rindex('/') + 1):] # * SEED TORRENTS Server.bit_torrent.start_download( \ - os.path.join(self.torrents_dir, output_file), - self.transcoded_videos_dir) - + os.path.join(config.TORRENTS_PATH, output_file), + config.MEDIA_PATH) + def transfer_out(self, local_files, local_path, remote_path): """ Transfers some local files to a remote path of the Web Server. @@ -135,7 +132,7 @@ class CIWorker(threading.Thread): @param remote_path destination path on the Web Server """ - print '** Transfering out...' + logger.log_msg('#%s: transfering out...' % self.job_id) file_transfer = config.FILE_TRANSFERER_CLASS( \ local_path, remote_path) @@ -147,7 +144,7 @@ class CIWorker(threading.Thread): Deletes files from a specified path. """ - print '** Cleaning up...' + logger.log_msg('#%s: cleaning up...' % self.job_id) for f in files: os.unlink(os.path.join(path, f)) @@ -155,6 +152,7 @@ class CIWorker(threading.Thread): def run(self): while True: job = Server.queue.get() + self.job_id = job['id'] # * TRANSFER RAW VIDEO IN self.transfer_in(job['raw_video']) @@ -172,26 +170,26 @@ class CIWorker(threading.Thread): self.seed(job['transcode_configs']) # Torrent files. - files = [f for f in os.listdir(self.torrents_dir) \ + files = [f for f in os.listdir(config.TORRENTS_PATH) \ if os.path.isfile(os.path.join( \ - self.torrents_dir, f))] + config.TORRENTS_PATH, f))] torrent_files = fnmatch.filter(files, job['name'] + "_*") # Thumbnail images files. - files = [f for f in os.listdir(self.thumbs_dir) \ + files = [f for f in os.listdir(config.THUMBS_PATH) \ if os.path.isfile(os.path.join( \ - self.thumbs_dir, f))] + config.THUMBS_PATH, f))] thumb_files = fnmatch.filter(files, job['name'] + "_*") # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT - self.transfer_out(torrent_files, self.torrents_dir, \ + self.transfer_out(torrent_files, config.TORRENTS_PATH, \ config.WS_TORRENTS_PATH) - self.transfer_out(thumb_files, self.thumbs_dir, \ + self.transfer_out(thumb_files, config.THUMBS_PATH, \ config.WS_THUMBS_PATH) # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES - self.remove_files([ job['raw_video'] ], self.raw_videos_dir) - self.remove_files(thumb_files, self.thumbs_dir) + self.remove_files([ job['raw_video'] ], config.RAW_VIDEOS_PATH) + self.remove_files(thumb_files, config.THUMBS_PATH) # * JOB FINISHED Server.queue.task_done() @@ -245,6 +243,23 @@ class Server: else: web.badrequest() return "" + + @staticmethod + def start_downloads(): + # All torrent files. + files = [f for f in os.listdir(config.TORRENTS_PATH) \ + if os.path.isfile(os.path.join( \ + config.TORRENTS_PATH, f))] + torrent_files = fnmatch.filter(files, "*.tstream") + + for torrent_file in torrent_files: + Server.bit_torrent.start_download( \ + torrent_file, + config.MEDIA_PATH) + + t = threading.Timer(config.START_DOWNLOADS_INTERVAL, \ + Server.start_downloads) + t.start() def authenticate(self, username, password): if not config.SECURITY: @@ -263,6 +278,8 @@ if __name__ == '__main__': Server.queue = Queue() Server.load = 0 + Server.start_downloads() + # Worker thread. ci_worker = CIWorker() ci_worker.daemon = True diff --git a/cis/config.py b/cis/config.py index 868253a..c68d05c 100644 --- a/cis/config.py +++ b/cis/config.py @@ -1,10 +1,18 @@ #!/usr/bin/env python +import logger + # Make here all necessary imports required for API classes. from api import avhandling from api import file_transfer +# === GENERAL CONFIGURATIONS === +LOG_LEVEL = logger.LOG_LEVEL_DEBUG +SECURITY = False +START_DOWNLOADS_INTERVAL = 24 * 3600.0 # Once a day + + # === FILE TRANSFER CONFIGURATIONS === # Path from the Web Server where the raw input video file is stored. WS_UPLOAD_PATH = 'tmp/data/upload' @@ -15,13 +23,15 @@ WS_TORRENTS_PATH = 'tmp/data/torrents' WS_THUMBS_PATH = 'tmp/data/thumbs' -SECURITY = False - - # === BITTORRENT CONFIGURATIONS === #BT_TRACKER = "http://p2p-next-10.grid.pub.ro:6969/announce" BT_TRACKER = 'http://localhost:6969/announce' -CIS_TORRENTS_PATH = 'tmp/torrents' + + +RAW_VIDEOS_PATH = 'tmp/raw' +MEDIA_PATH = 'tmp/media' +THUMBS_PATH = 'tmp/thumbs' +TORRENTS_PATH = 'tmp/torrents' # In a distributed file system for multi-CIS # === EXTERNAL PROGRAMS API CLASSES === diff --git a/cis/job_test.json b/cis/job_test.json index d29455f..e63e8b8 100644 --- a/cis/job_test.json +++ b/cis/job_test.json @@ -1,4 +1,5 @@ { + "id": "33", "raw_video": "test.ogv", "name": "test", "weight": 3, diff --git a/cis/job_test2.json b/cis/job_test2.json deleted file mode 100644 index a7c439a..0000000 --- a/cis/job_test2.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "raw_video": "2009-06-28_Beatles-Obladi_600p.ogv", - "name": "beatles", - "weight": 10, - "transcode_configs": [ - { - "container": "webm", - "a_codec": "vorbis", - "a_bitrate": "128k", - "v_codec": "vp8", - "v_bitrate": "480k", - "v_resolution": "640x480" - } - ], - "thumbs": 4 -} diff --git a/cis/logger.py b/cis/logger.py new file mode 100644 index 0000000..dee0e28 --- /dev/null +++ b/cis/logger.py @@ -0,0 +1,37 @@ +import sys + +LOG_LEVEL_ALL = 0 +LOG_LEVEL_DEBUG = 1 +LOG_LEVEL_INFO = 2 +LOG_LEVEL_WARNING = 3 +LOG_LEVEL_ERROR = 4 +LOG_LEVEL_FATAL = 5 +LOG_LEVEL_OFF = 6 + +LOG_LEVEL_NAMES = { \ + LOG_LEVEL_DEBUG: 'DEBUG', \ + LOG_LEVEL_INFO: 'INFO', \ + LOG_LEVEL_WARNING: 'WARNING', \ + LOG_LEVEL_ERROR: 'ERROR', \ + LOG_LEVEL_FATAL: 'FATAL', \ +} + +import config + +def log_msg(msg, level=LOG_LEVEL_INFO): + """ + Prints log messages based on the log level. + """ + + if level == LOG_LEVEL_ALL or level == LOG_LEVEL_OFF: + return + + if level < config.LOG_LEVEL: + return + + if level >= LOG_LEVEL_ERROR: + f = sys.stderr + else: + f = sys.stdout + + f.write('[%s] %s\n' % (LOG_LEVEL_NAMES[level], msg)) \ No newline at end of file -- 2.20.1