instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / Statistics / DatabaseCrawler.py
1 # Written by Boudewijn Schoon
2 # see LICENSE.txt for license information
3
4 import sys
5 import cPickle
6 from time import strftime
7
8 from BaseLib.Core.Overlay.SecureOverlay import OLPROTO_VER_SEVENTH, OLPROTO_VER_EIGHTH, OLPROTO_VER_ELEVENTH
9 # OLPROTO_VER_SEVENTH --> Sixth public release, >= 4.5.0, supports CRAWLER_REQUEST and CRAWLER_REPLY messages
10 # OLPROTO_VER_EIGHTH  --> Seventh public release, >= 5.0, supporting BuddyCast with clicklog info.
11
12 from BaseLib.Core.BitTornado.BT1.MessageID import CRAWLER_DATABASE_QUERY
13 from BaseLib.Core.CacheDB.sqlitecachedb import SQLiteCacheDB
14 from BaseLib.Core.Utilities.utilities import show_permid, show_permid_short
15 from BaseLib.Core.Statistics.Crawler import Crawler
16
17 DEBUG = False
18
19 class DatabaseCrawler:
20     __single = None
21
22     @classmethod
23     def get_instance(cls, *args, **kargs):
24         if not cls.__single:
25             cls.__single = cls(*args, **kargs)
26         return cls.__single
27
28     def __init__(self):
29         self._sqlite_cache_db = SQLiteCacheDB.getInstance()
30
31         crawler = Crawler.get_instance()
32         if crawler.am_crawler():
33             self._file = open("databasecrawler.txt", "a")
34             self._file.write("".join(("# ", "*" * 80, "\n# ", strftime("%Y/%m/%d %H:%M:%S"), " Crawler started\n")))
35             self._file.flush()
36         else:
37             self._file = None
38
39     def query_initiator(self, permid, selversion, request_callback):
40         """
41         Established a new connection. Send a CRAWLER_DATABASE_QUERY request.
42         @param permid The Tribler peer permid
43         @param selversion The oberlay protocol version
44         @param request_callback Call this function one or more times to send the requests: request_callback(message_id, payload)
45         """
46         if DEBUG: print >>sys.stderr, "databasecrawler: query_initiator", show_permid_short(permid)
47         sql = []
48         if selversion >= OLPROTO_VER_SEVENTH:
49             sql.extend(("SELECT 'peer_count', count(*) FROM Peer",
50                         "SELECT 'torrent_count', count(*) FROM Torrent"))
51
52         if selversion >= OLPROTO_VER_ELEVENTH:
53             sql.extend(("SELECT 'my_subscriptions', count(*) FROM VoteCast where voter_id='" + show_permid(permid) + "' and vote=2",
54                         "SELECT 'my_negative_votes', count(*) FROM VoteCast where voter_id='" + show_permid(permid) + "' and vote=-1",
55                         "SELECT 'my_channel_files', count(*) FROM ChannelCast where publisher_id='" + show_permid(permid) + "'",
56                         "SELECT 'all_subscriptions', count(*) FROM VoteCast where vote=2",
57                         "SELECT 'all_negative_votes', count(*) FROM VoteCast where vote=-1"))
58
59         # if OLPROTO_VER_EIGHTH <= selversion <= 11:
60         #     sql.extend(("SELECT 'moderations_count', count(*) FROM ModerationCast"))
61
62         # if selversion >= OLPROTO_VER_EIGHTH:
63         #     sql.extend(("SELECT 'positive_votes_count', count(*) FROM Moderators where status=1",
64         #                 "SELECT 'negative_votes_count', count(*) FROM Moderators where status=-1"))
65
66         request_callback(CRAWLER_DATABASE_QUERY, ";".join(sql), callback=self._after_request_callback)
67
68     def _after_request_callback(self, exc, permid):
69         """
70         Called by the Crawler with the result of the request_callback
71         call in the query_initiator method.
72         """
73         if not exc:
74             if DEBUG: print >>sys.stderr, "databasecrawler: request send to", show_permid_short(permid)
75             self._file.write("; ".join((strftime("%Y/%m/%d %H:%M:%S"), "REQUEST", show_permid(permid), "\n")))
76             self._file.flush()
77
78     def handle_crawler_request(self, permid, selversion, channel_id, message, reply_callback):
79         """
80         Received a CRAWLER_DATABASE_QUERY request.
81         @param permid The Crawler permid
82         @param selversion The overlay protocol version
83         @param channel_id Identifies a CRAWLER_REQUEST/CRAWLER_REPLY pair
84         @param message The message payload
85         @param reply_callback Call this function once to send the reply: reply_callback(payload [, error=123])
86         """
87         if DEBUG:
88             print >> sys.stderr, "databasecrawler: handle_crawler_request", show_permid_short(permid), message
89
90         # execute the sql
91         try:
92             cursor = self._sqlite_cache_db.execute_read(message)
93
94         except Exception, e:
95             reply_callback(str(e), error=1)
96         else:
97             if cursor:
98                 reply_callback(cPickle.dumps(list(cursor), 2))
99             else:
100                 reply_callback("error", error=2)
101
102     def handle_crawler_reply(self, permid, selversion, channel_id, channel_data, error, message, request_callback):
103         """
104         Received a CRAWLER_DATABASE_QUERY reply.
105         @param permid The Crawler permid
106         @param selversion The overlay protocol version
107         @param channel_id Identifies a CRAWLER_REQUEST/CRAWLER_REPLY pair
108         @param error The error value. 0 indicates success.
109         @param message The message payload
110         @param request_callback Call this function one or more times to send the requests: request_callback(message_id, payload)
111         """
112         if error:
113             if DEBUG:
114                 print >> sys.stderr, "databasecrawler: handle_crawler_reply", error, message
115
116             self._file.write("; ".join((strftime("%Y/%m/%d %H:%M:%S"), "  REPLY", show_permid(permid), str(error), message, "\n")))
117             self._file.flush()
118
119         else:
120             if DEBUG:
121                 print >> sys.stderr, "databasecrawler: handle_crawler_reply", show_permid_short(permid), cPickle.loads(message)
122
123             self._file.write("; ".join((strftime("%Y/%m/%d %H:%M:%S"), "  REPLY", show_permid(permid), str(error), str(cPickle.loads(message)), "\n")))
124             self._file.flush()
125