autorun: changes to support hrktorrent
authorAdriana Draghici <adriana008@gmail.com>
Thu, 6 May 2010 20:18:01 +0000 (20:18 +0000)
committerAdriana Draghici <adriana008@gmail.com>
Thu, 6 May 2010 20:18:10 +0000 (20:18 +0000)
autorun/PROTOCOL
autorun/Util.py
autorun/server/BitTorrentClientRun.py
autorun/server/Client.py
autorun/server/HrktorrentRun.py
autorun/server/Server.py

index 287c984..54d3873 100644 (file)
@@ -56,9 +56,10 @@ Serverul pastreaza un fisier cu info despre toate fisierele downloadate:
                DL_LIMIT:"256", 
                PORT:"9999", 
                DL_DIR:"/this/dir", 
-               LOG_DIR:"/this/dir", 
-               OUT_FILE: "output_file",  ??? nu stim exact scopul lui
-               LOG_FILE:"log_file", 
+               LOG_DIR:"/this/dir", !!! "" dc nu trebuie specificat 
+               LOG_FILE:"log_file", !!! "" dc nu trebuie specificat (eg. pt hrktorrent)
+               OUT_DIR:"/this/dir", 
+               OUT_FILE: "output_file",
                TORRENT: "torrent file"
                }
  <----- tuplu (ACK, pid_client) / (ERROR, error_msg)
index 279afce..a27ced2 100644 (file)
@@ -4,9 +4,8 @@ DAEMON_PID_PATH = "/var/tmp/autorun-server.pid"
 SERVER_DIR = "/home/p2p/cs-p2p-next/autorun/server/"
 SERVER_FILE = "Server.py"
 SERVER_TYPE = "python"
-SERVER_PORT = 10004 
+SERVER_PORT = 10004
 SERVER_HOST = "172.16.20.3"
-
 import os
 def get_sessions_file_path():
     dirpath = os.environ['HOME']+"/"+".autorun/"
@@ -60,19 +59,41 @@ ETA = "eta"
 DHT = "dht"
 NUM_PEERS = "num_peers"
 
+
 import os.path
 import time
 def create_archive_file(file_path):
-    print "Archive file: ", file_path
-    if os.path.isfile(file_path):
-        # archive file
-        archive_name = file_path + time.strftime("%d%m%Y_%H%M%S", time.localtime()) + ".zip"
-        command = "zip " + archive_name + " " + file_path
-        print command
-        os.system(command)   
-        # remove file
-        command = "rm " + file_path
-        os.system(command)
-        return 0
-    return -1
+    try:
+        print "Archive file: ", file_path
+
+        if os.path.isfile(file_path):
+            # archive file
+            archive_name = file_path + time.strftime("%d%m%Y_%H%M%S", time.localtime()) + ".zip"
+            command = "zip " + archive_name + " " + file_path
+            print command
+            os.system(command)   
+            # remove file
+            command = "rm " + file_path
+            os.system(command)
+            return 0
+        return -1
+    except Exception,ex:
+        raise 
+    
+
+def create_archive_dir_logs(dir_path):
+    try:
+        if not os.path.isfile(dir_path):
+            # archive all log files 
+            archive_name = dir_path + "/" + "hrk_" + time.strftime("%d%m%Y_%H%M%S", time.localtime()) + ".zip"
+            command = "zip "+ archive_name + " " + dir_path + "/libtorrent*" + " " + dir_path + "/*log"
+            print command
+            os.system(command)
 
+            # remove log files
+            command = "rm " + dir_path + "libtorrent *" + " " + dir_path + "/*log" 
+            os.system(command)
+            return 0
+        return -1
+    except Exception,ex:
+        raise 
index c336e03..7115372 100644 (file)
@@ -64,7 +64,6 @@ class BitTorrentClientRun:
         try:
             # split command
             args = shlex.split(command)
