ppf/new: Update code according to PEP 8.
[cs-p2p-next.git] / ppf / new / tests / test_storage.py
index 33fe26b..66c15de 100644 (file)
@@ -51,9 +51,8 @@ def create_numeric_files(path):
     f.close()
 
 class StorageTest(unittest.TestCase):
-    """
-    Test suite for stand alone functions in storage.py.
-    """
+
+    """Test suite for stand alone functions in storage.py."""
 
     # Class specific variables. Initialized in setUp, used throughout tests.
     path = "/tmp/ttfa-test"
@@ -78,7 +77,8 @@ class StorageTest(unittest.TestCase):
         id = storage.find_last_numeric_subfolder(self.path)
         self.assertEqual(id, None)
 
-    def test_find_last_numeric_subfolder_only_numeric_subfolders_sequential(self):
+    def test_find_last_numeric_subfolder_only_numeric_subfolders_sequential(
+            self):
         """
         Test return of correct id when sequential numeric named
         subfolders only are present (1, 2, 3, ...).
@@ -88,7 +88,8 @@ class StorageTest(unittest.TestCase):
         id = storage.find_last_numeric_subfolder(self.path)
         self.assertEqual(id, 5)
 
-    def test_find_last_numeric_subfolder_only_numeric_subfolders_nonsequential(self):
+    def test_find_last_numeric_subfolder_only_numeric_subfolders_nonsequential(
+            self):
         """
         Test return of correct id when nonsequential numeric named
         subfolders only are present (32, 5, 423, ...).
@@ -135,9 +136,8 @@ class StorageTest(unittest.TestCase):
 
 
 class TreeTextFileAccessTest(unittest.TestCase):
-    """
-    Test suite for TreeTextFileAccess class in storage.py.
-    """
+
+    """Test suite for TreeTextFileAccess class in storage.py."""
 
     # Class specific variables. Initialized in setUp, used throughout tests.
     path = "/tmp/ttfa-test"
@@ -154,12 +154,12 @@ class TreeTextFileAccessTest(unittest.TestCase):
         # Create instance of class and add swarm.
         a = storage.TreeTextFileAccess(self.path)
         s = storage.Swarm(torrent_filename="fedora.torrent",
-                data_size=102400)
+                          data_size=102400)
         a.add_swarm(s)
 
         expected_swarm_subfolder = 1
         expected_swarm_path = os.path.join(self.path,
-                str(expected_swarm_subfolder))
+                                           str(expected_swarm_subfolder))
         expected_swarm_config = os.path.join(expected_swarm_path, "swarm.conf")
 
         self.assertEqual(os.path.isdir(expected_swarm_path), True)
@@ -170,24 +170,27 @@ class TreeTextFileAccessTest(unittest.TestCase):
 
         # Add swarm.
         s = storage.Swarm(torrent_filename="fedora.torrent",
-                data_size=102400)
+                          data_size=102400)
         a.add_swarm(s)
 
         # Add client session.
         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
-                system_os="Linux", system_os_version="2.6.26",
-                system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
-                public_port="50500", ds_limit=300, us_limit=200)
+                                   system_os="Linux",
+                                   system_os_version="2.6.26",
+                                   system_ram=2048, system_cpu=3000,
+                                   public_ip="141.85.224.201",
+                                   public_port="50500", ds_limit=300,
+                                   us_limit=200)
         a.add_client_session(cs)
 
         expected_swarm_subfolder = 1
         expected_session_subfolder = 1
         expected_swarm_path = os.path.join(self.path,
-                str(expected_swarm_subfolder))
+                                           str(expected_swarm_subfolder))
         expected_session_path = os.path.join(expected_swarm_path,
-                str(expected_session_subfolder))
+                                             str(expected_session_subfolder))
         expected_session_config = os.path.join(expected_session_path,
-                "client_session.conf")
+                                               "client_session.conf")
 
         self.assertEqual(os.path.isdir(expected_session_path), True)
         self.assertEqual(os.path.isfile(expected_session_config), True)
