CIS: CIWorker code written, but needs debugging
authorCalin-Andrei Burloiu <calin.burloiu@gmail.com>
Fri, 16 Dec 2011 15:14:37 +0000 (17:14 +0200)
committerCalin-Andrei Burloiu <calin.burloiu@gmail.com>
Fri, 16 Dec 2011 15:14:37 +0000 (17:14 +0200)
cis/api/base.py
cis/api/file_transfer.py
cis/bt.py
cis/cisd.py
cis/config.py

index 6f41275..a2e11d1 100644 (file)
@@ -225,7 +225,7 @@ class BaseThumbExtractor:
         for index in range (0, count):
             thumb_extracted = True
             try:
-                self.extract_thumb(seek_pos, resolution, index)
+                self.extract_thumb(seek_pos, resolution, n_thumbs_extracted)
             except api_exceptions.ThumbExtractionException as e:
                 thumb_extracted = False
 
index 50c2fca..14ef20c 100644 (file)
@@ -7,7 +7,7 @@ They may extend BaseFileTransferer class.
 """
 
 import sys
-from ftplib import FTP_TLS
+import ftplib
 import base
 import ftp_config
 import socket
@@ -25,7 +25,7 @@ class FTPFileTransferer(base.BaseFileTransferer):
     def __init__(self, local_path='', remote_path=''):
         base.BaseFileTransferer.__init__(self, local_path, remote_path)
 
-        self.ftp = FTP_TLS(ftp_config.FTP_HOST, ftp_config.FTP_USER,
+        self.ftp = ftplib.FTP_TLS(ftp_config.FTP_HOST, ftp_config.FTP_USER,
                 ftp_config.FTP_PASSWD, ftp_config.FTP_ACCT)
         self.ftp.set_pasv(True)
 
index b6205b6..2184fa5 100644 (file)
--- a/cis/bt.py
+++ b/cis/bt.py
@@ -3,6 +3,7 @@
 from BaseLib.Core.API import *
 import tempfile
 import random
+import config
 
 def create_torrent(source):
     """
@@ -47,7 +48,7 @@ class BitTorrent:
         
         self.session = Session(sscfg)
 
-    def download(self, torrent, output_dir='.'):
+    def start_download(self, torrent, output_dir='.'):
         """
         Download (leech or seed) a file via BitTorrent.
         The code is adapted from Next-Share's 'BaseLib/Tools/cmdlinedl.py'.
index d4c20de..6a6c310 100755 (executable)
@@ -2,6 +2,7 @@
 
 import sys
 import os
+import shutil
 import time
 import threading
 from Queue import Queue
@@ -18,98 +19,210 @@ class CIWorker(threading.Thread):
     CIWorker shares a Queue with its master where jobs are submitted.
     """
 
-    def __init__(self, queue):
+    raw_videos_dir = 'tmp/raw'
+    transcoded_videos_dir = 'tmp/media'
+    thumbs_dir = 'tmp/thumbs'
+    torrents_dir = 'tmp/torrents'
+
+    def __init__(self, queue, bit_torrent):
+        """
+        Initialize Content Ingestion Worker.
+
+        @param queue a list of dictionaries with the following keys:
+        <ul>
+            <li>raw_video</li>
+            <li>name: a video name which must be a valid file name</li>
+            <li>transcode_configs: a list of transcode configuration
+            dictionaries having the keys as the parameters of
+            api.BaseTranscoder.transcode(...)</li>
+            <li>thumbs: string 'random' for extracting a thumbnail
+            image from a random video position or a positive integer which
+            represents the number of summary thumbnails to be extracted</li>
+        </ul>
+        """
+
         threading.Thread.__init__(self)
 
         self.queue = queue
+        self.bit_torrent = bit_torrent
 
-    def run(self):
-        while True:
-            job = self.queue.get()
+    def transfer_in(self, raw_video):
+        """
+        Transfers a raw video file from the Web Server.
 
-            # * TRANSFER RAW VIDEO IN
-            file_transfer = config.FILE_TRANSFERER_CLASS( \
-                    'tmp/raw', config.INPUT_PATH)
-            file_transfer.get([job.raw_video])
-            file_transfer.close()
+        @param raw_video raw video file name
+        """
 