-          
             # remove redirectation parameters
             for i in range(0, len(args)):
                 if args[i].find(">") > -1 :
@@ -72,18 +71,18 @@ class BitTorrentClientRun:
                         args.pop(i)
                     break;
             #self.my_logger.debug(" BitTorrentClientRun: command =" +  str(args))
-            
-            log_redirect = open(self.log_dir+"/"+self.log_file,"a")
-            output_redirect = open(self.output_dir+"/"+self.output_file,"a")
-            
+            output_redirect = open(self.output_dir+"/"+self.output_file,"w")
             self.my_logger.debug(" output redirect to file " +  self.output_dir+"/"+self.output_file)
-            self.my_logger.debug(" log redirect to file " + self.log_dir+"/"+self.log_file)
-            
+            log_redirect = open("/tmp/err","w") # because hrktorrent does not have logs at stderr output
+            if self.log_file != "": 
+                log_redirect = open(self.log_dir+"/"+self.log_file,"w")
+                self.my_logger.debug(" log redirect to file " + self.log_dir+"/"+self.log_file)
             p=subprocess.Popen(args, shell=False, #does not create sh process
-                                stdout=output_redirect,
-                                stderr=log_redirect)
+                                    stdout=output_redirect,
+                                    stderr=log_redirect)
+
             pid = p.pid
-            self.my_logger.debug(" BitTorrentClientRun: pid =" +  str(pid))
+            self.my_logger.debug(" BitTorrentClientRun: pid = " +  str(pid))
             return [pid, log_redirect, output_redirect]
        
         except Exception, ex:
index e139ba4..a4d6c1a 100644 (file)
@@ -110,26 +110,26 @@ def test_all_commands(torrent_file):
             BASE_DIR:"/home/p2p/p2p-clients/hrktorrent",
             TORRENT: "/home/p2p/p2p-meta/" + torrent_name+".torrent",
             DL_DIR: "/home/p2p/p2p-dld/hrktorrent",
-            LOG_DIR: "/home/p2p/p2p-log/hrktorrent",
+            LOG_DIR: "/home/p2p/p2p-dld/hrktorrent",
             OUT_DIR: "/home/p2p/p2p-log/hrktorrent",
             OUT_FILE: torrent_name + ".out", #status messages
-            LOG_FILE: "hrktorrent-" + torrent_name + ".log", #verbose messages
+            LOG_FILE: "", #verbose messages
 
 
             }
     #print s.send_command(GET_OUTPUT, "")
    
-    response = s.send_command(START_MSG, start_data_tribler)
+    """response = s.send_command(START_MSG, start_data_tribler)
     print response
     pid1 = response[1]
     print pid1
-    
-    time.sleep(7)
+    """
+    """time.sleep(7)
     s = MySocket()
     s.connect(SERVER_HOST, SERVER_PORT)
-    response = s.send_command(GET_STATUS, pid1)
+    response = s.send_command(ARCHIVE,"")
     print response
-
+    """
     """response = s.send_command(START_MSG, start_data_transmission)
     print response
     pid2 = response[1]
@@ -140,18 +140,19 @@ def test_all_commands(torrent_file):
     response = s.send_command(GET_CLIENTS,"")
     print response
     """
-    time.sleep(5)
+    """time.sleep(3)"""
     """
     s = MySocket()
     s.connect(SERVER_HOST, SERVER_PORT)
     response = s.send_command(STOP_MSG, pid2)
     print response
     """
+    """
     s = MySocket()
     s.connect(SERVER_HOST, SERVER_PORT)
     response = s.send_command(STOP_MSG, pid1)
     print response
-    
+    """
 def test_send_recv():
     # test 1
 