@@ -197,30 +200,34 @@ class TreeTextFileAccessTest(unittest.TestCase):
 
         # Add swarm.
         s = storage.Swarm(torrent_filename="fedora.torrent",
-                data_size=102400)
+                          data_size=102400)
         a.add_swarm(s)
 
         # Add client session.
         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
-                system_os="Linux", system_os_version="2.6.26",
-                system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
-                public_port="50500", ds_limit=300, us_limit=200)
+                                   system_os="Linux",
+                                   system_os_version="2.6.26",
+                                   system_ram=2048, system_cpu=3000,
+                                   public_ip="141.85.224.201",
+                                   public_port="50500", ds_limit=300,
+                                   us_limit=200)
         a.add_client_session(cs)
 
         # Add peer status message.
         msg = storage.PeerStatusMessage(swarm_id=1, client_session_id=1,
-                peer_ip="141.85.224.202", peer_port="12345",
-                download_speed=13, upload_speed=98)
+                                        peer_ip="141.85.224.202",
+                                        peer_port="12345", download_speed=13,
+                                        upload_speed=98)
         a.add_peer_status_message(msg)
 
         expected_swarm_subfolder = 1
         expected_session_subfolder = 1
         expected_swarm_path = os.path.join(self.path,
-                str(expected_swarm_subfolder))
+                                           str(expected_swarm_subfolder))
         expected_session_path = os.path.join(expected_swarm_path,
-                str(expected_session_subfolder))
+                                             str(expected_session_subfolder))
         expected_message_file = os.path.join(expected_session_path,
-                "peer_status.txt")
+                                             "peer_status.txt")
 
         self.assertEqual(os.path.isfile(expected_message_file), True)
 
@@ -232,30 +239,34 @@ class TreeTextFileAccessTest(unittest.TestCase):
 
         # Add swarm.
         s = storage.Swarm(torrent_filename="fedora.torrent",
-                data_size=102400)
+                          data_size=102400)
         a.add_swarm(s)
 
         # Add client session.
         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
-                system_os="Linux", system_os_version="2.6.26",
-                system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
-                public_port="50500", ds_limit=300, us_limit=200)
+                                   system_os="Linux",
+                                   system_os_version="2.6.26",
+                                   system_ram=2048, system_cpu=3000,
+                                   public_ip="141.85.224.201",
+                                   public_port="50500", ds_limit=300,
+                                   us_limit=200)
         a.add_client_session(cs)
 
         # Add status message.
         msg = storage.StatusMessage(swarm_id=1, client_session_id=1,
-                num_peers=10, num_dht_peers=3, download_speed=102,
-                upload_speed=99, download_size=10213, upload_size=3301)
+                                    num_peers=10, num_dht_peers=3,
+                                    download_speed=102, upload_speed=99,
+                                    download_size=10213, upload_size=3301)
         a.add_status_message(msg)
 
         expected_swarm_subfolder = 1
         expected_session_subfolder = 1
         expected_swarm_path = os.path.join(self.path,
-                str(expected_swarm_subfolder))
+                                           str(expected_swarm_subfolder))
         expected_session_path = os.path.join(expected_swarm_path,
-                str(expected_session_subfolder))
+                                             str(expected_session_subfolder))
         expected_message_file = os.path.join(expected_session_path,
-                "status.txt")
+                                             "status.txt")
 
         self.assertEqual(os.path.isfile(expected_message_file), True)
 
@@ -267,40 +278,44 @@ class TreeTextFileAccessTest(unittest.TestCase):
 
         # Add swarm.
         s = storage.Swarm(torrent_filename="fedora.torrent",
-                data_size=102400)
+                          data_size=102400)
         a.add_swarm(s)
 
         # Add client session.
         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
