return swarm
def get_session_entries(self):
- """ Return list of SwarmDescription instances. """
+ """ Return list of SessionEntry instances. """
session_names = self.data.sections()
session_names.remove('swarm')
session_list = []
for session in session_names:
- entry = SwarmDescription()
+ entry = SessionEntry()
entry.data = ConfigParser.ConfigParser()
entry.data.add_section(session)
session_items = dict(self.data.items(session))
session_list.append(entry)
return session_list
- def get_session(self):
- """ Return name of the session.
- If it contains more than one session, return None.
+ def update_session_entry_id(self, session_entry, cs_id):
+ """ Add or modify client session id. """
+ session_name = session_entry.data.sections()[0]
+ self.data.set(session_name, 'client_session_id', str(cs_id))
+ session_entry.data.set(session_name, 'client_session_id', str(cs_id))
- """
- # TODO: May need rethinking. It may be better if entry is a separate
- # class or instance of SwarmDescription.
- session_list = self.data.sections()
- if 'swarm' in session_list:
- return None
- else:
- cs_data = dict(self.data.items(session_list[0]))
- cs = storage.ClientSession(
+
+ def get_file_archives(self):
+ """ Return a list containing all archives from swarm. """
+ archives = []
+ for section in self.data.sections():
+ try:
+ archives.append(self.data.get(section, 'log_file'))
+ except ConfigParser.NoOptionError:
+ pass
+ return archives
+
+class SessionEntry(object):
+ def __init__(self):
+ super(SessionEntry, self).__init__()
+
+
+ def get_session(self):
+ """ Return name of the session. """
+ session = self.data.sections()[0]
+ cs_data = dict(self.data.items(session))
+ cs = storage.ClientSession(
btclient = cs_data['bittorrent_client'],
system_os = cs_data['system_os'],
system_os_version = cs_data['system_os_version'],
description = cs_data['description'])
return cs
- def update_session_entry_id(self, session_entry, cs_id):
- """ Add or modify client session id. """
- session_name = session_entry.data.sections()[0]
- self.data.set(session_name, 'client_session_id', str(cs_id))
- session_entry.data.set(session_name, 'client_session_id', str(cs_id))
-
def get_session_id(self):
- """ Return client session id corresponding to the archive file."""
- section = self.data.sections()[0]
- cs_id = self.data.get(section, 'client_session_id')
- return cs_id
+ """ Return client session id corresponding to the entry.
+ If the session entry hasn't been asigned a session id raise error.
- def get_file_archives(self):
- """ Return a list containing all archives from swarm. """
- archives = []
- for section in self.data.sections():
- try:
- archives.append(self.data.get(section, 'log_file'))
- except ConfigParser.NoOptionError:
- pass
- return archives
+ """
+ section = self.data.sections()[0]
+ try:
+ cs_id = self.data.get(section, 'client_session_id')
+ return cs_id
+ except ConfigParser.NoOptionError:
+ logger.debug("No client session id for entry: %s" %section)
+ return None
class AccessConfig(object):
def __init__(self):
import config
import storage
-class SessionDescriptionTest(unittest.TestCase):
+class SwarmDescriptionTest(unittest.TestCase):
"""
- Test suite for SessionDescription class in config.py.
+ Test suite for SwarmDescription class in config.py.
"""
# Class specific variables. Initialized in setUp, used throughout tests.
self.assertEqual(entry.data.items(section).sort(),
list(self.test_data[section]).sort())
- def test_get_session(self):
- session_entries = self.sd.get_session_entries()
- entry = session_entries[0]
- cs = entry.get_session()
-
- self.assertTrue(type(cs) is storage.ClientSession)
- self.assertEqual(cs.system_os, 'Linux')
-
def test_update_session_entry_id(self):
session_entries = self.sd.get_session_entries()
entry = session_entries[0]
self.assertEqual(archives.sort(),expected_archives.sort())
+class SessionEntryTest(unittest.TestCase):
+ """
+ Test suite for SessionEntry class in config.py.
+ """
+
+ # Class specific variables. Initialized in setUp, used throughout tests.
+ se = None
+ sd = None
+ config_file = "config.sample.ini"
+
+ def setUp(self):
+ config_file_path = os.path.join(os.path.dirname(__file__),
+ self.config_file)
+ self.sd = config.SwarmDescription()
+ self.sd.load(config_file_path)
+ session_entries = self.sd.get_session_entries()
+
+ self.se = config.SessionEntry()
+ self.se = session_entries[2]
+
+ def test_get_session(self):
+ cs = self.se.get_session()
+
+ self.assertTrue(type(cs) is storage.ClientSession)
+ self.assertEqual(cs.system_os, 'Linux')
+ self.assertEqual(cs.system_ram, '2007')
+
+ def test_get_session_id(self):
+ # Check for no session id before update
+ #self.assertFalse(self.se.get_session_id())
+ test_cs_id = 2
+ self.sd.update_session_entry_id(self.se, test_cs_id)
+ cs_id = self.se.get_session_id()
+ self.assertEqual(str(cs_id), str(test_cs_id))
class AccessConfigTest(unittest.TestCase):
"""
- Test suite for SessionDescription class in config.py.
+ Test suite for AccessConfig class in config.py.
"""
# Class specific variables. Initialized in setUp, used throughout tests.
def test_get_swarm_writer(self):
sw = self.ac.get_swarm_writer()
-
- # Add swarm.
- s = storage.Swarm(torrent_filename="fedora.torrent", data_size=102400)
- sw.add_swarm(s)
-
- # Add client session.
- cs = storage.ClientSession(
- swarm_id=1, btclient="Tribler", system_os="Linux",
- system_os_version="2.6.26", system_ram=2048,
- system_cpu=3000, public_ip="141.85.224.201",
- public_port="50500", ds_limit=300, us_limit=200)
- sw.add_client_session(cs)
-
- ts = datetime.datetime.strptime("2010-09-12 08:43:15",
- "%Y-%m-%d %H:%M:%S")
- # Add status message.
- sm = storage.StatusMessage(
- client_session_id=1, timestamp=ts, num_peers=10,
- num_dht_peers=3, download_speed=102, upload_speed=99,
- download_size=10213, upload_size=3301)
- sw.add_status_message(sm)
-
- # Add peer status message.
- psm = storage.PeerStatusMessage(
- client_session_id=1, timestamp=ts,
- peer_ip="141.85.224.202", peer_port="12345",
- download_speed=13, upload_speed=98)
- sw.add_peer_status_message(psm)
-
- # Add verbose message.
- vm = storage.VerboseMessage(
- client_session_id=1, timestamp=ts,
- transfer_direction="send", peer_ip="141.85.224.202",
- peer_port="12345", message_type="CHOKE")
- sw.add_verbose_message(vm)
-
expected_handlers = 2
handlers = len(sw.handlers)
+
self.assertEqual(expected_handlers, handlers)
if __name__ == "__main__":