-            # * TRANSCODE RAW VIDEO
-            transcoder = config.TRANSCODER_CLASS(input_file = job.raw_video, \
-                    name = job.name, prog_bin = config.TRANSCODER_BIN)
-            
-            # Transcode the raw video in each requested format.
-            for transcode_config in job.transcode_configs:
-                transcode_config['output_file'] = transcoder.transcode( \
-                       container = transcode_config.container, \ 
-                       a_codec = transcode_config.a_codec, \
-                       a_bitrate = transcode_config.a_bitrate, \ 
-                       a_samplingrate = transcode_config.a_samplingrate, \ 
-                       a_channels = transcode_config.a_channels, \ 
-                       v_codec = transcode_config.v_codec, \ 
-                       v_bitrate = transcode_config.v_bitrate, \ 
-                       v_framerate = transcode_config.v_framerate, \ 
-                       v_resolution = transcode_config.v_resolution, \ 
-                       v_dar = transcode_config.dar)
+        file_transfer = config.FILE_TRANSFERER_CLASS( \
+                self.raw_videos_dir, config.INPUT_PATH)
+        file_transfer.get(raw_video)
+        file_transfer.close()
 
-            # * EXTRACT THUMBNAIL IMAGES
-            thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
-                    input_file = job.raw_video, name = job.name, \
-                    prog_bin = config.THUMB_EXTRACTOR_BIN)
-            # TODO thumbnail extraction type must be got from input
+        print '** Transfering in finished.'
+
+    def transcode(self, input_video, video_name, transcode_configs):
+        """
+        Transcodes a video in each requested formats.
+
+        @param input_video input video file name
+        @param video_name a video name which must be a valid file name
+        @param transcode_configs a list of dictionaries with format settings
+        """
+
+        transcoder = config.TRANSCODER_CLASS( \
+                input_file = video_name, \
+                name = video_name, prog_bin = config.TRANSCODER_BIN)
+        transcoder.dest_path = self.transcoded_videos_dir
+        
+        # Transcode the raw video in each requested format.
+        # TODO report partial errors
+        for transcode_config in transcode_configs:
+            transcode_config['output_file'] = transcoder.transcode( \
+                   container = transcode_config['container'], \
+                   a_codec = transcode_config['a_codec'], \
+                   a_bitrate = transcode_config['a_bitrate'], \
+                   a_samplingrate = transcode_config['a_samplingrate'], \
+                   a_channels = transcode_config['a_channels'], \
+                   v_codec = transcode_config['v_codec'], \
+                   v_bitrate = transcode_config['v_bitrate'], \
+                   v_framerate = transcode_config['v_framerate'], \
+                   v_resolution = transcode_config['v_resolution'], \
+                   v_dar = transcode_config['dar'])
+
+        print '** Transcoding finished.'
+
+    def extract_thumbs(self, input_video, video_name, thumbs):
+        """
+        Extracts thumbnail images from a video.
+
+        @param input_video input video file name
+        @param video_name a video name which must be a valid file name
+        @param thumbs use 'random' to extract a thumbnail image from a random
+        point of the video or use a positive integer n to extract n summary
+        thumbnail
+        """
+
+        # TODO report partial errors
+        thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
+                input_file = input_video, name = video_name, \
+                prog_bin = config.THUMB_EXTRACTOR_BIN)
+        thumb_extractor.dest_path = self.thumbs_dir
+        if thumbs == 'random':
             thumb_extractor.extract_random_thumb()
-            print thumb_extractor.extract_summary_thumbs(5)
+        elif type(thumbs) is int and thumbs > 0:
+            thumb_extractor.extract_summary_thumbs(thumbs)
 
+        print '** Extracting thumbs finished.'
 
-            queue.task_done()
+    def seed(self, transcode_configs):
+        """
+        Creates torrents from the videos passed and then stats seeding them.
 
+        @param transcode_configs a list of dictionaries with format settings
+        """
 