-                system_os="Linux", system_os_version="2.6.26",
-                system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
-                public_port="50500", ds_limit=300, us_limit=200)
+                                   system_os="Linux",
+                                   system_os_version="2.6.26",
+                                   system_ram=2048, system_cpu=3000,
+                                   public_ip="141.85.224.201",
+                                   public_port="50500", ds_limit=300,
+                                   us_limit=200)
         a.add_client_session(cs)
 
         # Add verbose message.
         msg = storage.VerboseMessage(swarm_id=1, client_session_id=1,
-            transfer_direction="send", peer_ip="141.85.224.202",
-            peer_port="12345", message_type="CHOKE")
+                                     transfer_direction="send",
+                                     peer_ip="141.85.224.202",
+                                     peer_port="12345", message_type="CHOKE")
         a.add_verbose_message(msg)
 
         expected_swarm_subfolder = 1
         expected_session_subfolder = 1
         expected_swarm_path = os.path.join(self.path,
-                str(expected_swarm_subfolder))
+                                           str(expected_swarm_subfolder))
         expected_session_path = os.path.join(expected_swarm_path,
-                str(expected_session_subfolder))
+                                             str(expected_session_subfolder))
         expected_message_file = os.path.join(expected_session_path,
-                "verbose.txt")
+                                             "verbose.txt")
 
         self.assertEqual(os.path.isfile(expected_message_file), True)
 
     def test_add_verbose_message_number_of_lines(self):
         self.assertEqual(True, True)
 
+
 class SQLiteDatabaseAccessTest(unittest.TestCase):
-    """
-    Test suite for SQLiteDatabaseAccess class in storage.py.
-    """
+
+    """Test suite for SQLiteDatabaseAccess class in storage.py."""
 
     # Class specific variables. Initialized in setUp, used throughout tests.
     database = "test.db"
@@ -313,7 +328,7 @@ class SQLiteDatabaseAccessTest(unittest.TestCase):
     expected_verbmsg_count = 0
 
     def get_swarm_count(self):
-        """ Retrieve number of entries in `swarms` table. """
+        """Retrieve number of entries in `swarms` table."""
         conn = sqlite3.connect(self.database)
         cursor = conn.cursor()
 
@@ -326,7 +341,7 @@ class SQLiteDatabaseAccessTest(unittest.TestCase):
         return count
 
     def get_session_count(self):
-        """ Retrieve number of entries in `client_sessions` table. """
+        """Retrieve number of entries in `client_sessions` table."""
         conn = sqlite3.connect(self.database)
         cursor = conn.cursor()
 
@@ -339,7 +354,7 @@ class SQLiteDatabaseAccessTest(unittest.TestCase):
         return count
 
     def get_statmsg_count(self):
-        """ Retrieve number of entries in `status_messages` table. """
+        """Retrieve number of entries in `status_messages` table."""
         conn = sqlite3.connect(self.database)
         cursor = conn.cursor()
 
@@ -352,7 +367,7 @@ class SQLiteDatabaseAccessTest(unittest.TestCase):
         return count
 
     def get_pstatmsg_count(self):
-        """ Retrieve number of entries in `peer_status_messages` table. """
+        """Retrieve number of entries in `peer_status_messages` table."""
         conn = sqlite3.connect(self.database)
         cursor = conn.cursor()
 
@@ -365,7 +380,7 @@ class SQLiteDatabaseAccessTest(unittest.TestCase):
         return count
 
     def get_verbmsg_count(self):
-        """ Retrieve number of entries in `verbose_messages` table. """
+        """Retrieve number of entries in `verbose_messages` table."""
         conn = sqlite3.connect(self.database)
         cursor = conn.cursor()
 
@@ -378,11 +393,11 @@ class SQLiteDatabaseAccessTest(unittest.TestCase):
         return count
 
     def setUp(self):
