user gets notified interatively or by email in case of a CIS error
authorCălin-Andrei Burloiu <calin.burloiu@gmail.com>
Mon, 20 Feb 2012 13:08:59 +0000 (15:08 +0200)
committerCălin-Andrei Burloiu <calin.burloiu@gmail.com>
Mon, 20 Feb 2012 13:08:59 +0000 (15:08 +0200)
application/config/constants.php
application/config/content_ingestion.php
application/controllers/video.php
application/language/english/video_lang.php
application/models/videos_model.php
cis/cis.py
cis/cis_lb/config.py
cis/cis_lb/load_balancer/base.py
cis/config.py
cis/logger.py

index 6456597..3fe94e9 100644 (file)
@@ -46,5 +46,19 @@ define('FOPEN_READ_WRITE_CREATE_STRICT',             'x+b');
 define('USER_ROLE_STANDARD',                   0);
 define('USER_ROLE_ADMIN',                              1);
 
+/*
+|--------------------------------------------------------------------------
+| CIS Responses
+|--------------------------------------------------------------------------
+|
+| Reponses of a CIS for the ingest_content request as they appear coded
+| in the DB in `videos_unactivated` table, column `cis_response`.
+|
+*/
+define('CIS_RESP_NONE',                                0);
+define('CIS_RESP_COMPLETION',          1);
+define('CIS_RESP_UNREACHABLE',         2);
+define('CIS_RESP_INTERNAL_ERROR',      3);
+
 /* End of file constants.php */
 /* Location: ./application/config/constants.php */
\ No newline at end of file
index 76a6c69..000a955 100644 (file)
@@ -19,6 +19,7 @@
 |
 */
 $config['cis_url'] = 'http://localhost:31500/';
+//$config['cis_url'] = 'http://p2p-next-03.grid.pub.ro:31500/';
 
 /*
 |--------------------------------------------------------------------------
@@ -88,11 +89,14 @@ $config['thumbs_count'] = 4;
 
 /*
 |--------------------------------------------------------------------------
-| Eliminate Duplicate Resolutions
+| Eliminate Duplicate Resolutions (DISABLED)
 |--------------------------------------------------------------------------
 |
 | Eliminate consecutive formats with the same resolution after processing
 | them.
 |
+| THIS FEATURE HAS BEEN DISABLED BECAUSE IS CAUSES THE CREATION OF A
+| MALFORMED CONTENT INGESTION JSON.
+|
 */
 $config['elim_dupl_res'] = TRUE;
\ No newline at end of file
index 5182b62..320a94c 100644 (file)
@@ -24,11 +24,12 @@ class Video extends CI_Controller {
                //phpinfo();
        }
        
-       public function test()
+       public function test($param)
        {
-               $this->load->helper('video');
+               $this->load->model('videos_model');
                
-               var_dump(get_av_info('data/upload/test.ogv'));
+               echo $this->videos_model->send_upload_error_email($param,
+                               CIS_RESP_UNREACHABLE) ? 's-a trimis' : 'nu s-a trimis';
        }
        
        /**
@@ -69,7 +70,7 @@ class Video extends CI_Controller {
                
                // Video is being processed by CIS.
                if ($data['video']['activation_code']
-                               && !$data['video']['content_ingested'])
+                               && $data['video']['cis_response'] == CIS_RESP_NONE)
                {
                        $this->load->helper('message');
                        show_error_msg_page($this, 
@@ -213,13 +214,31 @@ class Video extends CI_Controller {
                                        $this->av_info['duration'],
                                        $prepared_formats['db_formats'], $category_id, $user_id,
                                        $this->uploaded_file);
+                       if ($activation_code == FALSE)
+                       {
+                               $this->load->helper('message');
+                               show_error_msg_page($this, 
+                                       $this->lang->line('video_msg_add_video_db_error'));
+                               return;
+                       }
                        
                        // Send a content ingestion request to
                        // CIS (Content Ingestion Server).
-                       $this->_send_content_ingestion($activation_code,
+                       $r = $this->videos_model->send_content_ingestion($activation_code,
                                        $this->uploaded_file,
                                        $name, $this->av_info['size'],
                                        $prepared_formats['transcode_configs']);
+                       if ($r == FALSE)
+                       {
+                               $this->videos_model->set_cis_response($activation_code,
+                                               CIS_RESP_UNREACHABLE);
+                               
+                               $this->load->helper('message');
+                               show_error_msg_page($this, 
+                                               $this->lang->line(
+                                                               'video_msg_send_content_ingestion_error'));
+                               return;
+                       }
                        
                        $this->load->helper('message');
                        show_info_msg_page($this, 
@@ -227,18 +246,65 @@ class Video extends CI_Controller {
                }
        }
        
+       /**
+        * URL used by CIS service to announce its content ingestion completion.
+        * 
+        * @param string $activation_code 
+        */
        public function cis_completion($activation_code)
        {
                $this->load->model('videos_model');
                
                if ($this->config->item('require_moderation'))
-                       $this->videos_model->set_content_ingested($activation_code);
+                       $this->videos_model->set_cis_response($activation_code,
+                                       CIS_RESP_COMPLETION);
                else
                        $this->videos_model->activate_video($activation_code);
                
 //             log_message('info', "cis_completion $activation_code");
        }
        
