From: Mariana Mărășoiu Date: Mon, 29 Aug 2011 10:16:41 +0000 (+0300) Subject: ppf/new: Add functions and tests for SessionEntry class in config. X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=commitdiff_plain;h=d169e3d9a424e5a5f69e13e93db9e097c53947eb;p=cs-p2p-next.git ppf/new: Add functions and tests for SessionEntry class in config. --- diff --git a/ppf/new/config.py b/ppf/new/config.py index 5c2ee4e..062585f 100644 --- a/ppf/new/config.py +++ b/ppf/new/config.py @@ -68,12 +68,12 @@ class SwarmDescription(object): return swarm def get_session_entries(self): - """ Return list of SwarmDescription instances. """ + """ Return list of SessionEntry instances. """ session_names = self.data.sections() session_names.remove('swarm') session_list = [] for session in session_names: - entry = SwarmDescription() + entry = SessionEntry() entry.data = ConfigParser.ConfigParser() entry.data.add_section(session) session_items = dict(self.data.items(session)) @@ -82,19 +82,33 @@ class SwarmDescription(object): session_list.append(entry) return session_list - def get_session(self): - """ Return name of the session. - If it contains more than one session, return None. + def update_session_entry_id(self, session_entry, cs_id): + """ Add or modify client session id. """ + session_name = session_entry.data.sections()[0] + self.data.set(session_name, 'client_session_id', str(cs_id)) + session_entry.data.set(session_name, 'client_session_id', str(cs_id)) - """ - # TODO: May need rethinking. It may be better if entry is a separate - # class or instance of SwarmDescription. - session_list = self.data.sections() - if 'swarm' in session_list: - return None - else: - cs_data = dict(self.data.items(session_list[0])) - cs = storage.ClientSession( + + def get_file_archives(self): + """ Return a list containing all archives from swarm. """ + archives = [] + for section in self.data.sections(): + try: + archives.append(self.data.get(section, 'log_file')) + except ConfigParser.NoOptionError: + pass + return archives + +class SessionEntry(object): + def __init__(self): + super(SessionEntry, self).__init__() + + + def get_session(self): + """ Return name of the session. """ + session = self.data.sections()[0] + cs_data = dict(self.data.items(session)) + cs = storage.ClientSession( btclient = cs_data['bittorrent_client'], system_os = cs_data['system_os'], system_os_version = cs_data['system_os_version'], @@ -111,27 +125,18 @@ class SwarmDescription(object): description = cs_data['description']) return cs - def update_session_entry_id(self, session_entry, cs_id): - """ Add or modify client session id. """ - session_name = session_entry.data.sections()[0] - self.data.set(session_name, 'client_session_id', str(cs_id)) - session_entry.data.set(session_name, 'client_session_id', str(cs_id)) - def get_session_id(self): - """ Return client session id corresponding to the archive file.""" - section = self.data.sections()[0] - cs_id = self.data.get(section, 'client_session_id') - return cs_id + """ Return client session id corresponding to the entry. + If the session entry hasn't been asigned a session id raise error. - def get_file_archives(self): - """ Return a list containing all archives from swarm. """ - archives = [] - for section in self.data.sections(): - try: - archives.append(self.data.get(section, 'log_file')) - except ConfigParser.NoOptionError: - pass - return archives + """ + section = self.data.sections()[0] + try: + cs_id = self.data.get(section, 'client_session_id') + return cs_id + except ConfigParser.NoOptionError: + logger.debug("No client session id for entry: %s" %section) + return None class AccessConfig(object): def __init__(self): diff --git a/ppf/new/tests/test_config.py b/ppf/new/tests/test_config.py index c33de02..390636a 100644 --- a/ppf/new/tests/test_config.py +++ b/ppf/new/tests/test_config.py @@ -13,9 +13,9 @@ import sys import config import storage -class SessionDescriptionTest(unittest.TestCase): +class SwarmDescriptionTest(unittest.TestCase): """ - Test suite for SessionDescription class in config.py. + Test suite for SwarmDescription class in config.py. """ # Class specific variables. Initialized in setUp, used throughout tests. @@ -138,14 +138,6 @@ class SessionDescriptionTest(unittest.TestCase): self.assertEqual(entry.data.items(section).sort(), list(self.test_data[section]).sort()) - def test_get_session(self): - session_entries = self.sd.get_session_entries() - entry = session_entries[0] - cs = entry.get_session() - - self.assertTrue(type(cs) is storage.ClientSession) - self.assertEqual(cs.system_os, 'Linux') - def test_update_session_entry_id(self): session_entries = self.sd.get_session_entries() entry = session_entries[0] @@ -164,10 +156,44 @@ class SessionDescriptionTest(unittest.TestCase): self.assertEqual(archives.sort(),expected_archives.sort()) +class SessionEntryTest(unittest.TestCase): + """ + Test suite for SessionEntry class in config.py. + """ + + # Class specific variables. Initialized in setUp, used throughout tests. + se = None + sd = None + config_file = "config.sample.ini" + + def setUp(self): + config_file_path = os.path.join(os.path.dirname(__file__), + self.config_file) + self.sd = config.SwarmDescription() + self.sd.load(config_file_path) + session_entries = self.sd.get_session_entries() + + self.se = config.SessionEntry() + self.se = session_entries[2] + + def test_get_session(self): + cs = self.se.get_session() + + self.assertTrue(type(cs) is storage.ClientSession) + self.assertEqual(cs.system_os, 'Linux') + self.assertEqual(cs.system_ram, '2007') + + def test_get_session_id(self): + # Check for no session id before update + #self.assertFalse(self.se.get_session_id()) + test_cs_id = 2 + self.sd.update_session_entry_id(self.se, test_cs_id) + cs_id = self.se.get_session_id() + self.assertEqual(str(cs_id), str(test_cs_id)) class AccessConfigTest(unittest.TestCase): """ - Test suite for SessionDescription class in config.py. + Test suite for AccessConfig class in config.py. """ # Class specific variables. Initialized in setUp, used throughout tests. @@ -212,44 +238,9 @@ class AccessConfigTest(unittest.TestCase): def test_get_swarm_writer(self): sw = self.ac.get_swarm_writer() - - # Add swarm. - s = storage.Swarm(torrent_filename="fedora.torrent", data_size=102400) - sw.add_swarm(s) - - # Add client session. - cs = storage.ClientSession( - swarm_id=1, btclient="Tribler", system_os="Linux", - system_os_version="2.6.26", system_ram=2048, - system_cpu=3000, public_ip="141.85.224.201", - public_port="50500", ds_limit=300, us_limit=200) - sw.add_client_session(cs) - - ts = datetime.datetime.strptime("2010-09-12 08:43:15", - "%Y-%m-%d %H:%M:%S") - # Add status message. - sm = storage.StatusMessage( - client_session_id=1, timestamp=ts, num_peers=10, - num_dht_peers=3, download_speed=102, upload_speed=99, - download_size=10213, upload_size=3301) - sw.add_status_message(sm) - - # Add peer status message. - psm = storage.PeerStatusMessage( - client_session_id=1, timestamp=ts, - peer_ip="141.85.224.202", peer_port="12345", - download_speed=13, upload_speed=98) - sw.add_peer_status_message(psm) - - # Add verbose message. - vm = storage.VerboseMessage( - client_session_id=1, timestamp=ts, - transfer_direction="send", peer_ip="141.85.224.202", - peer_port="12345", message_type="CHOKE") - sw.add_verbose_message(vm) - expected_handlers = 2 handlers = len(sw.handlers) + self.assertEqual(expected_handlers, handlers) if __name__ == "__main__":