-        """ Create database file and instantiate objects. """
+        """Create database file and instantiate objects."""
         # Create database file. Create tables and indices.
         # TODO: Check exceptions.
         sql_create_script_path = os.path.join(os.path.dirname(__file__),
-                self.sql_create_script)
+                                              self.sql_create_script)
         f = open(sql_create_script_path, 'r')
         p = subprocess.Popen(["sqlite3", self.database], stdin=f)
         ret = os.waitpid(p.pid, 0)[1]
@@ -391,7 +406,7 @@ class SQLiteDatabaseAccessTest(unittest.TestCase):
         # Populate database.
         # TODO: Check exceptions.
         sql_init_script_path = os.path.join(os.path.dirname(__file__),
-                self.sql_init_script)
+                                            self.sql_init_script)
         f = open(sql_init_script_path, 'r')
         p = subprocess.Popen(["sqlite3", self.database], stdin=f)
         ret = os.waitpid(p.pid, 0)[1]
@@ -405,13 +420,13 @@ class SQLiteDatabaseAccessTest(unittest.TestCase):
         self.expected_verbmsg_count = 2
 
     def tearDown(self):
-        """ Close connection and remove database file. """
+        """Close connection and remove database file."""
         os.remove(self.database)
 
     def test_add_swarm_new_entry_in_table(self):
         # Add new swarm.
         s = storage.Swarm(torrent_filename="fedora.torrent",
-                data_size=102400)
+                          data_size=102400)
         a = storage.SQLiteDatabaseAccess(self.database)
         a.connect()
         a.add_swarm(s)
@@ -426,9 +441,12 @@ class SQLiteDatabaseAccessTest(unittest.TestCase):
     def test_add_client_session_new_entry_in_table(self):
         # Add new client session.
         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
-                system_os="Linux", system_os_version="2.6.26",
-                system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
-                public_port="50500", ds_limit=300, us_limit=200)
+                                   system_os="Linux",
+                                   system_os_version="2.6.26",
+                                   system_ram=2048, system_cpu=3000,
+                                   public_ip="141.85.224.201",
+                                   public_port="50500", ds_limit=300,
+                                   us_limit=200)
         a = storage.SQLiteDatabaseAccess(self.database)
         a.connect()
         a.add_client_session(cs)
@@ -443,10 +461,11 @@ class SQLiteDatabaseAccessTest(unittest.TestCase):
     def test_add_status_message_new_entry_in_table(self):
         # Add new status message.
         ts = datetime.datetime.strptime("2010-09-12 08:43:15",
-                "%Y-%m-%d %H:%M:%S")
+                                        "%Y-%m-%d %H:%M:%S")
         msg = storage.StatusMessage(client_session_id=1, timestamp=ts,
-                num_peers=10, num_dht_peers=3, download_speed=102,
-                upload_speed=99, download_size=10213, upload_size=3301)
+                                    num_peers=10, num_dht_peers=3,
+                                    download_speed=102, upload_speed=99,
+                                    download_size=10213, upload_size=3301)
         a = storage.SQLiteDatabaseAccess(self.database)
         a.connect()
         a.add_status_message(msg)
@@ -461,10 +480,11 @@ class SQLiteDatabaseAccessTest(unittest.TestCase):
     def test_add_peer_status_message_new_entry_in_table(self):
         # Add new peer status message.
         ts = datetime.datetime.strptime("2010-09-12 13:43:25",
-                "%Y-%m-%d %H:%M:%S")
+                                        "%Y-%m-%d %H:%M:%S")
         msg = storage.PeerStatusMessage(client_session_id=1, timestamp=ts,
-                peer_ip="141.85.224.202", peer_port="12345",
-                download_speed=13, upload_speed=98)
+                                        peer_ip="141.85.224.202",
+                                        peer_port="12345", download_speed=13,
+                                        upload_speed=98)
         a = storage.SQLiteDatabaseAccess(self.database)
         a.connect()
         a.add_peer_status_message(msg)
@@ -476,13 +496,15 @@ class SQLiteDatabaseAccessTest(unittest.TestCase):
 
         self.assertEqual(pstatmsg_count, self.expected_pstatmsg_count)
 