index 43e79b7..540f494 100644 (file)
@@ -15,8 +15,8 @@ DEBUG = False
 class HrktorrentRun(BitTorrentClientRun):
     def __init__(self, base_path):
         BitTorrentClientRun.__init__(self, base_path,
-                "$base_path/hrktorrent --download_dir$download_dir --minport$port --maxport$port $torrent_file &> $output_dir/$output_file",
-                "$base_path/hrktorrent --download_dir$download_dir --minport$port --maxport$port $torrent_file &> $output_dir/$output_file")
+                "$base_path/hrktorrent --downloaddir$download_dir --minport$port --maxport$port $torrent_file &> $output_dir/$output_file",
+                "$base_path/hrktorrent --downloaddir$download_dir --minport$port --maxport$port $torrent_file &> $output_dir/$output_file")
 
 
 def main():
index 9da44d2..bd00d64 100644 (file)
@@ -97,7 +97,7 @@ class MyDaemon(Daemon):
             f.close()
         except Exception, ex:
             self.logger(" Exception while saving transfer info: " + str(ex))
-            raise Exception
+            raise
 
     def add_to_output_msg(self, file_list, info_dict):
         """ Constructs a list with self.logger file info - paths, client, metafile
@@ -122,34 +122,32 @@ class MyDaemon(Daemon):
     
     def archive_files(self, info_dict, archives_list):
         """ Archives status and verbose message log files. """
-        log_path = ""
-        out_path = ""
         err_msg = ""
         try:
             if LOG_DIR in info_dict:
-                log_path = info_dict[LOG_DIR]
-                if LOG_FILE in info_dict:
-                    file_path = log_path + "/" + info_dict[LOG_FILE]
-                    if create_archive_file(file_path) < -1:
-                        err_msg = "File does not exist: ", file_path
-                    else: archives_list.append(file_path)
-
-            if OUT_DIR in info_dict:
-                out_path = info_dict[OUT_DIR]
-            else:
-                out_path = log_path
-            
-            if OUT_FILE in info_dict:
-                file_path = out_path + "/" + info_dict[OUT_FILE]
-                if create_archive_file(file_path) < -1:
-                    err_msg = "File does not exist: ", file_path
+                if info_dict[CLIENT] == HRKTORRENT:
+                    if create_archive_dir_logs(info_dict[DL_DIR]) < 0:
+                        err_msg = "Could not archive files from dir " + file_path
+                    else: archives_list.append(info_dict[DL_DIR] + "/*" )
+                else:   
+                    if LOG_FILE in info_dict:
+                        file_path = info_dict[LOG_DIR] + "/" + info_dict[LOG_FILE]
+                        if create_archive_file(file_path) < 0:
+                            err_msg = "File does not exist: " + file_path
+                        else: archives_list.append(file_path)
+
+            if OUT_DIR in info_dict and OUT_FILE in info_dict:
+                file_path = info_dict[OUT_DIR] + "/" + info_dict[OUT_FILE]
+                if create_archive_file(file_path) < 0:
+                    err_msg = "File does not exist: " + file_path
                 else: archives_list.append(file_path)
+            
             if err_msg != "": 
-                self.logger.error(" " + err_msg)
+                self.logger.error(err_msg)
+                 
         except Exception, ex:
             self.logger.error("Exception while archiving files: " + str(ex))
-            raise Exception
-
+            raise 
 
     def read_download_info(self, file_list, create_archive = False, archives_list = []):
         """ Reads all the contents of the file that stores info about 
@@ -166,7 +164,6 @@ class MyDaemon(Daemon):
             dicts = [] #list with all info_dict dictionaries
             try:
                 while line != '':
-                    print line
                     if size == 0 :
                         if info_dict != {} and info_dict not in dicts:
                             if self.add_to_output_msg(file_list, info_dict) < 0:
@@ -261,20 +258,24 @@ class MyDaemon(Daemon):
             
     def stop_bt_client(self, pid):
         """ Stops a BT client by killing it."""
