ppf/new: Add functions and tests for SessionEntry class in config.
authorMariana Mărășoiu <mariana.marasoiu@gmail.com>
Mon, 29 Aug 2011 10:16:41 +0000 (13:16 +0300)
committerMariana Mărășoiu <mariana.marasoiu@gmail.com>
Mon, 29 Aug 2011 10:16:41 +0000 (13:16 +0300)
ppf/new/config.py
ppf/new/tests/test_config.py

index 5c2ee4e..062585f 100644 (file)
@@ -68,12 +68,12 @@ class SwarmDescription(object):
         return swarm
 
     def get_session_entries(self):
-        """ Return list of SwarmDescription instances. """
+        """ Return list of SessionEntry instances. """
         session_names = self.data.sections()
         session_names.remove('swarm')
         session_list = []
         for session in session_names:
-            entry = SwarmDescription()
+            entry = SessionEntry()
             entry.data = ConfigParser.ConfigParser()
             entry.data.add_section(session)
             session_items = dict(self.data.items(session))
@@ -82,19 +82,33 @@ class SwarmDescription(object):
             session_list.append(entry)
         return session_list
 
-    def get_session(self):
-        """ Return name of the session.
-        If it contains more than one session, return None.
+    def update_session_entry_id(self, session_entry, cs_id):
+        """ Add or modify client session id. """
+        session_name = session_entry.data.sections()[0]
+        self.data.set(session_name, 'client_session_id', str(cs_id))
+        session_entry.data.set(session_name, 'client_session_id', str(cs_id))
 
-        """
-        # TODO: May need rethinking. It may be better if entry is a separate
-        # class or instance of SwarmDescription.
-        session_list = self.data.sections()
-        if 'swarm' in session_list:
-            return None
-        else:
-            cs_data = dict(self.data.items(session_list[0]))
-            cs = storage.ClientSession(
+
+    def get_file_archives(self):
+        """ Return a list containing all archives from swarm. """
+        archives = []
+        for section in self.data.sections():
+            try:
+                archives.append(self.data.get(section, 'log_file'))
+            except ConfigParser.NoOptionError:
+                pass
+        return archives
+
+class SessionEntry(object):
+    def __init__(self):
+        super(SessionEntry, self).__init__()
+
+
+    def get_session(self):
+        """ Return name of the session. """
+        session = self.data.sections()[0]
+        cs_data = dict(self.data.items(session))
+        cs = storage.ClientSession(
                             btclient = cs_data['bittorrent_client'],
                             system_os = cs_data['system_os'],
                             system_os_version = cs_data['system_os_version'],
@@ -111,27 +125,18 @@ class SwarmDescription(object):
                             description = cs_data['description'])
         return cs
 
-    def update_session_entry_id(self, session_entry, cs_id):
-        """ Add or modify client session id. """
-        session_name = session_entry.data.sections()[0]
-        self.data.set(session_name, 'client_session_id', str(cs_id))
-        session_entry.data.set(session_name, 'client_session_id', str(cs_id))
-
     def get_session_id(self):
-        """ Return client session id corresponding to the archive file."""
-        section = self.data.sections()[0]
-        cs_id = self.data.get(section, 'client_session_id')
-        return cs_id
+        """ Return client session id corresponding to the entry.
+        If the session entry hasn't been asigned a session id raise error.
 
-    def get_file_archives(self):
-        """ Return a list containing all archives from swarm. """
-        archives = []
-        for section in self.data.sections():
-            try:
-                archives.append(self.data.get(section, 'log_file'))
-            except ConfigParser.NoOptionError:
-                pass
-        return archives
+        """
+        section = self.data.sections()[0]
+        try:
+            cs_id = self.data.get(section, 'client_session_id')
+            return cs_id
+        except ConfigParser.NoOptionError:
+            logger.debug("No client session id for entry: %s" %section)
+            return None
 
 class AccessConfig(object):
     def __init__(self):
index c33de02..390636a 100644 (file)
@@ -13,9 +13,9 @@ import sys
 import config
 import storage
 
-class SessionDescriptionTest(unittest.TestCase):
+class SwarmDescriptionTest(unittest.TestCase):
     """
-    Test suite for SessionDescription class in config.py.
+    Test suite for SwarmDescription class in config.py.
     """
 
     # Class specific variables. Initialized in setUp, used throughout tests.
@@ -138,14 +138,6 @@ class SessionDescriptionTest(unittest.TestCase):
             self.assertEqual(entry.data.items(section).sort(),
                              list(self.test_data[section]).sort())
 
-    def test_get_session(self):
-        session_entries = self.sd.get_session_entries()
-        entry = session_entries[0]
-        cs = entry.get_session()
-
-        self.assertTrue(type(cs) is storage.ClientSession)
-        self.assertEqual(cs.system_os, 'Linux')
-
     def test_update_session_entry_id(self):
         session_entries = self.sd.get_session_entries()
         entry = session_entries[0]
@@ -164,10 +156,44 @@ class SessionDescriptionTest(unittest.TestCase):
 
         self.assertEqual(archives.sort(),expected_archives.sort())
 
+class SessionEntryTest(unittest.TestCase):
+    """
+    Test suite for SessionEntry class in config.py.
+    """
+
+    # Class specific variables. Initialized in setUp, used throughout tests.
+    se = None
+    sd = None
+    config_file = "config.sample.ini"
+
+    def setUp(self):
+        config_file_path = os.path.join(os.path.dirname(__file__),
+                                        self.config_file)
+        self.sd = config.SwarmDescription()
+        self.sd.load(config_file_path)
+        session_entries = self.sd.get_session_entries()
+
+        self.se = config.SessionEntry()
+        self.se = session_entries[2]
+
+    def test_get_session(self):
+        cs = self.se.get_session()
+
+        self.assertTrue(type(cs) is storage.ClientSession)
+        self.assertEqual(cs.system_os, 'Linux')
+        self.assertEqual(cs.system_ram, '2007')
+
+    def test_get_session_id(self):
+        # Check for no session id before update
+        #self.assertFalse(self.se.get_session_id())
+        test_cs_id = 2
+        self.sd.update_session_entry_id(self.se, test_cs_id)
+        cs_id = self.se.get_session_id()
+        self.assertEqual(str(cs_id), str(test_cs_id))
 
 class AccessConfigTest(unittest.TestCase):
     """
-    Test suite for SessionDescription class in config.py.
+    Test suite for AccessConfig class in config.py.
     """
 
     # Class specific variables. Initialized in setUp, used throughout tests.
@@ -212,44 +238,9 @@ class AccessConfigTest(unittest.TestCase):
 
     def test_get_swarm_writer(self):
         sw = self.ac.get_swarm_writer()
-
-        # Add swarm.
-        s = storage.Swarm(torrent_filename="fedora.torrent", data_size=102400)
-        sw.add_swarm(s)
-
-        # Add client session.
-        cs = storage.ClientSession(
-                        swarm_id=1, btclient="Tribler", system_os="Linux",
-                        system_os_version="2.6.26", system_ram=2048,
-                        system_cpu=3000, public_ip="141.85.224.201",
-                        public_port="50500", ds_limit=300, us_limit=200)
-        sw.add_client_session(cs)
-
-        ts = datetime.datetime.strptime("2010-09-12 08:43:15",
-                                        "%Y-%m-%d %H:%M:%S")
-        # Add status message.
-        sm = storage.StatusMessage(
-                        client_session_id=1, timestamp=ts, num_peers=10,
-                        num_dht_peers=3, download_speed=102, upload_speed=99,
-                        download_size=10213, upload_size=3301)
-        sw.add_status_message(sm)
-
-        # Add peer status message.
-        psm = storage.PeerStatusMessage(
-                        client_session_id=1, timestamp=ts,
-                        peer_ip="141.85.224.202", peer_port="12345",
-                        download_speed=13, upload_speed=98)
-        sw.add_peer_status_message(psm)
-
-        # Add verbose message.
-        vm = storage.VerboseMessage(
-                        client_session_id=1, timestamp=ts,
-                        transfer_direction="send", peer_ip="141.85.224.202",
-                        peer_port="12345", message_type="CHOKE")
-        sw.add_verbose_message(vm)
-
         expected_handlers = 2
         handlers = len(sw.handlers)
+
         self.assertEqual(expected_handlers, handlers)
 
 if __name__ == "__main__":