X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=blobdiff_plain;f=cis%2Fcisd.py;h=d286ef7958b4a379c254473e98d7b71cb3e93847;hb=2aecadba1f242a0dcf68614a6aa602282f020b41;hp=b042817e6ef94d8a3c99b298512b422693343b15;hpb=820d45742df3bedbb0477e06cb651f5d328e5ab2;p=living-lab-site.git
diff --git a/cis/cisd.py b/cis/cisd.py
index b042817..d286ef7 100755
--- a/cis/cisd.py
+++ b/cis/cisd.py
@@ -2,10 +2,13 @@
import sys
import os
+import fnmatch
import shutil
import time
import threading
from Queue import Queue
+import web
+import json
import config
import bt
@@ -24,26 +27,16 @@ class CIWorker(threading.Thread):
thumbs_dir = 'tmp/thumbs'
torrents_dir = 'tmp/torrents'
- def __init__(self, queue, bit_torrent):
+ def __init__(self, shared, bit_torrent):
"""
Initialize Content Ingestion Worker.
- @param queue a list of dictionaries with the following keys:
-
- - raw_video
- - name: a video name which must be a valid file name
- - transcode_configs: a list of transcode configuration
- dictionaries having the keys as the parameters of
- api.BaseTranscoder.transcode(...)
- - thumbs: string 'random' for extracting a thumbnail
- image from a random video position or a positive integer which
- represents the number of summary thumbnails to be extracted
-
+ @param shared data shared with the front-end (Service)
"""
threading.Thread.__init__(self, name='CIWorker')
- self.queue = queue
+ self.shared = shared
self.bit_torrent = bit_torrent
def transfer_in(self, raw_video):
@@ -113,7 +106,7 @@ class CIWorker(threading.Thread):
@param transcode_configs a list of dictionaries with format settings
"""
- for transcode_config in transcode_cofigs:
+ for transcode_config in transcode_configs:
# * CREATE TORRENTS FOR EACH TRANSCODED VIDEO
# Create torrent file.
bt.create_torrent(transcode_config['output_file'])
@@ -123,10 +116,13 @@ class CIWorker(threading.Thread):
shutil.move(transcode_config['output_file'] + '.tstream', \
self.torrents_dir)
+ output_file = transcode_config['output_file'] + '.tstream'
+ output_file = output_file[(output_file.rindex('/') + 1):]
+
# * SEED TORRENTS
bit_torrent.start_download( \
- transcode_config['output_file'] + '.tstream',
- self_transcoded_videos_dir)
+ os.path.join(self.torrents_dir, output_file),
+ self.transcoded_videos_dir)
print '** Creating torrents and seeding finished.'
@@ -145,7 +141,7 @@ class CIWorker(threading.Thread):
print '** Creating torrents and seeding finished.'
- def remove_file(self, files, path):
+ def remove_files(self, files, path):
"""
Deletes files from a specified path.
"""
@@ -157,7 +153,7 @@ class CIWorker(threading.Thread):
def run(self):
while True:
- job = self.queue.get()
+ job = self.shared.queue.get()
# * TRANSFER RAW VIDEO IN
self.transfer_in(job['raw_video'])
@@ -171,83 +167,126 @@ class CIWorker(threading.Thread):
self.extract_thumbs(job['raw_video'], job['name'], \
job['thumbs'])
-# # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
-# self.seed(job['transcode_configs'])
-#
-# # Torrent files.
-# files = [f for f in os.listdir(self.torrents_dir) \
-# if os.path.isfile(os.path.join( \
-# self.torrents_dir, f))]
-# torrent_files = fnmatch.filter(files, name + "_*")
-#
-# # Thumbnail images files.
-# files = [f for f in os.listdir(self.thumbs_dir) \
-# if os.path.isfile(os.path.join( \
-# self.thumbs_dir, f))]
-# thumb_files = fnmatch.filter(files, name + "_*")
-#
-# # Raw video files.
-# raw_files = [f for f in os.listdir(self.raw_videos_dir) \
-# if os.path.isfile(os.path.join( \
-# self.raw_videos_dir, f))]
-#
-# # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
-# self.transfer_out(torrent_files, self.torrents_dir, \
-# config.OUTPUT_TORRENTS_PATH)
-# self.transfer_out(thumb_files, self.thumbs_dir, \
-# config.OUTPUT_THUMBS_PATH)
-#
-# # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
-# self.remove_files(raw_files, self.raw_videos_dir)
-# self.remove_files(thumb_files, self.thumbs_dir)
+ # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
+ self.seed(job['transcode_configs'])
+
+ # Torrent files.
+ files = [f for f in os.listdir(self.torrents_dir) \
+ if os.path.isfile(os.path.join( \
+ self.torrents_dir, f))]
+ torrent_files = fnmatch.filter(files, job['name'] + "_*")
+
+ # Thumbnail images files.
+ files = [f for f in os.listdir(self.thumbs_dir) \
+ if os.path.isfile(os.path.join( \
+ self.thumbs_dir, f))]
+ thumb_files = fnmatch.filter(files, job['name'] + "_*")
+
+ # Raw video files.
+ raw_files = [f for f in os.listdir(self.raw_videos_dir) \
+ if os.path.isfile(os.path.join( \
+ self.raw_videos_dir, f))]
+
+ # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
+ self.transfer_out(torrent_files, self.torrents_dir, \
+ config.OUTPUT_TORRENTS_PATH)
+ self.transfer_out(thumb_files, self.thumbs_dir, \
+ config.OUTPUT_THUMBS_PATH)
+
+ # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
+ self.remove_files(raw_files, self.raw_videos_dir)
+ self.remove_files(thumb_files, self.thumbs_dir)
# * JOB FINISHED
- queue.task_done()
+ self.shared.queue.task_done()
+ print 'load in run is', self.shared.load
+ self.shared.load -= job['weight']
+class Shared:
+ """
+ Shared data between Service (front-end) and CIWorker (back-end).
-if __name__ == '__main__':
- # Jobs queue.
- queue = Queue()
+ @member queue a list of dictionaries with the following keys:
+
+ - raw_video
+ - name: a video name which must be a valid file name
+ - transcode_configs: a list of transcode configuration
+ dictionaries having the keys as the parameters of
+ api.BaseTranscoder.transcode(...)
+ - thumbs: string 'random' for extracting a thumbnail
+ image from a random video position or a positive integer which
+ represents the number of summary thumbnails to be extracted
+
+ @member load total weight of the jobs from the queue
+ """
- # The BitTorrent object implements a NextShare (Tribler) BitTorrent client
- # for seeding, downloading etc.
- bit_torrent = bt.BitTorrent()
+ def __init__(self):
+ # Jobs queue.
+ self.queue = Queue()
+
+ # Sever load.
+ self.load = 0
+
+
+class Service:
+ """
+ Implementation of the RESTful web service which constitutes the interface
+ with the client (web server).
+ """
- # Worker thread.
- ci_worker = CIWorker(queue, bit_torrent)
- ci_worker.daemon = True
- ci_worker.start()
-
- while True:
- raw_video = sys.stdin.readline().strip()
- if raw_video == 'x':
- break
-
- container = 'webm'
- a_codec = 'vorbis'
- a_bitrate = '128k'
- v_codec = 'vp8'
- v_bitrate = '480k'
- v_resolution = '640x480'
+ def __init__(self):
+ # Shared data with back-end (CIWorker).
+ self.shared = Shared()
+
+ global bit_torrent
+
+ # Worker thread.
+ ci_worker = CIWorker(self.shared, bit_torrent)
+ ci_worker.daemon = True
+ ci_worker.start()
- name = raw_video[:raw_video.rindex('.')]
- transcode_config = {
- 'container': container,
- 'a_codec': a_codec,
- 'a_bitrate': a_bitrate,
- 'v_codec': v_codec,
- 'v_bitrate': v_bitrate,
- 'v_resolution': v_resolution
- }
- thumbs = 4
-
- job = {
- 'raw_video': raw_video,
- 'name': name,
- 'transcode_configs': [transcode_config],
- 'thumbs': thumbs
- }
+ def __del__(self):
+ self.shared.queue.join()
+
+ def GET(self, request):
+ #web.header('Cache-Control', 'no-cache')
+
+ if request == 'get_load':
+ resp = {"load": self.shared.load}
+ print 'load in GET is', self.shared.load
+ web.header('Content-Type', 'application/json')
+ return json.dumps(resp)
+ else:
+ web.badrequest()
+ return ""
- queue.put(job)
- queue.join()
+ def POST(self, request):
+ if request == 'ingest_content':
+ # Read JSON parameters.
+ json_data = web.data()
+ data = json.loads(json_data)
+
+ # Add job weight to CIS load.
+ self.shared.load += data["weight"]
+ print 'load in POST is', self.shared.load
+
+ # Submit job.
+ self.shared.queue.put(data)
+
+ return 'Job submitted.'
+ else:
+ web.badrequest()
+ return ""
+
+
+if __name__ == '__main__':
+ # The BitTorrent object implements a NextShare (Tribler) BitTorrent
+ # client for seeding, downloading etc.
+ global bit_torrent
+ bit_torrent = bt.BitTorrent()
+
+ # Web service.
+ urls = ('/(.*)', 'Service')
+ service = web.application(urls, globals())
+ service.run()