-        
-        int_pid = int(pid)
-        if int_pid == -1 or int_pid not in self.processes_fd.keys():
-            return "Invalid pid " + str(pid)
-        os.kill(int_pid, signal.SIGKILL) # kill generates zombies
-        os.wait()
-        self.processes_fd[int_pid][0].close() 
-        self.processes_fd[int_pid][1].close()
-        
-        del self.processes_fd[int_pid]
-        del self.processes_info[int_pid]
+        try:        
+            int_pid = int(pid)
+            if int_pid == -1 or int_pid not in self.processes_fd.keys():
+                return "Invalid pid " + str(pid)
+            os.kill(int_pid, signal.SIGKILL) # kill generates zombies
+            os.wait()
+            self.processes_fd[int_pid][0].close() 
+            self.processes_fd[int_pid][1].close()
+            
+            del self.processes_fd[int_pid]
+            del self.processes_info[int_pid]
 
-        self.logger.debug("  killed process with pid = " + str(pid))
-        return ""
+            self.logger.debug("  killed process with pid = " + str(pid))
+            return ""
+        
+        except Exception ,ex:
+            self.logger.error(" Exception occured: " + str(ex))
+            return "Could not kill process with pid " + str(pid)
 
     def set_linger(self,sock, l_onoff, l_linger):
         """Sets the SO_LINGER value on a socket."""
@@ -296,85 +297,89 @@ class MyDaemon(Daemon):
             self.logger.error("Exception occured while connecting: " + str(e))
             return
         while(1):
-            self.logger.debug("  accepting connections")
-            (clientsock, address) = self.serversocket.accept();
-            self.logger.debug("  accepted connection from " +str(address))
-            
-            msg = self.recv_pickled_data(clientsock)
+            try:
+                self.logger.debug("  accepting connections")
+                (clientsock, address) = self.serversocket.accept();
+                self.logger.debug("  accepted connection from " +str(address))
+                
+                msg = self.recv_pickled_data(clientsock)
+                
+                err_msg = ''
+                response = ''
+                
+                if msg[0] == START_MSG:
+                    bt_client_data = msg[1]
+                    response = self.start_bt_client(bt_client_data) #returns client_pid
+                    if response == self.START_ERR2:
+                        err_msg = "Error occurred while starting client"
+                    elif response == self.START_ERR1:
+                        err_msg = "BitTorrent client " + msg[1][CLIENT] +" not supported."
+                    else:
+                        try:
+                            self.save_download_info(bt_client_data)
+                        except Exception:
+                            err_msg = "Client started but transfer info could not be saved."
+
+                elif msg[0] == STOP_MSG:
+                    client_pid = msg[1]
+                    err_msg = self.stop_bt_client(client_pid)
             
-            err_msg = ''
-            response = ''
-            if msg[0] == START_MSG:
-                bt_client_data = msg[1]
-                response = self.start_bt_client(bt_client_data) #returns client_pid
-                if response == self.START_ERR1:
-                    err_msg = "Error occurred while starting client"
-                elif response == self.START_ERR2:
-                    err_msg = "BitTorrent client " + msg[1][CLIENT] +" not supported."
-                else:
+                elif msg[0] == GET_OUTPUT:
+                    response = []
+                    if self.read_download_info(response) < 0:
+                        err_msg = "Error encountered while reading file"
+                
+                elif msg[0] == GET_CLIENTS:
+                    response = []
+                    print self.processes_info
                     try:
-                        self.save_download_info(bt_client_data)
+                        for k in self. processes_info.keys():
+                            response.append([self.processes_info[k][0], k, self.processes_info[k][1]]) # client_name, pid, torrent file
                     except Exception:
-                        err_msg = "Client started but transfer info could not be saved."
-
-            elif msg[0] == STOP_MSG:
-                client_pid = msg[1]
-                err_msg = self.stop_bt_client(client_pid)
+                         err_msg = "Error: could not retrive running clients list"    
         
