self.dba.disconnect()
os.remove(self.database)
+ def get_cursor_num_rows(self):
+ """
+ Return number of rows selected/affected by last cursor statement.
+ """
+ num_rows = 0
+ for row in self.cursor:
+ num_rows = num_rows+1
+
+ return num_rows
+
def test_connect(self):
""" If connection is successful, no exception is raised. """
try:
def test_select_swarms(self):
# TODO: An entry should be added in the startup script.
self.dba.select_swarms(False)
-
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 0)
def test_insert_swarms_row(self):
self.dba.insert_swarms_row(["fedora.torrent", 10000, "btex", "isohunt"])
- self.dba.select_swarms(False)
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ self.dba.select_swarms(False)
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 1)
def test_insert_swarms(self):
self.dba.insert_swarms("fedora.torrent", 10000, "btex", "isohunt")
- self.dba.select_swarms(False)
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ self.dba.select_swarms(False)
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 1)
def test_delete_swarms(self):
self.dba.insert_swarms("fedora.torrent", 10000, "btex", "isohunt")
self.dba.delete_swarms()
- self.dba.select_swarms(False)
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ self.dba.select_swarms(False)
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 0)
def test_select_btclients(self):
self.dba.select_btclients(False)
-
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 6)
def test_select_btclients_by_name(self):
self.dba.select_btclients_by_name("Tribler", False)
-
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 1)
def test_insert_btclients_row(self):
self.dba.insert_btclients_row(['NextShare', 'Python', 1, 0]);
- self.dba.select_btclients(False)
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ self.dba.select_btclients(False)
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 7)
def test_insert_btclients(self):
self.dba.insert_btclients('NextShare', 'Python', 1, 0);
- self.dba.select_btclients(False)
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ self.dba.select_btclients(False)
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 7)
def test_select_client_sessions_by_id(self):
# TODO: An entry should be added in the startup script.
self.dba.select_client_sessions_by_id(False)
-
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 0)
def test_select_client_sessions_by_swarm(self):
self.dba.select_client_sessions_by_swarm(False)
-
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 0)
def test_insert_client_sessions_row(self):
self.dba.insert_swarms("fedora.torrent", 10000, "btex", "isohunt")
self.dba.insert_client_sessions_row(['1', '2', 'Linux', '2.6.26', '512', '1500', '141.85.224.205', '50500', '512', '64', '2455125.02409722'])
- self.dba.select_client_sessions_by_id(False)
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ self.dba.select_client_sessions_by_id(False)
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 1)
self.dba.insert_swarms("fedora.torrent", 10000, "btex", "isohunt")
self.dba.insert_client_sessions_row(['1', '2', 'Linux', '2.6.26', '512', '1500', '141.85.224.205', '50500', '512', '64', '2455125.02409722'])
self.dba.delete_client_sessions_by_id(1)
- self.dba.select_client_sessions_by_id(False)
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ self.dba.select_client_sessions_by_id(False)
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 0)
self.dba.insert_swarms("fedora.torrent", 10000, "btex", "isohunt")
self.dba.insert_client_sessions_row(['1', '2', 'Linux', '2.6.26', '512', '1500', '141.85.224.205', '50500', '512', '64', '2455125.02409722'])
self.dba.delete_client_sessions_by_swarm(1)
- self.dba.select_client_sessions_by_id(False)
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ self.dba.select_client_sessions_by_id(False)
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 0)
def test_select_status_messages(self):
# TODO: An entry should be added in the startup script.
self.dba.select_status_messages(False)
-
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 0)
self.dba.insert_status_messages_row(['1', '2455128.10', '222', '0', '213', '56', '200', '300', '121.324'])
self.dba.select_status_messages(False)
-
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 1)
self.dba.insert_status_messages('1', '2455128.10', '222', '0', '213', '56', '200', '300', '121.324')
self.dba.select_status_messages(False)
-
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 1)
self.dba.delete_status_messages()
self.dba.select_status_messages(False)
-
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 0)
# TODO: An entry should be added in the startup script.
self.dba.select_verbose_messages(False)
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 0)
self.dba.select_verbose_messages(False)
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 1)
self.dba.select_verbose_messages(False)
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 1)
self.dba.select_verbose_messages(False)
- num_rows = 0
- for row in self.cursor:
- num_rows = num_rows+1
+ num_rows = self.get_cursor_num_rows()
self.assertEqual(num_rows, 0)