-class TranscodeConfig:
-    """
-    Structure that contains parameters for a transcoding procedure.
-    """
+        for transcode_config in transcode_cofigs:
+            # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
+            # Create torrent file.
+            bt.create_torrent(transcode_config['output_file'])
+            
+            # The torrent file is created in the same directory with the
+            # source file. Move it to the torrents directory.
+            shutil.move(transcode_config['output_file'] + '.tstream', \
+                    self.torrents_dir)
 
-    def __init__(self, container, a_codec, v_codec,
-            a_bitrate, a_samplingrate, a_channels,
-            v_bitrate, v_framerate, v_resolution, v_dar):
+            # * SEED TORRENTS
+            bit_torrent.start_download( \
+                    transcode_config['output_file'] + '.tstream',
+                    self_transcoded_videos_dir)
 
-        self.container = container
-        self.a_codec = a_codec
-        self.v_codec = v_codec
-        self.a_bitrate = a_bitrate
-        self.a_samplingrate = a_samplingrate
-        self.a_channels = a_channels
-        self.v_bitrate = v_bitrate
-        self.v_framerate = v_framerate
-        self.v_resolution = v_resolution
-        self.v_dar = v_dar
+        print '** Creating torrents and seeding finished.'
 
+    def transfer_out(self, local_files, local_path, remote_path):
+        """
+        Transfers some local files to a remote path of the Web Server.
 
+        @param local_files list local files to transfer
+        @param remote_path destination path on the Web Server
+        """
 
-class Job:
-    """
-    Structure that contains information about a job.
+        file_transfer = config.FILE_TRANSFERER_CLASS( \
+                local_path, remote_path)
+        file_transfer.put(local_files)
+        file_transfer.close()
 
-    Members are documented in the constructor.
-    """
+        print '** Creating torrents and seeding finished.'
 
-    def __init__(self, raw_video, name, transcode_configs):
+    def remove_file(self, files, path):
         """
-        @param raw_video the input raw video file name transfered from WS
-        @param name video name (must be a valid file name)
-        @param transcode_configs a list of TranscodeConfig instances
+        Deletes files from a specified path.
         """
 
-        self.raw_video = raw_video
-        self.name = name
-        self.transcode_configs
+        for f in files:
+            os.unlink(os.path.join(path, f))
+
+        print '** Cleaning up finished.'
+
+    def run(self):
+        while True:
+            job = self.queue.get()
+
+            # * TRANSFER RAW VIDEO IN
+            self.transfer_in(job['raw_video'])
+
+            # * TRANSCODE RAW VIDEO
+            self.transcode(job['raw_video'], job['name'], \
+                    job['transcode_configs'])
+
+            # * EXTRACT THUMBNAIL IMAGES
+            if job['thumbs'] != 0:
+                self.extract_thumbs(job['raw_video'], job['name'], \
+                        job['thumbs'])
+
+            # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
+            self.seed(job['transcode_configs'])
+
+            # Torrent files.
+            files = [f for f in os.listdir(self.torrents_dir) \
+                    if os.path.isfile(os.path.join( \
+                            self.torrents_dir, f))]
+            torrent_files = fnmatch.filter(files, name + "_*")
+
+            # Thumbnail images files.
+            files = [f for f in os.listdir(self.thumbs_dir) \
+                    if os.path.isfile(os.path.join( \
+                            self.thumbs_dir, f))]
+            thumb_files = fnmatch.filter(files, name + "_*")
+                
+            # Raw video files.
+            raw_files = [f for f in os.listdir(self.raw_videos_dir) \
+                    if os.path.isfile(os.path.join( \
+                            self.raw_videos_dir, f))]
+
+            # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
+            self.transfer_out(torrent_files, self.torrents_dir, \
+                    config.OUTPUT_TORRENTS_PATH)
+            self.transfer_out(thumb_files, self.thumbs_dir, \
+                    config.OUTPUT_THUMBS_PATH)
+            
+            # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
+            self.remove_files(raw_files, self.raw_videos_dir)
+            self.remove_files(thumb_files, self.thumbs_dir)
+
+            # * JOB FINISHED
+            queue.task_done()
 
 
 if __name__ == '__main__':
     # Jobs queue.
     queue = Queue()
 