-            elif msg[0] == GET_OUTPUT:
-                response = []
-                if self.read_download_info(response) < 0:
-                    err_msg = "Error encountered while reading file"
-            
-            elif msg[0] == GET_CLIENTS:
-                response = []
-                print self.processes_info
-                try:
-                    for k in self. processes_info.keys():
-                        response.append([self.processes_info[k][0], k, self.processes_info[k][1]]) # client_name, pid, torrent file
-                except Exception:
-                     err_msg = "Error: could not retrive running clients list"    
-    
-            elif msg[0] == ARCHIVE:
-                response = []
-                if len(self.processes_fd) == 0: # ARCHIVE ONLY NO CLIENTS ARE RUNNING
-                    try :
-                        if self.read_download_info([], True, response) < 0: # read file path and create archives
+                elif msg[0] == ARCHIVE:
+                    response = []
+                    if len(self.processes_fd) == 0: # ARCHIVE ONLY NO CLIENTS ARE RUNNING
+                        try :
+                            if self.read_download_info([], True, response) < 0: # read file path and create archives
+                                err_msg = "Error: could not archive log files."
+                            else: # erase sessions file
+                                f = open(SESSIONS_FILE, "w")
+                                f.close()
+                        except Exception , ex:
                             err_msg = "Error: could not archive log files."
-                        else: # erase sessions file
-                            f = open(SESSIONS_FILE, "w")
-                            f.close()
-                    except Exception , ex:
-                        err_msg = "Error: could not archive log files."
+                    else:
+                        err_msg = "Cannot archive logs while clients are running"
+                
+                elif msg[0] == GET_STATUS:
+                    line_parts = []
+                    err_msg = self.get_client_status(msg[1], line_parts)
+                    if len(err_msg) == 0:
+                        response = {}
+                        
+                        print len(line_parts) 
+                        if len(line_parts[0]) >= 7:
+                            response[TIMESTAMP] = line_parts[0][0]
+                            response[NUM_PEERS] = line_parts[0][1]
+                            response[DHT] = line_parts[0][2]
+                            response[DL_SPEED] = line_parts[0][3]
+                            response[UP_SPEED] = line_parts[0][4]
+                            response[DL_SIZE] = line_parts[0][5]
+                            response[ETA] = line_parts[0][6]
+                
                 else:
-                    err_msg = "Cannot archive logs while clients are running"
-            elif msg[0] == GET_STATUS:
-                line_parts = []
-                err_msg = self.get_client_status(msg[1], line_parts)
-                if len(err_msg) == 0:
-                    response = {}
-                    
-                    print len(line_parts) 
-                    if len(line_parts[0]) >= 7:
-                        response[TIMESTAMP] = line_parts[0][0]
-                        response[NUM_PEERS] = line_parts[0][1]
-                        response[DHT] = line_parts[0][2]
-                        response[DL_SPEED] = line_parts[0][3]
-                        response[UP_SPEED] = line_parts[0][4]
-                        response[DL_SIZE] = line_parts[0][5]
-                        response[ETA] = line_parts[0][6]
-            
-            else:
-                err_msg = "Error: wrong message type";
+                    err_msg = "Error: wrong message type";
 
-            if err_msg != '':
-                self.send_pickled_data(clientsock,(ERROR_MSG,err_msg))
-                self.logger.debug(" Sending error message: " + err_msg)
-            else:
-                self.send_pickled_data(clientsock, (ACK_MSG, response))
-
-            clientsock.close()
+                if err_msg != '':
+                    self.send_pickled_data(clientsock,(ERROR_MSG,err_msg))
+                    self.logger.debug(" Sending error message: " + err_msg)
+                else:
+                    self.send_pickled_data(clientsock, (ACK_MSG, response))
 
+                clientsock.close()
+            except Exception, ex:
+                self.logger.error("Exception occured: "+ str(ex))
 
 if __name__ == "__main__":
 
@@ -407,3 +412,4 @@ if __name__ == "__main__":
         sys.exit(2)
 
 
+