--- /dev/null
+#!/usr/bin/env python
+
+import sys, os, socket
+import os.path
+import time
+import pickle
+import signal
+import struct
+import logging
+
+from daemon import Daemon
+from Util import *
+from Util import SERVER_HOST, SERVER_PORT
+from BitTorrentClientRun import *
+from TransmissionRun import *
+from TriblerRun import *
+from HrktorrentRun import *
+from TriblerStatusParser import *
+
+class MyDaemon(Daemon):
+ """
+ Server class
+ 2010, Adriana Draghici, adriana.draghici@cti.pub.ro
+ """
+
+ ip = ""
+ port = 0
+
+ processes_fd = {} # keeps lists of file descriptors for each process
+ processes_info = {} # keeps lists of BT clients name, torrent name and status log name for each process
+ logger = None
+
+ START_ERR1 = -1 # returned when starting a client, if that client is not supported
+ START_ERR2 = -2 # an exception occured when creating a client process
+
+ def __init__(self, pidfile, ip='', stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
+ Daemon.__init__(self, pidfile, stdin, stdout, stderr)
+ self.ip = ip
+ self.port = SERVER_PORT
+ logging.basicConfig(level = logging.DEBUG)
+ self.logger = logging.getLogger('autorun.Server')
+ self.logger.setLevel(logging.DEBUG)
+
+ def set_address(self, ip):
+ self.ip = ip
+
+ def set_port(self, port):
+ self.port = port
+
+ def run(self):
+ self.do_Server(self.ip, self.port)
+
+ def recv_pickled_data(self, clientsock):
+ msg = ''
+ while 1:
+ chunk = clientsock.recv(BUFFER_SIZE)
+ if not chunk:
+ break
+ #raise RuntimeError, "socket connection broken"
+ msg = msg + chunk
+ if len(chunk) < BUFFER_SIZE:
+ break
+
+ dd = pickle.loads(msg)
+ self.logger.debug(" received message: " + str(dd))
+ return dd
+
+ def send_pickled_data (self, clientsock, data):
+ dumped_data = pickle.dumps(data)
+ totalsent = 0
+ while totalsent < len(dumped_data):
+ sent = clientsock.send(dumped_data[totalsent:])
+ if sent == 0:
+ raise RuntimeError, "socket connection broken"
+ totalsent = totalsent + sent
+ self.logger.debug(" sent message: " + str(data))
+
+
+ def save_download_info(self, data):
+ """ Stores data about log files in a text file. """
+ try:
+ f = open(SESSIONS_FILE, 'a')
+ #f.write ("# session " + strftime("%d-%m-%Y %H:%M:%S", localtime())+ "\n")
+ num_lines = len(data.keys())
+ if PORT in data:
+ num_lines = num_lines -1;
+ if BASE_DIR in data:
+ num_lines = num_lines -1;
+
+ f.write(str(num_lines)+"\n")
+ for k in data.keys():
+ if k == PORT or k == BASE_DIR:
+ continue
+ f.write(str(k)+":"+str(data[k])+"\n")
+
+ f.close()
+ except Exception, ex:
+ self.logger(" Exception while saving transfer info: " + str(ex))
+ raise
+
+
+ def create_archive_file(file_path):
+ try:
+ print "Archive file: ", file_path
+
+ if os.path.isfile(file_path):
+ # archive file
+ archive_name = file_path + time.strftime("%d%m%Y_%H%M%S", time.localtime()) + ".zip"
+ command = "zip " + archive_name + " " + file_path
+ print command
+ os.system(command)
+ # remove file
+ command = "rm " + file_path
+ os.system(command)
+ return 0
+ return -1
+ except Exception,ex:
+ raise
+
+
+ def create_archive_dir_logs(dir_path):
+ try:
+ if not os.path.isfile(dir_path):
+ # archive all log files
+ archive_name = dir_path + "/" + "hrk_" + time.strftime("%d%m%Y_%H%M%S", time.localtime()) + ".zip"
+ command = "zip "+ archive_name + " " + dir_path + "/libtorrent*" + " " + dir_path + "/*log"
+ print command
+ os.system(command)
+
+ # remove log files
+ command = "rm " + dir_path + "/libtorrent*" + " " + dir_path + "/*log"
+ os.system(command)
+ return 0
+ return -1
+ except Exception,ex:
+ raise
+
+ def add_to_output_msg(self, file_list, info_dict):
+ """ Constructs a list with log files info - paths, client, metafile
+ @param file_list - list to which the created logs info list is appended. """
+
+ if CLIENT not in info_dict and TORRENT not in info_dict:
+ return -1
+ info_list = [info_dict[CLIENT],info_dict[TORRENT]]
+ if LOG_DIR in info_dict:
+ info_list.append((info_dict[LOG_DIR], LOG_DIR))
+ if OUT_DIR in info_dict:
+ info_list.append((info_dict[OUT_DIR], OUT_DIR))
+ if OUT_FILE in info_dict:
+ info_list.append((info_dict[OUT_FILE], OUT_FILE))
+ if LOG_FILE in info_dict:
+ info_list.append((info_dict[LOG_FILE], LOG_FILE))
+
+ self.logger.debug(" read transfer log info: " + str(info_list))
+
+ file_list.append (info_list)
+ return 0 # success
+
+ def archive_files(self, info_dict, archives_list):
+ """ Archives status and verbose message log files. """
+ err_msg = ""
+ try:
+ if LOG_DIR in info_dict:
+ if info_dict[CLIENT] == HRKTORRENT:
+ if create_archive_dir_logs(info_dict[DL_DIR]) < 0:
+ err_msg = "Could not archive files from dir " + file_path
+ else: archives_list.append(info_dict[DL_DIR] + "/*" )
+ else:
+ if LOG_FILE in info_dict:
+ file_path = info_dict[LOG_DIR] + "/" + info_dict[LOG_FILE]
+ if create_archive_file(file_path) < 0:
+ err_msg = "File does not exist: " + file_path
+ else: archives_list.append(file_path)
+
+ if OUT_DIR in info_dict and OUT_FILE in info_dict:
+ file_path = info_dict[OUT_DIR] + "/" + info_dict[OUT_FILE]
+ if create_archive_file(file_path) < 0:
+ err_msg = "File does not exist: " + file_path
+ else: archives_list.append(file_path)
+
+ if err_msg != "":
+ self.logger.error(err_msg)
+
+ except Exception, ex:
+ self.logger.error("Exception while archiving files: " + str(ex))
+ raise
+
+ def read_download_info(self, file_list, create_archive = False, archives_list = []):
+ """ Reads all the contents of the file that stores info about
+ self.logger files and folders.
+ """
+ if not os.path.exists(SESSIONS_FILE):
+ self.logger.error(" No sessions_file")
+ return -1
+ else:
+ f = open(SESSIONS_FILE,"r")
+ line = f.readline()
+ size = 0
+ info_dict = {}
+ dicts = [] #list with all info_dict dictionaries
+ try:
+ while line != '':
+ if size == 0 :
+ if info_dict != {} and info_dict not in dicts:
+ if self.add_to_output_msg(file_list, info_dict) < 0:
+ raise Exception
+ dicts.append(info_dict)
+ if create_archive:
+ self.archive_files(info_dict,archives_list)
+
+ size = int(line)
+ info_dict = {}
+ else:
+ parts = line.strip().split(':')
+ if len(parts) != 2:
+ raise Exception
+ info_dict[parts[0]] = parts[1]
+ size = size - 1
+ line = f.readline()
+
+ if size == 0 and info_dict != {} and info_dict not in dicts:
+ if self.add_to_output_msg(file_list, info_dict) < 0:
+ raise Exception
+ if create_archive:
+ self.archive_files(info_dict, archives_list)
+
+ except Exception, e:
+ self.logger.error(" wrong file type for sessions file\n" + str(e))
+ f.close()
+ return -1
+
+ f.close()
+ return 0
+
+
+ def get_client_status(self, transfer_id, line_parts = []):
+ """
+ Runs a parser for the obtaining the last line from the
+ torrent transfer's status file (output file).
+ @param transfer_id identifier for the transfer
+ @param line_parts list with line's components
+ @return error message if any
+ """
+
+ if int(transfer_id) not in self.processes_info:
+ return "Invalid transfer id: " + str(transfer_id)
+ status_file_path = self.processes_info[transfer_id][2]
+ client = self.processes_info[transfer_id][0]
+ if client == TRIBLER:
+ parser = TriblerStatusParser(status_file_path)
+ elif client == HRKTORRENT:
+ parser = TriblerStatusParser(status_file_path)
+ else:
+ return "Functionality not supported for client "+ client
+ line_parts.append(parser.parse_last_status_line(""))
+ if line_parts == "":
+ return "Error occured while reading status file " + status_file_path
+ return ""
+
+ def set_linger(self,sock, l_onoff, l_linger):
+ """Sets the SO_LINGER value on a socket."""
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
+ struct.pack('ii', l_onoff, l_linger))
+
+
+ def do_Server(self, ip, port):
+ """ Accepts socket connections and receives messages from commander."""
+
+ try:
+ self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.logger.debug( " host ip = %s, port = %s"%(ip,SERVER_PORT))
+
+ self.serversocket.bind((ip,SERVER_PORT));
+ self.serversocket.listen(10) #max 10 requests
+ self.set_linger(self.serversocket,1, 0)
+ except Exception, e:
+ self.logger.error("Exception occured while connecting: " + str(e))
+ return
+ while(1):
+ try:
+ self.logger.debug(" accepting connections")
+ (clientsock, address) = self.serversocket.accept();
+ self.logger.debug(" accepted connection from " +str(address))
+ msg = self.recv_pickled_data(clientsock)
+
+ err_msg = ''
+ response = ''
+
+ if msg[0] == START_MSG:
+ bt_client_data = msg[1]
+ response = self.start_bt_client(bt_client_data) #returns client_pid
+ if response == self.START_ERR2:
+ err_msg = "Error occurred while starting client"
+ elif response == self.START_ERR1:
+ err_msg = "BitTorrent client " + msg[1][CLIENT] +" not supported."
+ else:
+ try:
+ self.save_download_info(bt_client_data)
+ except Exception:
+ err_msg = "Client started but transfer info could not be saved."
+
+ elif msg[0] == STOP_MSG:
+ client_pid = msg[1]
+ err_msg = self.stop_bt_client(client_pid)
+
+ elif msg[0] == GET_OUTPUT:
+ response = []
+ if self.read_download_info(response) < 0:
+ err_msg = "Error encountered while reading file"
+
+ elif msg[0] == GET_CLIENTS:
+ response = []
+ print self.processes_info
+ try:
+ for k in self. processes_info.keys():
+ response.append([self.processes_info[k][0], k, self.processes_info[k][1]]) # client_name, pid, torrent file
+ except Exception:
+ err_msg = "Error: could not retrive running clients list"
+
+ elif msg[0] == ARCHIVE:
+ response = []
+ if len(self.processes_fd) == 0: # ARCHIVE ONLY NO CLIENTS ARE RUNNING
+ try :
+ if self.read_download_info([], True, response) < 0: # read file path and create archives
+ err_msg = "Error: could not archive log files."
+ else: # erase sessions file
+ f = open(SESSIONS_FILE, "w")
+ f.close()
+ except Exception , ex:
+ err_msg = "Error: could not archive log files."
+ else:
+ err_msg = "Cannot archive logs while clients are running"
+
+ elif msg[0] == GET_STATUS:
+ line_parts = []
+ err_msg = self.get_client_status(msg[1], line_parts)
+ if len(err_msg) == 0:
+ response = {}
+
+ print len(line_parts)
+ if len(line_parts[0]) >= 7:
+ response[TIMESTAMP] = line_parts[0][0]
+ response[NUM_PEERS] = line_parts[0][1]
+ response[DHT] = line_parts[0][2]
+ response[DL_SPEED] = line_parts[0][3]
+ response[UP_SPEED] = line_parts[0][4]
+ response[DL_SIZE] = line_parts[0][5]
+ response[ETA] = line_parts[0][6]
+
+ else:
+ err_msg = "Error: wrong message type";
+
+ if err_msg != '':
+ self.send_pickled_data(clientsock,(ERROR_MSG,err_msg))
+ self.logger.debug(" Sending error message: " + err_msg)
+ else:
+ self.send_pickled_data(clientsock, (ACK_MSG, response))
+
+ clientsock.close()
+
+ except Exception, ex:
+ self.logger.error("Exception occured: "+ str(ex))
+
+if __name__ == "__main__":
+
+ if len(sys.argv) >= 2:
+
+
+ if 'start' == sys.argv[1]:
+ if(len(sys.argv) != 3):
+ print "usage:\n\t %s start <host_ip> " % sys.argv[0]
+ sys.exit(2)
+
+ daemon = MyDaemon(DAEMON_PID_PATH, sys.argv[2], stdout = '/home/p2p/out', stderr = '/home/p2p/err')
+ daemon.start()
+ elif 'stop' == sys.argv[1]:
+ daemon = MyDaemon(DAEMON_PID_PATH)
+ daemon.stop()
+ elif 'restart' == sys.argv[1]:
+ daemon = MyDaemon(DAEMON_PID_PATH)
+ daemon.restart()
+ elif 'no' == sys.argv[1]: #no daemon
+ daemon = MyDaemon(DAEMON_PID_PATH, sys.argv[2], stdout = '/home/p2p/out', stderr = '/home/p2p/err')
+ daemon.do_Server(sys.argv[2],"")
+ else:
+ print "Unknown command"
+ sys.exit(2)
+ sys.exit(0)
+ else:
+ print "usage:\n\t %s start <host_ip> " % sys.argv[0]
+ print "\t%s stop|restart " % sys.argv[0]
+ sys.exit(2)
+
+
+