-    def test_add_verbose_message_new_entry_in_databas(self):
+    def test_add_verbose_message_new_entry_in_database(self):
         # Add new verbose message.
         ts = datetime.datetime.strptime("2010-09-12 13:43:24",
-                "%Y-%m-%d %H:%M:%S")
+                                        "%Y-%m-%d %H:%M:%S")
         msg = storage.VerboseMessage(client_session_id=1, timestamp=ts,
-            transfer_direction="send", peer_ip="141.85.224.202",
-            peer_port="12345", message_type="CHOKE")
+                                     transfer_direction="send",
+                                     peer_ip="141.85.224.202",
+                                     peer_port="12345",
+                                     message_type="CHOKE")
         a = storage.SQLiteDatabaseAccess(self.database)
         a.connect()
         a.add_verbose_message(msg)
@@ -496,9 +518,8 @@ class SQLiteDatabaseAccessTest(unittest.TestCase):
 
 
 class MySQLDatabaseAccessTest(unittest.TestCase):
-    """
-    Test suite for MySQLDatabaseAccess class in storage.py.
-    """
+
+    """Test suite for MySQLDatabaseAccess class in storage.py."""
 
     # Class specific variables. Initialized in setUp, used throughout tests.
     database = "p2p_test"
@@ -515,43 +536,42 @@ class MySQLDatabaseAccessTest(unittest.TestCase):
     expected_verbmsg_count = 0
 
     def get_swarm_count(self):
-        """ Retrieve number of entries in `swarms` table. """
-
+        """Retrieve number of entries in `swarms` table."""
         self.cursor.execute('SELECT COUNT(id) FROM swarms')
         count = self.cursor.fetchone()[0]
 
         return count
 
     def get_session_count(self):
-        """ Retrieve number of entries in `client_sessions` table. """
+        """Retrieve number of entries in `client_sessions` table."""
         self.cursor.execute('SELECT COUNT(id) FROM client_sessions')
         count = self.cursor.fetchone()[0]
 
         return count
 
     def get_statmsg_count(self):
-        """ Retrieve number of entries in `status_messages` table. """
+        """Retrieve number of entries in `status_messages` table."""
         self.cursor.execute('SELECT COUNT(id) FROM status_messages')
         count = self.cursor.fetchone()[0]
 
         return count
 
     def get_pstatmsg_count(self):
-        """ Retrieve number of entries in `peer_status_messages` table. """
+        """Retrieve number of entries in `peer_status_messages` table."""
         self.cursor.execute('SELECT COUNT(id) FROM peer_status_messages')
         count = self.cursor.fetchone()[0]
 
         return count
 
     def get_verbmsg_count(self):
-        """ Retrieve number of entries in `verbose_messages` table. """
+        """Retrieve number of entries in `verbose_messages` table."""
         self.cursor.execute('SELECT COUNT(id) FROM verbose_messages')
         count = self.cursor.fetchone()[0]
 
         return count
 
     def setUp(self):
-        """ Create database and instantiate objects. """
+        """Create database and instantiate objects."""
         # Create database. Create tables and indices.
         # TODO: Check exceptions.
 
@@ -569,11 +589,11 @@ class MySQLDatabaseAccessTest(unittest.TestCase):
                              "--password=%s" %self.password], stdin=f)
         ret = os.waitpid(p.pid, 0)[1]
         f.close()