+    # The BitTorrent object implements a NextShare (Tribler) BitTorrent client
+    # for seeding, downloading etc.
+    bit_torrent = bt.BitTorrent()
+
     # Worker thread.
-    ci_worker = CIWorker(queue)
+    ci_worker = CIWorker(queue, bit_torrent)
     ci_worker.daemon = True
     ci_worker.start()
 
@@ -118,33 +231,31 @@ if __name__ == '__main__':
         if raw_video == 'x':
             break
 
-        job = Job(raw_video)
+        container = 'webm'
+        a_codec = 'vorbis'
+        a_bitrate = '128k'
+        v_codec = 'vp8'
+        v_bitrate = '480k'
+        v_resolution = '640x480'
+        
+        name = raw_video[:raw_video.rindex('.')]
+        transcode_config = {
+            'container': container,
+            'a_codec': a_codec,
+            'a_bitrate': a_bitrate,
+            'v_codec': v_codec,
+            'v_bitrate': v_bitrate,
+            'v_resolution': v_resolution
+        }
+        thumbs = 'random'
+
+        job = {
+            'raw_video': raw_video,
+            'name': name,
+            'transcode_configs': [transcode_config],
+            'thumbs': thumbs
+        }
+            
         queue.put(job)
 
     queue.join()
-
-
-
-
-#    transcoder = config.TRANSCODER_CLASS(sys.argv[1])
-#    transcoder.transcode('webm', "vorbis", "vp8", a_bitrate="128k", a_samplingrate=22050, a_channels=2, v_bitrate="256k", v_framerate=15, v_resolution="320x240", v_dar="4:3")
-    
-#    thumb_extractor = config.THUMB_EXTRACTOR_CLASS(sys.argv[1])
-#    #print thumb_extractor.get_video_duration()
-#    #thumb_extractor.extract_random_thumb()
-#    print thumb_extractor.extract_summary_thumbs(5)
-
-#    file_transfer = config.FILE_TRANSFERER_CLASS()
-#    file_transfer.get(['vim_config.tar.gz'])
-#    #file_transfer.put(['cisd.py'])
-#    file_transfer.close()
-
-#    create_torrent(sys.argv[1])
-
-#    bt_inst = bt.BitTorrent()
-#
-#    bt_inst.download(sys.argv[1], '/tmp')
-#    bt_inst.download(sys.argv[2], '/tmp')
-#
-#    print threading.active_count(), threading.enumerate()
-#    time.sleep(30)
index 3e9ff69..990f746 100644 (file)
@@ -7,16 +7,17 @@ from api import file_transfer
 
 # === FILE TRANSFER CONFIGURATIONS ===
 # Path from the Web Server where the raw input video file is stored.
-INPUT_PATH = 'upload'
+INPUT_PATH = 'Downloads'
 # Path from the Web Server where the output torrent files will be stored.
-OUTPUT_TORRENTS_PATH = 'torrents'
+OUTPUT_TORRENTS_PATH = 'Downloads/tmp1'
 # Path from the Web Server where the output thumbnail image files will be
 # stored.
-OUTPUT_THUMBS_PATH = 'thumbs'
+OUTPUT_THUMBS_PATH = 'Downloads/tmp2'
 
 
 # === BITTORRENT CONFIGURATIONS ===
-BT_TRACKER = "http://p2p-next-10.grid.pub.ro:6969/announce"
+#BT_TRACKER = "http://p2p-next-10.grid.pub.ro:6969/announce"
+BT_TRACKER = "http://localhost:6969/announce"
 
 
 # === EXTERNAL PROGRAMS API CLASSES ===
@@ -30,7 +31,7 @@ THUMB_EXTRACTOR_CLASS = avhandling.FFmpegThumbExtractor
 FILE_TRANSFERER_CLASS = file_transfer.FTPFileTransferer
 
 
-# === EXTERNAL PROGRAMS BINARY FILES
+# === EXTERNAL PROGRAMS BINARY FILES ===
 # Set this values to None if you want default values provided by the API
 # class to be used.
 # Binary of a prgram which retrives audio/video information, like duration.