+       /**
+        * URL used by CIS service to annouce an error which occured while
+        * ingesting content.
+        * 
+        * @param string $activation_code
+        * @param string $error_name 'internal_error' corresponds to constant
+        * CIS_RESP_INTERNAL_ERROR and 'unreachable' corresponds to constant
+        * CIS_RESP_UNREACHABLE
+        */
+       public function cis_error($activation_code, $error_name = 'internal_error')
+       {
+               $this->load->model('videos_model');
+               
+               if ($error_name == 'internal_error')
+               {
+                       $this->videos_model->set_cis_response ($activation_code,
+                                       CIS_RESP_INTERNAL_ERROR);
+                       log_message('error',
+                                       "Internal CIS error for activation code $activation_code");
+                       $this->videos_model->send_upload_error_email($activation_code,
+                                       CIS_RESP_INTERNAL_ERROR);
+               }
+               // Unreachable error is announced by a CIS-LB which was unable to
+               // contact an CIS.
+               else if ($error_name == 'unreachable')
+               {
+                       $this->videos_model->set_cis_response($activation_code,
+                                       CIS_RESP_UNREACHABLE);
+                       log_message('error',
+                                       "CIS-LB could not reach any CIS for activation code $activation_code");
+                       $this->videos_model->send_upload_error_email($activation_code,
+                                       CIS_RESP_UNREACHABLE);
+               }
+               else
+               {
+                       log_message('error',
+                                       "Invalid error name received from CIS / CIS-LB for activation code $activation_code");
+               }
+               
+       }
+       
        /**
        * Increments (dis)likes count for video with the specified id and returns to
        * the client as plain text the number if likes.
@@ -335,50 +401,6 @@ class Video extends CI_Controller {
                        return $output;
        }
        
-       /**
-        * Request content_ingest to the CIS in order to start the content
-        * ingestion process.
-        * 
-        * @param string $activation_code
-        * @param string $raw_video_fn uploaded video file name
-        * @param string $name
-        * @param int $raw_video_size uploaded video file size in bytes
-        * @param array $transcode_configs dictionary which must be included in
-        * the JSON data that needs to be sent to CIS
-        * @return mixed return the HTTP content (body) on success and FALSE
-        * otherwise
-        */
-       protected function _send_content_ingestion($activation_code, $raw_video_fn,
-                       $name, $raw_video_size, $transcode_configs)
-       {
-               $this->config->load('content_ingestion');
-               
-               $url = $this->config->item('cis_url') . 'ingest_content';
-               $data = array(
-                       'code'=>$activation_code,
-                       'raw_video'=>$raw_video_fn,
-                       'name'=>$name,
-                       'weight'=>$raw_video_size,
-                       'transcode_configs'=>$transcode_configs,
-                       'thumbs'=>$this->config->item('thumbs_count')
-               );
-               $json_data = json_encode($data);
-               
-               // Send request to CIS.
-               $r = new HttpRequest($url, HttpRequest::METH_POST);
-               $r->setBody($json_data);
-               try
-               {
-                       $response = $r->send()->getBody();
-               }
-               catch (HttpException $ex) 
-               {
-                       return FALSE;
-               }
-               
-               return $response;
-       }
-       
        public function _is_user_loggedin($param)
        {
                if (! $this->session->userdata('user_id'))
index 77c0225..7663621 100644 (file)
@@ -28,6 +28,23 @@ $lang['video_msg_video_uploaded'] = 'Your video has been uploaded and is now bei
 $lang['video_msg_no_video'] = 'This video does not exist.';
 $lang['video_msg_video_not_ready'] = 'This video is being processed and is not available yet.';
 $lang['video_msg_video_unactivated'] = 'This video requires moderation and must be approved by an administrator.';
+$lang['video_msg_add_video_db_error'] = 'An error occured while adding your video to our database.';
+$lang['video_msg_send_content_ingestion_error'] = 'We are sorry for the inconvenience, but your video cannot be processed for the moment.';
+
+$lang['video_internal_cis_error_email_content'] =
+'Hello,
+
+We are sorry for the inconvenience, but an error occured while processing your video "%s".
+
+Best regards!
+';
+$lang['video_unreachable_cis_error_email_content'] = 
+'Hello,
+
+We are sorry for the inconvenience, but your video "%s" cannot be processed for the moment.
+
+Best regards!
+';
 
 
 /* End of file video_lang.php */
index c09673c..590d1b7 100644 (file)
@@ -80,7 +80,7 @@ class Videos_model extends CI_Model {
                // Show unactivated videos.
                $cond_unactivated = ($unactivated
                                ? '(a.activation_code IS NULL OR a.activation_code IS NOT NULL
-                                       AND a.content_ingested = 1)'
+                                       AND a.cis_response = '. CIS_RESP_COMPLETION. ')'
                                : 'a.activation_code IS NULL');
                
                // Category filtering