-        
+
         # Populate database.
         # TODO: Check exceptions.
         sql_init_script_path = os.path.join(os.path.dirname(__file__),
-                self.sql_init_script)
+                                            self.sql_init_script)
 
         f = open(sql_init_script_path, 'r')
         p = subprocess.Popen(["mysql", self.database, "--user=%s" %self.user,
@@ -597,7 +617,7 @@ class MySQLDatabaseAccessTest(unittest.TestCase):
     def test_add_swarm_new_entry_in_table(self):
         # Add new swarm.
         s = storage.Swarm(torrent_filename="fedora.torrent",
-                data_size=102400)
+                          data_size=102400)
         a = storage.MySQLDatabaseAccess(self.database)
         a.connect()
         a.add_swarm(s)
@@ -612,9 +632,12 @@ class MySQLDatabaseAccessTest(unittest.TestCase):
     def test_add_client_session_new_entry_in_table(self):
         # Add new client session.
         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
-                system_os="Linux", system_os_version="2.6.26",
-                system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
-                public_port="50500", ds_limit=300, us_limit=200)
+                                   system_os="Linux",
+                                   system_os_version="2.6.26",
+                                   system_ram=2048, system_cpu=3000,
+                                   public_ip="141.85.224.201",
+                                   public_port="50500", ds_limit=300,
+                                   us_limit=200)
         a = storage.MySQLDatabaseAccess(self.database)
         a.connect()
         a.add_client_session(cs)
@@ -629,10 +652,11 @@ class MySQLDatabaseAccessTest(unittest.TestCase):
     def test_add_status_message_new_entry_in_table(self):
         # Add new status message.
         ts = datetime.datetime.strptime("2010-09-12 08:43:15",
-                "%Y-%m-%d %H:%M:%S")
+                                        "%Y-%m-%d %H:%M:%S")
         msg = storage.StatusMessage(client_session_id=1, timestamp=ts,
-                num_peers=10, num_dht_peers=3, download_speed=102,
-                upload_speed=99, download_size=10213, upload_size=3301)
+                                    num_peers=10, num_dht_peers=3,
+                                    download_speed=102, upload_speed=99,
+                                    download_size=10213, upload_size=3301)
         a = storage.MySQLDatabaseAccess(self.database)
         a.connect()
         a.add_status_message(msg)
@@ -647,10 +671,11 @@ class MySQLDatabaseAccessTest(unittest.TestCase):
     def test_add_peer_status_message_new_entry_in_table(self):
         # Add new peer status message.
         ts = datetime.datetime.strptime("2010-09-12 13:43:25",
-                "%Y-%m-%d %H:%M:%S")
+                                        "%Y-%m-%d %H:%M:%S")
         msg = storage.PeerStatusMessage(client_session_id=1, timestamp=ts,
-                peer_ip="141.85.224.202", peer_port="12345",
-                download_speed=13, upload_speed=98)
+                                        peer_ip="141.85.224.202",
+                                        peer_port="12345", download_speed=13,
+                                        upload_speed=98)
         a = storage.MySQLDatabaseAccess(self.database)
         a.connect()
         a.add_peer_status_message(msg)
@@ -662,13 +687,15 @@ class MySQLDatabaseAccessTest(unittest.TestCase):
 
         self.assertEqual(pstatmsg_count, self.expected_pstatmsg_count)
 
-    def test_add_verbose_message_new_entry_in_databas(self):
+    def test_add_verbose_message_new_entry_in_database(self):
         # Add new verbose message.
         ts = datetime.datetime.strptime("2010-09-12 13:43:24",
-                "%Y-%m-%d %H:%M:%S")
+                                        "%Y-%m-%d %H:%M:%S")
         msg = storage.VerboseMessage(client_session_id=1, timestamp=ts,
-            transfer_direction="send", peer_ip="141.85.224.202",
-            peer_port="12345", message_type="CHOKE")
+                                     transfer_direction="send",
+                                     peer_ip="141.85.224.202",
+                                     peer_port="12345",
+                                     message_type="CHOKE")
         a = storage.MySQLDatabaseAccess(self.database)
         a.connect()
         a.add_verbose_message(msg)
@@ -680,5 +707,6 @@ class MySQLDatabaseAccessTest(unittest.TestCase):
 
         self.assertEqual(verbmsg_count, self.expected_verbmsg_count)
 
+
 if __name__ == "__main__":
     unittest.main()