autorun/server: add server pre-implementation (v0)
authorp2p <p2p@p2p-next-01.grid.pub.ro>
Sun, 19 Sep 2010 09:49:58 +0000 (12:49 +0300)
committerp2p <p2p@p2p-next-01.grid.pub.ro>
Sun, 19 Sep 2010 09:54:22 +0000 (12:54 +0300)
autorun/server/Server_v0.py [new file with mode: 0644]

diff --git a/autorun/server/Server_v0.py b/autorun/server/Server_v0.py
new file mode 100644 (file)
index 0000000..d621a38
--- /dev/null
@@ -0,0 +1,395 @@
+#!/usr/bin/env python
+
+import sys, os, socket 
+import os.path
+import time
+import pickle
+import signal
+import struct
+import logging
+
+from daemon import Daemon
+from Util import *
+from Util import SERVER_HOST, SERVER_PORT
+from BitTorrentClientRun import *
+from TransmissionRun import *
+from TriblerRun import *
+from HrktorrentRun import *
+from TriblerStatusParser import *
+
+class MyDaemon(Daemon):
+    """
+     Server class
+     2010, Adriana Draghici, adriana.draghici@cti.pub.ro
+    """
+
+    ip = ""
+    port = 0
+
+    processes_fd = {}   # keeps lists of file descriptors for each process
+    processes_info = {} # keeps lists of BT clients name, torrent name and status log name for each process
+    logger = None 
+    
+    START_ERR1 = -1 # returned when starting a client, if that client is not supported
+    START_ERR2 = -2 # an exception occured when creating a client process
+
+    def __init__(self, pidfile, ip='', stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
+        Daemon.__init__(self, pidfile, stdin, stdout, stderr)
+        self.ip = ip
+        self.port = SERVER_PORT
+        logging.basicConfig(level = logging.DEBUG)
+        self.logger = logging.getLogger('autorun.Server')
+        self.logger.setLevel(logging.DEBUG) 
+
+    def set_address(self, ip):
+        self.ip = ip
+    
+    def set_port(self, port):
+        self.port = port
+    
+    def run(self):
+        self.do_Server(self.ip, self.port)
+    
+    def recv_pickled_data(self, clientsock):
+        msg = ''
+        while 1:
+            chunk = clientsock.recv(BUFFER_SIZE)
+            if not chunk:
+                break
+                #raise RuntimeError, "socket connection broken"
+            msg = msg + chunk
+            if len(chunk) < BUFFER_SIZE:
+                break
+        
+        dd = pickle.loads(msg) 
+        self.logger.debug("  received message: " + str(dd))
+        return dd
+
+    def send_pickled_data (self, clientsock, data):
+        dumped_data = pickle.dumps(data)       
+        totalsent = 0
+        while totalsent < len(dumped_data):
+            sent = clientsock.send(dumped_data[totalsent:])
+            if sent == 0:
+                raise RuntimeError,    "socket connection broken"
+            totalsent = totalsent + sent
+        self.logger.debug("  sent message: " + str(data))
+
+
+    def save_download_info(self, data):
+        """ Stores  data about log files in a text file. """
+        try:
+            f = open(SESSIONS_FILE, 'a')
+            #f.write ("# session " + strftime("%d-%m-%Y %H:%M:%S", localtime())+ "\n")
+            num_lines = len(data.keys())
+            if PORT in data:
+                num_lines = num_lines -1;
+            if BASE_DIR in data:
+                num_lines = num_lines -1;
+
+            f.write(str(num_lines)+"\n")
+            for k in data.keys():
+                if k == PORT or k ==  BASE_DIR:
+                    continue
+                f.write(str(k)+":"+str(data[k])+"\n")
+
+            f.close()
+        except Exception, ex:
+            self.logger(" Exception while saving transfer info: " + str(ex))
+            raise
+
+
+    def create_archive_file(file_path):
+        try:
+            print "Archive file: ", file_path
+
+            if os.path.isfile(file_path):
+                # archive file
+                archive_name = file_path + time.strftime("%d%m%Y_%H%M%S", time.localtime()) + ".zip"
+                command = "zip " + archive_name + " " + file_path
+                print command
+                os.system(command)   
+                # remove file
+                command = "rm " + file_path
+                os.system(command)
+                return 0
+            return -1
+        except Exception,ex:
+            raise
+
+
+    def create_archive_dir_logs(dir_path):
+        try:
+            if not os.path.isfile(dir_path):
+                # archive all log files 
+                archive_name = dir_path + "/" + "hrk_" + time.strftime("%d%m%Y_%H%M%S", time.localtime()) + ".zip"
+                command = "zip "+ archive_name + " " + dir_path + "/libtorrent*" + " " + dir_path + "/*log"
+                print command
+                os.system(command)
+
+                # remove log files
+                command = "rm " + dir_path + "/libtorrent*" + " " + dir_path + "/*log" 
+                os.system(command)
+                return 0
+            return -1
+        except Exception,ex:
+            raise
+
+    def add_to_output_msg(self, file_list, info_dict):
+        """ Constructs a list with log files info - paths, client, metafile
+        @param file_list - list to which the created logs info list is appended. """
+
+        if CLIENT not in info_dict and TORRENT not in info_dict:
+            return -1
+        info_list = [info_dict[CLIENT],info_dict[TORRENT]]
+        if LOG_DIR in info_dict:
+            info_list.append((info_dict[LOG_DIR], LOG_DIR))
+        if OUT_DIR in info_dict:
+            info_list.append((info_dict[OUT_DIR], OUT_DIR))
+        if OUT_FILE in info_dict:
+            info_list.append((info_dict[OUT_FILE], OUT_FILE))
+        if LOG_FILE in info_dict:
+            info_list.append((info_dict[LOG_FILE], LOG_FILE))
+        
+        self.logger.debug("  read transfer log info: " + str(info_list))
+         
+        file_list.append (info_list)
+        return 0 # success
+    
+    def archive_files(self, info_dict, archives_list):
+        """ Archives status and verbose message log files. """
+        err_msg = ""
+        try:
+            if LOG_DIR in info_dict:
+                if info_dict[CLIENT] == HRKTORRENT:
+                    if create_archive_dir_logs(info_dict[DL_DIR]) < 0:
+                        err_msg = "Could not archive files from dir " + file_path
+                    else: archives_list.append(info_dict[DL_DIR] + "/*" )
+                else:   
+                    if LOG_FILE in info_dict:
+                        file_path = info_dict[LOG_DIR] + "/" + info_dict[LOG_FILE]
+                        if create_archive_file(file_path) < 0:
+                            err_msg = "File does not exist: " + file_path
+                        else: archives_list.append(file_path)
+
+            if OUT_DIR in info_dict and OUT_FILE in info_dict:
+                file_path = info_dict[OUT_DIR] + "/" + info_dict[OUT_FILE]
+                if create_archive_file(file_path) < 0:
+                    err_msg = "File does not exist: " + file_path
+                else: archives_list.append(file_path)
+            
+            if err_msg != "": 
+                self.logger.error(err_msg)
+                 
+        except Exception, ex:
+            self.logger.error("Exception while archiving files: " + str(ex))
+            raise 
+
+    def read_download_info(self, file_list, create_archive = False, archives_list = []):
+        """ Reads all the contents of the file that stores info about 
+            self.logger files and folders.
+        """
+        if not os.path.exists(SESSIONS_FILE):
+            self.logger.error("  No sessions_file")
+            return -1
+        else:    
+            f = open(SESSIONS_FILE,"r")
+            line = f.readline()
+            size = 0
+            info_dict = {}
+            dicts = [] #list with all info_dict dictionaries
+            try:
+                while line != '':
+                    if size == 0 :
+                        if info_dict != {} and info_dict not in dicts:
+                            if self.add_to_output_msg(file_list, info_dict) < 0:
+                                raise Exception
+                            dicts.append(info_dict)
+                            if create_archive:
+                                self.archive_files(info_dict,archives_list)
+
+                        size = int(line)
+                        info_dict = {}
+                    else:
+                        parts = line.strip().split(':')
+                        if len(parts) != 2:
+                            raise Exception
+                        info_dict[parts[0]] = parts[1]
+                        size = size - 1
+                    line = f.readline()
+               
+                if size == 0 and info_dict != {} and info_dict not in dicts:
+                    if self.add_to_output_msg(file_list, info_dict) < 0:
+                        raise Exception
+                    if create_archive:
+                        self.archive_files(info_dict, archives_list)
+                
+            except Exception, e:
+                self.logger.error("  wrong file type for sessions file\n" + str(e))
+                f.close()
+                return -1
+
+            f.close()
+            return 0
+
+
+    def get_client_status(self, transfer_id, line_parts = []):
+        """
+            Runs a parser for the obtaining the last line from the 
+            torrent transfer's status file (output file).
+            @param transfer_id identifier for the transfer
+            @param line_parts list with line's components
+            @return error message if any
+        """
+
+        if int(transfer_id) not in self.processes_info:
+            return "Invalid transfer id: " + str(transfer_id)
+        status_file_path = self.processes_info[transfer_id][2]
+        client = self.processes_info[transfer_id][0]
+        if client == TRIBLER:
+            parser = TriblerStatusParser(status_file_path)
+        elif client == HRKTORRENT:
+            parser = TriblerStatusParser(status_file_path)
+        else:
+            return "Functionality not supported for client "+ client
+        line_parts.append(parser.parse_last_status_line(""))
+        if line_parts == "":
+            return "Error occured while reading status file " + status_file_path
+        return ""
+
+    def set_linger(self,sock, l_onoff, l_linger):
+        """Sets the SO_LINGER value on a socket."""
+        sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
+                struct.pack('ii', l_onoff, l_linger))
+
+
+    def do_Server(self, ip, port):
+        """ Accepts socket connections and receives messages from commander."""
+
+        try:
+            self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            self.logger.debug( "  host ip = %s, port = %s"%(ip,SERVER_PORT))
+            
+            self.serversocket.bind((ip,SERVER_PORT));
+            self.serversocket.listen(10) #max 10 requests
+            self.set_linger(self.serversocket,1, 0)
+        except Exception, e:
+            self.logger.error("Exception occured while connecting: " + str(e))
+            return
+        while(1):
+            try:
+                self.logger.debug("  accepting connections")
+                (clientsock, address) = self.serversocket.accept();
+                self.logger.debug("  accepted connection from " +str(address))
+                msg = self.recv_pickled_data(clientsock)
+                
+                err_msg = ''
+                response = ''
+                
+                if msg[0] == START_MSG:
+                    bt_client_data = msg[1]
+                    response = self.start_bt_client(bt_client_data) #returns client_pid
+                    if response == self.START_ERR2:
+                        err_msg = "Error occurred while starting client"
+                    elif response == self.START_ERR1:
+                        err_msg = "BitTorrent client " + msg[1][CLIENT] +" not supported."
+                    else:
+                        try:
+                            self.save_download_info(bt_client_data)
+                        except Exception:
+                            err_msg = "Client started but transfer info could not be saved."
+
+                elif msg[0] == STOP_MSG:
+                    client_pid = msg[1]
+                    err_msg = self.stop_bt_client(client_pid)
+            
+                elif msg[0] == GET_OUTPUT:
+                    response = []
+                    if self.read_download_info(response) < 0:
+                        err_msg = "Error encountered while reading file"
+                
+                elif msg[0] == GET_CLIENTS:
+                    response = []
+                    print self.processes_info
+                    try:
+                        for k in self. processes_info.keys():
+                            response.append([self.processes_info[k][0], k, self.processes_info[k][1]]) # client_name, pid, torrent file
+                    except Exception:
+                         err_msg = "Error: could not retrive running clients list"    
+        
+                elif msg[0] == ARCHIVE:
+                    response = []
+                    if len(self.processes_fd) == 0: # ARCHIVE ONLY NO CLIENTS ARE RUNNING
+                        try :
+                            if self.read_download_info([], True, response) < 0: # read file path and create archives
+                                err_msg = "Error: could not archive log files."
+                            else: # erase sessions file
+                                f = open(SESSIONS_FILE, "w")
+                                f.close()
+                        except Exception , ex:
+                            err_msg = "Error: could not archive log files."
+                    else:
+                        err_msg = "Cannot archive logs while clients are running"
+                
+                elif msg[0] == GET_STATUS:
+                    line_parts = []
+                    err_msg = self.get_client_status(msg[1], line_parts)
+                    if len(err_msg) == 0:
+                        response = {}
+                        
+                        print len(line_parts) 
+                        if len(line_parts[0]) >= 7:
+                            response[TIMESTAMP] = line_parts[0][0]
+                            response[NUM_PEERS] = line_parts[0][1]
+                            response[DHT] = line_parts[0][2]
+                            response[DL_SPEED] = line_parts[0][3]
+                            response[UP_SPEED] = line_parts[0][4]
+                            response[DL_SIZE] = line_parts[0][5]
+                            response[ETA] = line_parts[0][6]
+                
+                else:
+                    err_msg = "Error: wrong message type";
+
+                if err_msg != '':
+                    self.send_pickled_data(clientsock,(ERROR_MSG,err_msg))
+                    self.logger.debug(" Sending error message: " + err_msg)
+                else:
+                    self.send_pickled_data(clientsock, (ACK_MSG, response))
+
+                clientsock.close()
+
+            except Exception, ex:
+                self.logger.error("Exception occured: "+ str(ex))
+
+if __name__ == "__main__":
+
+    if len(sys.argv) >= 2:
+       
+        
+        if 'start' == sys.argv[1]:
+            if(len(sys.argv) != 3):
+                print "usage:\n\t %s start <host_ip> " % sys.argv[0]
+                sys.exit(2)
+
+            daemon = MyDaemon(DAEMON_PID_PATH, sys.argv[2], stdout = '/home/p2p/out', stderr = '/home/p2p/err') 
+            daemon.start()
+        elif 'stop' == sys.argv[1]:
+            daemon = MyDaemon(DAEMON_PID_PATH)
+            daemon.stop()
+        elif 'restart' == sys.argv[1]:
+            daemon = MyDaemon(DAEMON_PID_PATH) 
+            daemon.restart()
+        elif 'no' == sys.argv[1]: #no daemon
+            daemon = MyDaemon(DAEMON_PID_PATH, sys.argv[2], stdout = '/home/p2p/out', stderr = '/home/p2p/err') 
+            daemon.do_Server(sys.argv[2],"")
+        else:
+            print "Unknown command"
+            sys.exit(2)
+        sys.exit(0)
+    else:
+        print "usage:\n\t %s start <host_ip> " % sys.argv[0]
+        print "\t%s stop|restart "  % sys.argv[0]
+        sys.exit(2)
+
+
+