From: p2p Date: Sun, 19 Sep 2010 09:49:58 +0000 (+0300) Subject: autorun/server: add server pre-implementation (v0) X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=commitdiff_plain;h=2f636049bbd248d0ca9c93ba084994d1fbac2200;p=cs-p2p-next.git autorun/server: add server pre-implementation (v0) --- diff --git a/autorun/server/Server_v0.py b/autorun/server/Server_v0.py new file mode 100644 index 0000000..d621a38 --- /dev/null +++ b/autorun/server/Server_v0.py @@ -0,0 +1,395 @@ +#!/usr/bin/env python + +import sys, os, socket +import os.path +import time +import pickle +import signal +import struct +import logging + +from daemon import Daemon +from Util import * +from Util import SERVER_HOST, SERVER_PORT +from BitTorrentClientRun import * +from TransmissionRun import * +from TriblerRun import * +from HrktorrentRun import * +from TriblerStatusParser import * + +class MyDaemon(Daemon): + """ + Server class + 2010, Adriana Draghici, adriana.draghici@cti.pub.ro + """ + + ip = "" + port = 0 + + processes_fd = {} # keeps lists of file descriptors for each process + processes_info = {} # keeps lists of BT clients name, torrent name and status log name for each process + logger = None + + START_ERR1 = -1 # returned when starting a client, if that client is not supported + START_ERR2 = -2 # an exception occured when creating a client process + + def __init__(self, pidfile, ip='', stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): + Daemon.__init__(self, pidfile, stdin, stdout, stderr) + self.ip = ip + self.port = SERVER_PORT + logging.basicConfig(level = logging.DEBUG) + self.logger = logging.getLogger('autorun.Server') + self.logger.setLevel(logging.DEBUG) + + def set_address(self, ip): + self.ip = ip + + def set_port(self, port): + self.port = port + + def run(self): + self.do_Server(self.ip, self.port) + + def recv_pickled_data(self, clientsock): + msg = '' + while 1: + chunk = clientsock.recv(BUFFER_SIZE) + if not chunk: + break + #raise RuntimeError, "socket connection broken" + msg = msg + chunk + if len(chunk) < BUFFER_SIZE: + break + + dd = pickle.loads(msg) + self.logger.debug(" received message: " + str(dd)) + return dd + + def send_pickled_data (self, clientsock, data): + dumped_data = pickle.dumps(data) + totalsent = 0 + while totalsent < len(dumped_data): + sent = clientsock.send(dumped_data[totalsent:]) + if sent == 0: + raise RuntimeError, "socket connection broken" + totalsent = totalsent + sent + self.logger.debug(" sent message: " + str(data)) + + + def save_download_info(self, data): + """ Stores data about log files in a text file. """ + try: + f = open(SESSIONS_FILE, 'a') + #f.write ("# session " + strftime("%d-%m-%Y %H:%M:%S", localtime())+ "\n") + num_lines = len(data.keys()) + if PORT in data: + num_lines = num_lines -1; + if BASE_DIR in data: + num_lines = num_lines -1; + + f.write(str(num_lines)+"\n") + for k in data.keys(): + if k == PORT or k == BASE_DIR: + continue + f.write(str(k)+":"+str(data[k])+"\n") + + f.close() + except Exception, ex: + self.logger(" Exception while saving transfer info: " + str(ex)) + raise + + + def create_archive_file(file_path): + try: + print "Archive file: ", file_path + + if os.path.isfile(file_path): + # archive file + archive_name = file_path + time.strftime("%d%m%Y_%H%M%S", time.localtime()) + ".zip" + command = "zip " + archive_name + " " + file_path + print command + os.system(command) + # remove file + command = "rm " + file_path + os.system(command) + return 0 + return -1 + except Exception,ex: + raise + + + def create_archive_dir_logs(dir_path): + try: + if not os.path.isfile(dir_path): + # archive all log files + archive_name = dir_path + "/" + "hrk_" + time.strftime("%d%m%Y_%H%M%S", time.localtime()) + ".zip" + command = "zip "+ archive_name + " " + dir_path + "/libtorrent*" + " " + dir_path + "/*log" + print command + os.system(command) + + # remove log files + command = "rm " + dir_path + "/libtorrent*" + " " + dir_path + "/*log" + os.system(command) + return 0 + return -1 + except Exception,ex: + raise + + def add_to_output_msg(self, file_list, info_dict): + """ Constructs a list with log files info - paths, client, metafile + @param file_list - list to which the created logs info list is appended. """ + + if CLIENT not in info_dict and TORRENT not in info_dict: + return -1 + info_list = [info_dict[CLIENT],info_dict[TORRENT]] + if LOG_DIR in info_dict: + info_list.append((info_dict[LOG_DIR], LOG_DIR)) + if OUT_DIR in info_dict: + info_list.append((info_dict[OUT_DIR], OUT_DIR)) + if OUT_FILE in info_dict: + info_list.append((info_dict[OUT_FILE], OUT_FILE)) + if LOG_FILE in info_dict: + info_list.append((info_dict[LOG_FILE], LOG_FILE)) + + self.logger.debug(" read transfer log info: " + str(info_list)) + + file_list.append (info_list) + return 0 # success + + def archive_files(self, info_dict, archives_list): + """ Archives status and verbose message log files. """ + err_msg = "" + try: + if LOG_DIR in info_dict: + if info_dict[CLIENT] == HRKTORRENT: + if create_archive_dir_logs(info_dict[DL_DIR]) < 0: + err_msg = "Could not archive files from dir " + file_path + else: archives_list.append(info_dict[DL_DIR] + "/*" ) + else: + if LOG_FILE in info_dict: + file_path = info_dict[LOG_DIR] + "/" + info_dict[LOG_FILE] + if create_archive_file(file_path) < 0: + err_msg = "File does not exist: " + file_path + else: archives_list.append(file_path) + + if OUT_DIR in info_dict and OUT_FILE in info_dict: + file_path = info_dict[OUT_DIR] + "/" + info_dict[OUT_FILE] + if create_archive_file(file_path) < 0: + err_msg = "File does not exist: " + file_path + else: archives_list.append(file_path) + + if err_msg != "": + self.logger.error(err_msg) + + except Exception, ex: + self.logger.error("Exception while archiving files: " + str(ex)) + raise + + def read_download_info(self, file_list, create_archive = False, archives_list = []): + """ Reads all the contents of the file that stores info about + self.logger files and folders. + """ + if not os.path.exists(SESSIONS_FILE): + self.logger.error(" No sessions_file") + return -1 + else: + f = open(SESSIONS_FILE,"r") + line = f.readline() + size = 0 + info_dict = {} + dicts = [] #list with all info_dict dictionaries + try: + while line != '': + if size == 0 : + if info_dict != {} and info_dict not in dicts: + if self.add_to_output_msg(file_list, info_dict) < 0: + raise Exception + dicts.append(info_dict) + if create_archive: + self.archive_files(info_dict,archives_list) + + size = int(line) + info_dict = {} + else: + parts = line.strip().split(':') + if len(parts) != 2: + raise Exception + info_dict[parts[0]] = parts[1] + size = size - 1 + line = f.readline() + + if size == 0 and info_dict != {} and info_dict not in dicts: + if self.add_to_output_msg(file_list, info_dict) < 0: + raise Exception + if create_archive: + self.archive_files(info_dict, archives_list) + + except Exception, e: + self.logger.error(" wrong file type for sessions file\n" + str(e)) + f.close() + return -1 + + f.close() + return 0 + + + def get_client_status(self, transfer_id, line_parts = []): + """ + Runs a parser for the obtaining the last line from the + torrent transfer's status file (output file). + @param transfer_id identifier for the transfer + @param line_parts list with line's components + @return error message if any + """ + + if int(transfer_id) not in self.processes_info: + return "Invalid transfer id: " + str(transfer_id) + status_file_path = self.processes_info[transfer_id][2] + client = self.processes_info[transfer_id][0] + if client == TRIBLER: + parser = TriblerStatusParser(status_file_path) + elif client == HRKTORRENT: + parser = TriblerStatusParser(status_file_path) + else: + return "Functionality not supported for client "+ client + line_parts.append(parser.parse_last_status_line("")) + if line_parts == "": + return "Error occured while reading status file " + status_file_path + return "" + + def set_linger(self,sock, l_onoff, l_linger): + """Sets the SO_LINGER value on a socket.""" + sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, + struct.pack('ii', l_onoff, l_linger)) + + + def do_Server(self, ip, port): + """ Accepts socket connections and receives messages from commander.""" + + try: + self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.logger.debug( " host ip = %s, port = %s"%(ip,SERVER_PORT)) + + self.serversocket.bind((ip,SERVER_PORT)); + self.serversocket.listen(10) #max 10 requests + self.set_linger(self.serversocket,1, 0) + except Exception, e: + self.logger.error("Exception occured while connecting: " + str(e)) + return + while(1): + try: + self.logger.debug(" accepting connections") + (clientsock, address) = self.serversocket.accept(); + self.logger.debug(" accepted connection from " +str(address)) + msg = self.recv_pickled_data(clientsock) + + err_msg = '' + response = '' + + if msg[0] == START_MSG: + bt_client_data = msg[1] + response = self.start_bt_client(bt_client_data) #returns client_pid + if response == self.START_ERR2: + err_msg = "Error occurred while starting client" + elif response == self.START_ERR1: + err_msg = "BitTorrent client " + msg[1][CLIENT] +" not supported." + else: + try: + self.save_download_info(bt_client_data) + except Exception: + err_msg = "Client started but transfer info could not be saved." + + elif msg[0] == STOP_MSG: + client_pid = msg[1] + err_msg = self.stop_bt_client(client_pid) + + elif msg[0] == GET_OUTPUT: + response = [] + if self.read_download_info(response) < 0: + err_msg = "Error encountered while reading file" + + elif msg[0] == GET_CLIENTS: + response = [] + print self.processes_info + try: + for k in self. processes_info.keys(): + response.append([self.processes_info[k][0], k, self.processes_info[k][1]]) # client_name, pid, torrent file + except Exception: + err_msg = "Error: could not retrive running clients list" + + elif msg[0] == ARCHIVE: + response = [] + if len(self.processes_fd) == 0: # ARCHIVE ONLY NO CLIENTS ARE RUNNING + try : + if self.read_download_info([], True, response) < 0: # read file path and create archives + err_msg = "Error: could not archive log files." + else: # erase sessions file + f = open(SESSIONS_FILE, "w") + f.close() + except Exception , ex: + err_msg = "Error: could not archive log files." + else: + err_msg = "Cannot archive logs while clients are running" + + elif msg[0] == GET_STATUS: + line_parts = [] + err_msg = self.get_client_status(msg[1], line_parts) + if len(err_msg) == 0: + response = {} + + print len(line_parts) + if len(line_parts[0]) >= 7: + response[TIMESTAMP] = line_parts[0][0] + response[NUM_PEERS] = line_parts[0][1] + response[DHT] = line_parts[0][2] + response[DL_SPEED] = line_parts[0][3] + response[UP_SPEED] = line_parts[0][4] + response[DL_SIZE] = line_parts[0][5] + response[ETA] = line_parts[0][6] + + else: + err_msg = "Error: wrong message type"; + + if err_msg != '': + self.send_pickled_data(clientsock,(ERROR_MSG,err_msg)) + self.logger.debug(" Sending error message: " + err_msg) + else: + self.send_pickled_data(clientsock, (ACK_MSG, response)) + + clientsock.close() + + except Exception, ex: + self.logger.error("Exception occured: "+ str(ex)) + +if __name__ == "__main__": + + if len(sys.argv) >= 2: + + + if 'start' == sys.argv[1]: + if(len(sys.argv) != 3): + print "usage:\n\t %s start " % sys.argv[0] + sys.exit(2) + + daemon = MyDaemon(DAEMON_PID_PATH, sys.argv[2], stdout = '/home/p2p/out', stderr = '/home/p2p/err') + daemon.start() + elif 'stop' == sys.argv[1]: + daemon = MyDaemon(DAEMON_PID_PATH) + daemon.stop() + elif 'restart' == sys.argv[1]: + daemon = MyDaemon(DAEMON_PID_PATH) + daemon.restart() + elif 'no' == sys.argv[1]: #no daemon + daemon = MyDaemon(DAEMON_PID_PATH, sys.argv[2], stdout = '/home/p2p/out', stderr = '/home/p2p/err') + daemon.do_Server(sys.argv[2],"") + else: + print "Unknown command" + sys.exit(2) + sys.exit(0) + else: + print "usage:\n\t %s start " % sys.argv[0] + print "\t%s stop|restart " % sys.argv[0] + sys.exit(2) + + +