import sys
import os
+import shutil
import time
import threading
from Queue import Queue
CIWorker shares a Queue with its master where jobs are submitted.
"""
- def __init__(self, queue):
+ raw_videos_dir = 'tmp/raw'
+ transcoded_videos_dir = 'tmp/media'
+ thumbs_dir = 'tmp/thumbs'
+ torrents_dir = 'tmp/torrents'
+
+ def __init__(self, queue, bit_torrent):
+ """
+ Initialize Content Ingestion Worker.
+
+ @param queue a list of dictionaries with the following keys:
+ <ul>
+ <li>raw_video</li>
+ <li>name: a video name which must be a valid file name</li>
+ <li>transcode_configs: a list of transcode configuration
+ dictionaries having the keys as the parameters of
+ api.BaseTranscoder.transcode(...)</li>
+ <li>thumbs: string 'random' for extracting a thumbnail
+ image from a random video position or a positive integer which
+ represents the number of summary thumbnails to be extracted</li>
+ </ul>
+ """
+
threading.Thread.__init__(self)
self.queue = queue
+ self.bit_torrent = bit_torrent
- def run(self):
- while True:
- job = self.queue.get()
+ def transfer_in(self, raw_video):
+ """
+ Transfers a raw video file from the Web Server.
- # * TRANSFER RAW VIDEO IN
- file_transfer = config.FILE_TRANSFERER_CLASS( \
- 'tmp/raw', config.INPUT_PATH)
- file_transfer.get([job.raw_video])
- file_transfer.close()
+ @param raw_video raw video file name
+ """
- # * TRANSCODE RAW VIDEO
- transcoder = config.TRANSCODER_CLASS(input_file = job.raw_video, \
- name = job.name, prog_bin = config.TRANSCODER_BIN)
-
- # Transcode the raw video in each requested format.
- for transcode_config in job.transcode_configs:
- transcode_config['output_file'] = transcoder.transcode( \
- container = transcode_config.container, \
- a_codec = transcode_config.a_codec, \
- a_bitrate = transcode_config.a_bitrate, \
- a_samplingrate = transcode_config.a_samplingrate, \
- a_channels = transcode_config.a_channels, \
- v_codec = transcode_config.v_codec, \
- v_bitrate = transcode_config.v_bitrate, \
- v_framerate = transcode_config.v_framerate, \
- v_resolution = transcode_config.v_resolution, \
- v_dar = transcode_config.dar)
+ file_transfer = config.FILE_TRANSFERER_CLASS( \
+ self.raw_videos_dir, config.INPUT_PATH)
+ file_transfer.get(raw_video)
+ file_transfer.close()
- # * EXTRACT THUMBNAIL IMAGES
- thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
- input_file = job.raw_video, name = job.name, \
- prog_bin = config.THUMB_EXTRACTOR_BIN)
- # TODO thumbnail extraction type must be got from input
+ print '** Transfering in finished.'
+
+ def transcode(self, input_video, video_name, transcode_configs):
+ """
+ Transcodes a video in each requested formats.
+
+ @param input_video input video file name
+ @param video_name a video name which must be a valid file name
+ @param transcode_configs a list of dictionaries with format settings
+ """
+
+ transcoder = config.TRANSCODER_CLASS( \
+ input_file = video_name, \
+ name = video_name, prog_bin = config.TRANSCODER_BIN)
+ transcoder.dest_path = self.transcoded_videos_dir
+
+ # Transcode the raw video in each requested format.
+ # TODO report partial errors
+ for transcode_config in transcode_configs:
+ transcode_config['output_file'] = transcoder.transcode( \
+ container = transcode_config['container'], \
+ a_codec = transcode_config['a_codec'], \
+ a_bitrate = transcode_config['a_bitrate'], \
+ a_samplingrate = transcode_config['a_samplingrate'], \
+ a_channels = transcode_config['a_channels'], \
+ v_codec = transcode_config['v_codec'], \
+ v_bitrate = transcode_config['v_bitrate'], \
+ v_framerate = transcode_config['v_framerate'], \
+ v_resolution = transcode_config['v_resolution'], \
+ v_dar = transcode_config['dar'])
+
+ print '** Transcoding finished.'
+
+ def extract_thumbs(self, input_video, video_name, thumbs):
+ """
+ Extracts thumbnail images from a video.
+
+ @param input_video input video file name
+ @param video_name a video name which must be a valid file name
+ @param thumbs use 'random' to extract a thumbnail image from a random
+ point of the video or use a positive integer n to extract n summary
+ thumbnail
+ """
+
+ # TODO report partial errors
+ thumb_extractor = config.THUMB_EXTRACTOR_CLASS( \
+ input_file = input_video, name = video_name, \
+ prog_bin = config.THUMB_EXTRACTOR_BIN)
+ thumb_extractor.dest_path = self.thumbs_dir
+ if thumbs == 'random':
thumb_extractor.extract_random_thumb()
- print thumb_extractor.extract_summary_thumbs(5)
+ elif type(thumbs) is int and thumbs > 0:
+ thumb_extractor.extract_summary_thumbs(thumbs)
+ print '** Extracting thumbs finished.'
- queue.task_done()
+ def seed(self, transcode_configs):
+ """
+ Creates torrents from the videos passed and then stats seeding them.
+ @param transcode_configs a list of dictionaries with format settings
+ """
-class TranscodeConfig:
- """
- Structure that contains parameters for a transcoding procedure.
- """
+ for transcode_config in transcode_cofigs:
+ # * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
+ # Create torrent file.
+ bt.create_torrent(transcode_config['output_file'])
+
+ # The torrent file is created in the same directory with the
+ # source file. Move it to the torrents directory.
+ shutil.move(transcode_config['output_file'] + '.tstream', \
+ self.torrents_dir)
- def __init__(self, container, a_codec, v_codec,
- a_bitrate, a_samplingrate, a_channels,
- v_bitrate, v_framerate, v_resolution, v_dar):
+ # * SEED TORRENTS
+ bit_torrent.start_download( \
+ transcode_config['output_file'] + '.tstream',
+ self_transcoded_videos_dir)
- self.container = container
- self.a_codec = a_codec
- self.v_codec = v_codec
- self.a_bitrate = a_bitrate
- self.a_samplingrate = a_samplingrate
- self.a_channels = a_channels
- self.v_bitrate = v_bitrate
- self.v_framerate = v_framerate
- self.v_resolution = v_resolution
- self.v_dar = v_dar
+ print '** Creating torrents and seeding finished.'
+ def transfer_out(self, local_files, local_path, remote_path):
+ """
+ Transfers some local files to a remote path of the Web Server.
+ @param local_files list local files to transfer
+ @param remote_path destination path on the Web Server
+ """
-class Job:
- """
- Structure that contains information about a job.
+ file_transfer = config.FILE_TRANSFERER_CLASS( \
+ local_path, remote_path)
+ file_transfer.put(local_files)
+ file_transfer.close()
- Members are documented in the constructor.
- """
+ print '** Creating torrents and seeding finished.'
- def __init__(self, raw_video, name, transcode_configs):
+ def remove_file(self, files, path):
"""
- @param raw_video the input raw video file name transfered from WS
- @param name video name (must be a valid file name)
- @param transcode_configs a list of TranscodeConfig instances
+ Deletes files from a specified path.
"""
- self.raw_video = raw_video
- self.name = name
- self.transcode_configs
+ for f in files:
+ os.unlink(os.path.join(path, f))
+
+ print '** Cleaning up finished.'
+
+ def run(self):
+ while True:
+ job = self.queue.get()
+
+ # * TRANSFER RAW VIDEO IN
+ self.transfer_in(job['raw_video'])
+
+ # * TRANSCODE RAW VIDEO
+ self.transcode(job['raw_video'], job['name'], \
+ job['transcode_configs'])
+
+ # * EXTRACT THUMBNAIL IMAGES
+ if job['thumbs'] != 0:
+ self.extract_thumbs(job['raw_video'], job['name'], \
+ job['thumbs'])
+
+ # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
+ self.seed(job['transcode_configs'])
+
+ # Torrent files.
+ files = [f for f in os.listdir(self.torrents_dir) \
+ if os.path.isfile(os.path.join( \
+ self.torrents_dir, f))]
+ torrent_files = fnmatch.filter(files, name + "_*")
+
+ # Thumbnail images files.
+ files = [f for f in os.listdir(self.thumbs_dir) \
+ if os.path.isfile(os.path.join( \
+ self.thumbs_dir, f))]
+ thumb_files = fnmatch.filter(files, name + "_*")
+
+ # Raw video files.
+ raw_files = [f for f in os.listdir(self.raw_videos_dir) \
+ if os.path.isfile(os.path.join( \
+ self.raw_videos_dir, f))]
+
+ # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
+ self.transfer_out(torrent_files, self.torrents_dir, \
+ config.OUTPUT_TORRENTS_PATH)
+ self.transfer_out(thumb_files, self.thumbs_dir, \
+ config.OUTPUT_THUMBS_PATH)
+
+ # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
+ self.remove_files(raw_files, self.raw_videos_dir)
+ self.remove_files(thumb_files, self.thumbs_dir)
+
+ # * JOB FINISHED
+ queue.task_done()
if __name__ == '__main__':
# Jobs queue.
queue = Queue()
+ # The BitTorrent object implements a NextShare (Tribler) BitTorrent client
+ # for seeding, downloading etc.
+ bit_torrent = bt.BitTorrent()
+
# Worker thread.
- ci_worker = CIWorker(queue)
+ ci_worker = CIWorker(queue, bit_torrent)
ci_worker.daemon = True
ci_worker.start()
if raw_video == 'x':
break
- job = Job(raw_video)
+ container = 'webm'
+ a_codec = 'vorbis'
+ a_bitrate = '128k'
+ v_codec = 'vp8'
+ v_bitrate = '480k'
+ v_resolution = '640x480'
+
+ name = raw_video[:raw_video.rindex('.')]
+ transcode_config = {
+ 'container': container,
+ 'a_codec': a_codec,
+ 'a_bitrate': a_bitrate,
+ 'v_codec': v_codec,
+ 'v_bitrate': v_bitrate,
+ 'v_resolution': v_resolution
+ }
+ thumbs = 'random'
+
+ job = {
+ 'raw_video': raw_video,
+ 'name': name,
+ 'transcode_configs': [transcode_config],
+ 'thumbs': thumbs
+ }
+
queue.put(job)
queue.join()
-
-
-
-
-# transcoder = config.TRANSCODER_CLASS(sys.argv[1])
-# transcoder.transcode('webm', "vorbis", "vp8", a_bitrate="128k", a_samplingrate=22050, a_channels=2, v_bitrate="256k", v_framerate=15, v_resolution="320x240", v_dar="4:3")
-
-# thumb_extractor = config.THUMB_EXTRACTOR_CLASS(sys.argv[1])
-# #print thumb_extractor.get_video_duration()
-# #thumb_extractor.extract_random_thumb()
-# print thumb_extractor.extract_summary_thumbs(5)
-
-# file_transfer = config.FILE_TRANSFERER_CLASS()
-# file_transfer.get(['vim_config.tar.gz'])
-# #file_transfer.put(['cisd.py'])
-# file_transfer.close()
-
-# create_torrent(sys.argv[1])
-
-# bt_inst = bt.BitTorrent()
-#
-# bt_inst.download(sys.argv[1], '/tmp')
-# bt_inst.download(sys.argv[2], '/tmp')
-#
-# print threading.active_count(), threading.enumerate()
-# time.sleep(30)