@@ -109,7 +109,7 @@ class Videos_model extends CI_Model {
                        $fields = "v.id, name, title, duration, user_id, u.username, views,
                                thumbs_count, default_thumb,
                                (views + likes - dislikes) AS score,
-                               a.activation_code, a.content_ingested";
+                               a.activation_code, a.cis_response";
                
                $query = $this->db->query(
                        "SELECT $fields
@@ -198,7 +198,7 @@ class Videos_model extends CI_Model {
                $this->load->helper('text');
                
                $query = $this->db->query("SELECT v.*, u.username,
-                                       a.activation_code, a.content_ingested
+                                       a.activation_code, a.cis_response
                                FROM `videos` v 
                                        LEFT JOIN `videos_unactivated` a ON (v.id = a.video_id),
                                        `users` u
@@ -320,13 +320,91 @@ class Videos_model extends CI_Model {
                return $activation_code;
        }
        
-       public function set_content_ingested($activation_code)
+       /**
+        * Request content_ingest to the CIS in order to start the content
+        * ingestion process.
+        * 
+        * @param string $activation_code
+        * @param string $raw_video_fn uploaded video file name
+        * @param string $name
+        * @param int $raw_video_size uploaded video file size in bytes
+        * @param array $transcode_configs dictionary which must be included in
+        * the JSON data that needs to be sent to CIS
+        * @return mixed return the HTTP content (body) on success and FALSE
+        * otherwise
+        */
+       public function send_content_ingestion($activation_code, $raw_video_fn,
+                       $name, $raw_video_size, $transcode_configs)
+       {
+               $this->config->load('content_ingestion');
+               
+               $url = $this->config->item('cis_url') . 'ingest_content';
+               $data = array(
+                       'code'=>$activation_code,
+                       'raw_video'=>$raw_video_fn,
+                       'name'=>$name,
+                       'weight'=>$raw_video_size,
+                       'transcode_configs'=>$transcode_configs,
+                       'thumbs'=>$this->config->item('thumbs_count')
+               );
+               $json_data = json_encode($data);
+               
+               // Send request to CIS.
+               $r = new HttpRequest($url, HttpRequest::METH_POST);
+               $r->setBody($json_data);
+               try
+               {
+                       $response = $r->send()->getBody();
+               }
+               catch (HttpException $ex) 
+               {
+                       return FALSE;
+               }
+               
+               return $response;
+       }
+       
+       public function set_cis_response($activation_code,
+                       $response = CIS_RESP_COMPLETION)
        {
                return $this->db->query("UPDATE `videos_unactivated`
-                       SET content_ingested = 1
+                       SET cis_response = $response
                        WHERE activation_code = '$activation_code'");
        }
        
+       public function send_upload_error_email($activation_code,
+                       $cis_response = CIS_RESP_INTERNAL_ERROR)
+       {
+               $query = $this->db->query("SELECT v.title, u.email
+                       FROM `videos_unactivated` a, `videos` v, `users` u
+                       WHERE a.activation_code = '$activation_code'
+                               AND a.video_id = v.id AND v.user_id = u.id");
+               
+               if ($query->num_rows() > 0)
+               {
+                       $title = $query->row()->title;
+                       $email = $query->row()->email;
+               }
+               else
+                       return FALSE;
+               
+               $subject = '['. $this->config->item('site_name')
+                               . '] Upload Error';
+               if ($cis_response == CIS_RESP_INTERNAL_ERROR)
+               {
+                       $msg = sprintf($this->lang->line(
+                                       'video_internal_cis_error_email_content'), $title);
+               }
+               else if ($cis_response == CIS_RESP_UNREACHABLE)
+               {
+                       $msg = sprintf($this->lang->line(
+                                       'video_unreachable_cis_error_email_content'), $title);
+               }
+               $headers = "From: ". $this->config->item('noreply_email');
+               
+               return mail($email, $subject, $msg, $headers);
+       }
+       
        /**
         * Activates a video by deleting its entry from `videos_unactivated`.
         * 
@@ -367,9 +445,10 @@ class Videos_model extends CI_Model {
        
        public function get_unactivated_videos()
        {
-               $query = $this->db->query("SELECT a.video_id, v.name, a.content_ingested
+               $query = $this->db->query("SELECT a.video_id, v.name, a.cis_response
                        FROM `videos_unactivated` a, `videos` v
-                       WHERE a.video_id = v.id AND a.content_ingested = 1");
+                       WHERE a.video_id = v.id AND a.cis_response = "
+                               . CIS_RESP_COMPLETION);
                
                if ($query->num_rows() > 0)
                        return $query->result_array();
index e63b729..8bd6575 100755 (executable)
@@ -167,6 +167,19 @@ class CIWorker(threading.Thread):
         
         f = urllib.urlopen(url)
         f.read()
+        
+    def notify_error(self, code):
+        logger.log_msg('#%s: notifying web server about the error...'\
+                % self.job_id)
+        
+        if config.WS_ERROR[len(config.WS_ERROR) - 1] == '/':
+            url = config.WS_ERROR + code
+        else:
+            url = config.WS_ERROR + '/' + code
+        url = url + '/' + 'internal_error'
+        
+        f = urllib.urlopen(url)
+        f.read()
     
     def run(self):
         while True:
@@ -179,10 +192,12 @@ class CIWorker(threading.Thread):
             except cis_exceptions.FileAlreadyExistsException as e:
                 logger.log_msg('#%s: %s' \
                         % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
+                self.notify_error(job['code'])
                 continue
             except Exception as e:
                 logger.log_msg('#%s: error while transferring in: %s' \
-                        % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
+                        % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL)
+                self.notify_error(job['code'])
                 continue
 
             # * TRANSCODE RAW VIDEO
@@ -192,10 +207,12 @@ class CIWorker(threading.Thread):
             except cis_exceptions.FileAlreadyExistsException as e:
                 logger.log_msg('#%s: %s' \
                         % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
+                self.notify_error(job['code'])
                 continue
             except Exception as e:
                 logger.log_msg('#%s: error while transcoding: %s' \
-                        % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
+                        % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL)
+                self.notify_error(job['code'])
                 continue
 
             # * EXTRACT THUMBNAIL IMAGES
@@ -206,11 +223,13 @@ class CIWorker(threading.Thread):
                 except cis_exceptions.FileAlreadyExistsException as e:
                     logger.log_msg('#%s: %s' \
                             % (job['code'], repr(e)), logger.LOG_LEVEL_ERROR)
+                    self.notify_error(job['code'])
                     continue
                 except Exception as e:
                     logger.log_msg( \
                             '#%s: error while extracting thumbnail images: %s' \
-                            % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
+                            % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL)
+                    self.notify_error(job['code'])
                     continue
 
             # * CREATE TORRENTS AND START SEEDING OF TRANSCODED VIDEOS
@@ -237,16 +256,17 @@ class CIWorker(threading.Thread):
             except Exception as e:
                 logger.log_msg('#%s: error while transferring out: %s' \
                         % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
+                self.notify_error(job['code'])
                 continue
             
             # * NOTIFY WEB SERVER ABOUT CONTENT INGESTION COMPLETION
-            # TODO in the future web server should also be notified about errors
             try:
                 self.notify_completion(job['code'])
             except Exception as e:
                 logger.log_msg(
                         '#%s: error while notifying web server about the job completion: %s' \
-                        % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL) 
+                        % (job['code'], repr(e)), logger.LOG_LEVEL_FATAL)
+                self.notify_error(job['code'])
                 continue
             
             # * CLEANUP RAW VIDEOS AND THUMBNAIL IMAGES
index 410fe27..208c957 100644 (file)
@@ -12,6 +12,9 @@ CIS_URLS = [ \
     'http://p2p-next-09.grid.pub.ro:31500/', \
     'http://p2p-next-10.grid.pub.ro:31500/' \
 ]
+# Web server's URL for content ingestion errors. P2P-Tube uses
+# http://<site>/video/cis_error .
+WS_ERROR = 'http://p2p-next.cs.pub.ro/devel/video/cis_error'
 
 import load_balancer.random_lb
 LOAD_BALANCER = load_balancer.random_lb.RandomLoadBalancer
index f4e9261..e81baee 100644 (file)
@@ -1,9 +1,11 @@
 import threading
 import urllib
+import json
 
 import config
 import logger
 
+
 class LoadBalancer(threading.Thread):
     
     def __init__(self, id, queue):
@@ -21,6 +23,7 @@ class LoadBalancer(threading.Thread):
         while True:
             (request, data) = self.queue.get()
             urls = config.CIS_URLS[:]
+            code = json.loads(data)['code']
             
             while len(urls) != 0:
                 cis = self.choose(urls)
@@ -29,15 +32,36 @@ class LoadBalancer(threading.Thread):
                 try:
                     urllib.urlopen(cis + request, data)
                 except IOError:
-                    logger.log_msg('Failed to forward request to %s' % cis, \
+                    logger.log_msg('#%s: Failed to forward request to %s' \
+                            % (code, cis), \
                             logger.LOG_LEVEL_ERROR)
                     continue
                 
-                logger.log_msg('Request forwarded to %s' % cis, \
+                logger.log_msg('#%s: Request forwarded to %s' \
+                            % (code, cis), \
                         logger.LOG_LEVEL_INFO)
                 break
             
+            if len(urls) == 0:
+                logger.log_msg('#%s: Failed to forward request to any CIS' \
+                            % code, \
+                            logger.LOG_LEVEL_FATAL)
+                self.notify_error(code)
+            
             self.queue.task_done()
+    
+    def notify_error(self, code):
+        logger.log_msg('#%s: notifying web server about the error...'\
+                % code)
+        
+        if config.WS_ERROR[len(config.WS_ERROR) - 1] == '/':
+            url = config.WS_ERROR + code
+        else:
+            url = config.WS_ERROR + '/' + code
+        url = url + '/' + 'unreachable'
+        
+        f = urllib.urlopen(url)
+        f.read()
         
     def choose(self, urls):
         """
index adb5ed0..ba529d9 100644 (file)
@@ -33,8 +33,10 @@ WS_THUMBS_PATH = 'devel/data/thumbs'
 BT_TRACKER = 'http://p2p-next-10.grid.pub.ro:6969/announce'
 # Web server's URL for content ingestion completion. P2P-Tube uses
 # http://<site>/video/cis_completion .
-#WS_COMPLETION = 'http://p2p-next-03.grid.pub.ro:8081/cis_completion'
 WS_COMPLETION = 'http://p2p-next.cs.pub.ro/devel/video/cis_completion'
+# Web server's URL for content ingestion errors. P2P-Tube uses
+# http://<site>/video/cis_error .
+WS_ERROR = 'http://p2p-next.cs.pub.ro/devel/video/cis_error'
 
 
 # === CIS PATHS ===
index dee0e28..7e9fd31 100644 (file)
@@ -1,4 +1,5 @@
 import sys
+import datetime
 
 LOG_LEVEL_ALL = 0
 LOG_LEVEL_DEBUG = 1
@@ -33,5 +34,8 @@ def log_msg(msg, level=LOG_LEVEL_INFO):
         f = sys.stderr
     else:
         f = sys.stdout
+    
+    now = datetime.datetime.now()
+    date_time = now.strftime('%Y-%m-%d %H:%M:%S')
         
-    f.write('[%s] %s\n' % (LOG_LEVEL_NAMES[level], msg))
\ No newline at end of file
+    f.write('[%s][%s] %s\n' % (LOG_LEVEL_NAMES[level], date_time, msg))
\ No newline at end of file