cis: bugs fixed; start / stop / remove torrents commands implemented
[living-lab-site.git] / cis / cis.py
similarity index 71%
rename from cis/cisd.py
rename to cis/cis.py
index 2f39a6e..f88aad5 100755 (executable)
@@ -46,7 +46,7 @@ class CIWorker(threading.Thread):
         logger.log_msg('#%s: transfering in...' % self.job_id)
         
         file_transfer = config.FILE_TRANSFERER_CLASS( \
-                config.RAW_VIDEOS_PATH, config.WS_UPLOAD_PATH)
+                config.RAW_VIDEOS_PATH, config.WS_UPLOAD_PATH)            
         file_transfer.get([raw_video])
         file_transfer.close()
 
@@ -120,7 +120,7 @@ class CIWorker(threading.Thread):
             output_file = output_file[(output_file.rindex('/') + 1):]
 
             # * SEED TORRENTS
-            Server.bit_torrent.start_download( \
+            Server.bit_torrent.start_torrent( \
                     os.path.join(config.TORRENTS_PATH, output_file),
                     config.MEDIA_PATH)
                     
@@ -155,16 +155,32 @@ class CIWorker(threading.Thread):
             self.job_id = job['id']
 
             # * TRANSFER RAW VIDEO IN
-            self.transfer_in(job['raw_video'])
+            try:
+                self.transfer_in(job['raw_video'])
+            except Exception as e:
+                logger.log_msg('#%s: error while transferring in: %s' \
+                        % (job['id'], str(e)), logger.LOG_LEVEL_FATAL) 
+                continue
 
             # * TRANSCODE RAW VIDEO
-            self.transcode(job['raw_video'], job['name'], \
-                    job['transcode_configs'])
+            try:
+                self.transcode(job['raw_video'], job['name'], \
+                        job['transcode_configs'])
+            except Exception as e:
+                logger.log_msg('#%s: error while transcoding: %s' \
+                        % (job['id'], str(e)), logger.LOG_LEVEL_FATAL) 
+                continue
 
             # * EXTRACT THUMBNAIL IMAGES
             if job['thumbs'] != 0:
-                self.extract_thumbs(job['raw_video'], job['name'], \
-                        job['thumbs'])
+                try:
+                    self.extract_thumbs(job['raw_video'], job['name'], \
+                            job['thumbs'])
+                except Exception as e:
+                    logger.log_msg( \
+                            '#%s: error while extracting thumbnail images: %s' \
+                            % (job['id'], str(e)), logger.LOG_LEVEL_FATAL) 
+                    continue
 
             # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
             self.seed(job['transcode_configs'])
@@ -182,10 +198,15 @@ class CIWorker(threading.Thread):
             thumb_files = fnmatch.filter(files, job['name'] + "_*")
 
             # * TRANSFER TORRENTS AND THUMBNAIL IMAGES OUT
-            self.transfer_out(torrent_files, config.TORRENTS_PATH, \
-                    config.WS_TORRENTS_PATH)
-            self.transfer_out(thumb_files, config.THUMBS_PATH, \
-                    config.WS_THUMBS_PATH)
+            try:
+                self.transfer_out(torrent_files, config.TORRENTS_PATH, \
+                        config.WS_TORRENTS_PATH)
+                self.transfer_out(thumb_files, config.THUMBS_PATH, \
+                        config.WS_THUMBS_PATH)
+            except Exception as e:
+                logger.log_msg('#%s: error while transferring out: %s' \
+                        % (job['id'], str(e)), logger.LOG_LEVEL_FATAL) 
+                continue
             
             # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
             self.remove_files([ job['raw_video'] ], config.RAW_VIDEOS_PATH)
@@ -215,6 +236,8 @@ class Server:
             resp = {"load": Server.load}
             web.header('Content-Type', 'application/json')
             return json.dumps(resp)
+        #elif request == 'shutdown':
+            #sys.exit(0)
         elif request == 'test':
             return ''
         else:
@@ -240,26 +263,55 @@ class Server:
             Server.queue.put(data)
 
             return 'Job submitted.'
+        elif request == 'start_torrents':
+            # Read JSON parameters.
+            json_data = web.data()
+            data = json.loads(json_data)
+            
+            # TODO Verify data
+            Server.start_torrents(data)
+        elif request == 'stop_torrents':
+            # Read JSON parameters.
+            json_data = web.data()
+            data = json.loads(json_data)
+            
+            # TODO Verify data
+            Server.stop_torrents(data)
+        elif request == 'remove_torrents':
+            # Read JSON parameters.
+            json_data = web.data()
+            data = json.loads(json_data)
+            
+            # TODO Verify data
+            Server.stop_torrents(data, True)
         else:
             web.badrequest()
             return ""
     
     @staticmethod
-    def start_downloads():
+    def start_torrents(torrents=None):
+        """
+        Scans torrent path for files in order to start download for the files
+        that are not already started.
+        """
+        
         # All torrent files.
-        files = [f for f in os.listdir(config.TORRENTS_PATH) \
-                if os.path.isfile(os.path.join( \
-                        config.TORRENTS_PATH, f))]
-        torrent_files = fnmatch.filter(files, "*.tstream")
-
-        for torrent_file in torrent_files:
-            Server.bit_torrent.start_download( \
-                    torrent_file,
+        if torrents == None:
+            files = [f for f in os.listdir(config.TORRENTS_PATH) \
+                    if os.path.isfile(os.path.join( \
+                            config.TORRENTS_PATH, f))]
+            torrents = fnmatch.filter(files, "*.tstream")
+
+        for torrent_file in torrents:
+            Server.bit_torrent.start_torrent( \
+                    os.path.join(config.TORRENTS_PATH, torrent_file),
                     config.MEDIA_PATH)
-        
-        t = threading.Timer(config.START_DOWNLOADS_INTERVAL, \
-                Server.start_downloads)
-        t.start()
+    
+    @staticmethod
+    def stop_torrents(torrents, remove_content=False):
+        for torrent_file in torrents:
+            Server.bit_torrent.stop_torrent( \
+                    torrent_file, remove_content)
 
     def authenticate(self, username, password):
         if not config.SECURITY:
@@ -278,7 +330,11 @@ if __name__ == '__main__':
     Server.queue = Queue()
     Server.load = 0
     
-    Server.start_downloads()
+    Server.start_torrents()
+    t = threading.Timer(config.START_DOWNLOADS_INTERVAL, \
+            Server.start_torrents)
+    t.daemon = True
+    t.start()
     
     # Worker thread.
     ci_worker = CIWorker()